1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508
|
"""
This is a python client implementation of the STOMP protocol.
It aims to be transport layer neutral. This module provides functions to
create and parse STOMP messages in a programmatic fashion.
The examples package contains two examples using twisted as the transport
framework. Other frameworks can be used and I may add other examples as
time goes on.
The STOMP protocol specification maybe found here:
* http://stomp.codehaus.org/Protocol
I've looked at the stomp client by Jason R. Briggs and have based the message
generation on how his client does it. The client can be found at the follow
address however it isn't a dependancy.
* http://www.briggs.net.nz/log/projects/stomppy
In testing this library I run against ActiveMQ project. The server runs
in java, however its fairly standalone and easy to set up. The projects
page is here:
* http://activemq.apache.org/
(c) Oisin Mulvihill, 2007-07-26.
License: http://www.apache.org/licenses/LICENSE-2.0
"""
import re
import uuid
import types
import logging
import utils
import stompbuffer
# This is used as a return from message responses functions.
# It is used more for readability more then anything or reason.
NO_RESPONSE_NEEDED = ''
# For backwards compatibility
NO_REPONSE_NEEDED = ''
# The version of the protocol we implement.
STOMP_VERSION = '1.0'
# Message terminator:
NULL = '\x00'
# STOMP Spec v1.0 valid commands:
VALID_COMMANDS = [
'ABORT', 'ACK', 'BEGIN', 'COMMIT',
'CONNECT', 'CONNECTED', 'DISCONNECT', 'MESSAGE',
'SEND', 'SUBSCRIBE', 'UNSUBSCRIBE',
'RECEIPT', 'ERROR',
]
def get_log():
return logging.getLogger("stomper")
class FrameError(Exception):
"""Raise for problem with frame generation or parsing.
"""
class Frame(object):
"""This class is used to create or read STOMP message frames.
The method pack() is used to create a STOMP message ready
for transmission.
The method unpack() is used to read a STOMP message into
a frame instance. It uses the unpack_frame(...) function
to do the initial parsing.
The frame has three important member variables:
* cmd
* headers
* body
The 'cmd' is a property that represents the STOMP message
command. When you assign this a check is done to make sure
its one of the VALID_COMMANDS. If not then FrameError will
be raised.
The 'headers' is a dictionary which the user can added to
if needed. There are no restrictions or checks imposed on
what values are inserted.
The 'body' is just a member variable that the body text
is assigned to.
"""
def __init__(self):
"""Setup the internal state."""
self._cmd = ''
self.body = ''
self.headers = {}
def getCmd(self):
"""Don't use _cmd directly!"""
return self._cmd
def setCmd(self, cmd):
"""Check the cmd is valid, FrameError will be raised if its not."""
cmd = cmd.upper()
if cmd not in VALID_COMMANDS:
raise FrameError("The cmd '%s' is not valid! It must be one of '%s' (STOMP v%s)." % (
cmd, VALID_COMMANDS, STOMP_VERSION)
)
else:
self._cmd = cmd
cmd = property(getCmd, setCmd)
def pack(self):
"""Called to create a STOMP message from the internal values.
"""
headers = ''.join(
['%s:%s\n' % (f, v) for f, v in sorted(self.headers.items())]
)
stomp_message = "%s\n%s\n%s%s\n" % (self._cmd, headers, self.body, NULL)
# import pprint
# print "stomp_message: ", pprint.pprint(stomp_message)
return stomp_message
def unpack(self, message):
"""Called to extract a STOMP message into this instance.
message:
This is a text string representing a valid
STOMP (v1.0) message.
This method uses unpack_frame(...) to extract the
information, before it is assigned internally.
retuned:
The result of the unpack_frame(...) call.
"""
if not message:
raise FrameError("Unpack error! The given message isn't valid '%s'!" % message)
msg = unpack_frame(message)
self.cmd = msg['cmd']
self.headers = msg['headers']
# Assign directly as the message will have the null
# character in the message already.
self.body = msg['body']
return msg
def unpack_frame(message):
"""Called to unpack a STOMP message into a dictionary.
returned = {
# STOMP Command:
'cmd' : '...',
# Headers e.g.
'headers' : {
'destination' : 'xyz',
'message-id' : 'some event',
:
etc,
}
# Body:
'body' : '...1234...\x00',
}
"""
body = []
returned = dict(cmd='', headers={}, body='')
breakdown = message.split('\n')
# Get the message command:
returned['cmd'] = breakdown[0]
breakdown = breakdown[1:]
def headD(field):
# find the first ':' everything to the left of this is a
# header, everything to the right is data:
index = field.find(':')
if index:
header = field[:index].strip()
data = field[index+1:].strip()
# print "header '%s' data '%s'" % (header, data)
returned['headers'][header.strip()] = data.strip()
def bodyD(field):
field = field.strip()
if field:
body.append(field)
# Recover the header fields and body data
handler = headD
for field in breakdown:
# print "field:", field
if field.strip() == '':
# End of headers, it body data next.
handler = bodyD
continue
handler(field)
# Stich the body data together:
# print "1. body: ", body
body = "".join(body)
returned['body'] = body.replace('\x00', '')
# print "2. body: <%s>" % returned['body']
return returned
def abort(transactionid):
"""STOMP abort transaction command.
Rollback whatever actions in this transaction.
transactionid:
This is the id that all actions in this transaction.
"""
return "ABORT\ntransaction: %s\n\n\x00\n" % transactionid
def ack(messageid, transactionid=None):
"""STOMP acknowledge command.
Acknowledge receipt of a specific message from the server.
messageid:
This is the id of the message we are acknowledging,
what else could it be? ;)
transactionid:
This is the id that all actions in this transaction
will have. If this is not given then a random UUID
will be generated for this.
"""
header = 'message-id: %s' % messageid
if transactionid:
header = 'message-id: %s\ntransaction: %s' % (messageid, transactionid)
return "ACK\n%s\n\n\x00\n" % header
def begin(transactionid=None):
"""STOMP begin command.
Start a transaction...
transactionid:
This is the id that all actions in this transaction
will have. If this is not given then a random UUID
will be generated for this.
"""
if not transactionid:
# Generate a random UUID:
transactionid = uuid.uuid4()
return "BEGIN\ntransaction: %s\n\n\x00\n" % transactionid
def commit(transactionid):
"""STOMP commit command.
Do whatever is required to make the series of actions
permanent for this transactionid.
transactionid:
This is the id that all actions in this transaction.
"""
return "COMMIT\ntransaction: %s\n\n\x00\n" % transactionid
def connect(username, password):
"""STOMP connect command.
username, password:
These are the needed auth details to connect to the
message server.
After sending this we will receive a CONNECTED
message which will contain our session id.
"""
return "CONNECT\nlogin:%s\npasscode:%s\n\n\x00\n" % (username, password)
def disconnect():
"""STOMP disconnect command.
Tell the server we finished and we'll be closing the
socket soon.
"""
return "DISCONNECT\n\n\x00\n"
def send(dest, msg, transactionid=None):
"""STOMP send command.
dest:
This is the channel we wish to subscribe to
msg:
This is the message body to be sent.
transactionid:
This is an optional field and is not needed
by default.
"""
transheader = ''
if transactionid:
transheader = 'transaction: %s\n' % transactionid
return "SEND\ndestination: %s\n%s\n%s\x00\n" % (dest, transheader, msg)
def subscribe(dest, ack='auto'):
"""STOMP subscribe command.
dest:
This is the channel we wish to subscribe to
ack: 'auto' | 'client'
If the ack is set to client, then messages received will
have to have an acknowledge as a reply. Otherwise the server
will assume delivery failure.
"""
return "SUBSCRIBE\ndestination: %s\nack: %s\n\n\x00\n" % (dest, ack)
def unsubscribe(dest):
"""STOMP unsubscribe command.
dest:
This is the channel we wish to subscribe to
Tell the server we no longer wish to receive any
further messages for the given subscription.
"""
return "UNSUBSCRIBE\ndestination:%s\n\n\x00\n" % dest
class Engine(object):
"""This is a simple state machine to return a response to received
message if needed.
"""
def __init__(self, testing=False):
self.testing = testing
self.log = logging.getLogger("stomper.Engine")
self.sessionId = ''
# Entry Format:
#
# COMMAND : Handler_Function
#
self.states = {
'CONNECTED' : self.connected,
'MESSAGE' : self.ack,
'ERROR' : self.error,
'RECEIPT' : self.receipt,
}
def react(self, msg):
"""Called to provide a response to a message if needed.
msg:
This is a dictionary as returned by unpack_frame(...)
or it can be a straight STOMP message. This function
will attempt to determine which an deal with it.
returned:
A message to return or an empty string.
"""
returned = ""
# If its not a string assume its a dict.
mtype = type(msg)
if mtype in types.StringTypes:
msg = unpack_frame(msg)
elif mtype == types.DictType:
pass
else:
raise FrameError("Unknown message type '%s', I don't know what to do with this!" % mtype)
if self.states.has_key(msg['cmd']):
# print("reacting to message - %s" % msg['cmd'])
returned = self.states[msg['cmd']](msg)
return returned
def connected(self, msg):
"""No response is needed to a connected frame.
This method stores the session id as the
member sessionId for later use.
returned:
NO_RESPONSE_NEEDED
"""
self.sessionId = msg['headers']['session']
#print "connected: session id '%s'." % self.sessionId
return NO_RESPONSE_NEEDED
def ack(self, msg):
"""Called when a MESSAGE has been received.
Override this method to handle received messages.
This function will generate an acknowledge message
for the given message and transaction (if present).
"""
message_id = msg['headers']['message-id']
transaction_id = None
if msg['headers'].has_key('transaction-id'):
transaction_id = msg['headers']['transaction-id']
# print "acknowledging message id <%s>." % message_id
return ack(message_id, transaction_id)
def error(self, msg):
"""Called to handle an error message received from the server.
This method just logs the error message
returned:
NO_RESPONSE_NEEDED
"""
body = msg['body'].replace(NULL, '')
brief_msg = ""
if msg['headers'].has_key('message'):
brief_msg = msg['headers']['message']
self.log.error("Received server error - message%s\n\n%s" % (brief_msg, body))
returned = NO_RESPONSE_NEEDED
if self.testing:
returned = 'error'
return returned
def receipt(self, msg):
"""Called to handle a receipt message received from the server.
This method just logs the receipt message
returned:
NO_RESPONSE_NEEDED
"""
body = msg['body'].replace(NULL, '')
brief_msg = ""
if msg['headers'].has_key('receipt-id'):
brief_msg = msg['headers']['receipt-id']
self.log.info("Received server receipt message - receipt-id:%s\n\n%s" % (brief_msg, body))
returned = NO_RESPONSE_NEEDED
if self.testing:
returned = 'receipt'
return returned
|