Skip to content

timer_queue.py

A simple thread safe timer queue implementation which has O(logn) time complexity.

TEARDOWN_SENTINEL = None module-attribute

__all__ = ['Timer', 'TimerQueueStruct', 'TimerQueue'] module-attribute

Timer

Timer wraps the callback and timestamp related attributes.

Source code in solnlib/timer_queue.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class Timer:
    """Timer wraps the callback and timestamp related attributes."""

    _ident = 0
    _lock = threading.Lock()

    def __init__(self, callback: Callable, when: int, interval: int, ident: int = None):
        """Initializes Timer.

        Arguments:
            callback: Arbitrary callable object.
            when: The first expiration time, seconds since epoch.
            interval: Timer interval, if equals 0, one time timer, otherwise
                the timer will be periodically executed.
            ident: (optional) Timer identity.
        """
        self._callback = callback
        self.when = when
        self.interval = interval

        if ident is not None:
            self.ident = ident
        else:
            with Timer._lock:
                self.ident = Timer._ident + 1
                Timer._ident = Timer._ident + 1

    def update_expiration(self):
        self.when += self.interval

    def __hash__(self):
        return hash(self.ident)

    def __eq__(self, other):
        return isinstance(other, Timer) and (self.ident == other.ident)

    def __lt__(self, other):
        return (self.when, self.ident) < (other.when, other.ident)

    def __le__(self, other):
        return (self.when, self.ident) <= (other.when, other.ident)

    def __gt__(self, other):
        return (self.when, self.ident) > (other.when, other.ident)

    def __ge__(self, other):
        return (self.when, self.ident) >= (other.when, other.ident)

    def __call__(self):
        self._callback()

ident = Timer._ident + 1 instance-attribute

interval = interval instance-attribute

when = when instance-attribute

__call__()

Source code in solnlib/timer_queue.py
79
80
def __call__(self):
    self._callback()

__eq__(other)

Source code in solnlib/timer_queue.py
64
65
def __eq__(self, other):
    return isinstance(other, Timer) and (self.ident == other.ident)

__ge__(other)

Source code in solnlib/timer_queue.py
76
77
def __ge__(self, other):
    return (self.when, self.ident) >= (other.when, other.ident)

__gt__(other)

Source code in solnlib/timer_queue.py
73
74
def __gt__(self, other):
    return (self.when, self.ident) > (other.when, other.ident)

__hash__()

Source code in solnlib/timer_queue.py
61
62
def __hash__(self):
    return hash(self.ident)

__init__(callback, when, interval, ident=None)

Initializes Timer.

Parameters:

Name Type Description Default
callback Callable

Arbitrary callable object.

required
when int

The first expiration time, seconds since epoch.

required
interval int

Timer interval, if equals 0, one time timer, otherwise the timer will be periodically executed.

required
ident int

(optional) Timer identity.

None
Source code in solnlib/timer_queue.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def __init__(self, callback: Callable, when: int, interval: int, ident: int = None):
    """Initializes Timer.

    Arguments:
        callback: Arbitrary callable object.
        when: The first expiration time, seconds since epoch.
        interval: Timer interval, if equals 0, one time timer, otherwise
            the timer will be periodically executed.
        ident: (optional) Timer identity.
    """
    self._callback = callback
    self.when = when
    self.interval = interval

    if ident is not None:
        self.ident = ident
    else:
        with Timer._lock:
            self.ident = Timer._ident + 1
            Timer._ident = Timer._ident + 1

__le__(other)

Source code in solnlib/timer_queue.py
70
71
def __le__(self, other):
    return (self.when, self.ident) <= (other.when, other.ident)

__lt__(other)

Source code in solnlib/timer_queue.py
67
68
def __lt__(self, other):
    return (self.when, self.ident) < (other.when, other.ident)

update_expiration()

Source code in solnlib/timer_queue.py
58
59
def update_expiration(self):
    self.when += self.interval

TimerQueue

A simple timer queue implementation.

It runs a separate thread to handle timers Note: to effectively use this timer queue, the timer callback should be short, otherwise it will cause other timers’s delay execution. A typical use scenario in production is that the timers are just a simple functions which inject themselvies to a task queue and then they are picked up by a threading/process pool to execute, as shows below:

Timers --enqueue---> TimerQueue --------expiration-----------
                                                            |
                                                            |
                                                           \|/
Threading/Process Pool <---- TaskQueue <--enqueue-- Timers' callback (nonblocking)

Examples:

>>> from solnlib import timer_queue
>>> tq = timer_queue.TimerQueue()
>>> tq.start()
>>> t = tq.add_timer(my_func, time.time(), 10)
>>> # do other stuff
>>> tq.stop()
Source code in solnlib/timer_queue.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
class TimerQueue:
    r"""A simple timer queue implementation.

    It runs a separate thread to handle timers Note: to effectively use this
    timer queue, the timer callback should be short, otherwise it will cause
    other timers's delay execution. A typical use scenario in production is
    that the timers are just a simple functions which inject themselvies to
    a task queue and then they are picked up by a threading/process pool to
    execute, as shows below:

        Timers --enqueue---> TimerQueue --------expiration-----------
                                                                    |
                                                                    |
                                                                   \|/
        Threading/Process Pool <---- TaskQueue <--enqueue-- Timers' callback (nonblocking)

    Examples:
           >>> from solnlib import timer_queue
           >>> tq = timer_queue.TimerQueue()
           >>> tq.start()
           >>> t = tq.add_timer(my_func, time.time(), 10)
           >>> # do other stuff
           >>> tq.stop()
    """

    def __init__(self):
        self._timers = TimerQueueStruct()
        self._lock = threading.Lock()
        self._wakeup_queue = Queue.Queue()
        self._thr = threading.Thread(target=self._check_and_execute)
        self._thr.daemon = True
        self._started = False

    def start(self):
        """Start the timer queue."""

        if self._started:
            return
        self._started = True

        self._thr.start()
        logging.info("TimerQueue started.")

    def stop(self):
        """Stop the timer queue."""

        if not self._started:
            return
        self._started = True

        self._wakeup(TEARDOWN_SENTINEL)
        self._thr.join()

    def add_timer(
        self, callback: Callable, when: int, interval: int, ident: int = None
    ) -> Timer:
        """Add timer to the queue.

        Arguments:
            callback: Arbitrary callable object.
            when: The first expiration time, seconds since epoch.
            interval: Timer interval, if equals 0, one time timer, otherwise
                the timer will be periodically executed
            ident: (optional) Timer identity.

        Returns:
            A timer object which should not be manipulated directly by
                clients. Used to delete/update the timer.
        """

        with self._lock:
            timer = self._timers.add_timer(callback, when, interval, ident)
        self._wakeup()
        return timer

    def remove_timer(self, timer: Timer):
        """Remove timer from the queue.

        Arguments:
            timer: Timer object to remove.
        """

        with self._lock:
            self._timers.remove_timer(timer)

    def _check_and_execute(self):
        wakeup_queue = self._wakeup_queue
        while 1:
            (next_expired_time, expired_timers) = self._get_expired_timers()
            for timer in expired_timers:
                try:
                    # Note, please make timer callback effective/short
                    timer()
                except Exception:
                    logging.error(traceback.format_exc())

            self._reset_timers(expired_timers)

            sleep_time = _calc_sleep_time(next_expired_time)
            try:
                wakeup = wakeup_queue.get(timeout=sleep_time)
                if wakeup is TEARDOWN_SENTINEL:
                    break
            except Queue.Empty:
                pass
        logging.info("TimerQueue stopped.")

    def _get_expired_timers(self):
        with self._lock:
            return self._timers.get_expired_timers()

    def _reset_timers(self, expired_timers):
        with self._lock:
            has_new_timer = self._timers.reset_timers(expired_timers)

        if has_new_timer:
            self._wakeup()

    def _wakeup(self, something="not_None"):
        self._wakeup_queue.put(something)

__init__()

Source code in solnlib/timer_queue.py
218
219
220
221
222
223
224
def __init__(self):
    self._timers = TimerQueueStruct()
    self._lock = threading.Lock()
    self._wakeup_queue = Queue.Queue()
    self._thr = threading.Thread(target=self._check_and_execute)
    self._thr.daemon = True
    self._started = False

add_timer(callback, when, interval, ident=None)

Add timer to the queue.

Parameters:

Name Type Description Default
callback Callable

Arbitrary callable object.

required
when int

The first expiration time, seconds since epoch.

required
interval int

Timer interval, if equals 0, one time timer, otherwise the timer will be periodically executed

required
ident int

(optional) Timer identity.

None

Returns:

Type Description
Timer

A timer object which should not be manipulated directly by clients. Used to delete/update the timer.

Source code in solnlib/timer_queue.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def add_timer(
    self, callback: Callable, when: int, interval: int, ident: int = None
) -> Timer:
    """Add timer to the queue.

    Arguments:
        callback: Arbitrary callable object.
        when: The first expiration time, seconds since epoch.
        interval: Timer interval, if equals 0, one time timer, otherwise
            the timer will be periodically executed
        ident: (optional) Timer identity.

    Returns:
        A timer object which should not be manipulated directly by
            clients. Used to delete/update the timer.
    """

    with self._lock:
        timer = self._timers.add_timer(callback, when, interval, ident)
    self._wakeup()
    return timer

remove_timer(timer)

Remove timer from the queue.

Parameters:

Name Type Description Default
timer Timer

Timer object to remove.

required
Source code in solnlib/timer_queue.py
268
269
270
271
272
273
274
275
276
def remove_timer(self, timer: Timer):
    """Remove timer from the queue.

    Arguments:
        timer: Timer object to remove.
    """

    with self._lock:
        self._timers.remove_timer(timer)

start()

Start the timer queue.

Source code in solnlib/timer_queue.py
226
227
228
229
230
231
232
233
234
def start(self):
    """Start the timer queue."""

    if self._started:
        return
    self._started = True

    self._thr.start()
    logging.info("TimerQueue started.")

stop()

Stop the timer queue.

Source code in solnlib/timer_queue.py
236
237
238
239
240
241
242
243
244
def stop(self):
    """Stop the timer queue."""

    if not self._started:
        return
    self._started = True

    self._wakeup(TEARDOWN_SENTINEL)
    self._thr.join()

TimerQueueStruct

The underlying data structure for TimerQueue.

Source code in solnlib/timer_queue.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
class TimerQueueStruct:
    """The underlying data structure for TimerQueue."""

    def __init__(self):
        self._timers = sc.SortedSet()
        self._cancelling_timers = {}

    def add_timer(
        self, callback: Callable, when: int, interval: int, ident: int
    ) -> Timer:
        """Add timer to the data structure.

        Arguments:
            callback: Arbitrary callable object.
            when: The first expiration time, seconds since epoch.
            interval: Timer interval, if equals 0, one time timer, otherwise
                the timer will be periodically executed
            ident: (optional) Timer identity.

        Returns:
            A timer object which should not be manipulated directly by
                clients. Used to delete/update the timer.
        """

        timer = Timer(callback, when, interval, ident)
        self._timers.add(timer)
        return timer

    def remove_timer(self, timer: Timer):
        """Remove timer from data structure.

        Arguments:
            timer: Timer object which is returned by ``TimerQueueStruct.add_timer``.
        """

        try:
            self._timers.remove(timer)
        except ValueError:
            logging.info(
                "Timer=%s is not in queue, move it to cancelling " "list", timer.ident
            )
        else:
            self._cancelling_timers[timer.ident] = timer

    def get_expired_timers(self) -> Tuple:
        """Get a list of expired timers.

        Returns:
            A tuple of ``Timer``, empty list if there is no expired timers.
        """

        next_expired_time = 0
        now = time()
        expired_timers = []
        for timer in self._timers:
            if timer.when <= now:
                expired_timers.append(timer)

        if expired_timers:
            del self._timers[: len(expired_timers)]

        if self._timers:
            next_expired_time = self._timers[0].when
        return next_expired_time, expired_timers

    def reset_timers(self, expired_timers: List[Timer]) -> bool:
        """Re-add the expired periodical timers to data structure for next
        round scheduling.

        Arguments:
            expired_timers: List of expired timers.

        Returns:
            True if there are timers added, False otherwise.
        """

        has_new_timer = False
        cancelling_timers = self._cancelling_timers
        for timer in expired_timers:
            if timer.ident in cancelling_timers:
                continue
            elif timer.interval:
                # Repeated timer
                timer.update_expiration()
                self._timers.add(timer)
                has_new_timer = True
        cancelling_timers.clear()
        return has_new_timer

    def check_and_execute(self) -> float:
        """Get expired timers and execute callbacks for the timers.

        Returns:
            Duration of next expired timer.
        """

        (next_expired_time, expired_timers) = self.get_expired_timers()
        for timer in expired_timers:
            try:
                timer()
            except Exception:
                logging.error(traceback.format_exc())

        self.reset_timers(expired_timers)
        return _calc_sleep_time(next_expired_time)

__init__()

Source code in solnlib/timer_queue.py
89
90
91
def __init__(self):
    self._timers = sc.SortedSet()
    self._cancelling_timers = {}

add_timer(callback, when, interval, ident)

Add timer to the data structure.

Parameters:

Name Type Description Default
callback Callable

Arbitrary callable object.

required
when int

The first expiration time, seconds since epoch.

required
interval int

Timer interval, if equals 0, one time timer, otherwise the timer will be periodically executed

required
ident int

(optional) Timer identity.

required

Returns:

Type Description
Timer

A timer object which should not be manipulated directly by clients. Used to delete/update the timer.

Source code in solnlib/timer_queue.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def add_timer(
    self, callback: Callable, when: int, interval: int, ident: int
) -> Timer:
    """Add timer to the data structure.

    Arguments:
        callback: Arbitrary callable object.
        when: The first expiration time, seconds since epoch.
        interval: Timer interval, if equals 0, one time timer, otherwise
            the timer will be periodically executed
        ident: (optional) Timer identity.

    Returns:
        A timer object which should not be manipulated directly by
            clients. Used to delete/update the timer.
    """

    timer = Timer(callback, when, interval, ident)
    self._timers.add(timer)
    return timer

check_and_execute()

Get expired timers and execute callbacks for the timers.

Returns:

Type Description
float

Duration of next expired timer.

Source code in solnlib/timer_queue.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def check_and_execute(self) -> float:
    """Get expired timers and execute callbacks for the timers.

    Returns:
        Duration of next expired timer.
    """

    (next_expired_time, expired_timers) = self.get_expired_timers()
    for timer in expired_timers:
        try:
            timer()
        except Exception:
            logging.error(traceback.format_exc())

    self.reset_timers(expired_timers)
    return _calc_sleep_time(next_expired_time)

get_expired_timers()

Get a list of expired timers.

Returns:

Type Description
Tuple

A tuple of Timer, empty list if there is no expired timers.

Source code in solnlib/timer_queue.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def get_expired_timers(self) -> Tuple:
    """Get a list of expired timers.

    Returns:
        A tuple of ``Timer``, empty list if there is no expired timers.
    """

    next_expired_time = 0
    now = time()
    expired_timers = []
    for timer in self._timers:
        if timer.when <= now:
            expired_timers.append(timer)

    if expired_timers:
        del self._timers[: len(expired_timers)]

    if self._timers:
        next_expired_time = self._timers[0].when
    return next_expired_time, expired_timers

remove_timer(timer)

Remove timer from data structure.

Parameters:

Name Type Description Default
timer Timer

Timer object which is returned by TimerQueueStruct.add_timer.

required
Source code in solnlib/timer_queue.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def remove_timer(self, timer: Timer):
    """Remove timer from data structure.

    Arguments:
        timer: Timer object which is returned by ``TimerQueueStruct.add_timer``.
    """

    try:
        self._timers.remove(timer)
    except ValueError:
        logging.info(
            "Timer=%s is not in queue, move it to cancelling " "list", timer.ident
        )
    else:
        self._cancelling_timers[timer.ident] = timer

reset_timers(expired_timers)

Re-add the expired periodical timers to data structure for next round scheduling.

Parameters:

Name Type Description Default
expired_timers List[Timer]

List of expired timers.

required

Returns:

Type Description
bool

True if there are timers added, False otherwise.

Source code in solnlib/timer_queue.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def reset_timers(self, expired_timers: List[Timer]) -> bool:
    """Re-add the expired periodical timers to data structure for next
    round scheduling.

    Arguments:
        expired_timers: List of expired timers.

    Returns:
        True if there are timers added, False otherwise.
    """

    has_new_timer = False
    cancelling_timers = self._cancelling_timers
    for timer in expired_timers:
        if timer.ident in cancelling_timers:
            continue
        elif timer.interval:
            # Repeated timer
            timer.update_expiration()
            self._timers.add(timer)
            has_new_timer = True
    cancelling_timers.clear()
    return has_new_timer