Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""Module defining the retry_factory function""" 

2 

3import asyncio 

4import itertools 

5import random 

6import time 

7from dataclasses import dataclass 

8from functools import update_wrapper 

9from typing import Any, overload 

10 

11from tubthumper import _types 

12from tubthumper._types import AwaitableCallable, RetryCallable, T 

13 

14__all__ = ["RetryError"] 

15 

16COUNTER_EXCEPTION = RuntimeError("Infinite retry counter stopped iteration") 

17 

18 

19class RetryError(Exception): 

20 """Exception raised when a retry or time limit is reached""" 

21 

22 

23@dataclass(frozen=True) 

24class RetryConfig: 

25 """Config class for retry logic""" 

26 

27 exceptions: _types.Exceptions 

28 retry_limit: _types.RetryLimit 

29 time_limit: _types.TimeLimit 

30 init_backoff: _types.InitBackoff 

31 exponential: _types.Exponential 

32 jitter: _types.Jitter 

33 reraise: _types.Reraise 

34 log_level: _types.LogLevel 

35 logger: _types.Logger 

36 

37 

38@overload 

39def retry_factory( 

40 func: AwaitableCallable[T], 

41 retry_config: RetryConfig, 

42) -> AwaitableCallable[T]: 

43 ... 

44 

45 

46@overload 

47def retry_factory( 

48 func: RetryCallable[T], 

49 retry_config: RetryConfig, 

50) -> RetryCallable[T]: 

51 ... 

52 

53 

54def retry_factory(func, retry_config): # type: ignore 

55 """ 

56 Function that produces a retry_function given a function to retry, 

57 and config to determine retry logic. 

58 """ 

59 if asyncio.iscoroutinefunction(func): 

60 retry_func = _async_retry_factory(func, retry_config) 

61 else: 

62 retry_func = _sync_retry_factory(func, retry_config) 

63 update_wrapper(retry_func, func) 

64 return retry_func 

65 

66 

67def _async_retry_factory( 

68 func: AwaitableCallable[T], 

69 retry_config: RetryConfig, 

70) -> AwaitableCallable[T]: 

71 async def retry_func(*args: Any, **kwargs: Any) -> T: 

72 retry_timeout = _get_timeout(retry_config.time_limit) 

73 for retry_count in itertools.count(): 

74 try: 

75 return await func(*args, **kwargs) 

76 except retry_config.exceptions as exc: 

77 backoff = _process_exception( 

78 retry_config, exc, retry_count, retry_timeout 

79 ) 

80 await asyncio.sleep(backoff) 

81 raise COUNTER_EXCEPTION 

82 

83 return retry_func 

84 

85 

86def _sync_retry_factory( 

87 func: RetryCallable[T], 

88 retry_config: RetryConfig, 

89) -> RetryCallable[T]: 

90 def retry_func(*args: Any, **kwargs: Any) -> T: 

91 retry_timeout = _get_timeout(retry_config.time_limit) 

92 for retry_count in itertools.count(): 

93 try: 

94 return func(*args, **kwargs) 

95 except retry_config.exceptions as exc: 

96 backoff = _process_exception( 

97 retry_config, exc, retry_count, retry_timeout 

98 ) 

99 time.sleep(backoff) 

100 raise COUNTER_EXCEPTION 

101 

102 return retry_func 

103 

104 

105def _process_exception( 

106 retry_config: RetryConfig, exc: Exception, retry_count: int, retry_timeout: float 

107) -> float: 

108 if retry_count >= retry_config.retry_limit: 

109 if retry_config.reraise: 

110 raise exc 

111 raise RetryError(f"Retry limit {retry_config.retry_limit} reached") from exc 

112 backoff = retry_config.init_backoff * retry_config.exponential ** retry_count 

113 if retry_config.jitter: 

114 backoff *= random.random() 

115 if (time.perf_counter() + backoff) > retry_timeout: 

116 if retry_config.reraise: 

117 raise exc 

118 raise RetryError(f"Time limit {retry_config.time_limit} exceeded") from exc 

119 retry_config.logger.log( 

120 retry_config.log_level, 

121 f"Function threw exception below on try {retry_count + 1}, " 

122 f"retrying in {backoff:n} seconds", 

123 exc_info=True, 

124 ) 

125 return backoff 

126 

127 

128def _get_timeout(time_limit: _types.TimeLimit) -> float: 

129 return time.perf_counter() + time_limit