summaryrefslogtreecommitdiff
path: root/python/qpid/messaging
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/messaging')
-rw-r--r--python/qpid/messaging/__init__.py35
-rw-r--r--python/qpid/messaging/address.py168
-rw-r--r--python/qpid/messaging/constants.py32
-rw-r--r--python/qpid/messaging/driver.py1041
-rw-r--r--python/qpid/messaging/endpoints.py832
-rw-r--r--python/qpid/messaging/exceptions.py67
-rw-r--r--python/qpid/messaging/message.py141
7 files changed, 2316 insertions, 0 deletions
diff --git a/python/qpid/messaging/__init__.py b/python/qpid/messaging/__init__.py
new file mode 100644
index 0000000000..f9ddda2e80
--- /dev/null
+++ b/python/qpid/messaging/__init__.py
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+A candidate high level messaging API for python.
+
+Areas that still need work:
+
+ - definition of the arguments for L{Session.sender} and L{Session.receiver}
+ - standard L{Message} properties
+ - L{Message} content encoding
+ - protocol negotiation/multiprotocol impl
+"""
+
+from qpid.datatypes import timestamp, uuid4, Serial
+from qpid.messaging.constants import *
+from qpid.messaging.endpoints import *
+from qpid.messaging.exceptions import *
+from qpid.messaging.message import *
diff --git a/python/qpid/messaging/address.py b/python/qpid/messaging/address.py
new file mode 100644
index 0000000000..bf494433e4
--- /dev/null
+++ b/python/qpid/messaging/address.py
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import re
+from qpid.lexer import Lexicon, LexError
+from qpid.parser import Parser, ParseError
+
+l = Lexicon()
+
+LBRACE = l.define("LBRACE", r"\{")
+RBRACE = l.define("RBRACE", r"\}")
+LBRACK = l.define("LBRACK", r"\[")
+RBRACK = l.define("RBRACK", r"\]")
+COLON = l.define("COLON", r":")
+SEMI = l.define("SEMI", r";")
+SLASH = l.define("SLASH", r"/")
+COMMA = l.define("COMMA", r",")
+NUMBER = l.define("NUMBER", r'[+-]?[0-9]*\.?[0-9]+')
+TRUE = l.define("TRUE", r'True')
+FALSE = l.define("FALSE", r'False')
+ID = l.define("ID", r'[a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])?')
+STRING = l.define("STRING", r""""(?:[^\\"]|\\.)*"|'(?:[^\\']|\\.)*'""")
+ESC = l.define("ESC", r"\\[^ux]|\\x[0-9a-fA-F][0-9a-fA-F]|\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F]")
+SYM = l.define("SYM", r"[.#*%@$^!+-]")
+WSPACE = l.define("WSPACE", r"[ \n\r\t]+")
+EOF = l.eof("EOF")
+
+LEXER = l.compile()
+
+def lex(st):
+ return LEXER.lex(st)
+
+def tok2str(tok):
+ if tok.type is STRING:
+ return eval(tok.value)
+ elif tok.type is ESC:
+ if tok.value[1] == "x":
+ return eval('"%s"' % tok.value)
+ elif tok.value[1] == "u":
+ return eval('u"%s"' % tok.value)
+ else:
+ return tok.value[1]
+ else:
+ return tok.value
+
+def tok2obj(tok):
+ if tok.type in (STRING, NUMBER):
+ return eval(tok.value)
+ elif tok.type == TRUE:
+ return True
+ elif tok.type == FALSE:
+ return False
+ else:
+ return tok.value
+
+def toks2str(toks):
+ if toks:
+ return "".join(map(tok2str, toks))
+ else:
+ return None
+
+class AddressParser(Parser):
+
+ def __init__(self, tokens):
+ Parser.__init__(self, [t for t in tokens if t.type is not WSPACE])
+
+ def parse(self):
+ result = self.address()
+ self.eat(EOF)
+ return result
+
+ def address(self):
+ name = toks2str(self.eat_until(SLASH, SEMI, EOF))
+
+ if name is None:
+ raise ParseError(self.next())
+
+ if self.matches(SLASH):
+ self.eat(SLASH)
+ subject = toks2str(self.eat_until(SEMI, EOF))
+ else:
+ subject = None
+
+ if self.matches(SEMI):
+ self.eat(SEMI)
+ options = self.map()
+ else:
+ options = None
+ return name, subject, options
+
+ def map(self):
+ self.eat(LBRACE)
+
+ result = {}
+ while True:
+ if self.matches(NUMBER, STRING, ID, LBRACE, LBRACK):
+ n, v = self.keyval()
+ result[n] = v
+ if self.matches(COMMA):
+ self.eat(COMMA)
+ elif self.matches(RBRACE):
+ break
+ else:
+ raise ParseError(self.next(), COMMA, RBRACE)
+ elif self.matches(RBRACE):
+ break
+ else:
+ raise ParseError(self.next(), NUMBER, STRING, ID, LBRACE, LBRACK,
+ RBRACE)
+
+ self.eat(RBRACE)
+ return result
+
+ def keyval(self):
+ key = self.value()
+ self.eat(COLON)
+ val = self.value()
+ return (key, val)
+
+ def value(self):
+ if self.matches(NUMBER, STRING, ID, TRUE, FALSE):
+ return tok2obj(self.eat())
+ elif self.matches(LBRACE):
+ return self.map()
+ elif self.matches(LBRACK):
+ return self.list()
+ else:
+ raise ParseError(self.next(), NUMBER, STRING, ID, LBRACE, LBRACK)
+
+ def list(self):
+ self.eat(LBRACK)
+
+ result = []
+
+ while True:
+ if self.matches(RBRACK):
+ break
+ else:
+ result.append(self.value())
+ if self.matches(COMMA):
+ self.eat(COMMA)
+ elif self.matches(RBRACK):
+ break
+ else:
+ raise ParseError(self.next(), COMMA, RBRACK)
+
+ self.eat(RBRACK)
+ return result
+
+def parse(addr):
+ return AddressParser(lex(addr)).parse()
+
+__all__ = ["parse", "ParseError"]
diff --git a/python/qpid/messaging/constants.py b/python/qpid/messaging/constants.py
new file mode 100644
index 0000000000..cad47bd52a
--- /dev/null
+++ b/python/qpid/messaging/constants.py
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+class Constant:
+
+ def __init__(self, name, value=None):
+ self.name = name
+ self.value = value
+
+ def __repr__(self):
+ return self.name
+
+AMQP_PORT = 5672
+AMQPS_PORT = 5671
+
+UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL)
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py
new file mode 100644
index 0000000000..9d58616804
--- /dev/null
+++ b/python/qpid/messaging/driver.py
@@ -0,0 +1,1041 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import socket, struct, sys, time
+from logging import getLogger
+from qpid import compat
+from qpid import sasl
+from qpid.concurrency import synchronized
+from qpid.datatypes import RangedSet, Serial
+from qpid.exceptions import Timeout, VersionError
+from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \
+ FrameDecoder, SegmentDecoder, OpDecoder
+from qpid.messaging import address
+from qpid.messaging.constants import UNLIMITED
+from qpid.messaging.endpoints import Pattern
+from qpid.messaging.exceptions import ConnectError
+from qpid.messaging.message import get_codec, Message
+from qpid.ops import *
+from qpid.selector import Selector
+from qpid.util import connect
+from qpid.validator import And, Context, Map, Types, Values
+from threading import Condition, Thread
+
+log = getLogger("qpid.messaging")
+rawlog = getLogger("qpid.messaging.io.raw")
+opslog = getLogger("qpid.messaging.io.ops")
+
+def addr2reply_to(addr):
+ name, subject, options = address.parse(addr)
+ return ReplyTo(name, subject)
+
+def reply_to2addr(reply_to):
+ if reply_to.routing_key is None:
+ return reply_to.exchange
+ elif reply_to.exchange in (None, ""):
+ return reply_to.routing_key
+ else:
+ return "%s/%s" % (reply_to.exchange, reply_to.routing_key)
+
+class Attachment:
+
+ def __init__(self, target):
+ self.target = target
+
+# XXX
+
+DURABLE_DEFAULT=True
+
+# XXX
+
+FILTER_DEFAULTS = {
+ "topic": Pattern("*"),
+ "amq.failover": Pattern("DUMMY")
+ }
+
+# XXX
+ppid = 0
+try:
+ ppid = os.getppid()
+except:
+ pass
+
+CLIENT_PROPERTIES = {"product": "qpid python client",
+ "version": "development",
+ "platform": os.name,
+ "qpid.client_process": os.path.basename(sys.argv[0]),
+ "qpid.client_pid": os.getpid(),
+ "qpid.client_ppid": ppid}
+
+def noop(): pass
+
+class SessionState:
+
+ def __init__(self, driver, session, name, channel):
+ self.driver = driver
+ self.session = session
+ self.name = name
+ self.channel = channel
+ self.detached = False
+ self.committing = False
+ self.aborting = False
+
+ # sender state
+ self.sent = Serial(0)
+ self.acknowledged = RangedSet()
+ self.actions = {}
+ self.min_completion = self.sent
+ self.max_completion = self.sent
+ self.results = {}
+
+ # receiver state
+ self.received = None
+ self.executed = RangedSet()
+
+ # XXX: need to periodically exchange completion/known_completion
+
+ self.destinations = {}
+
+ def write_query(self, query, handler):
+ id = self.sent
+ self.write_cmd(query, lambda: handler(self.results.pop(id)))
+
+ def write_cmd(self, cmd, action=noop):
+ if action != noop:
+ cmd.sync = True
+ if self.detached:
+ raise Exception("detached")
+ cmd.id = self.sent
+ self.sent += 1
+ self.actions[cmd.id] = action
+ self.max_completion = cmd.id
+ self.write_op(cmd)
+
+ def write_cmds(self, cmds, action=noop):
+ if cmds:
+ for cmd in cmds[:-1]:
+ self.write_cmd(cmd)
+ self.write_cmd(cmds[-1], action)
+ else:
+ action()
+
+ def write_op(self, op):
+ op.channel = self.channel
+ self.driver.write_op(op)
+
+POLICIES = Values("always", "sender", "receiver", "never")
+
+class Bindings:
+
+ def validate(self, o, ctx):
+ t = ctx.containers[1].get("type", "queue")
+ if t != "queue":
+ return "bindings are only permitted on nodes of type queue"
+
+COMMON_OPTS = {
+ "create": POLICIES,
+ "delete": POLICIES,
+ "assert": POLICIES,
+ "node-properties": Map({
+ "type": Values("queue", "topic"),
+ "durable": Types(bool),
+ "x-properties": Map({
+ "type": Types(basestring),
+ "bindings": And(Types(list), Bindings())
+ },
+ restricted=False)
+ })
+ }
+
+RECEIVE_MODES = Values("browse", "consume")
+
+SOURCE_OPTS = COMMON_OPTS.copy()
+SOURCE_OPTS.update({
+ "mode": RECEIVE_MODES
+ })
+
+TARGET_OPTS = COMMON_OPTS.copy()
+
+class LinkIn:
+
+ ADDR_NAME = "source"
+ DIR_NAME = "receiver"
+ VALIDATOR = Map(SOURCE_OPTS)
+
+ def init_link(self, sst, rcv, _rcv):
+ _rcv.destination = str(rcv.id)
+ sst.destinations[_rcv.destination] = _rcv
+ _rcv.draining = False
+
+ def do_link(self, sst, rcv, _rcv, type, subtype, action):
+ acq_mode = acquire_mode.pre_acquired
+
+ if type == "topic":
+ _rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination)
+ sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
+ filter = _rcv.options.get("filter")
+ if _rcv.subject is None and filter is None:
+ f = FILTER_DEFAULTS[subtype]
+ elif _rcv.subject and filter:
+ # XXX
+ raise Exception("can't supply both subject and filter")
+ elif _rcv.subject:
+ # XXX
+ f = Pattern(_rcv.subject)
+ else:
+ f = filter
+ f._bind(sst, _rcv.name, _rcv._queue)
+ elif type == "queue":
+ _rcv._queue = _rcv.name
+ if _rcv.options.get("mode", "consume") == "browse":
+ acq_mode = acquire_mode.not_acquired
+
+ sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination,
+ acquire_mode = acq_mode))
+ sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action)
+
+ def do_unlink(self, sst, rcv, _rcv, action=noop):
+ sst.write_cmd(MessageCancel(_rcv.destination), action)
+
+ def del_link(self, sst, rcv, _rcv):
+ del sst.destinations[_rcv.destination]
+
+class LinkOut:
+
+ ADDR_NAME = "target"
+ DIR_NAME = "sender"
+ VALIDATOR = Map(TARGET_OPTS)
+
+ def init_link(self, sst, snd, _snd):
+ _snd.closing = False
+
+ def do_link(self, sst, snd, _snd, type, subtype, action):
+ if type == "topic":
+ _snd._exchange = _snd.name
+ _snd._routing_key = _snd.subject
+ elif type == "queue":
+ _snd._exchange = ""
+ _snd._routing_key = _snd.name
+ action()
+
+ def do_unlink(self, sst, snd, _snd, action=noop):
+ action()
+
+ def del_link(self, sst, snd, _snd):
+ pass
+
+class Cache:
+
+ def __init__(self, ttl):
+ self.ttl = ttl
+ self.entries = {}
+
+ def __setitem__(self, key, value):
+ self.entries[key] = time.time(), value
+
+ def __getitem__(self, key):
+ tstamp, value = self.entries[key]
+ if time.time() - tstamp >= self.ttl:
+ del self.entries[key]
+ raise KeyError(key)
+ else:
+ return value
+
+ def __delitem__(self, key):
+ del self.entries[key]
+
+# XXX
+HEADER="!4s4B"
+
+EMPTY_DP = DeliveryProperties()
+EMPTY_MP = MessageProperties()
+
+SUBJECT = "qpid.subject"
+TO = "qpid.to"
+
+class Driver:
+
+ def __init__(self, connection):
+ self.connection = connection
+ self.log_id = "%x" % id(self.connection)
+ self._lock = self.connection._lock
+
+ self._in = LinkIn()
+ self._out = LinkOut()
+
+ self._selector = Selector.default()
+ self._attempts = 0
+ self._hosts = [(self.connection.host, self.connection.port)] + \
+ self.connection.backups
+ self._host = 0
+ self._retrying = False
+
+ self.reset()
+
+ def reset(self):
+ self._opening = False
+ self._closing = False
+ self._connected = False
+ self._attachments = {}
+
+ self._channel_max = 65536
+ self._channels = 0
+ self._sessions = {}
+
+ options = self.connection.options
+
+ self.address_cache = Cache(options.get("address_ttl", 60))
+
+ self._socket = None
+ self._buf = ""
+ self._hdr = ""
+ self._op_enc = OpEncoder()
+ self._seg_enc = SegmentEncoder()
+ self._frame_enc = FrameEncoder()
+ self._frame_dec = FrameDecoder()
+ self._seg_dec = SegmentDecoder()
+ self._op_dec = OpDecoder()
+ self._timeout = None
+
+ self._sasl = sasl.Client()
+ if self.connection.username:
+ self._sasl.setAttr("username", self.connection.username)
+ if self.connection.password:
+ self._sasl.setAttr("password", self.connection.password)
+ if self.connection.host:
+ self._sasl.setAttr("host", self.connection.host)
+ self._sasl.setAttr("service", options.get("service", "qpidd"))
+ if "min_ssf" in options:
+ self._sasl.setAttr("minssf", options["min_ssf"])
+ if "max_ssf" in options:
+ self._sasl.setAttr("maxssf", options["max_ssf"])
+ self._sasl.init()
+ self._sasl_encode = False
+ self._sasl_decode = False
+
+ for ssn in self.connection.sessions.values():
+ for m in ssn.acked + ssn.unacked + ssn.incoming:
+ m._transfer_id = None
+ for snd in ssn.senders:
+ snd.linked = False
+ for rcv in ssn.receivers:
+ rcv.impending = rcv.received
+ rcv.linked = False
+
+ @synchronized
+ def wakeup(self):
+ self.dispatch()
+ self._selector.wakeup()
+
+ def start(self):
+ self._selector.register(self)
+
+ def fileno(self):
+ return self._socket.fileno()
+
+ @synchronized
+ def reading(self):
+ return self._socket is not None
+
+ @synchronized
+ def writing(self):
+ return self._socket is not None and self._buf
+
+ @synchronized
+ def timing(self):
+ return self._timeout
+
+ @synchronized
+ def readable(self):
+ error = None
+ recoverable = False
+ try:
+ data = self._socket.recv(64*1024)
+ if data:
+ rawlog.debug("READ[%s]: %r", self.log_id, data)
+ if self._sasl_decode:
+ data = self._sasl.decode(data)
+ else:
+ rawlog.debug("ABORTED[%s]: %s", self.log_id, self._socket.getpeername())
+ error = "connection aborted"
+ recoverable = True
+ except socket.error, e:
+ error = e
+ recoverable = True
+
+ if not error:
+ try:
+ if len(self._hdr) < 8:
+ r = 8 - len(self._hdr)
+ self._hdr += data[:r]
+ data = data[r:]
+
+ if len(self._hdr) == 8:
+ self.do_header(self._hdr)
+
+ self._frame_dec.write(data)
+ self._seg_dec.write(*self._frame_dec.read())
+ self._op_dec.write(*self._seg_dec.read())
+ for op in self._op_dec.read():
+ self.assign_id(op)
+ opslog.debug("RCVD[%s]: %r", self.log_id, op)
+ op.dispatch(self)
+ except VersionError, e:
+ error = e
+ except:
+ msg = compat.format_exc()
+ error = msg
+
+ if error:
+ self._error(error, recoverable)
+ else:
+ self.dispatch()
+
+ self.connection._waiter.notifyAll()
+
+ def assign_id(self, op):
+ if isinstance(op, Command):
+ sst = self.get_sst(op)
+ op.id = sst.received
+ sst.received += 1
+
+ @synchronized
+ def writeable(self):
+ try:
+ n = self._socket.send(self._buf)
+ rawlog.debug("SENT[%s]: %r", self.log_id, self._buf[:n])
+ self._buf = self._buf[n:]
+ except socket.error, e:
+ self._error(e, True)
+ self.connection._waiter.notifyAll()
+
+ @synchronized
+ def timeout(self):
+ self.dispatch()
+ self.connection._waiter.notifyAll()
+
+ def _error(self, err, recoverable):
+ if self._socket is not None:
+ self._socket.close()
+ self.reset()
+ if (recoverable and self.connection.reconnect and
+ (self.connection.reconnect_limit is None or
+ self.connection.reconnect_limit <= 0 or
+ self._attempts <= self.connection.reconnect_limit)):
+ if self._host > 0:
+ delay = 0
+ else:
+ delay = self.connection.reconnect_delay
+ self._timeout = time.time() + delay
+ log.warn("recoverable error[attempt %s]: %s" % (self._attempts, err))
+ if delay > 0:
+ log.warn("sleeping %s seconds" % delay)
+ self._retrying = True
+ else:
+ self.connection.error = (err,)
+
+ def write_op(self, op):
+ opslog.debug("SENT[%s]: %r", self.log_id, op)
+ self._op_enc.write(op)
+ self._seg_enc.write(*self._op_enc.read())
+ self._frame_enc.write(*self._seg_enc.read())
+ bytes = self._frame_enc.read()
+ if self._sasl_encode:
+ bytes = self._sasl.encode(bytes)
+ self._buf += bytes
+
+ def do_header(self, hdr):
+ cli_major = 0; cli_minor = 10
+ magic, _, _, major, minor = struct.unpack(HEADER, hdr)
+ if major != cli_major or minor != cli_minor:
+ raise VersionError("client: %s-%s, server: %s-%s" %
+ (cli_major, cli_minor, major, minor))
+
+ def do_connection_start(self, start):
+ if self.connection.mechanisms:
+ permitted = self.connection.mechanisms.split()
+ mechs = [m for m in start.mechanisms if m in permitted]
+ else:
+ mechs = start.mechanisms
+ mech, initial = self._sasl.start(" ".join(mechs))
+ self.write_op(ConnectionStartOk(client_properties=CLIENT_PROPERTIES,
+ mechanism=mech, response=initial))
+
+ def do_connection_secure(self, secure):
+ resp = self._sasl.step(secure.challenge)
+ self.write_op(ConnectionSecureOk(response=resp))
+
+ def do_connection_tune(self, tune):
+ # XXX: is heartbeat protocol specific?
+ if tune.channel_max is not None:
+ self.channel_max = tune.channel_max
+ self.write_op(ConnectionTuneOk(heartbeat=self.connection.heartbeat,
+ channel_max=self.channel_max))
+ self.write_op(ConnectionOpen())
+ self._sasl_encode = True
+
+ def do_connection_open_ok(self, open_ok):
+ self._connected = True
+ self._sasl_decode = True
+
+ def connection_heartbeat(self, hrt):
+ self.write_op(ConnectionHeartbeat())
+
+ def do_connection_close(self, close):
+ self.write_op(ConnectionCloseOk())
+ if close.reply_code != close_code.normal:
+ self.connection.error = (close.reply_code, close.reply_text)
+ # XXX: should we do a half shutdown on the socket here?
+ # XXX: we really need to test this, we may end up reporting a
+ # connection abort after this, if we were to do a shutdown on read
+ # and stop reading, then we wouldn't report the abort, that's
+ # probably the right thing to do
+
+ def do_connection_close_ok(self, close_ok):
+ self._socket.close()
+ self.reset()
+
+ def do_session_attached(self, atc):
+ pass
+
+ def do_session_command_point(self, cp):
+ sst = self.get_sst(cp)
+ sst.received = cp.command_id
+
+ def do_session_completed(self, sc):
+ sst = self.get_sst(sc)
+ for r in sc.commands:
+ sst.acknowledged.add(r.lower, r.upper)
+
+ if not sc.commands.empty():
+ while sst.min_completion in sc.commands:
+ if sst.actions.has_key(sst.min_completion):
+ sst.actions.pop(sst.min_completion)()
+ sst.min_completion += 1
+
+ def session_known_completed(self, kcmp):
+ sst = self.get_sst(kcmp)
+ executed = RangedSet()
+ for e in sst.executed.ranges:
+ for ke in kcmp.ranges:
+ if e.lower in ke and e.upper in ke:
+ break
+ else:
+ executed.add_range(e)
+ sst.executed = completed
+
+ def do_session_flush(self, sf):
+ sst = self.get_sst(sf)
+ if sf.expected:
+ if sst.received is None:
+ exp = None
+ else:
+ exp = RangedSet(sst.received)
+ sst.write_op(SessionExpected(exp))
+ if sf.confirmed:
+ sst.write_op(SessionConfirmed(sst.executed))
+ if sf.completed:
+ sst.write_op(SessionCompleted(sst.executed))
+
+ def do_execution_result(self, er):
+ sst = self.get_sst(er)
+ sst.results[er.command_id] = er.value
+
+ def do_execution_exception(self, ex):
+ sst = self.get_sst(ex)
+ sst.session.error = (ex,)
+
+ def dispatch(self):
+ try:
+ if self._socket is None and self.connection._connected and not self._opening:
+ self.connect()
+ elif self._socket is not None and not self.connection._connected and not self._closing:
+ self.disconnect()
+
+ if self._connected and not self._closing:
+ for ssn in self.connection.sessions.values():
+ self.attach(ssn)
+ self.process(ssn)
+ except:
+ msg = compat.format_exc()
+ self.connection.error = (msg,)
+
+ def connect(self):
+ try:
+ # XXX: should make this non blocking
+ if self._host == 0:
+ self._attempts += 1
+ host, port = self._hosts[self._host]
+ if self._retrying:
+ log.warn("trying: %s:%s", host, port)
+ self._socket = connect(host, port)
+ if self._retrying:
+ log.warn("reconnect succeeded: %s:%s", host, port)
+ self._timeout = None
+ self._attempts = 0
+ self._host = 0
+ self._retrying = False
+ self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
+ self._opening = True
+ except socket.error, e:
+ self._host = (self._host + 1) % len(self._hosts)
+ self._error(e, True)
+
+ def disconnect(self):
+ self.write_op(ConnectionClose(close_code.normal))
+ self._closing = True
+
+ def attach(self, ssn):
+ sst = self._attachments.get(ssn)
+ if sst is None and not ssn.closed:
+ for i in xrange(0, self.channel_max):
+ if not self._sessions.has_key(i):
+ ch = i
+ break
+ else:
+ raise RuntimeError("all channels used")
+ sst = SessionState(self, ssn, ssn.name, ch)
+ sst.write_op(SessionAttach(name=ssn.name))
+ sst.write_op(SessionCommandPoint(sst.sent, 0))
+ sst.outgoing_idx = 0
+ sst.acked = []
+ if ssn.transactional:
+ sst.write_cmd(TxSelect())
+ self._attachments[ssn] = sst
+ self._sessions[sst.channel] = sst
+
+ for snd in ssn.senders:
+ self.link(snd, self._out, snd.target)
+ for rcv in ssn.receivers:
+ self.link(rcv, self._in, rcv.source)
+
+ if sst is not None and ssn.closing and not sst.detached:
+ sst.detached = True
+ sst.write_op(SessionDetach(name=ssn.name))
+
+ def get_sst(self, op):
+ return self._sessions[op.channel]
+
+ def do_session_detached(self, dtc):
+ sst = self._sessions.pop(dtc.channel)
+ ssn = sst.session
+ del self._attachments[ssn]
+ ssn.closed = True
+
+ def do_session_detach(self, dtc):
+ sst = self.get_sst(dtc)
+ sst.write_op(SessionDetached(name=dtc.name))
+ self.do_session_detached(dtc)
+
+ def link(self, lnk, dir, addr):
+ sst = self._attachments.get(lnk.session)
+ _lnk = self._attachments.get(lnk)
+
+ if _lnk is None and not lnk.closing and not lnk.closed:
+ _lnk = Attachment(lnk)
+ _lnk.closing = False
+ dir.init_link(sst, lnk, _lnk)
+
+ err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk, dir)
+ if err:
+ lnk.error = (err,)
+ lnk.closed = True
+ return
+
+ def linked():
+ lnk.linked = True
+
+ def resolved(type, subtype):
+ dir.do_link(sst, lnk, _lnk, type, subtype, linked)
+
+ self.resolve_declare(sst, _lnk, dir.DIR_NAME, resolved)
+ self._attachments[lnk] = _lnk
+
+ if lnk.linked and lnk.closing and not lnk.closed:
+ if not _lnk.closing:
+ def unlinked():
+ dir.del_link(sst, lnk, _lnk)
+ del self._attachments[lnk]
+ lnk.closed = True
+ if _lnk.options.get("delete") in ("always", dir.DIR_NAME):
+ dir.do_unlink(sst, lnk, _lnk)
+ self.delete(sst, _lnk.name, unlinked)
+ else:
+ dir.do_unlink(sst, lnk, _lnk, unlinked)
+ _lnk.closing = True
+ elif not lnk.linked and lnk.closing and not lnk.closed:
+ lnk.closed = True
+
+ def parse_address(self, lnk, dir, addr):
+ if addr is None:
+ return "%s is None" % dir.ADDR_NAME
+ else:
+ try:
+ lnk.name, lnk.subject, lnk.options = address.parse(addr)
+ # XXX: subject
+ if lnk.options is None:
+ lnk.options = {}
+ except address.LexError, e:
+ return e
+ except address.ParseError, e:
+ return e
+
+ def validate_options(self, lnk, dir):
+ ctx = Context()
+ err = dir.VALIDATOR.validate(lnk.options, ctx)
+ if err: return "error in options: %s" % err
+
+ def resolve_declare(self, sst, lnk, dir, action):
+ declare = lnk.options.get("create") in ("always", dir)
+ def do_resolved(type, subtype):
+ err = None
+ if type is None:
+ if declare:
+ err = self.declare(sst, lnk, action)
+ else:
+ err = ("no such queue: %s" % lnk.name,)
+ elif type == "queue":
+ try:
+ cmds = self.bindings(lnk)
+ sst.write_cmds(cmds, lambda: action(type, subtype))
+ except address.ParseError, e:
+ err = (e,)
+ else:
+ action(type, subtype)
+
+ if err:
+ tgt = lnk.target
+ tgt.error = err
+ del self._attachments[tgt]
+ tgt.closed = True
+ return
+ self.resolve(sst, lnk.name, do_resolved, force=declare)
+
+ def resolve(self, sst, name, action, force=False):
+ if not force:
+ try:
+ type, subtype = self.address_cache[name]
+ action(type, subtype)
+ return
+ except KeyError:
+ pass
+
+ args = []
+ def do_result(r):
+ args.append(r)
+ def do_action(r):
+ do_result(r)
+ er, qr = args
+ if er.not_found and not qr.queue:
+ type, subtype = None, None
+ elif qr.queue:
+ type, subtype = "queue", None
+ else:
+ type, subtype = "topic", er.type
+ if type is not None:
+ self.address_cache[name] = (type, subtype)
+ action(type, subtype)
+ sst.write_query(ExchangeQuery(name), do_result)
+ sst.write_query(QueueQuery(name), do_action)
+
+ def declare(self, sst, lnk, action):
+ name = lnk.name
+ props = lnk.options.get("node-properties", {})
+ durable = props.get("durable", DURABLE_DEFAULT)
+ type = props.get("type", "queue")
+ xprops = props.get("x-properties", {})
+
+ if type == "topic":
+ cmd = ExchangeDeclare(exchange=name, durable=durable)
+ elif type == "queue":
+ cmd = QueueDeclare(queue=name, durable=durable)
+ else:
+ raise ValueError(type)
+
+ for f in cmd.FIELDS:
+ if f.name != "arguments" and xprops.has_key(f.name):
+ cmd[f.name] = xprops.pop(f.name)
+ if xprops:
+ cmd.arguments = xprops
+
+ if type == "topic":
+ if cmd.type is None:
+ cmd.type = "topic"
+ subtype = cmd.type
+ else:
+ subtype = None
+
+ cmds = [cmd]
+ if type == "queue":
+ try:
+ cmds.extend(self.bindings(lnk))
+ except address.ParseError, e:
+ return (e,)
+
+ def declared():
+ self.address_cache[name] = (type, subtype)
+ action(type, subtype)
+
+ sst.write_cmds(cmds, declared)
+
+ def bindings(self, lnk):
+ props = lnk.options.get("node-properties", {})
+ xprops = props.get("x-properties", {})
+ bindings = xprops.get("bindings", [])
+ cmds = []
+ for b in bindings:
+ n, s, o = address.parse(b)
+ cmds.append(ExchangeBind(lnk.name, n, s, o))
+ return cmds
+
+ def delete(self, sst, name, action):
+ def deleted():
+ del self.address_cache[name]
+ action()
+
+ def do_delete(type, subtype):
+ if type == "topic":
+ sst.write_cmd(ExchangeDelete(name), deleted)
+ elif type == "queue":
+ sst.write_cmd(QueueDelete(name), deleted)
+ elif type is None:
+ action()
+ else:
+ raise ValueError(type)
+ self.resolve(sst, name, do_delete, force=True)
+
+ def process(self, ssn):
+ if ssn.closed or ssn.closing: return
+
+ sst = self._attachments[ssn]
+
+ while sst.outgoing_idx < len(ssn.outgoing):
+ msg = ssn.outgoing[sst.outgoing_idx]
+ snd = msg._sender
+ # XXX: should check for sender error here
+ _snd = self._attachments.get(snd)
+ if _snd and snd.linked:
+ self.send(snd, msg)
+ sst.outgoing_idx += 1
+ else:
+ break
+
+ for rcv in ssn.receivers:
+ self.process_receiver(rcv)
+
+ if ssn.acked:
+ messages = [m for m in ssn.acked if m not in sst.acked]
+ if messages:
+ # XXX: we're ignoring acks that get lost when disconnected,
+ # could we deal this via some message-id based purge?
+ ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None])
+ for range in ids:
+ sst.executed.add_range(range)
+ sst.write_op(SessionCompleted(sst.executed))
+ def ack_ack():
+ for m in messages:
+ ssn.acked.remove(m)
+ if not ssn.transactional:
+ sst.acked.remove(m)
+ sst.write_cmd(MessageAccept(ids), ack_ack)
+ log.debug("SACK[%s]: %s", ssn.log_id, m)
+ sst.acked.extend(messages)
+
+ if ssn.committing and not sst.committing:
+ def commit_ok():
+ del sst.acked[:]
+ ssn.committing = False
+ ssn.committed = True
+ ssn.aborting = False
+ ssn.aborted = False
+ sst.write_cmd(TxCommit(), commit_ok)
+ sst.committing = True
+
+ if ssn.aborting and not sst.aborting:
+ sst.aborting = True
+ def do_rb():
+ messages = sst.acked + ssn.unacked + ssn.incoming
+ ids = RangedSet(*[m._transfer_id for m in messages])
+ for range in ids:
+ sst.executed.add_range(range)
+ sst.write_op(SessionCompleted(sst.executed))
+ sst.write_cmd(MessageRelease(ids))
+ sst.write_cmd(TxRollback(), do_rb_ok)
+
+ def do_rb_ok():
+ del ssn.incoming[:]
+ del ssn.unacked[:]
+ del sst.acked[:]
+
+ for rcv in ssn.receivers:
+ rcv.impending = rcv.received
+ rcv.returned = rcv.received
+ # XXX: do we need to update granted here as well?
+
+ for rcv in ssn.receivers:
+ self.process_receiver(rcv)
+
+ ssn.aborting = False
+ ssn.aborted = True
+ ssn.committing = False
+ ssn.committed = False
+ sst.aborting = False
+
+ for rcv in ssn.receivers:
+ _rcv = self._attachments[rcv]
+ sst.write_cmd(MessageStop(_rcv.destination))
+ sst.write_cmd(ExecutionSync(), do_rb)
+
+ def grant(self, rcv):
+ sst = self._attachments[rcv.session]
+ _rcv = self._attachments.get(rcv)
+ if _rcv is None or not rcv.linked or _rcv.closing or _rcv.draining:
+ return
+
+ if rcv.granted is UNLIMITED:
+ if rcv.impending is UNLIMITED:
+ delta = 0
+ else:
+ delta = UNLIMITED
+ elif rcv.impending is UNLIMITED:
+ delta = -1
+ else:
+ delta = max(rcv.granted, rcv.received) - rcv.impending
+
+ if delta is UNLIMITED:
+ sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value))
+ sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value))
+ rcv.impending = UNLIMITED
+ elif delta > 0:
+ sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value))
+ sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta))
+ rcv.impending += delta
+ elif delta < 0 and not rcv.draining:
+ _rcv.draining = True
+ def do_stop():
+ rcv.impending = rcv.received
+ _rcv.draining = False
+ self.grant(rcv)
+ sst.write_cmd(MessageStop(_rcv.destination), do_stop)
+
+ if rcv.draining:
+ _rcv.draining = True
+ def do_flush():
+ rcv.impending = rcv.received
+ rcv.granted = rcv.impending
+ _rcv.draining = False
+ rcv.draining = False
+ sst.write_cmd(MessageFlush(_rcv.destination), do_flush)
+
+
+ def process_receiver(self, rcv):
+ if rcv.closed: return
+ self.grant(rcv)
+
+ def send(self, snd, msg):
+ sst = self._attachments[snd.session]
+ _snd = self._attachments[snd]
+
+ if msg.subject is None or _snd._exchange == "":
+ rk = _snd._routing_key
+ else:
+ rk = msg.subject
+
+ if msg.subject is None:
+ subject = _snd.subject
+ else:
+ subject = msg.subject
+
+ # XXX: do we need to query to figure out how to create the reply-to interoperably?
+ if msg.reply_to:
+ rt = addr2reply_to(msg.reply_to)
+ else:
+ rt = None
+ dp = DeliveryProperties(routing_key=rk)
+ mp = MessageProperties(message_id=msg.id,
+ user_id=msg.user_id,
+ reply_to=rt,
+ correlation_id=msg.correlation_id,
+ content_type=msg.content_type,
+ application_headers=msg.properties)
+ if subject is not None:
+ if mp.application_headers is None:
+ mp.application_headers = {}
+ mp.application_headers[SUBJECT] = subject
+ if msg.to is not None:
+ if mp.application_headers is None:
+ mp.application_headers = {}
+ mp.application_headers[TO] = msg.to
+ if msg.durable:
+ dp.delivery_mode = delivery_mode.persistent
+ enc, dec = get_codec(msg.content_type)
+ body = enc(msg.content)
+ def msg_acked():
+ # XXX: should we log the ack somehow too?
+ snd.acked += 1
+ m = snd.session.outgoing.pop(0)
+ sst.outgoing_idx -= 1
+ log.debug("RACK[%s]: %s", sst.session.log_id, msg)
+ assert msg == m
+ sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp),
+ payload=body), msg_acked)
+ log.debug("SENT[%s]: %s", sst.session.log_id, msg)
+
+ def do_message_transfer(self, xfr):
+ sst = self.get_sst(xfr)
+ ssn = sst.session
+
+ msg = self._decode(xfr)
+ rcv = sst.destinations[xfr.destination].target
+ msg._receiver = rcv
+ if rcv.impending is not UNLIMITED:
+ assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending)
+ rcv.received += 1
+ log.debug("RCVD[%s]: %s", ssn.log_id, msg)
+ ssn.incoming.append(msg)
+ self.connection._waiter.notifyAll()
+
+ def _decode(self, xfr):
+ dp = EMPTY_DP
+ mp = EMPTY_MP
+
+ for h in xfr.headers:
+ if isinstance(h, DeliveryProperties):
+ dp = h
+ elif isinstance(h, MessageProperties):
+ mp = h
+
+ ap = mp.application_headers
+ enc, dec = get_codec(mp.content_type)
+ content = dec(xfr.payload)
+ msg = Message(content)
+ msg.id = mp.message_id
+ if ap is not None:
+ msg.to = ap.get(TO)
+ msg.subject = ap.get(SUBJECT)
+ msg.user_id = mp.user_id
+ if mp.reply_to is not None:
+ msg.reply_to = reply_to2addr(mp.reply_to)
+ msg.correlation_id = mp.correlation_id
+ msg.durable = dp.delivery_mode == delivery_mode.persistent
+ msg.redelivered = dp.redelivered
+ msg.properties = mp.application_headers
+ msg.content_type = mp.content_type
+ msg._transfer_id = xfr.id
+ return msg
diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py
new file mode 100644
index 0000000000..2337986ecb
--- /dev/null
+++ b/python/qpid/messaging/endpoints.py
@@ -0,0 +1,832 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+A candidate high level messaging API for python.
+
+Areas that still need work:
+
+ - definition of the arguments for L{Session.sender} and L{Session.receiver}
+ - standard L{Message} properties
+ - L{Message} content encoding
+ - protocol negotiation/multiprotocol impl
+"""
+
+from logging import getLogger
+from qpid.codec010 import StringCodec
+from qpid.concurrency import synchronized, Waiter, Condition
+from qpid.datatypes import Serial, uuid4
+from qpid.messaging.constants import *
+from qpid.messaging.exceptions import *
+from qpid.messaging.message import *
+from qpid.ops import PRIMITIVE
+from qpid.util import default
+from threading import Thread, RLock
+
+log = getLogger("qpid.messaging")
+
+static = staticmethod
+
+class Connection:
+
+ """
+ A Connection manages a group of L{Sessions<Session>} and connects
+ them with a remote endpoint.
+ """
+
+ @static
+ def open(host, port=None, username="guest", password="guest", **options):
+ """
+ Creates an AMQP connection and connects it to the given host and port.
+
+ @type host: str
+ @param host: the name or ip address of the remote host
+ @type port: int
+ @param port: the port number of the remote host
+ @rtype: Connection
+ @return: a connected Connection
+ """
+ conn = Connection(host, port, username, password, **options)
+ conn.connect()
+ return conn
+
+ def __init__(self, host, port=None, username="guest", password="guest", **options):
+ """
+ Creates a connection. A newly created connection must be connected
+ with the Connection.connect() method before it can be used.
+
+ @type host: str
+ @param host: the name or ip address of the remote host
+ @type port: int
+ @param port: the port number of the remote host
+ @rtype: Connection
+ @return: a disconnected Connection
+ """
+ self.host = host
+ self.port = default(port, AMQP_PORT)
+ self.username = username
+ self.password = password
+ self.mechanisms = options.get("mechanisms")
+ self.heartbeat = options.get("heartbeat")
+ self.reconnect = options.get("reconnect", False)
+ self.reconnect_delay = options.get("reconnect_delay", 3)
+ self.reconnect_limit = options.get("reconnect_limit")
+ self.backups = options.get("backups", [])
+ self.options = options
+
+ self.id = str(uuid4())
+ self.session_counter = 0
+ self.sessions = {}
+ self._connected = False
+ self._lock = RLock()
+ self._condition = Condition(self._lock)
+ self._waiter = Waiter(self._condition)
+ self._modcount = Serial(0)
+ self.error = None
+ from driver import Driver
+ self._driver = Driver(self)
+ self._driver.start()
+
+ def _wait(self, predicate, timeout=None):
+ return self._waiter.wait(predicate, timeout=timeout)
+
+ def _wakeup(self):
+ self._modcount += 1
+ self._driver.wakeup()
+
+ def _check_error(self, exc=ConnectionError):
+ if self.error:
+ raise exc(*self.error)
+
+ def _ewait(self, predicate, timeout=None, exc=ConnectionError):
+ result = self._wait(lambda: self.error or predicate(), timeout)
+ self._check_error(exc)
+ return result
+
+ @synchronized
+ def session(self, name=None, transactional=False):
+ """
+ Creates or retrieves the named session. If the name is omitted or
+ None, then a unique name is chosen based on a randomly generated
+ uuid.
+
+ @type name: str
+ @param name: the session name
+ @rtype: Session
+ @return: the named Session
+ """
+
+ if name is None:
+ name = "%s:%s" % (self.id, self.session_counter)
+ self.session_counter += 1
+ else:
+ name = "%s:%s" % (self.id, name)
+
+ if self.sessions.has_key(name):
+ return self.sessions[name]
+ else:
+ ssn = Session(self, name, transactional)
+ self.sessions[name] = ssn
+ self._wakeup()
+ return ssn
+
+ @synchronized
+ def _remove_session(self, ssn):
+ del self.sessions[ssn.name]
+
+ @synchronized
+ def connect(self):
+ """
+ Connect to the remote endpoint.
+ """
+ self._connected = True
+ self._wakeup()
+ self._ewait(lambda: self._driver._connected and not self._unlinked(),
+ exc=ConnectError)
+
+ def _unlinked(self):
+ return [l
+ for ssn in self.sessions.values()
+ for l in ssn.senders + ssn.receivers
+ if not (l.linked or l.error or l.closed)]
+
+ @synchronized
+ def disconnect(self):
+ """
+ Disconnect from the remote endpoint.
+ """
+ self._connected = False
+ self._wakeup()
+ self._ewait(lambda: not self._driver._connected)
+
+ @synchronized
+ def connected(self):
+ """
+ Return true if the connection is connected, false otherwise.
+ """
+ return self._connected
+
+ @synchronized
+ def close(self):
+ """
+ Close the connection and all sessions.
+ """
+ for ssn in self.sessions.values():
+ ssn.close()
+ self.disconnect()
+
+class Pattern:
+ """
+ The pattern filter matches the supplied wildcard pattern against a
+ message subject.
+ """
+
+ def __init__(self, value):
+ self.value = value
+
+ # XXX: this should become part of the driver
+ def _bind(self, sst, exchange, queue):
+ from qpid.ops import ExchangeBind
+ sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue,
+ binding_key=self.value.replace("*", "#")))
+
+class Session:
+
+ """
+ Sessions provide a linear context for sending and receiving
+ L{Messages<Message>}. L{Messages<Message>} are sent and received
+ using the L{Sender.send} and L{Receiver.fetch} methods of the
+ L{Sender} and L{Receiver} objects associated with a Session.
+
+ Each L{Sender} and L{Receiver} is created by supplying either a
+ target or source address to the L{sender} and L{receiver} methods of
+ the Session. The address is supplied via a string syntax documented
+ below.
+
+ Addresses
+ =========
+
+ An address identifies a source or target for messages. In its
+ simplest form this is just a name. In general a target address may
+ also be used as a source address, however not all source addresses
+ may be used as a target, e.g. a source might additionally have some
+ filtering criteria that would not be present in a target.
+
+ A subject may optionally be specified along with the name. When an
+ address is used as a target, any subject specified in the address is
+ used as the default subject of outgoing messages for that target.
+ When an address is used as a source, any subject specified in the
+ address is pattern matched against the subject of available messages
+ as a filter for incoming messages from that source.
+
+ The options map contains additional information about the address
+ including:
+
+ - policies for automatically creating, and deleting the node to
+ which an address refers
+
+ - policies for asserting facts about the node to which an address
+ refers
+
+ - extension points that can be used for sender/receiver
+ configuration
+
+ Mapping to AMQP 0-10
+ --------------------
+ The name is resolved to either an exchange or a queue by querying
+ the broker.
+
+ The subject is set as a property on the message. Additionally, if
+ the name refers to an exchange, the routing key is set to the
+ subject.
+
+ Syntax
+ ------
+ The following regular expressions define the tokens used to parse
+ addresses::
+ LBRACE: \\{
+ RBRACE: \\}
+ LBRACK: \\[
+ RBRACK: \\]
+ COLON: :
+ SEMI: ;
+ SLASH: /
+ COMMA: ,
+ NUMBER: [+-]?[0-9]*\\.?[0-9]+
+ ID: [a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])?
+ STRING: "(?:[^\\\\"]|\\\\.)*"|\'(?:[^\\\\\']|\\\\.)*\'
+ ESC: \\\\[^ux]|\\\\x[0-9a-fA-F][0-9a-fA-F]|\\\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F]
+ SYM: [.#*%@$^!+-]
+ WSPACE: [ \\n\\r\\t]+
+
+ The formal grammar for addresses is given below::
+ address = name [ "/" subject ] [ ";" options ]
+ name = ( part | quoted )+
+ subject = ( part | quoted | "/" )*
+ quoted = STRING / ESC
+ part = LBRACE / RBRACE / COLON / COMMA / NUMBER / ID / SYM
+ options = map
+ map = "{" ( keyval ( "," keyval )* )? "}"
+ keyval = ID ":" value
+ value = NUMBER / STRING / ID / map / list
+ list = "[" ( value ( "," value )* )? "]"
+
+ This grammar resuls in the following informal syntax::
+
+ <name> [ / <subject> ] [ ; <options> ]
+
+ Where options is::
+
+ { <key> : <value>, ... }
+
+ And values may be:
+ - numbers
+ - single, double, or non quoted strings
+ - maps (dictionaries)
+ - lists
+
+ Options
+ -------
+ The options map permits the following parameters::
+
+ <name> [ / <subject> ] ; {
+ create: <create-policy>,
+ delete: <delete-policy>,
+ assert: <assert-policy>,
+ node-properties: {
+ type: <node-type>,
+ durable: <node-durability>,
+ x-properties: {
+ bindings: ["<exchange>/<key>", ...],
+ <passthrough-key>: <passthrough-value>
+ }
+ }
+ }
+
+ The create, delete, and assert policies specify who should perfom
+ the associated action:
+
+ - I{always}: the action will always be performed
+ - I{sender}: the action will only be performed by the sender
+ - I{receiver}: the action will only be performed by the receiver
+ - I{never}: the action will never be performed (this is the default)
+
+ The node-type is one of:
+
+ - I{topic}: a topic node will default to the topic exchange,
+ x-properties may be used to specify other exchange types
+ - I{queue}: this is the default node-type
+
+ The x-properties map permits arbitrary additional keys and values to
+ be specified. These keys and values are passed through when creating
+ a node or asserting facts about an existing node. Any passthrough
+ keys and values that do not match a standard field of the underlying
+ exchange or queue declare command will be sent in the arguments map.
+
+ Examples
+ --------
+ A simple name resolves to any named node, usually a queue or a
+ topic::
+
+ my-queue-or-topic
+
+ A simple name with a subject will also resolve to a node, but the
+ presence of the subject will cause a sender using this address to
+ set the subject on outgoing messages, and receivers to filter based
+ on the subject::
+
+ my-queue-or-topic/my-subject
+
+ A subject pattern can be used and will cause filtering if used by
+ the receiver. If used for a sender, the literal value gets set as
+ the subject::
+
+ my-queue-or-topic/my-*
+
+ In all the above cases, the address is resolved to an existing node.
+ If you want the node to be auto-created, then you can do the
+ following. By default nonexistent nodes are assumed to be queues::
+
+ my-queue; {create: always}
+
+ You can customize the properties of the queue::
+
+ my-queue; {create: always, node-properties: {durable: True}}
+
+ You can create a topic instead if you want::
+
+ my-queue; {create: always, node-properties: {type: topic}}
+
+ You can assert that the address resolves to a node with particular
+ properties::
+
+ my-transient-topic; {
+ assert: always,
+ node-properties: {
+ type: topic,
+ durable: False
+ }
+ }
+ """
+
+ def __init__(self, connection, name, transactional):
+ self.connection = connection
+ self.name = name
+ self.log_id = "%x" % id(self)
+
+ self.transactional = transactional
+
+ self.committing = False
+ self.committed = True
+ self.aborting = False
+ self.aborted = False
+
+ self.next_sender_id = 0
+ self.senders = []
+ self.next_receiver_id = 0
+ self.receivers = []
+ self.outgoing = []
+ self.incoming = []
+ self.unacked = []
+ self.acked = []
+ # XXX: I hate this name.
+ self.ack_capacity = UNLIMITED
+
+ self.error = None
+ self.closing = False
+ self.closed = False
+
+ self._lock = connection._lock
+
+ def __repr__(self):
+ return "<Session %s>" % self.name
+
+ def _wait(self, predicate, timeout=None):
+ return self.connection._wait(predicate, timeout=timeout)
+
+ def _wakeup(self):
+ self.connection._wakeup()
+
+ def _check_error(self, exc=SessionError):
+ self.connection._check_error(exc)
+ if self.error:
+ raise exc(*self.error)
+
+ def _ewait(self, predicate, timeout=None, exc=SessionError):
+ result = self.connection._ewait(lambda: self.error or predicate(), timeout, exc)
+ self._check_error(exc)
+ return result
+
+ @synchronized
+ def sender(self, target, **options):
+ """
+ Creates a L{Sender} that may be used to send L{Messages<Message>}
+ to the specified target.
+
+ @type target: str
+ @param target: the target to which messages will be sent
+ @rtype: Sender
+ @return: a new Sender for the specified target
+ """
+ sender = Sender(self, self.next_sender_id, target, options)
+ self.next_sender_id += 1
+ self.senders.append(sender)
+ if not self.closed and self.connection._connected:
+ self._wakeup()
+ try:
+ sender._ewait(lambda: sender.linked)
+ except SendError, e:
+ sender.close()
+ raise e
+ return sender
+
+ @synchronized
+ def receiver(self, source, **options):
+ """
+ Creates a receiver that may be used to fetch L{Messages<Message>}
+ from the specified source.
+
+ @type source: str
+ @param source: the source of L{Messages<Message>}
+ @rtype: Receiver
+ @return: a new Receiver for the specified source
+ """
+ receiver = Receiver(self, self.next_receiver_id, source, options)
+ self.next_receiver_id += 1
+ self.receivers.append(receiver)
+ if not self.closed and self.connection._connected:
+ self._wakeup()
+ try:
+ receiver._ewait(lambda: receiver.linked)
+ except ReceiveError, e:
+ receiver.close()
+ raise e
+ return receiver
+
+ @synchronized
+ def _count(self, predicate):
+ result = 0
+ for msg in self.incoming:
+ if predicate(msg):
+ result += 1
+ return result
+
+ def _peek(self, predicate):
+ for msg in self.incoming:
+ if predicate(msg):
+ return msg
+
+ def _pop(self, predicate):
+ i = 0
+ while i < len(self.incoming):
+ msg = self.incoming[i]
+ if predicate(msg):
+ del self.incoming[i]
+ return msg
+ else:
+ i += 1
+
+ @synchronized
+ def _get(self, predicate, timeout=None):
+ if self._ewait(lambda: ((self._peek(predicate) is not None) or self.closing),
+ timeout):
+ msg = self._pop(predicate)
+ if msg is not None:
+ msg._receiver.returned += 1
+ self.unacked.append(msg)
+ log.debug("RETR[%s]: %s", self.log_id, msg)
+ return msg
+ return None
+
+ @synchronized
+ def next_receiver(self, timeout=None):
+ if self._ewait(lambda: self.incoming, timeout):
+ return self.incoming[0]._receiver
+ else:
+ raise Empty
+
+ @synchronized
+ def acknowledge(self, message=None, sync=True):
+ """
+ Acknowledge the given L{Message}. If message is None, then all
+ unacknowledged messages on the session are acknowledged.
+
+ @type message: Message
+ @param message: the message to acknowledge or None
+ @type sync: boolean
+ @param sync: if true then block until the message(s) are acknowledged
+ """
+ if message is None:
+ messages = self.unacked[:]
+ else:
+ messages = [message]
+
+ for m in messages:
+ if self.ack_capacity is not UNLIMITED:
+ if self.ack_capacity <= 0:
+ # XXX: this is currently a SendError, maybe it should be a SessionError?
+ raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity)
+ self._wakeup()
+ self._ewait(lambda: len(self.acked) < self.ack_capacity)
+ self.unacked.remove(m)
+ self.acked.append(m)
+
+ self._wakeup()
+ if sync:
+ self._ewait(lambda: not [m for m in messages if m in self.acked])
+
+ @synchronized
+ def commit(self):
+ """
+ Commit outstanding transactional work. This consists of all
+ message sends and receives since the prior commit or rollback.
+ """
+ if not self.transactional:
+ raise NontransactionalSession()
+ self.committing = True
+ self._wakeup()
+ self._ewait(lambda: not self.committing)
+ if self.aborted:
+ raise TransactionAborted()
+ assert self.committed
+
+ @synchronized
+ def rollback(self):
+ """
+ Rollback outstanding transactional work. This consists of all
+ message sends and receives since the prior commit or rollback.
+ """
+ if not self.transactional:
+ raise NontransactionalSession()
+ self.aborting = True
+ self._wakeup()
+ self._ewait(lambda: not self.aborting)
+ assert self.aborted
+
+ @synchronized
+ def close(self):
+ """
+ Close the session.
+ """
+ # XXX: should be able to express this condition through API calls
+ self._ewait(lambda: not self.outgoing and not self.acked)
+
+ for link in self.receivers + self.senders:
+ link.close()
+
+ self.closing = True
+ self._wakeup()
+ self._ewait(lambda: self.closed)
+ self.connection._remove_session(self)
+
+class Sender:
+
+ """
+ Sends outgoing messages.
+ """
+
+ def __init__(self, session, id, target, options):
+ self.session = session
+ self.id = id
+ self.target = target
+ self.options = options
+ self.capacity = options.get("capacity", UNLIMITED)
+ self.durable = options.get("durable")
+ self.queued = Serial(0)
+ self.acked = Serial(0)
+ self.error = None
+ self.linked = False
+ self.closing = False
+ self.closed = False
+ self._lock = self.session._lock
+
+ def _wakeup(self):
+ self.session._wakeup()
+
+ def _check_error(self, exc=SendError):
+ self.session._check_error(exc)
+ if self.error:
+ raise exc(*self.error)
+
+ def _ewait(self, predicate, timeout=None, exc=SendError):
+ result = self.session._ewait(lambda: self.error or predicate(), timeout, exc)
+ self._check_error(exc)
+ return result
+
+ @synchronized
+ def pending(self):
+ """
+ Returns the number of messages awaiting acknowledgment.
+ @rtype: int
+ @return: the number of unacknowledged messages
+ """
+ return self.queued - self.acked
+
+ @synchronized
+ def send(self, object, sync=True, timeout=None):
+ """
+ Send a message. If the object passed in is of type L{unicode},
+ L{str}, L{list}, or L{dict}, it will automatically be wrapped in a
+ L{Message} and sent. If it is of type L{Message}, it will be sent
+ directly. If the sender capacity is not L{UNLIMITED} then send
+ will block until there is available capacity to send the message.
+ If the timeout parameter is specified, then send will throw an
+ L{InsufficientCapacity} exception if capacity does not become
+ available within the specified time.
+
+ @type object: unicode, str, list, dict, Message
+ @param object: the message or content to send
+
+ @type sync: boolean
+ @param sync: if true then block until the message is sent
+
+ @type timeout: float
+ @param timeout: the time to wait for available capacity
+ """
+
+ if not self.session.connection._connected or self.session.closing:
+ raise Disconnected()
+
+ self._ewait(lambda: self.linked)
+
+ if isinstance(object, Message):
+ message = object
+ else:
+ message = Message(object)
+
+ if message.durable is None:
+ message.durable = self.durable
+
+ if self.capacity is not UNLIMITED:
+ if self.capacity <= 0:
+ raise InsufficientCapacity("capacity = %s" % self.capacity)
+ if not self._ewait(lambda: self.pending() < self.capacity, timeout=timeout):
+ raise InsufficientCapacity("capacity = %s" % self.capacity)
+
+ # XXX: what if we send the same message to multiple senders?
+ message._sender = self
+ self.session.outgoing.append(message)
+ self.queued += 1
+
+ self._wakeup()
+
+ if sync:
+ self.sync()
+ assert message not in self.session.outgoing
+
+ @synchronized
+ def sync(self):
+ mno = self.queued
+ self._ewait(lambda: self.acked >= mno)
+
+ @synchronized
+ def close(self):
+ """
+ Close the Sender.
+ """
+ self.closing = True
+ self._wakeup()
+ try:
+ self.session._ewait(lambda: self.closed)
+ finally:
+ self.session.senders.remove(self)
+
+class Receiver(object):
+
+ """
+ Receives incoming messages from a remote source. Messages may be
+ fetched with L{fetch}.
+ """
+
+ def __init__(self, session, id, source, options):
+ self.session = session
+ self.id = id
+ self.source = source
+ self.options = options
+
+ self.granted = Serial(0)
+ self.draining = False
+ self.impending = Serial(0)
+ self.received = Serial(0)
+ self.returned = Serial(0)
+
+ self.error = None
+ self.linked = False
+ self.closing = False
+ self.closed = False
+ self._lock = self.session._lock
+ self._capacity = 0
+ self._set_capacity(options.get("capacity", 0), False)
+
+ @synchronized
+ def _set_capacity(self, c, wakeup=True):
+ if c is UNLIMITED:
+ self._capacity = c.value
+ else:
+ self._capacity = c
+ self._grant()
+ if wakeup:
+ self._wakeup()
+
+ def _get_capacity(self):
+ if self._capacity == UNLIMITED.value:
+ return UNLIMITED
+ else:
+ return self._capacity
+
+ capacity = property(_get_capacity, _set_capacity)
+
+ def _wakeup(self):
+ self.session._wakeup()
+
+ def _check_error(self, exc=ReceiveError):
+ self.session._check_error(exc)
+ if self.error:
+ raise exc(*self.error)
+
+ def _ewait(self, predicate, timeout=None, exc=ReceiveError):
+ result = self.session._ewait(lambda: self.error or predicate(), timeout, exc)
+ self._check_error(exc)
+ return result
+
+ @synchronized
+ def pending(self):
+ """
+ Returns the number of messages available to be fetched by the
+ application.
+
+ @rtype: int
+ @return: the number of available messages
+ """
+ return self.received - self.returned
+
+ def _pred(self, msg):
+ return msg._receiver == self
+
+ @synchronized
+ def fetch(self, timeout=None):
+ """
+ Fetch and return a single message. A timeout of None will block
+ forever waiting for a message to arrive, a timeout of zero will
+ return immediately if no messages are available.
+
+ @type timeout: float
+ @param timeout: the time to wait for a message to be available
+ """
+
+ self._ewait(lambda: self.linked)
+
+ if self._capacity == 0:
+ self.granted = self.returned + 1
+ self._wakeup()
+ self._ewait(lambda: self.impending >= self.granted)
+ msg = self.session._get(self._pred, timeout=timeout)
+ if msg is None:
+ self.draining = True
+ self._wakeup()
+ self._ewait(lambda: not self.draining)
+ self._grant()
+ self._wakeup()
+ msg = self.session._get(self._pred, timeout=0)
+ if msg is None:
+ raise Empty()
+ elif self._capacity not in (0, UNLIMITED.value):
+ self.granted += 1
+ self._wakeup()
+ return msg
+
+ def _grant(self):
+ if self._capacity == UNLIMITED.value:
+ self.granted = UNLIMITED
+ else:
+ self.granted = self.received + self._capacity
+
+ @synchronized
+ def close(self):
+ """
+ Close the receiver.
+ """
+ self.closing = True
+ self._wakeup()
+ try:
+ self.session._ewait(lambda: self.closed)
+ finally:
+ self.session.receivers.remove(self)
+
+__all__ = ["Connection", "Session", "Sender", "Receiver"]
diff --git a/python/qpid/messaging/exceptions.py b/python/qpid/messaging/exceptions.py
new file mode 100644
index 0000000000..5c8bdedc26
--- /dev/null
+++ b/python/qpid/messaging/exceptions.py
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+class ConnectionError(Exception):
+ """
+ The base class for all connection related exceptions.
+ """
+ pass
+
+class ConnectError(ConnectionError):
+ """
+ Exception raised when there is an error connecting to the remote
+ peer.
+ """
+ pass
+
+class SessionError(Exception):
+ pass
+
+class Disconnected(SessionError):
+ """
+ Exception raised when an operation is attempted that is illegal when
+ disconnected.
+ """
+ pass
+
+class NontransactionalSession(SessionError):
+ """
+ Exception raised when commit or rollback is attempted on a non
+ transactional session.
+ """
+ pass
+
+class TransactionAborted(SessionError):
+ pass
+
+class SendError(SessionError):
+ pass
+
+class InsufficientCapacity(SendError):
+ pass
+
+class ReceiveError(SessionError):
+ pass
+
+class Empty(ReceiveError):
+ """
+ Exception raised by L{Receiver.fetch} when there is no message
+ available within the alloted time.
+ """
+ pass
diff --git a/python/qpid/messaging/message.py b/python/qpid/messaging/message.py
new file mode 100644
index 0000000000..1c7c7beb81
--- /dev/null
+++ b/python/qpid/messaging/message.py
@@ -0,0 +1,141 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.codec010 import StringCodec
+from qpid.ops import PRIMITIVE
+
+def codec(name):
+ type = PRIMITIVE[name]
+
+ def encode(x):
+ sc = StringCodec()
+ sc.write_primitive(type, x)
+ return sc.encoded
+
+ def decode(x):
+ sc = StringCodec(x)
+ return sc.read_primitive(type)
+
+ return encode, decode
+
+# XXX: need to correctly parse the mime type and deal with
+# content-encoding header
+
+TYPE_MAPPINGS={
+ dict: "amqp/map",
+ list: "amqp/list",
+ unicode: "text/plain; charset=utf8",
+ unicode: "text/plain",
+ buffer: None,
+ str: None,
+ None.__class__: None
+ }
+
+TYPE_CODEC={
+ "amqp/map": codec("map"),
+ "amqp/list": codec("list"),
+ "text/plain; charset=utf8": (lambda x: x.encode("utf8"), lambda x: x.decode("utf8")),
+ "text/plain": (lambda x: x.encode("utf8"), lambda x: x.decode("utf8")),
+ "": (lambda x: x, lambda x: x),
+ None: (lambda x: x, lambda x: x)
+ }
+
+def get_type(content):
+ return TYPE_MAPPINGS[content.__class__]
+
+def get_codec(content_type):
+ return TYPE_CODEC[content_type]
+
+UNSPECIFIED = object()
+
+class Message:
+
+ """
+ A message consists of a standard set of fields, an application
+ defined set of properties, and some content.
+
+ @type id: str
+ @ivar id: the message id
+ @type user_id: str
+ @ivar user_id: the user-id of the message producer
+ @type to: str
+ @ivar to: the destination address
+ @type reply_to: str
+ @ivar reply_to: the address to send replies
+ @type correlation_id: str
+ @ivar correlation_id: a correlation-id for the message
+ @type properties: dict
+ @ivar properties: application specific message properties
+ @type content_type: str
+ @ivar content_type: the content-type of the message
+ @type content: str, unicode, buffer, dict, list
+ @ivar content: the message content
+ """
+
+ def __init__(self, content=None, content_type=UNSPECIFIED, id=None,
+ subject=None, to=None, user_id=None, reply_to=None,
+ correlation_id=None, durable=None, properties=None):
+ """
+ Construct a new message with the supplied content. The
+ content-type of the message will be automatically inferred from
+ type of the content parameter.
+
+ @type content: str, unicode, buffer, dict, list
+ @param content: the message content
+
+ @type content_type: str
+ @param content_type: the content-type of the message
+ """
+ self.id = id
+ self.subject = subject
+ self.to = to
+ self.user_id = user_id
+ self.reply_to = reply_to
+ self.correlation_id = correlation_id
+ self.durable = durable
+ self.redelivered = False
+ if properties is None:
+ self.properties = {}
+ else:
+ self.properties = properties
+ if content_type is UNSPECIFIED:
+ self.content_type = get_type(content)
+ else:
+ self.content_type = content_type
+ self.content = content
+
+ def __repr__(self):
+ args = []
+ for name in ["id", "subject", "to", "user_id", "reply_to",
+ "correlation_id"]:
+ value = self.__dict__[name]
+ if value is not None: args.append("%s=%r" % (name, value))
+ for name in ["durable", "properties"]:
+ value = self.__dict__[name]
+ if value: args.append("%s=%r" % (name, value))
+ if self.content_type != get_type(self.content):
+ args.append("content_type=%r" % self.content_type)
+ if self.content is not None:
+ if args:
+ args.append("content=%r" % self.content)
+ else:
+ args.append(repr(self.content))
+ return "Message(%s)" % ", ".join(args)
+
+__all__ = ["Message"]