[go: up one dir, main page]

File: stomp_11.py

package info (click to toggle)
stomper 0.3.0-1
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd, stretch
  • size: 252 kB
  • ctags: 296
  • sloc: python: 1,363; makefile: 12
file content (549 lines) | stat: -rw-r--r-- 14,464 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
"""
This is a python client implementation of the STOMP protocol.

It aims to be transport layer neutral. This module provides functions to
create and parse STOMP messages in a programmatic fashion.

The examples package contains two examples using twisted as the transport
framework. Other frameworks can be used and I may add other examples as
time goes on.

The STOMP protocol specification maybe found here:

 * http://stomp.codehaus.org/Protocol

I've looked at the stomp client by Jason R. Briggs and have based the message
generation on how his client does it. The client can be found at the follow
address however it isn't a dependancy.

 * http://www.briggs.net.nz/log/projects/stomppy

In testing this library I run against ActiveMQ project. The server runs
in java, however its fairly standalone and easy to set up. The projects
page is here:

 * http://activemq.apache.org/


(c) Oisin Mulvihill, 2007-07-26.
    Ralph Bean, 2014-09-09.
License: http://www.apache.org/licenses/LICENSE-2.0

"""
import re
import uuid
import types
import logging


import utils
import stompbuffer

# This is used as a return from message responses functions.
# It is used more for readability more then anything or reason.
NO_RESPONSE_NEEDED = ''

# For backwards compatibility
NO_REPONSE_NEEDED = ''


# The version of the protocol we implement.
STOMP_VERSION = '1.1'

# Message terminator:
NULL = '\x00'

# STOMP Spec v1.1 valid commands:
VALID_COMMANDS = [
    'ABORT', 'ACK', 'BEGIN', 'COMMIT',
    'CONNECT', 'CONNECTED', 'DISCONNECT', 'MESSAGE',
    'NACK', 'SEND', 'SUBSCRIBE', 'UNSUBSCRIBE',
    'RECEIPT', 'ERROR',
]


def get_log():
    return logging.getLogger("stomper")


class FrameError(Exception):
    """Raise for problem with frame generation or parsing.
    """


class Frame(object):
    """This class is used to create or read STOMP message frames.

    The method pack() is used to create a STOMP message ready
    for transmission.

    The method unpack() is used to read a STOMP message into
    a frame instance. It uses the unpack_frame(...) function
    to do the initial parsing.

    The frame has three important member variables:

      * cmd
      * headers
      * body

    The 'cmd' is a property that represents the STOMP message
    command. When you assign this a check is done to make sure
    its one of the VALID_COMMANDS. If not then FrameError will
    be raised.

    The 'headers' is a dictionary which the user can added to
    if needed. There are no restrictions or checks imposed on
    what values are inserted.

    The 'body' is just a member variable that the body text
    is assigned to.

    """
    def __init__(self):
        """Setup the internal state."""
        self._cmd = ''
        self.body = ''
        self.headers = {}

    def getCmd(self):
        """Don't use _cmd directly!"""
        return self._cmd

    def setCmd(self, cmd):
        """Check the cmd is valid, FrameError will be raised if its not."""
        cmd = cmd.upper()
        if cmd not in VALID_COMMANDS:
            raise FrameError("The cmd '%s' is not valid! It must be one of '%s' (STOMP v%s)." % (
                cmd, VALID_COMMANDS, STOMP_VERSION)
            )
        else:
            self._cmd = cmd

    cmd = property(getCmd, setCmd)

    def pack(self):
        """Called to create a STOMP message from the internal values.
        """
        headers = ''.join(
            ['%s:%s\n' % (f, v) for f, v in sorted(self.headers.items())]
        )
        stomp_message = "%s\n%s\n%s%s\n" % (self._cmd, headers, self.body, NULL)

        return stomp_message


    def unpack(self, message):
        """Called to extract a STOMP message into this instance.

        message:
            This is a text string representing a valid
            STOMP (v1.1) message.

        This method uses unpack_frame(...) to extract the
        information, before it is assigned internally.

        retuned:
            The result of the unpack_frame(...) call.

        """
        if not message:
            raise FrameError("Unpack error! The given message isn't valid '%s'!" % message)

        msg = unpack_frame(message)

        self.cmd = msg['cmd']
        self.headers = msg['headers']

        # Assign directly as the message will have the null
        # character in the message already.
        self.body = msg['body']

        return msg


def unpack_frame(message):
    """Called to unpack a STOMP message into a dictionary.

    returned = {
        # STOMP Command:
        'cmd' : '...',

        # Headers e.g.
        'headers' : {
            'destination' : 'xyz',
            'message-id' : 'some event',
            :
            etc,
        }

        # Body:
        'body' : '...1234...\x00',
    }

    """
    body = []
    returned = dict(cmd='', headers={}, body='')

    breakdown = message.split('\n')

    # Get the message command:
    returned['cmd'] = breakdown[0]
    breakdown = breakdown[1:]

    def headD(field):
        # find the first ':' everything to the left of this is a
        # header, everything to the right is data:
        index = field.find(':')
        if index:
            header = field[:index].strip()
            data = field[index+1:].strip()
#            print "header '%s' data '%s'" % (header, data)
            returned['headers'][header.strip()] = data.strip()

    def bodyD(field):
        field = field.strip()
        if field:
            body.append(field)

    # Recover the header fields and body data
    handler = headD
    for field in breakdown:
#        print "field:", field
        if field.strip() == '':
            # End of headers, it body data next.
            handler = bodyD
            continue

        handler(field)

    # Stich the body data together:
#    print "1. body: ", body
    body = "".join(body)
    returned['body'] = body.replace('\x00', '')

#    print "2. body: <%s>" % returned['body']

    return returned


def abort(transactionid):
    """STOMP abort transaction command.

    Rollback whatever actions in this transaction.

    transactionid:
        This is the id that all actions in this transaction.

    """
    return "ABORT\ntransaction:%s\n\n\x00\n" % transactionid


def ack(messageid, subscriptionid, transactionid=None):
    """STOMP acknowledge command.

    Acknowledge receipt of a specific message from the server.

    messageid:
        This is the id of the message we are acknowledging,
        what else could it be? ;)

    subscriptionid:
        This is the id of the subscription that applies to the message.

    transactionid:
        This is the id that all actions in this transaction
        will have. If this is not given then a random UUID
        will be generated for this.

    """
    header = 'subscription:%s\nmessage-id:%s' % (subscriptionid, messageid)

    if transactionid:
        header += '\ntransaction:%s' % transactionid

    return "ACK\n%s\n\n\x00\n" % header


def nack(messageid, subscriptionid, transactionid=None):
    """STOMP negative acknowledge command.

    NACK is the opposite of ACK. It is used to tell the server that the client
    did not consume the message. The server can then either send the message to
    a different client, discard it, or put it in a dead letter queue. The exact
    behavior is server specific.

    messageid:
        This is the id of the message we are acknowledging,
        what else could it be? ;)

    subscriptionid:
        This is the id of the subscription that applies to the message.

    transactionid:
        This is the id that all actions in this transaction
        will have. If this is not given then a random UUID
        will be generated for this.

    """
    header = 'subscription:%s\nmessage-id:%s' % (subscriptionid, messageid)

    if transactionid:
        header += '\ntransaction:%s' % transactionid

    return "NACK\n%s\n\n\x00\n" % header


def begin(transactionid=None):
    """STOMP begin command.

    Start a transaction...

    transactionid:
        This is the id that all actions in this transaction
        will have. If this is not given then a random UUID
        will be generated for this.

    """
    if not transactionid:
        # Generate a random UUID:
        transactionid = uuid.uuid4()

    return "BEGIN\ntransaction:%s\n\n\x00\n" % transactionid


def commit(transactionid):
    """STOMP commit command.

    Do whatever is required to make the series of actions
    permanent for this transactionid.

    transactionid:
        This is the id that all actions in this transaction.

    """
    return "COMMIT\ntransaction:%s\n\n\x00\n" % transactionid


def connect(username, password, host, heartbeats=(0,0)):
    """STOMP connect command.

    username, password:
        These are the needed auth details to connect to the
        message server.

    After sending this we will receive a CONNECTED
    message which will contain our session id.

    """
    if len(heartbeats) != 2:
        raise ValueError('Invalid heartbeat %r' % heartbeats)
    cx, cy = heartbeats
    return "CONNECT\naccept-version:1.1\nhost:%s\nheart-beat:%i,%i\nlogin:%s\npasscode:%s\n\n\x00\n" % (host, cx, cy, username, password)


def disconnect(receipt=None):
    """STOMP disconnect command.

    Tell the server we finished and we'll be closing the
    socket soon.

    """
    if not receipt:
        receipt = uuid.uuid4()
    return "DISCONNECT\nreceipt:%s\n\x00\n" % receipt


def send(dest, msg, transactionid=None, content_type='text/plain'):
    """STOMP send command.

    dest:
        This is the channel we wish to subscribe to

    msg:
        This is the message body to be sent.

    transactionid:
        This is an optional field and is not needed
        by default.

    """
    transheader = ''

    if transactionid:
        transheader = 'transaction:%s\n' % transactionid

    return "SEND\ndestination:%s\ncontent-type:%s\n%s\n%s\x00\n" % (
        dest, content_type, transheader, msg)


def subscribe(dest, idx, ack='auto'):
    """STOMP subscribe command.

    dest:
        This is the channel we wish to subscribe to

    idx:
        The ID that should uniquely identify the subscription

    ack: 'auto' | 'client'
        If the ack is set to client, then messages received will
        have to have an acknowledge as a reply. Otherwise the server
        will assume delivery failure.

    """
    return "SUBSCRIBE\nid:%s\ndestination:%s\nack:%s\n\n\x00\n" % (
        idx, dest, ack)


def unsubscribe(idx):
    """STOMP unsubscribe command.

    idx:
        This is the id of the subscription

    Tell the server we no longer wish to receive any
    further messages for the given subscription.

    """
    return "UNSUBSCRIBE\nid:%s\n\n\x00\n" % idx


class Engine(object):
    """This is a simple state machine to return a response to received
    message if needed.

    """
    def __init__(self, testing=False):
        self.testing = testing

        self.log = logging.getLogger("stomper.Engine")

        self.sessionId = ''

        # Entry Format:
        #
        #    COMMAND : Handler_Function
        #
        self.states = {
            'CONNECTED' : self.connected,
            'MESSAGE' : self.ack,
            'ERROR' : self.error,
            'RECEIPT' : self.receipt,
        }


    def react(self, msg):
        """Called to provide a response to a message if needed.

        msg:
            This is a dictionary as returned by unpack_frame(...)
            or it can be a straight STOMP message. This function
            will attempt to determine which an deal with it.

        returned:
            A message to return or an empty string.

        """
        returned = ""

        # If its not a string assume its a dict.
        mtype = type(msg)
        if mtype in types.StringTypes:
            msg = unpack_frame(msg)
        elif mtype == types.DictType:
            pass
        else:
            raise FrameError("Unknown message type '%s', I don't know what to do with this!" % mtype)

        if self.states.has_key(msg['cmd']):
#            print("reacting to message - %s" % msg['cmd'])
            returned = self.states[msg['cmd']](msg)

        return returned


    def connected(self, msg):
        """No response is needed to a connected frame.

        This method stores the session id as the
        member sessionId for later use.

        returned:
            NO_RESPONSE_NEEDED

        """
        self.sessionId = msg['headers']['session']
        #print "connected: session id '%s'." % self.sessionId

        return NO_RESPONSE_NEEDED


    def ack(self, msg):
        """Called when a MESSAGE has been received.

        Override this method to handle received messages.

        This function will generate an acknowledge message
        for the given message and transaction (if present).

        """
        message_id = msg['headers']['message-id']
        subscription = msg['headers']['subscription']

        transaction_id = None
        if msg['headers'].has_key('transaction-id'):
            transaction_id = msg['headers']['transaction-id']

#        print "acknowledging message id <%s>." % message_id

        return ack(message_id, subscription, transaction_id)


    def error(self, msg):
        """Called to handle an error message received from the server.

        This method just logs the error message

        returned:
            NO_RESPONSE_NEEDED

        """
        body = msg['body'].replace(NULL, '')

        brief_msg = ""
        if msg['headers'].has_key('message'):
            brief_msg = msg['headers']['message']

        self.log.error("Received server error - message%s\n\n%s" % (brief_msg, body))

        returned = NO_RESPONSE_NEEDED
        if self.testing:
            returned = 'error'

        return returned


    def receipt(self, msg):
        """Called to handle a receipt message received from the server.

        This method just logs the receipt message

        returned:
            NO_RESPONSE_NEEDED

        """
        body = msg['body'].replace(NULL, '')

        brief_msg = ""
        if msg['headers'].has_key('receipt-id'):
            brief_msg = msg['headers']['receipt-id']

        self.log.info("Received server receipt message - receipt-id:%s\n\n%s" % (brief_msg, body))

        returned = NO_RESPONSE_NEEDED
        if self.testing:
            returned = 'receipt'

        return returned