1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import random
23
24 from twisted.internet import defer, reactor
25 from twisted.python import reflect
26
27
28 from flumotion.common import errors
29
30 __version__ = "$Rev: 7901 $"
31
32
33
34
35
37
38 def wrapper(*args, **kwargs):
39 gen = proc(*args, **kwargs)
40 result = defer.Deferred()
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 result.__callbacks = result.callbacks
60
61 def with_saved_callbacks(proc, *_args, **_kwargs):
62 saved_callbacks, saved_called = result.callbacks, result.called
63 result.callbacks, result.called = result.__callbacks, False
64 proc(*_args, **_kwargs)
65 result.callbacks, result.called = saved_callbacks, saved_called
66
67
68
69 def default_errback(failure, d):
70
71
72 if failure.check(errors.HandledException):
73 return failure
74
75 def print_traceback(f):
76 import traceback
77 print 'flumotion.twisted.defer: ' + \
78 'Unhandled error calling', proc.__name__, ':', f.type
79 traceback.print_exc()
80 with_saved_callbacks(lambda: d.addErrback(print_traceback))
81 raise
82 result.addErrback(default_errback, result)
83
84 def generator_next():
85 try:
86 x = gen.next()
87 if isinstance(x, defer.Deferred):
88 x.addCallback(callback, x).addErrback(errback, x)
89 else:
90 result.callback(x)
91 except StopIteration:
92 result.callback(None)
93 except Exception, e:
94 result.errback(e)
95
96 def errback(failure, d):
97
98 def raise_error():
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113 k, v = failure.parents[-1], failure.value
114 try:
115 if isinstance(k, str):
116 k = reflect.namedClass(k)
117 if isinstance(v, tuple):
118 e = k(*v)
119 else:
120 e = k(v)
121 except Exception:
122 e = Exception('%s: %r' % (failure.type, v))
123 raise e
124 d.value = raise_error
125 generator_next()
126
127 def callback(result, d):
128 d.value = lambda: result
129 generator_next()
130
131 generator_next()
132
133 return result
134
135 return wrapper
136
137
141
142
144 """
145 Return a deferred which will fire from a callLater after d fires
146 """
147
148 def fire(result, d):
149 reactor.callLater(0, d.callback, result)
150 res = defer.Deferred()
151 deferred.addCallback(fire, res)
152 return res
153
154
156 """
157 I am a helper class to make sure that the deferred is fired only once
158 with either a result or exception.
159
160 @ivar d: the deferred that gets fired as part of the resolution
161 @type d: L{twisted.internet.defer.Deferred}
162 """
163
165 self.d = defer.Deferred()
166 self.fired = False
167
169 """
170 Clean up any resources related to the resolution.
171 Subclasses can implement me.
172 """
173 pass
174
176 """
177 Make the result succeed, triggering the callbacks with
178 the given result. If a result was already reached, do nothing.
179 """
180 if not self.fired:
181 self.fired = True
182 self.cleanup()
183 self.d.callback(result)
184
186 """
187 Make the result fail, triggering the errbacks with the given exception.
188 If a result was already reached, do nothing.
189 """
190 if not self.fired:
191 self.fired = True
192 self.cleanup()
193 self.d.errback(exception)
194
195
197 """
198 Provides a mechanism to attempt to run some deferred operation until it
199 succeeds. On failure, the operation is tried again later, exponentially
200 backing off.
201 """
202 maxDelay = 1800
203 initialDelay = 5.0
204
205 factor = 2.7182818284590451
206 jitter = 0.11962656492
207 delay = None
208
209 - def __init__(self, deferredCreate, *args, **kwargs):
210 """
211 Create a new RetryingDeferred. Will call
212 deferredCreate(*args, **kwargs) each time a new deferred is needed.
213 """
214 self._create = deferredCreate
215 self._args = args
216 self._kwargs = kwargs
217
218 self._masterD = None
219 self._running = False
220 self._callId = None
221
223 """
224 Start trying. Returns a deferred that will fire when this operation
225 eventually succeeds. That deferred will only errback if this
226 RetryingDeferred is cancelled (it will then errback with the result of
227 the next attempt if one is in progress, or a CancelledError.
228 # TODO: yeah?
229 """
230 self._masterD = defer.Deferred()
231 self._running = True
232 self.delay = None
233
234 self._retry()
235
236 return self._masterD
237
246
248 self._callId = None
249 d = self._create(*self._args, **self._kwargs)
250 d.addCallbacks(self._success, self._failed)
251
253
254 self._masterD.callback(val)
255 self._masterD = None
256
258 if self._running:
259 next = self._nextDelay()
260 self._callId = reactor.callLater(next, self._retry)
261 else:
262 self._masterD.errback(failure)
263 self._masterD = None
264
277