[go: up one dir, main page]

File: twisted_utils.py

package info (click to toggle)
txwinrm 1.3.3-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 3,480 kB
  • sloc: python: 4,449; xml: 517; sh: 59; makefile: 12
file content (53 lines) | stat: -rw-r--r-- 1,845 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
##############################################################################
#
# Copyright (C) Zenoss, Inc. 2016, all rights reserved.
#
# This content is made available according to terms specified in
# License.zenoss under the directory where your Zenoss product is installed.
#
##############################################################################

from twisted.internet import defer, error, reactor
from twisted.python import failure


def with_timeout(fn, args=None, kwargs=None, seconds=None, exception_class=error.TimeoutError):
    """Execute asynchronous function fn(*args, **kwargs) with a timeout."""
    return add_timeout(
        deferred=fn(*args, **kwargs),
        seconds=seconds,
        exception_class=exception_class)


def add_timeout(deferred, seconds, exception_class=error.TimeoutError):
    """Return new Deferred that will errback exception_class after seconds."""
    deferred_with_timeout = defer.Deferred()

    def fire_timeout():
        deferred.cancel()

        if not deferred_with_timeout.called:
            deferred_with_timeout.errback(exception_class())

    delayed_timeout = reactor.callLater(seconds, fire_timeout)

    def handle_result(result):
        is_failure = isinstance(result, failure.Failure)
        is_cancelled = is_failure and isinstance(result.value, defer.CancelledError)

        if delayed_timeout.active():
            # Cancel the timeout since a result came before it fired.
            delayed_timeout.cancel()
        elif is_cancelled:
            # Don't propagate cancellations that we caused.
            return

        # Propagate remaining results.
        if is_failure:
            deferred_with_timeout.errback(result)
        else:
            deferred_with_timeout.callback(result)

    deferred.addBoth(handle_result)

    return deferred_with_timeout