Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Module defining the retry_factory function"""
3import asyncio
4import itertools
5import random
6import time
7from dataclasses import dataclass
8from functools import update_wrapper
9from typing import Any, overload
11from tubthumper import _types
12from tubthumper._types import AwaitableCallable, RetryCallable, T
14__all__ = ["RetryError"]
16COUNTER_EXCEPTION = RuntimeError("Infinite retry counter stopped iteration")
19class RetryError(Exception):
20 """Exception raised when a retry or time limit is reached"""
23@dataclass(frozen=True)
24class RetryConfig:
25 """Config class for retry logic"""
27 exceptions: _types.Exceptions
28 retry_limit: _types.RetryLimit
29 time_limit: _types.TimeLimit
30 init_backoff: _types.InitBackoff
31 exponential: _types.Exponential
32 jitter: _types.Jitter
33 reraise: _types.Reraise
34 log_level: _types.LogLevel
35 logger: _types.Logger
38@overload
39def retry_factory(
40 func: AwaitableCallable[T],
41 retry_config: RetryConfig,
42) -> AwaitableCallable[T]:
43 ...
46@overload
47def retry_factory(
48 func: RetryCallable[T],
49 retry_config: RetryConfig,
50) -> RetryCallable[T]:
51 ...
54def retry_factory(func, retry_config): # type: ignore
55 """
56 Function that produces a retry_function given a function to retry,
57 and config to determine retry logic.
58 """
59 if asyncio.iscoroutinefunction(func):
60 retry_func = _async_retry_factory(func, retry_config)
61 else:
62 retry_func = _sync_retry_factory(func, retry_config)
63 update_wrapper(retry_func, func)
64 return retry_func
67def _async_retry_factory(
68 func: AwaitableCallable[T],
69 retry_config: RetryConfig,
70) -> AwaitableCallable[T]:
71 async def retry_func(*args: Any, **kwargs: Any) -> T:
72 retry_timeout = _get_timeout(retry_config.time_limit)
73 for retry_count in itertools.count():
74 try:
75 return await func(*args, **kwargs)
76 except retry_config.exceptions as exc:
77 backoff = _process_exception(
78 retry_config, exc, retry_count, retry_timeout
79 )
80 await asyncio.sleep(backoff)
81 raise COUNTER_EXCEPTION
83 return retry_func
86def _sync_retry_factory(
87 func: RetryCallable[T],
88 retry_config: RetryConfig,
89) -> RetryCallable[T]:
90 def retry_func(*args: Any, **kwargs: Any) -> T:
91 retry_timeout = _get_timeout(retry_config.time_limit)
92 for retry_count in itertools.count():
93 try:
94 return func(*args, **kwargs)
95 except retry_config.exceptions as exc:
96 backoff = _process_exception(
97 retry_config, exc, retry_count, retry_timeout
98 )
99 time.sleep(backoff)
100 raise COUNTER_EXCEPTION
102 return retry_func
105def _process_exception(
106 retry_config: RetryConfig, exc: Exception, retry_count: int, retry_timeout: float
107) -> float:
108 if retry_count >= retry_config.retry_limit:
109 if retry_config.reraise:
110 raise exc
111 raise RetryError(f"Retry limit {retry_config.retry_limit} reached") from exc
112 backoff = retry_config.init_backoff * retry_config.exponential ** retry_count
113 if retry_config.jitter:
114 backoff *= random.random()
115 if (time.perf_counter() + backoff) > retry_timeout:
116 if retry_config.reraise:
117 raise exc
118 raise RetryError(f"Time limit {retry_config.time_limit} exceeded") from exc
119 retry_config.logger.log(
120 retry_config.log_level,
121 f"Function threw exception below on try {retry_count + 1}, "
122 f"retrying in {backoff:n} seconds",
123 exc_info=True,
124 )
125 return backoff
128def _get_timeout(time_limit: _types.TimeLimit) -> float:
129 return time.perf_counter() + time_limit