[go: up one dir, main page]

File: stream.py

package info (click to toggle)
txtorcon 0.11.0-1
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 972 kB
  • ctags: 1,284
  • sloc: python: 7,531; makefile: 205
file content (271 lines) | stat: -rw-r--r-- 9,951 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
"""
Contains an implementation of a :class:`Stream abstraction used by
:class:`TorState to represent all streams in Tor's state. There is
also an interface called :class:`interface.IStreamListener` for
listening for stream updates (see also
:meth:`TorState.add_stream_listener`) and the interface called
:class:interface.IStreamAttacher` used by :class:`TorState` as a way
to attach streams to circuits "by hand"

"""

from twisted.python import log
from twisted.internet import defer
from txtorcon.interface import ICircuitContainer, IStreamListener
from txtorcon.util import find_keywords, maybe_ip_addr


class Stream(object):
    """
    Represents an active stream in Tor's state (:class:`txtorcon.TorState`).

    :ivar circuit:
        Streams will generally be attached to circuits pretty
        quickly. If they are attached, circuit will be a
        :class:`txtorcon.Circuit` instance or None if this stream
        isn't yet attached to a circuit.

    :ivar state:
        Tor's idea of the stream's state, one of:
          - NEW: New request to connect
          - NEWRESOLVE: New request to resolve an address
          - REMAP: Address re-mapped to another
          - SENTCONNECT: Sent a connect cell along a circuit
          - SENTRESOLVE: Sent a resolve cell along a circuit
          - SUCCEEDED: Received a reply; stream established
          - FAILED: Stream failed and not retriable
          - CLOSED: Stream closed
          - DETACHED: Detached from circuit; still retriable

    :ivar target_host:
        Something like www.example.com -- the host the stream is destined for.

    :ivar target_port:
        The port the stream will exit to.

    :ivar target_addr:
        Target address, looked up (usually) by Tor (e.g. 127.0.0.1).

    :ivar id:
        The ID of this stream, a number (or None if unset).
    """

    def __init__(self, circuitcontainer):
        """
        :param circuitcontainer: an object which implements
        :class:`interface.ICircuitContainer`
        """

        self.circuit_container = ICircuitContainer(circuitcontainer)

        ## FIXME: Sphinx doesn't seem to understand these variable
        ## docstrings, so consolidate with above if Sphinx is the
        ## answer -- actually it does, so long as the :ivar: things
        ## are never mentioned it seems.

        self.id = None
        """An int, Tor's ID for this :class:`txtorcon.Circuit`"""

        self.state = None
        """A string, Tor's idea of the state of this
        :class:`txtorcon.Stream`"""

        self.target_host = None
        """Usually a hostname, but sometimes an IP address (e.g. when
        we query existing state from Tor)"""

        self.target_addr = None
        """If available, the IP address we're connecting to (if None,
        see target_host instead)."""

        self.target_port = 0
        """The port we're connecting to."""

        self.circuit = None
        """If we've attached to a :class:`txtorcon.Circuit`, this will
        be an instance of :class:`txtorcon.Circuit` (otherwise None)."""

        self.listeners = []
        """A list of all connected
        :class:`txtorcon.interface.ICircuitListener` instances."""

        self.source_addr = None
        """If available, the address from which this Stream originated
        (e.g. local process, etc). See get_process() also."""

        self.source_port = 0
        """If available, the port from which this Stream
        originated. See get_process() also."""

        self.flags = {}
        """All flags from last update to this Stream. str->str"""

        self._closing_deferred = None
        """Internal. Holds Deferred that will callback when this
        stream is CLOSED, FAILED (or DETACHED??)"""

    def listen(self, listen):
        """
        Attach an :class:`txtorcon.interface.IStreamListener` to this stream.

        See also :meth:`txtorcon.TorState.add_stream_listener` to
        listen to all streams.

        :param listen: something that knows
        :class:`txtorcon.interface.IStreamListener`
        """

        listener = IStreamListener(listen)
        if listener not in self.listeners:
            self.listeners.append(listener)

    def unlisten(self, listener):
        self.listeners.remove(listener)

    def close(self, **kw):
        """
        This asks Tor to close the underlying stream object. See
        :method:`txtorcon.interface.ITorControlProtocol.close_stream`
        for details.

        Although Tor currently takes no flags, it allows you to; any
        keyword arguments are passed through as flags.

        NOTE that the callback delivered from this method only
        callbacks after the underlying stream is really destroyed
        (*not* just when the CLOSESTREAM command has successfully
        completed).
        """

        self._closing_deferred = defer.Deferred()

        def close_command_is_queued(*args):
            return self._closing_deferred
        d = self.circuit_container.close_stream(self, **kw)
        d.addCallback(close_command_is_queued)
        return self._closing_deferred

    def _create_flags(self, kw):
        "this clones the kw dict, adding a lower-case version of every key (duplicated in circuit.py; consider putting in util?)"

        flags = {}
        for k in kw.keys():
            flags[k] = kw[k]
            flags[k.lower()] = flags[k]
        return flags

    def update(self, args):
        ## print "update",self.id,args

        if self.id is None:
            self.id = int(args[0])
        else:
            if self.id != int(args[0]):
                raise RuntimeError("Update for wrong stream.")

        kw = find_keywords(args)
        self.flags = kw

        if 'SOURCE_ADDR' in kw:
            last_colon = kw['SOURCE_ADDR'].rfind(':')
            self.source_addr = kw['SOURCE_ADDR'][:last_colon]
            if self.source_addr != '(Tor_internal)':
                self.source_addr = maybe_ip_addr(self.source_addr)
            self.source_port = int(kw['SOURCE_ADDR'][last_colon + 1:])

        self.state = args[1]
        if self.state in ['NEW', 'NEWRESOLVE', 'SUCCEEDED']:
            if self.target_host is None:
                last_colon = args[3].rfind(':')
                self.target_host = args[3][:last_colon]
                self.target_port = int(args[3][last_colon + 1:])

            self.target_port = int(self.target_port)
            if self.state == 'NEW':
                if self.circuit is not None:
                    log.err(RuntimeError("Weird: circuit valid in NEW"))
                [x.stream_new(self) for x in self.listeners]
            else:
                [x.stream_succeeded(self) for x in self.listeners]

        elif self.state == 'REMAP':
            self.target_addr = maybe_ip_addr(args[3][:args[3].rfind(':')])

        elif self.state == 'CLOSED':
            if self.circuit:
                self.circuit.streams.remove(self)
            self.circuit = None
            self.maybe_call_closing_deferred()
            flags = self._create_flags(kw)
            [x.stream_closed(self, **flags) for x in self.listeners]

        elif self.state == 'FAILED':
            if self.circuit:
                self.circuit.streams.remove(self)
            self.circuit = None
            self.maybe_call_closing_deferred()
            # build lower-case version of all flags
            flags = self._create_flags(kw)
            [x.stream_failed(self, **flags) for x in self.listeners]

        elif self.state == 'SENTCONNECT':
            pass  # print 'SENTCONNECT',self,args

        elif self.state == 'DETACHED':
            if self.circuit:
                self.circuit.streams.remove(self)
                self.circuit = None

            ## FIXME does this count as closed?
            ##self.maybe_call_closing_deferred()
            flags = self._create_flags(kw)
            [x.stream_detach(self, **flags) for x in self.listeners]

        elif self.state in ['NEWRESOLVE', 'SENTRESOLVE']:
            pass  # print self.state, self, args

        else:
            raise RuntimeError("Unknown state: %s" % self.state)

        ## see if we attached to a circuit. I believe this only
        ## happens on a SENTCONNECT or REMAP. DETACHED is excluded so
        ## we don't immediately re-add the circuit we just detached
        ## from
        if self.state not in ['CLOSED', 'FAILED', 'DETACHED']:
            cid = int(args[2])
            if cid == 0:
                if self.circuit and self in self.circuit.streams:
                    self.circuit.streams.remove(self)
                self.circuit = None

            else:
                if self.circuit is None:
                    self.circuit = self.circuit_container.find_circuit(cid)
                    if self not in self.circuit.streams:
                        self.circuit.streams.append(self)
                        [x.stream_attach(self, self.circuit) for x in self.listeners]

                else:
                    if self.circuit.id != cid:
                        log.err(RuntimeError('Circuit ID changed from %d to %d.' % (self.circuit.id, cid)))

    def maybe_call_closing_deferred(self):
        """
        Used internally to callback on the _closing_deferred if it
        exists.
        """

        if self._closing_deferred:
            self._closing_deferred.callback(self)
            self._closing_deferred = None

    def __str__(self):
        c = ''
        if self.circuit:
            c = 'on %d ' % self.circuit.id
        return "<Stream %s %d %s%s -> %s port %d>" % (self.state,
                                                      self.id,
                                                      c,
                                                      self.target_host,
                                                      str(self.target_addr),
                                                      self.target_port)