diff options
Diffstat (limited to 'python/qpid/messaging')
-rw-r--r-- | python/qpid/messaging/__init__.py | 35 | ||||
-rw-r--r-- | python/qpid/messaging/address.py | 168 | ||||
-rw-r--r-- | python/qpid/messaging/constants.py | 32 | ||||
-rw-r--r-- | python/qpid/messaging/driver.py | 1041 | ||||
-rw-r--r-- | python/qpid/messaging/endpoints.py | 832 | ||||
-rw-r--r-- | python/qpid/messaging/exceptions.py | 67 | ||||
-rw-r--r-- | python/qpid/messaging/message.py | 141 |
7 files changed, 2316 insertions, 0 deletions
diff --git a/python/qpid/messaging/__init__.py b/python/qpid/messaging/__init__.py new file mode 100644 index 0000000000..f9ddda2e80 --- /dev/null +++ b/python/qpid/messaging/__init__.py @@ -0,0 +1,35 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +A candidate high level messaging API for python. + +Areas that still need work: + + - definition of the arguments for L{Session.sender} and L{Session.receiver} + - standard L{Message} properties + - L{Message} content encoding + - protocol negotiation/multiprotocol impl +""" + +from qpid.datatypes import timestamp, uuid4, Serial +from qpid.messaging.constants import * +from qpid.messaging.endpoints import * +from qpid.messaging.exceptions import * +from qpid.messaging.message import * diff --git a/python/qpid/messaging/address.py b/python/qpid/messaging/address.py new file mode 100644 index 0000000000..bf494433e4 --- /dev/null +++ b/python/qpid/messaging/address.py @@ -0,0 +1,168 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import re +from qpid.lexer import Lexicon, LexError +from qpid.parser import Parser, ParseError + +l = Lexicon() + +LBRACE = l.define("LBRACE", r"\{") +RBRACE = l.define("RBRACE", r"\}") +LBRACK = l.define("LBRACK", r"\[") +RBRACK = l.define("RBRACK", r"\]") +COLON = l.define("COLON", r":") +SEMI = l.define("SEMI", r";") +SLASH = l.define("SLASH", r"/") +COMMA = l.define("COMMA", r",") +NUMBER = l.define("NUMBER", r'[+-]?[0-9]*\.?[0-9]+') +TRUE = l.define("TRUE", r'True') +FALSE = l.define("FALSE", r'False') +ID = l.define("ID", r'[a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])?') +STRING = l.define("STRING", r""""(?:[^\\"]|\\.)*"|'(?:[^\\']|\\.)*'""") +ESC = l.define("ESC", r"\\[^ux]|\\x[0-9a-fA-F][0-9a-fA-F]|\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F]") +SYM = l.define("SYM", r"[.#*%@$^!+-]") +WSPACE = l.define("WSPACE", r"[ \n\r\t]+") +EOF = l.eof("EOF") + +LEXER = l.compile() + +def lex(st): + return LEXER.lex(st) + +def tok2str(tok): + if tok.type is STRING: + return eval(tok.value) + elif tok.type is ESC: + if tok.value[1] == "x": + return eval('"%s"' % tok.value) + elif tok.value[1] == "u": + return eval('u"%s"' % tok.value) + else: + return tok.value[1] + else: + return tok.value + +def tok2obj(tok): + if tok.type in (STRING, NUMBER): + return eval(tok.value) + elif tok.type == TRUE: + return True + elif tok.type == FALSE: + return False + else: + return tok.value + +def toks2str(toks): + if toks: + return "".join(map(tok2str, toks)) + else: + return None + +class AddressParser(Parser): + + def __init__(self, tokens): + Parser.__init__(self, [t for t in tokens if t.type is not WSPACE]) + + def parse(self): + result = self.address() + self.eat(EOF) + return result + + def address(self): + name = toks2str(self.eat_until(SLASH, SEMI, EOF)) + + if name is None: + raise ParseError(self.next()) + + if self.matches(SLASH): + self.eat(SLASH) + subject = toks2str(self.eat_until(SEMI, EOF)) + else: + subject = None + + if self.matches(SEMI): + self.eat(SEMI) + options = self.map() + else: + options = None + return name, subject, options + + def map(self): + self.eat(LBRACE) + + result = {} + while True: + if self.matches(NUMBER, STRING, ID, LBRACE, LBRACK): + n, v = self.keyval() + result[n] = v + if self.matches(COMMA): + self.eat(COMMA) + elif self.matches(RBRACE): + break + else: + raise ParseError(self.next(), COMMA, RBRACE) + elif self.matches(RBRACE): + break + else: + raise ParseError(self.next(), NUMBER, STRING, ID, LBRACE, LBRACK, + RBRACE) + + self.eat(RBRACE) + return result + + def keyval(self): + key = self.value() + self.eat(COLON) + val = self.value() + return (key, val) + + def value(self): + if self.matches(NUMBER, STRING, ID, TRUE, FALSE): + return tok2obj(self.eat()) + elif self.matches(LBRACE): + return self.map() + elif self.matches(LBRACK): + return self.list() + else: + raise ParseError(self.next(), NUMBER, STRING, ID, LBRACE, LBRACK) + + def list(self): + self.eat(LBRACK) + + result = [] + + while True: + if self.matches(RBRACK): + break + else: + result.append(self.value()) + if self.matches(COMMA): + self.eat(COMMA) + elif self.matches(RBRACK): + break + else: + raise ParseError(self.next(), COMMA, RBRACK) + + self.eat(RBRACK) + return result + +def parse(addr): + return AddressParser(lex(addr)).parse() + +__all__ = ["parse", "ParseError"] diff --git a/python/qpid/messaging/constants.py b/python/qpid/messaging/constants.py new file mode 100644 index 0000000000..cad47bd52a --- /dev/null +++ b/python/qpid/messaging/constants.py @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +class Constant: + + def __init__(self, name, value=None): + self.name = name + self.value = value + + def __repr__(self): + return self.name + +AMQP_PORT = 5672 +AMQPS_PORT = 5671 + +UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL) diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py new file mode 100644 index 0000000000..9d58616804 --- /dev/null +++ b/python/qpid/messaging/driver.py @@ -0,0 +1,1041 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import socket, struct, sys, time +from logging import getLogger +from qpid import compat +from qpid import sasl +from qpid.concurrency import synchronized +from qpid.datatypes import RangedSet, Serial +from qpid.exceptions import Timeout, VersionError +from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \ + FrameDecoder, SegmentDecoder, OpDecoder +from qpid.messaging import address +from qpid.messaging.constants import UNLIMITED +from qpid.messaging.endpoints import Pattern +from qpid.messaging.exceptions import ConnectError +from qpid.messaging.message import get_codec, Message +from qpid.ops import * +from qpid.selector import Selector +from qpid.util import connect +from qpid.validator import And, Context, Map, Types, Values +from threading import Condition, Thread + +log = getLogger("qpid.messaging") +rawlog = getLogger("qpid.messaging.io.raw") +opslog = getLogger("qpid.messaging.io.ops") + +def addr2reply_to(addr): + name, subject, options = address.parse(addr) + return ReplyTo(name, subject) + +def reply_to2addr(reply_to): + if reply_to.routing_key is None: + return reply_to.exchange + elif reply_to.exchange in (None, ""): + return reply_to.routing_key + else: + return "%s/%s" % (reply_to.exchange, reply_to.routing_key) + +class Attachment: + + def __init__(self, target): + self.target = target + +# XXX + +DURABLE_DEFAULT=True + +# XXX + +FILTER_DEFAULTS = { + "topic": Pattern("*"), + "amq.failover": Pattern("DUMMY") + } + +# XXX +ppid = 0 +try: + ppid = os.getppid() +except: + pass + +CLIENT_PROPERTIES = {"product": "qpid python client", + "version": "development", + "platform": os.name, + "qpid.client_process": os.path.basename(sys.argv[0]), + "qpid.client_pid": os.getpid(), + "qpid.client_ppid": ppid} + +def noop(): pass + +class SessionState: + + def __init__(self, driver, session, name, channel): + self.driver = driver + self.session = session + self.name = name + self.channel = channel + self.detached = False + self.committing = False + self.aborting = False + + # sender state + self.sent = Serial(0) + self.acknowledged = RangedSet() + self.actions = {} + self.min_completion = self.sent + self.max_completion = self.sent + self.results = {} + + # receiver state + self.received = None + self.executed = RangedSet() + + # XXX: need to periodically exchange completion/known_completion + + self.destinations = {} + + def write_query(self, query, handler): + id = self.sent + self.write_cmd(query, lambda: handler(self.results.pop(id))) + + def write_cmd(self, cmd, action=noop): + if action != noop: + cmd.sync = True + if self.detached: + raise Exception("detached") + cmd.id = self.sent + self.sent += 1 + self.actions[cmd.id] = action + self.max_completion = cmd.id + self.write_op(cmd) + + def write_cmds(self, cmds, action=noop): + if cmds: + for cmd in cmds[:-1]: + self.write_cmd(cmd) + self.write_cmd(cmds[-1], action) + else: + action() + + def write_op(self, op): + op.channel = self.channel + self.driver.write_op(op) + +POLICIES = Values("always", "sender", "receiver", "never") + +class Bindings: + + def validate(self, o, ctx): + t = ctx.containers[1].get("type", "queue") + if t != "queue": + return "bindings are only permitted on nodes of type queue" + +COMMON_OPTS = { + "create": POLICIES, + "delete": POLICIES, + "assert": POLICIES, + "node-properties": Map({ + "type": Values("queue", "topic"), + "durable": Types(bool), + "x-properties": Map({ + "type": Types(basestring), + "bindings": And(Types(list), Bindings()) + }, + restricted=False) + }) + } + +RECEIVE_MODES = Values("browse", "consume") + +SOURCE_OPTS = COMMON_OPTS.copy() +SOURCE_OPTS.update({ + "mode": RECEIVE_MODES + }) + +TARGET_OPTS = COMMON_OPTS.copy() + +class LinkIn: + + ADDR_NAME = "source" + DIR_NAME = "receiver" + VALIDATOR = Map(SOURCE_OPTS) + + def init_link(self, sst, rcv, _rcv): + _rcv.destination = str(rcv.id) + sst.destinations[_rcv.destination] = _rcv + _rcv.draining = False + + def do_link(self, sst, rcv, _rcv, type, subtype, action): + acq_mode = acquire_mode.pre_acquired + + if type == "topic": + _rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination) + sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)) + filter = _rcv.options.get("filter") + if _rcv.subject is None and filter is None: + f = FILTER_DEFAULTS[subtype] + elif _rcv.subject and filter: + # XXX + raise Exception("can't supply both subject and filter") + elif _rcv.subject: + # XXX + f = Pattern(_rcv.subject) + else: + f = filter + f._bind(sst, _rcv.name, _rcv._queue) + elif type == "queue": + _rcv._queue = _rcv.name + if _rcv.options.get("mode", "consume") == "browse": + acq_mode = acquire_mode.not_acquired + + sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination, + acquire_mode = acq_mode)) + sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action) + + def do_unlink(self, sst, rcv, _rcv, action=noop): + sst.write_cmd(MessageCancel(_rcv.destination), action) + + def del_link(self, sst, rcv, _rcv): + del sst.destinations[_rcv.destination] + +class LinkOut: + + ADDR_NAME = "target" + DIR_NAME = "sender" + VALIDATOR = Map(TARGET_OPTS) + + def init_link(self, sst, snd, _snd): + _snd.closing = False + + def do_link(self, sst, snd, _snd, type, subtype, action): + if type == "topic": + _snd._exchange = _snd.name + _snd._routing_key = _snd.subject + elif type == "queue": + _snd._exchange = "" + _snd._routing_key = _snd.name + action() + + def do_unlink(self, sst, snd, _snd, action=noop): + action() + + def del_link(self, sst, snd, _snd): + pass + +class Cache: + + def __init__(self, ttl): + self.ttl = ttl + self.entries = {} + + def __setitem__(self, key, value): + self.entries[key] = time.time(), value + + def __getitem__(self, key): + tstamp, value = self.entries[key] + if time.time() - tstamp >= self.ttl: + del self.entries[key] + raise KeyError(key) + else: + return value + + def __delitem__(self, key): + del self.entries[key] + +# XXX +HEADER="!4s4B" + +EMPTY_DP = DeliveryProperties() +EMPTY_MP = MessageProperties() + +SUBJECT = "qpid.subject" +TO = "qpid.to" + +class Driver: + + def __init__(self, connection): + self.connection = connection + self.log_id = "%x" % id(self.connection) + self._lock = self.connection._lock + + self._in = LinkIn() + self._out = LinkOut() + + self._selector = Selector.default() + self._attempts = 0 + self._hosts = [(self.connection.host, self.connection.port)] + \ + self.connection.backups + self._host = 0 + self._retrying = False + + self.reset() + + def reset(self): + self._opening = False + self._closing = False + self._connected = False + self._attachments = {} + + self._channel_max = 65536 + self._channels = 0 + self._sessions = {} + + options = self.connection.options + + self.address_cache = Cache(options.get("address_ttl", 60)) + + self._socket = None + self._buf = "" + self._hdr = "" + self._op_enc = OpEncoder() + self._seg_enc = SegmentEncoder() + self._frame_enc = FrameEncoder() + self._frame_dec = FrameDecoder() + self._seg_dec = SegmentDecoder() + self._op_dec = OpDecoder() + self._timeout = None + + self._sasl = sasl.Client() + if self.connection.username: + self._sasl.setAttr("username", self.connection.username) + if self.connection.password: + self._sasl.setAttr("password", self.connection.password) + if self.connection.host: + self._sasl.setAttr("host", self.connection.host) + self._sasl.setAttr("service", options.get("service", "qpidd")) + if "min_ssf" in options: + self._sasl.setAttr("minssf", options["min_ssf"]) + if "max_ssf" in options: + self._sasl.setAttr("maxssf", options["max_ssf"]) + self._sasl.init() + self._sasl_encode = False + self._sasl_decode = False + + for ssn in self.connection.sessions.values(): + for m in ssn.acked + ssn.unacked + ssn.incoming: + m._transfer_id = None + for snd in ssn.senders: + snd.linked = False + for rcv in ssn.receivers: + rcv.impending = rcv.received + rcv.linked = False + + @synchronized + def wakeup(self): + self.dispatch() + self._selector.wakeup() + + def start(self): + self._selector.register(self) + + def fileno(self): + return self._socket.fileno() + + @synchronized + def reading(self): + return self._socket is not None + + @synchronized + def writing(self): + return self._socket is not None and self._buf + + @synchronized + def timing(self): + return self._timeout + + @synchronized + def readable(self): + error = None + recoverable = False + try: + data = self._socket.recv(64*1024) + if data: + rawlog.debug("READ[%s]: %r", self.log_id, data) + if self._sasl_decode: + data = self._sasl.decode(data) + else: + rawlog.debug("ABORTED[%s]: %s", self.log_id, self._socket.getpeername()) + error = "connection aborted" + recoverable = True + except socket.error, e: + error = e + recoverable = True + + if not error: + try: + if len(self._hdr) < 8: + r = 8 - len(self._hdr) + self._hdr += data[:r] + data = data[r:] + + if len(self._hdr) == 8: + self.do_header(self._hdr) + + self._frame_dec.write(data) + self._seg_dec.write(*self._frame_dec.read()) + self._op_dec.write(*self._seg_dec.read()) + for op in self._op_dec.read(): + self.assign_id(op) + opslog.debug("RCVD[%s]: %r", self.log_id, op) + op.dispatch(self) + except VersionError, e: + error = e + except: + msg = compat.format_exc() + error = msg + + if error: + self._error(error, recoverable) + else: + self.dispatch() + + self.connection._waiter.notifyAll() + + def assign_id(self, op): + if isinstance(op, Command): + sst = self.get_sst(op) + op.id = sst.received + sst.received += 1 + + @synchronized + def writeable(self): + try: + n = self._socket.send(self._buf) + rawlog.debug("SENT[%s]: %r", self.log_id, self._buf[:n]) + self._buf = self._buf[n:] + except socket.error, e: + self._error(e, True) + self.connection._waiter.notifyAll() + + @synchronized + def timeout(self): + self.dispatch() + self.connection._waiter.notifyAll() + + def _error(self, err, recoverable): + if self._socket is not None: + self._socket.close() + self.reset() + if (recoverable and self.connection.reconnect and + (self.connection.reconnect_limit is None or + self.connection.reconnect_limit <= 0 or + self._attempts <= self.connection.reconnect_limit)): + if self._host > 0: + delay = 0 + else: + delay = self.connection.reconnect_delay + self._timeout = time.time() + delay + log.warn("recoverable error[attempt %s]: %s" % (self._attempts, err)) + if delay > 0: + log.warn("sleeping %s seconds" % delay) + self._retrying = True + else: + self.connection.error = (err,) + + def write_op(self, op): + opslog.debug("SENT[%s]: %r", self.log_id, op) + self._op_enc.write(op) + self._seg_enc.write(*self._op_enc.read()) + self._frame_enc.write(*self._seg_enc.read()) + bytes = self._frame_enc.read() + if self._sasl_encode: + bytes = self._sasl.encode(bytes) + self._buf += bytes + + def do_header(self, hdr): + cli_major = 0; cli_minor = 10 + magic, _, _, major, minor = struct.unpack(HEADER, hdr) + if major != cli_major or minor != cli_minor: + raise VersionError("client: %s-%s, server: %s-%s" % + (cli_major, cli_minor, major, minor)) + + def do_connection_start(self, start): + if self.connection.mechanisms: + permitted = self.connection.mechanisms.split() + mechs = [m for m in start.mechanisms if m in permitted] + else: + mechs = start.mechanisms + mech, initial = self._sasl.start(" ".join(mechs)) + self.write_op(ConnectionStartOk(client_properties=CLIENT_PROPERTIES, + mechanism=mech, response=initial)) + + def do_connection_secure(self, secure): + resp = self._sasl.step(secure.challenge) + self.write_op(ConnectionSecureOk(response=resp)) + + def do_connection_tune(self, tune): + # XXX: is heartbeat protocol specific? + if tune.channel_max is not None: + self.channel_max = tune.channel_max + self.write_op(ConnectionTuneOk(heartbeat=self.connection.heartbeat, + channel_max=self.channel_max)) + self.write_op(ConnectionOpen()) + self._sasl_encode = True + + def do_connection_open_ok(self, open_ok): + self._connected = True + self._sasl_decode = True + + def connection_heartbeat(self, hrt): + self.write_op(ConnectionHeartbeat()) + + def do_connection_close(self, close): + self.write_op(ConnectionCloseOk()) + if close.reply_code != close_code.normal: + self.connection.error = (close.reply_code, close.reply_text) + # XXX: should we do a half shutdown on the socket here? + # XXX: we really need to test this, we may end up reporting a + # connection abort after this, if we were to do a shutdown on read + # and stop reading, then we wouldn't report the abort, that's + # probably the right thing to do + + def do_connection_close_ok(self, close_ok): + self._socket.close() + self.reset() + + def do_session_attached(self, atc): + pass + + def do_session_command_point(self, cp): + sst = self.get_sst(cp) + sst.received = cp.command_id + + def do_session_completed(self, sc): + sst = self.get_sst(sc) + for r in sc.commands: + sst.acknowledged.add(r.lower, r.upper) + + if not sc.commands.empty(): + while sst.min_completion in sc.commands: + if sst.actions.has_key(sst.min_completion): + sst.actions.pop(sst.min_completion)() + sst.min_completion += 1 + + def session_known_completed(self, kcmp): + sst = self.get_sst(kcmp) + executed = RangedSet() + for e in sst.executed.ranges: + for ke in kcmp.ranges: + if e.lower in ke and e.upper in ke: + break + else: + executed.add_range(e) + sst.executed = completed + + def do_session_flush(self, sf): + sst = self.get_sst(sf) + if sf.expected: + if sst.received is None: + exp = None + else: + exp = RangedSet(sst.received) + sst.write_op(SessionExpected(exp)) + if sf.confirmed: + sst.write_op(SessionConfirmed(sst.executed)) + if sf.completed: + sst.write_op(SessionCompleted(sst.executed)) + + def do_execution_result(self, er): + sst = self.get_sst(er) + sst.results[er.command_id] = er.value + + def do_execution_exception(self, ex): + sst = self.get_sst(ex) + sst.session.error = (ex,) + + def dispatch(self): + try: + if self._socket is None and self.connection._connected and not self._opening: + self.connect() + elif self._socket is not None and not self.connection._connected and not self._closing: + self.disconnect() + + if self._connected and not self._closing: + for ssn in self.connection.sessions.values(): + self.attach(ssn) + self.process(ssn) + except: + msg = compat.format_exc() + self.connection.error = (msg,) + + def connect(self): + try: + # XXX: should make this non blocking + if self._host == 0: + self._attempts += 1 + host, port = self._hosts[self._host] + if self._retrying: + log.warn("trying: %s:%s", host, port) + self._socket = connect(host, port) + if self._retrying: + log.warn("reconnect succeeded: %s:%s", host, port) + self._timeout = None + self._attempts = 0 + self._host = 0 + self._retrying = False + self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10) + self._opening = True + except socket.error, e: + self._host = (self._host + 1) % len(self._hosts) + self._error(e, True) + + def disconnect(self): + self.write_op(ConnectionClose(close_code.normal)) + self._closing = True + + def attach(self, ssn): + sst = self._attachments.get(ssn) + if sst is None and not ssn.closed: + for i in xrange(0, self.channel_max): + if not self._sessions.has_key(i): + ch = i + break + else: + raise RuntimeError("all channels used") + sst = SessionState(self, ssn, ssn.name, ch) + sst.write_op(SessionAttach(name=ssn.name)) + sst.write_op(SessionCommandPoint(sst.sent, 0)) + sst.outgoing_idx = 0 + sst.acked = [] + if ssn.transactional: + sst.write_cmd(TxSelect()) + self._attachments[ssn] = sst + self._sessions[sst.channel] = sst + + for snd in ssn.senders: + self.link(snd, self._out, snd.target) + for rcv in ssn.receivers: + self.link(rcv, self._in, rcv.source) + + if sst is not None and ssn.closing and not sst.detached: + sst.detached = True + sst.write_op(SessionDetach(name=ssn.name)) + + def get_sst(self, op): + return self._sessions[op.channel] + + def do_session_detached(self, dtc): + sst = self._sessions.pop(dtc.channel) + ssn = sst.session + del self._attachments[ssn] + ssn.closed = True + + def do_session_detach(self, dtc): + sst = self.get_sst(dtc) + sst.write_op(SessionDetached(name=dtc.name)) + self.do_session_detached(dtc) + + def link(self, lnk, dir, addr): + sst = self._attachments.get(lnk.session) + _lnk = self._attachments.get(lnk) + + if _lnk is None and not lnk.closing and not lnk.closed: + _lnk = Attachment(lnk) + _lnk.closing = False + dir.init_link(sst, lnk, _lnk) + + err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk, dir) + if err: + lnk.error = (err,) + lnk.closed = True + return + + def linked(): + lnk.linked = True + + def resolved(type, subtype): + dir.do_link(sst, lnk, _lnk, type, subtype, linked) + + self.resolve_declare(sst, _lnk, dir.DIR_NAME, resolved) + self._attachments[lnk] = _lnk + + if lnk.linked and lnk.closing and not lnk.closed: + if not _lnk.closing: + def unlinked(): + dir.del_link(sst, lnk, _lnk) + del self._attachments[lnk] + lnk.closed = True + if _lnk.options.get("delete") in ("always", dir.DIR_NAME): + dir.do_unlink(sst, lnk, _lnk) + self.delete(sst, _lnk.name, unlinked) + else: + dir.do_unlink(sst, lnk, _lnk, unlinked) + _lnk.closing = True + elif not lnk.linked and lnk.closing and not lnk.closed: + lnk.closed = True + + def parse_address(self, lnk, dir, addr): + if addr is None: + return "%s is None" % dir.ADDR_NAME + else: + try: + lnk.name, lnk.subject, lnk.options = address.parse(addr) + # XXX: subject + if lnk.options is None: + lnk.options = {} + except address.LexError, e: + return e + except address.ParseError, e: + return e + + def validate_options(self, lnk, dir): + ctx = Context() + err = dir.VALIDATOR.validate(lnk.options, ctx) + if err: return "error in options: %s" % err + + def resolve_declare(self, sst, lnk, dir, action): + declare = lnk.options.get("create") in ("always", dir) + def do_resolved(type, subtype): + err = None + if type is None: + if declare: + err = self.declare(sst, lnk, action) + else: + err = ("no such queue: %s" % lnk.name,) + elif type == "queue": + try: + cmds = self.bindings(lnk) + sst.write_cmds(cmds, lambda: action(type, subtype)) + except address.ParseError, e: + err = (e,) + else: + action(type, subtype) + + if err: + tgt = lnk.target + tgt.error = err + del self._attachments[tgt] + tgt.closed = True + return + self.resolve(sst, lnk.name, do_resolved, force=declare) + + def resolve(self, sst, name, action, force=False): + if not force: + try: + type, subtype = self.address_cache[name] + action(type, subtype) + return + except KeyError: + pass + + args = [] + def do_result(r): + args.append(r) + def do_action(r): + do_result(r) + er, qr = args + if er.not_found and not qr.queue: + type, subtype = None, None + elif qr.queue: + type, subtype = "queue", None + else: + type, subtype = "topic", er.type + if type is not None: + self.address_cache[name] = (type, subtype) + action(type, subtype) + sst.write_query(ExchangeQuery(name), do_result) + sst.write_query(QueueQuery(name), do_action) + + def declare(self, sst, lnk, action): + name = lnk.name + props = lnk.options.get("node-properties", {}) + durable = props.get("durable", DURABLE_DEFAULT) + type = props.get("type", "queue") + xprops = props.get("x-properties", {}) + + if type == "topic": + cmd = ExchangeDeclare(exchange=name, durable=durable) + elif type == "queue": + cmd = QueueDeclare(queue=name, durable=durable) + else: + raise ValueError(type) + + for f in cmd.FIELDS: + if f.name != "arguments" and xprops.has_key(f.name): + cmd[f.name] = xprops.pop(f.name) + if xprops: + cmd.arguments = xprops + + if type == "topic": + if cmd.type is None: + cmd.type = "topic" + subtype = cmd.type + else: + subtype = None + + cmds = [cmd] + if type == "queue": + try: + cmds.extend(self.bindings(lnk)) + except address.ParseError, e: + return (e,) + + def declared(): + self.address_cache[name] = (type, subtype) + action(type, subtype) + + sst.write_cmds(cmds, declared) + + def bindings(self, lnk): + props = lnk.options.get("node-properties", {}) + xprops = props.get("x-properties", {}) + bindings = xprops.get("bindings", []) + cmds = [] + for b in bindings: + n, s, o = address.parse(b) + cmds.append(ExchangeBind(lnk.name, n, s, o)) + return cmds + + def delete(self, sst, name, action): + def deleted(): + del self.address_cache[name] + action() + + def do_delete(type, subtype): + if type == "topic": + sst.write_cmd(ExchangeDelete(name), deleted) + elif type == "queue": + sst.write_cmd(QueueDelete(name), deleted) + elif type is None: + action() + else: + raise ValueError(type) + self.resolve(sst, name, do_delete, force=True) + + def process(self, ssn): + if ssn.closed or ssn.closing: return + + sst = self._attachments[ssn] + + while sst.outgoing_idx < len(ssn.outgoing): + msg = ssn.outgoing[sst.outgoing_idx] + snd = msg._sender + # XXX: should check for sender error here + _snd = self._attachments.get(snd) + if _snd and snd.linked: + self.send(snd, msg) + sst.outgoing_idx += 1 + else: + break + + for rcv in ssn.receivers: + self.process_receiver(rcv) + + if ssn.acked: + messages = [m for m in ssn.acked if m not in sst.acked] + if messages: + # XXX: we're ignoring acks that get lost when disconnected, + # could we deal this via some message-id based purge? + ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None]) + for range in ids: + sst.executed.add_range(range) + sst.write_op(SessionCompleted(sst.executed)) + def ack_ack(): + for m in messages: + ssn.acked.remove(m) + if not ssn.transactional: + sst.acked.remove(m) + sst.write_cmd(MessageAccept(ids), ack_ack) + log.debug("SACK[%s]: %s", ssn.log_id, m) + sst.acked.extend(messages) + + if ssn.committing and not sst.committing: + def commit_ok(): + del sst.acked[:] + ssn.committing = False + ssn.committed = True + ssn.aborting = False + ssn.aborted = False + sst.write_cmd(TxCommit(), commit_ok) + sst.committing = True + + if ssn.aborting and not sst.aborting: + sst.aborting = True + def do_rb(): + messages = sst.acked + ssn.unacked + ssn.incoming + ids = RangedSet(*[m._transfer_id for m in messages]) + for range in ids: + sst.executed.add_range(range) + sst.write_op(SessionCompleted(sst.executed)) + sst.write_cmd(MessageRelease(ids)) + sst.write_cmd(TxRollback(), do_rb_ok) + + def do_rb_ok(): + del ssn.incoming[:] + del ssn.unacked[:] + del sst.acked[:] + + for rcv in ssn.receivers: + rcv.impending = rcv.received + rcv.returned = rcv.received + # XXX: do we need to update granted here as well? + + for rcv in ssn.receivers: + self.process_receiver(rcv) + + ssn.aborting = False + ssn.aborted = True + ssn.committing = False + ssn.committed = False + sst.aborting = False + + for rcv in ssn.receivers: + _rcv = self._attachments[rcv] + sst.write_cmd(MessageStop(_rcv.destination)) + sst.write_cmd(ExecutionSync(), do_rb) + + def grant(self, rcv): + sst = self._attachments[rcv.session] + _rcv = self._attachments.get(rcv) + if _rcv is None or not rcv.linked or _rcv.closing or _rcv.draining: + return + + if rcv.granted is UNLIMITED: + if rcv.impending is UNLIMITED: + delta = 0 + else: + delta = UNLIMITED + elif rcv.impending is UNLIMITED: + delta = -1 + else: + delta = max(rcv.granted, rcv.received) - rcv.impending + + if delta is UNLIMITED: + sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) + sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value)) + rcv.impending = UNLIMITED + elif delta > 0: + sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) + sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta)) + rcv.impending += delta + elif delta < 0 and not rcv.draining: + _rcv.draining = True + def do_stop(): + rcv.impending = rcv.received + _rcv.draining = False + self.grant(rcv) + sst.write_cmd(MessageStop(_rcv.destination), do_stop) + + if rcv.draining: + _rcv.draining = True + def do_flush(): + rcv.impending = rcv.received + rcv.granted = rcv.impending + _rcv.draining = False + rcv.draining = False + sst.write_cmd(MessageFlush(_rcv.destination), do_flush) + + + def process_receiver(self, rcv): + if rcv.closed: return + self.grant(rcv) + + def send(self, snd, msg): + sst = self._attachments[snd.session] + _snd = self._attachments[snd] + + if msg.subject is None or _snd._exchange == "": + rk = _snd._routing_key + else: + rk = msg.subject + + if msg.subject is None: + subject = _snd.subject + else: + subject = msg.subject + + # XXX: do we need to query to figure out how to create the reply-to interoperably? + if msg.reply_to: + rt = addr2reply_to(msg.reply_to) + else: + rt = None + dp = DeliveryProperties(routing_key=rk) + mp = MessageProperties(message_id=msg.id, + user_id=msg.user_id, + reply_to=rt, + correlation_id=msg.correlation_id, + content_type=msg.content_type, + application_headers=msg.properties) + if subject is not None: + if mp.application_headers is None: + mp.application_headers = {} + mp.application_headers[SUBJECT] = subject + if msg.to is not None: + if mp.application_headers is None: + mp.application_headers = {} + mp.application_headers[TO] = msg.to + if msg.durable: + dp.delivery_mode = delivery_mode.persistent + enc, dec = get_codec(msg.content_type) + body = enc(msg.content) + def msg_acked(): + # XXX: should we log the ack somehow too? + snd.acked += 1 + m = snd.session.outgoing.pop(0) + sst.outgoing_idx -= 1 + log.debug("RACK[%s]: %s", sst.session.log_id, msg) + assert msg == m + sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp), + payload=body), msg_acked) + log.debug("SENT[%s]: %s", sst.session.log_id, msg) + + def do_message_transfer(self, xfr): + sst = self.get_sst(xfr) + ssn = sst.session + + msg = self._decode(xfr) + rcv = sst.destinations[xfr.destination].target + msg._receiver = rcv + if rcv.impending is not UNLIMITED: + assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending) + rcv.received += 1 + log.debug("RCVD[%s]: %s", ssn.log_id, msg) + ssn.incoming.append(msg) + self.connection._waiter.notifyAll() + + def _decode(self, xfr): + dp = EMPTY_DP + mp = EMPTY_MP + + for h in xfr.headers: + if isinstance(h, DeliveryProperties): + dp = h + elif isinstance(h, MessageProperties): + mp = h + + ap = mp.application_headers + enc, dec = get_codec(mp.content_type) + content = dec(xfr.payload) + msg = Message(content) + msg.id = mp.message_id + if ap is not None: + msg.to = ap.get(TO) + msg.subject = ap.get(SUBJECT) + msg.user_id = mp.user_id + if mp.reply_to is not None: + msg.reply_to = reply_to2addr(mp.reply_to) + msg.correlation_id = mp.correlation_id + msg.durable = dp.delivery_mode == delivery_mode.persistent + msg.redelivered = dp.redelivered + msg.properties = mp.application_headers + msg.content_type = mp.content_type + msg._transfer_id = xfr.id + return msg diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py new file mode 100644 index 0000000000..2337986ecb --- /dev/null +++ b/python/qpid/messaging/endpoints.py @@ -0,0 +1,832 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +A candidate high level messaging API for python. + +Areas that still need work: + + - definition of the arguments for L{Session.sender} and L{Session.receiver} + - standard L{Message} properties + - L{Message} content encoding + - protocol negotiation/multiprotocol impl +""" + +from logging import getLogger +from qpid.codec010 import StringCodec +from qpid.concurrency import synchronized, Waiter, Condition +from qpid.datatypes import Serial, uuid4 +from qpid.messaging.constants import * +from qpid.messaging.exceptions import * +from qpid.messaging.message import * +from qpid.ops import PRIMITIVE +from qpid.util import default +from threading import Thread, RLock + +log = getLogger("qpid.messaging") + +static = staticmethod + +class Connection: + + """ + A Connection manages a group of L{Sessions<Session>} and connects + them with a remote endpoint. + """ + + @static + def open(host, port=None, username="guest", password="guest", **options): + """ + Creates an AMQP connection and connects it to the given host and port. + + @type host: str + @param host: the name or ip address of the remote host + @type port: int + @param port: the port number of the remote host + @rtype: Connection + @return: a connected Connection + """ + conn = Connection(host, port, username, password, **options) + conn.connect() + return conn + + def __init__(self, host, port=None, username="guest", password="guest", **options): + """ + Creates a connection. A newly created connection must be connected + with the Connection.connect() method before it can be used. + + @type host: str + @param host: the name or ip address of the remote host + @type port: int + @param port: the port number of the remote host + @rtype: Connection + @return: a disconnected Connection + """ + self.host = host + self.port = default(port, AMQP_PORT) + self.username = username + self.password = password + self.mechanisms = options.get("mechanisms") + self.heartbeat = options.get("heartbeat") + self.reconnect = options.get("reconnect", False) + self.reconnect_delay = options.get("reconnect_delay", 3) + self.reconnect_limit = options.get("reconnect_limit") + self.backups = options.get("backups", []) + self.options = options + + self.id = str(uuid4()) + self.session_counter = 0 + self.sessions = {} + self._connected = False + self._lock = RLock() + self._condition = Condition(self._lock) + self._waiter = Waiter(self._condition) + self._modcount = Serial(0) + self.error = None + from driver import Driver + self._driver = Driver(self) + self._driver.start() + + def _wait(self, predicate, timeout=None): + return self._waiter.wait(predicate, timeout=timeout) + + def _wakeup(self): + self._modcount += 1 + self._driver.wakeup() + + def _check_error(self, exc=ConnectionError): + if self.error: + raise exc(*self.error) + + def _ewait(self, predicate, timeout=None, exc=ConnectionError): + result = self._wait(lambda: self.error or predicate(), timeout) + self._check_error(exc) + return result + + @synchronized + def session(self, name=None, transactional=False): + """ + Creates or retrieves the named session. If the name is omitted or + None, then a unique name is chosen based on a randomly generated + uuid. + + @type name: str + @param name: the session name + @rtype: Session + @return: the named Session + """ + + if name is None: + name = "%s:%s" % (self.id, self.session_counter) + self.session_counter += 1 + else: + name = "%s:%s" % (self.id, name) + + if self.sessions.has_key(name): + return self.sessions[name] + else: + ssn = Session(self, name, transactional) + self.sessions[name] = ssn + self._wakeup() + return ssn + + @synchronized + def _remove_session(self, ssn): + del self.sessions[ssn.name] + + @synchronized + def connect(self): + """ + Connect to the remote endpoint. + """ + self._connected = True + self._wakeup() + self._ewait(lambda: self._driver._connected and not self._unlinked(), + exc=ConnectError) + + def _unlinked(self): + return [l + for ssn in self.sessions.values() + for l in ssn.senders + ssn.receivers + if not (l.linked or l.error or l.closed)] + + @synchronized + def disconnect(self): + """ + Disconnect from the remote endpoint. + """ + self._connected = False + self._wakeup() + self._ewait(lambda: not self._driver._connected) + + @synchronized + def connected(self): + """ + Return true if the connection is connected, false otherwise. + """ + return self._connected + + @synchronized + def close(self): + """ + Close the connection and all sessions. + """ + for ssn in self.sessions.values(): + ssn.close() + self.disconnect() + +class Pattern: + """ + The pattern filter matches the supplied wildcard pattern against a + message subject. + """ + + def __init__(self, value): + self.value = value + + # XXX: this should become part of the driver + def _bind(self, sst, exchange, queue): + from qpid.ops import ExchangeBind + sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue, + binding_key=self.value.replace("*", "#"))) + +class Session: + + """ + Sessions provide a linear context for sending and receiving + L{Messages<Message>}. L{Messages<Message>} are sent and received + using the L{Sender.send} and L{Receiver.fetch} methods of the + L{Sender} and L{Receiver} objects associated with a Session. + + Each L{Sender} and L{Receiver} is created by supplying either a + target or source address to the L{sender} and L{receiver} methods of + the Session. The address is supplied via a string syntax documented + below. + + Addresses + ========= + + An address identifies a source or target for messages. In its + simplest form this is just a name. In general a target address may + also be used as a source address, however not all source addresses + may be used as a target, e.g. a source might additionally have some + filtering criteria that would not be present in a target. + + A subject may optionally be specified along with the name. When an + address is used as a target, any subject specified in the address is + used as the default subject of outgoing messages for that target. + When an address is used as a source, any subject specified in the + address is pattern matched against the subject of available messages + as a filter for incoming messages from that source. + + The options map contains additional information about the address + including: + + - policies for automatically creating, and deleting the node to + which an address refers + + - policies for asserting facts about the node to which an address + refers + + - extension points that can be used for sender/receiver + configuration + + Mapping to AMQP 0-10 + -------------------- + The name is resolved to either an exchange or a queue by querying + the broker. + + The subject is set as a property on the message. Additionally, if + the name refers to an exchange, the routing key is set to the + subject. + + Syntax + ------ + The following regular expressions define the tokens used to parse + addresses:: + LBRACE: \\{ + RBRACE: \\} + LBRACK: \\[ + RBRACK: \\] + COLON: : + SEMI: ; + SLASH: / + COMMA: , + NUMBER: [+-]?[0-9]*\\.?[0-9]+ + ID: [a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])? + STRING: "(?:[^\\\\"]|\\\\.)*"|\'(?:[^\\\\\']|\\\\.)*\' + ESC: \\\\[^ux]|\\\\x[0-9a-fA-F][0-9a-fA-F]|\\\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F] + SYM: [.#*%@$^!+-] + WSPACE: [ \\n\\r\\t]+ + + The formal grammar for addresses is given below:: + address = name [ "/" subject ] [ ";" options ] + name = ( part | quoted )+ + subject = ( part | quoted | "/" )* + quoted = STRING / ESC + part = LBRACE / RBRACE / COLON / COMMA / NUMBER / ID / SYM + options = map + map = "{" ( keyval ( "," keyval )* )? "}" + keyval = ID ":" value + value = NUMBER / STRING / ID / map / list + list = "[" ( value ( "," value )* )? "]" + + This grammar resuls in the following informal syntax:: + + <name> [ / <subject> ] [ ; <options> ] + + Where options is:: + + { <key> : <value>, ... } + + And values may be: + - numbers + - single, double, or non quoted strings + - maps (dictionaries) + - lists + + Options + ------- + The options map permits the following parameters:: + + <name> [ / <subject> ] ; { + create: <create-policy>, + delete: <delete-policy>, + assert: <assert-policy>, + node-properties: { + type: <node-type>, + durable: <node-durability>, + x-properties: { + bindings: ["<exchange>/<key>", ...], + <passthrough-key>: <passthrough-value> + } + } + } + + The create, delete, and assert policies specify who should perfom + the associated action: + + - I{always}: the action will always be performed + - I{sender}: the action will only be performed by the sender + - I{receiver}: the action will only be performed by the receiver + - I{never}: the action will never be performed (this is the default) + + The node-type is one of: + + - I{topic}: a topic node will default to the topic exchange, + x-properties may be used to specify other exchange types + - I{queue}: this is the default node-type + + The x-properties map permits arbitrary additional keys and values to + be specified. These keys and values are passed through when creating + a node or asserting facts about an existing node. Any passthrough + keys and values that do not match a standard field of the underlying + exchange or queue declare command will be sent in the arguments map. + + Examples + -------- + A simple name resolves to any named node, usually a queue or a + topic:: + + my-queue-or-topic + + A simple name with a subject will also resolve to a node, but the + presence of the subject will cause a sender using this address to + set the subject on outgoing messages, and receivers to filter based + on the subject:: + + my-queue-or-topic/my-subject + + A subject pattern can be used and will cause filtering if used by + the receiver. If used for a sender, the literal value gets set as + the subject:: + + my-queue-or-topic/my-* + + In all the above cases, the address is resolved to an existing node. + If you want the node to be auto-created, then you can do the + following. By default nonexistent nodes are assumed to be queues:: + + my-queue; {create: always} + + You can customize the properties of the queue:: + + my-queue; {create: always, node-properties: {durable: True}} + + You can create a topic instead if you want:: + + my-queue; {create: always, node-properties: {type: topic}} + + You can assert that the address resolves to a node with particular + properties:: + + my-transient-topic; { + assert: always, + node-properties: { + type: topic, + durable: False + } + } + """ + + def __init__(self, connection, name, transactional): + self.connection = connection + self.name = name + self.log_id = "%x" % id(self) + + self.transactional = transactional + + self.committing = False + self.committed = True + self.aborting = False + self.aborted = False + + self.next_sender_id = 0 + self.senders = [] + self.next_receiver_id = 0 + self.receivers = [] + self.outgoing = [] + self.incoming = [] + self.unacked = [] + self.acked = [] + # XXX: I hate this name. + self.ack_capacity = UNLIMITED + + self.error = None + self.closing = False + self.closed = False + + self._lock = connection._lock + + def __repr__(self): + return "<Session %s>" % self.name + + def _wait(self, predicate, timeout=None): + return self.connection._wait(predicate, timeout=timeout) + + def _wakeup(self): + self.connection._wakeup() + + def _check_error(self, exc=SessionError): + self.connection._check_error(exc) + if self.error: + raise exc(*self.error) + + def _ewait(self, predicate, timeout=None, exc=SessionError): + result = self.connection._ewait(lambda: self.error or predicate(), timeout, exc) + self._check_error(exc) + return result + + @synchronized + def sender(self, target, **options): + """ + Creates a L{Sender} that may be used to send L{Messages<Message>} + to the specified target. + + @type target: str + @param target: the target to which messages will be sent + @rtype: Sender + @return: a new Sender for the specified target + """ + sender = Sender(self, self.next_sender_id, target, options) + self.next_sender_id += 1 + self.senders.append(sender) + if not self.closed and self.connection._connected: + self._wakeup() + try: + sender._ewait(lambda: sender.linked) + except SendError, e: + sender.close() + raise e + return sender + + @synchronized + def receiver(self, source, **options): + """ + Creates a receiver that may be used to fetch L{Messages<Message>} + from the specified source. + + @type source: str + @param source: the source of L{Messages<Message>} + @rtype: Receiver + @return: a new Receiver for the specified source + """ + receiver = Receiver(self, self.next_receiver_id, source, options) + self.next_receiver_id += 1 + self.receivers.append(receiver) + if not self.closed and self.connection._connected: + self._wakeup() + try: + receiver._ewait(lambda: receiver.linked) + except ReceiveError, e: + receiver.close() + raise e + return receiver + + @synchronized + def _count(self, predicate): + result = 0 + for msg in self.incoming: + if predicate(msg): + result += 1 + return result + + def _peek(self, predicate): + for msg in self.incoming: + if predicate(msg): + return msg + + def _pop(self, predicate): + i = 0 + while i < len(self.incoming): + msg = self.incoming[i] + if predicate(msg): + del self.incoming[i] + return msg + else: + i += 1 + + @synchronized + def _get(self, predicate, timeout=None): + if self._ewait(lambda: ((self._peek(predicate) is not None) or self.closing), + timeout): + msg = self._pop(predicate) + if msg is not None: + msg._receiver.returned += 1 + self.unacked.append(msg) + log.debug("RETR[%s]: %s", self.log_id, msg) + return msg + return None + + @synchronized + def next_receiver(self, timeout=None): + if self._ewait(lambda: self.incoming, timeout): + return self.incoming[0]._receiver + else: + raise Empty + + @synchronized + def acknowledge(self, message=None, sync=True): + """ + Acknowledge the given L{Message}. If message is None, then all + unacknowledged messages on the session are acknowledged. + + @type message: Message + @param message: the message to acknowledge or None + @type sync: boolean + @param sync: if true then block until the message(s) are acknowledged + """ + if message is None: + messages = self.unacked[:] + else: + messages = [message] + + for m in messages: + if self.ack_capacity is not UNLIMITED: + if self.ack_capacity <= 0: + # XXX: this is currently a SendError, maybe it should be a SessionError? + raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity) + self._wakeup() + self._ewait(lambda: len(self.acked) < self.ack_capacity) + self.unacked.remove(m) + self.acked.append(m) + + self._wakeup() + if sync: + self._ewait(lambda: not [m for m in messages if m in self.acked]) + + @synchronized + def commit(self): + """ + Commit outstanding transactional work. This consists of all + message sends and receives since the prior commit or rollback. + """ + if not self.transactional: + raise NontransactionalSession() + self.committing = True + self._wakeup() + self._ewait(lambda: not self.committing) + if self.aborted: + raise TransactionAborted() + assert self.committed + + @synchronized + def rollback(self): + """ + Rollback outstanding transactional work. This consists of all + message sends and receives since the prior commit or rollback. + """ + if not self.transactional: + raise NontransactionalSession() + self.aborting = True + self._wakeup() + self._ewait(lambda: not self.aborting) + assert self.aborted + + @synchronized + def close(self): + """ + Close the session. + """ + # XXX: should be able to express this condition through API calls + self._ewait(lambda: not self.outgoing and not self.acked) + + for link in self.receivers + self.senders: + link.close() + + self.closing = True + self._wakeup() + self._ewait(lambda: self.closed) + self.connection._remove_session(self) + +class Sender: + + """ + Sends outgoing messages. + """ + + def __init__(self, session, id, target, options): + self.session = session + self.id = id + self.target = target + self.options = options + self.capacity = options.get("capacity", UNLIMITED) + self.durable = options.get("durable") + self.queued = Serial(0) + self.acked = Serial(0) + self.error = None + self.linked = False + self.closing = False + self.closed = False + self._lock = self.session._lock + + def _wakeup(self): + self.session._wakeup() + + def _check_error(self, exc=SendError): + self.session._check_error(exc) + if self.error: + raise exc(*self.error) + + def _ewait(self, predicate, timeout=None, exc=SendError): + result = self.session._ewait(lambda: self.error or predicate(), timeout, exc) + self._check_error(exc) + return result + + @synchronized + def pending(self): + """ + Returns the number of messages awaiting acknowledgment. + @rtype: int + @return: the number of unacknowledged messages + """ + return self.queued - self.acked + + @synchronized + def send(self, object, sync=True, timeout=None): + """ + Send a message. If the object passed in is of type L{unicode}, + L{str}, L{list}, or L{dict}, it will automatically be wrapped in a + L{Message} and sent. If it is of type L{Message}, it will be sent + directly. If the sender capacity is not L{UNLIMITED} then send + will block until there is available capacity to send the message. + If the timeout parameter is specified, then send will throw an + L{InsufficientCapacity} exception if capacity does not become + available within the specified time. + + @type object: unicode, str, list, dict, Message + @param object: the message or content to send + + @type sync: boolean + @param sync: if true then block until the message is sent + + @type timeout: float + @param timeout: the time to wait for available capacity + """ + + if not self.session.connection._connected or self.session.closing: + raise Disconnected() + + self._ewait(lambda: self.linked) + + if isinstance(object, Message): + message = object + else: + message = Message(object) + + if message.durable is None: + message.durable = self.durable + + if self.capacity is not UNLIMITED: + if self.capacity <= 0: + raise InsufficientCapacity("capacity = %s" % self.capacity) + if not self._ewait(lambda: self.pending() < self.capacity, timeout=timeout): + raise InsufficientCapacity("capacity = %s" % self.capacity) + + # XXX: what if we send the same message to multiple senders? + message._sender = self + self.session.outgoing.append(message) + self.queued += 1 + + self._wakeup() + + if sync: + self.sync() + assert message not in self.session.outgoing + + @synchronized + def sync(self): + mno = self.queued + self._ewait(lambda: self.acked >= mno) + + @synchronized + def close(self): + """ + Close the Sender. + """ + self.closing = True + self._wakeup() + try: + self.session._ewait(lambda: self.closed) + finally: + self.session.senders.remove(self) + +class Receiver(object): + + """ + Receives incoming messages from a remote source. Messages may be + fetched with L{fetch}. + """ + + def __init__(self, session, id, source, options): + self.session = session + self.id = id + self.source = source + self.options = options + + self.granted = Serial(0) + self.draining = False + self.impending = Serial(0) + self.received = Serial(0) + self.returned = Serial(0) + + self.error = None + self.linked = False + self.closing = False + self.closed = False + self._lock = self.session._lock + self._capacity = 0 + self._set_capacity(options.get("capacity", 0), False) + + @synchronized + def _set_capacity(self, c, wakeup=True): + if c is UNLIMITED: + self._capacity = c.value + else: + self._capacity = c + self._grant() + if wakeup: + self._wakeup() + + def _get_capacity(self): + if self._capacity == UNLIMITED.value: + return UNLIMITED + else: + return self._capacity + + capacity = property(_get_capacity, _set_capacity) + + def _wakeup(self): + self.session._wakeup() + + def _check_error(self, exc=ReceiveError): + self.session._check_error(exc) + if self.error: + raise exc(*self.error) + + def _ewait(self, predicate, timeout=None, exc=ReceiveError): + result = self.session._ewait(lambda: self.error or predicate(), timeout, exc) + self._check_error(exc) + return result + + @synchronized + def pending(self): + """ + Returns the number of messages available to be fetched by the + application. + + @rtype: int + @return: the number of available messages + """ + return self.received - self.returned + + def _pred(self, msg): + return msg._receiver == self + + @synchronized + def fetch(self, timeout=None): + """ + Fetch and return a single message. A timeout of None will block + forever waiting for a message to arrive, a timeout of zero will + return immediately if no messages are available. + + @type timeout: float + @param timeout: the time to wait for a message to be available + """ + + self._ewait(lambda: self.linked) + + if self._capacity == 0: + self.granted = self.returned + 1 + self._wakeup() + self._ewait(lambda: self.impending >= self.granted) + msg = self.session._get(self._pred, timeout=timeout) + if msg is None: + self.draining = True + self._wakeup() + self._ewait(lambda: not self.draining) + self._grant() + self._wakeup() + msg = self.session._get(self._pred, timeout=0) + if msg is None: + raise Empty() + elif self._capacity not in (0, UNLIMITED.value): + self.granted += 1 + self._wakeup() + return msg + + def _grant(self): + if self._capacity == UNLIMITED.value: + self.granted = UNLIMITED + else: + self.granted = self.received + self._capacity + + @synchronized + def close(self): + """ + Close the receiver. + """ + self.closing = True + self._wakeup() + try: + self.session._ewait(lambda: self.closed) + finally: + self.session.receivers.remove(self) + +__all__ = ["Connection", "Session", "Sender", "Receiver"] diff --git a/python/qpid/messaging/exceptions.py b/python/qpid/messaging/exceptions.py new file mode 100644 index 0000000000..5c8bdedc26 --- /dev/null +++ b/python/qpid/messaging/exceptions.py @@ -0,0 +1,67 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +class ConnectionError(Exception): + """ + The base class for all connection related exceptions. + """ + pass + +class ConnectError(ConnectionError): + """ + Exception raised when there is an error connecting to the remote + peer. + """ + pass + +class SessionError(Exception): + pass + +class Disconnected(SessionError): + """ + Exception raised when an operation is attempted that is illegal when + disconnected. + """ + pass + +class NontransactionalSession(SessionError): + """ + Exception raised when commit or rollback is attempted on a non + transactional session. + """ + pass + +class TransactionAborted(SessionError): + pass + +class SendError(SessionError): + pass + +class InsufficientCapacity(SendError): + pass + +class ReceiveError(SessionError): + pass + +class Empty(ReceiveError): + """ + Exception raised by L{Receiver.fetch} when there is no message + available within the alloted time. + """ + pass diff --git a/python/qpid/messaging/message.py b/python/qpid/messaging/message.py new file mode 100644 index 0000000000..1c7c7beb81 --- /dev/null +++ b/python/qpid/messaging/message.py @@ -0,0 +1,141 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.codec010 import StringCodec +from qpid.ops import PRIMITIVE + +def codec(name): + type = PRIMITIVE[name] + + def encode(x): + sc = StringCodec() + sc.write_primitive(type, x) + return sc.encoded + + def decode(x): + sc = StringCodec(x) + return sc.read_primitive(type) + + return encode, decode + +# XXX: need to correctly parse the mime type and deal with +# content-encoding header + +TYPE_MAPPINGS={ + dict: "amqp/map", + list: "amqp/list", + unicode: "text/plain; charset=utf8", + unicode: "text/plain", + buffer: None, + str: None, + None.__class__: None + } + +TYPE_CODEC={ + "amqp/map": codec("map"), + "amqp/list": codec("list"), + "text/plain; charset=utf8": (lambda x: x.encode("utf8"), lambda x: x.decode("utf8")), + "text/plain": (lambda x: x.encode("utf8"), lambda x: x.decode("utf8")), + "": (lambda x: x, lambda x: x), + None: (lambda x: x, lambda x: x) + } + +def get_type(content): + return TYPE_MAPPINGS[content.__class__] + +def get_codec(content_type): + return TYPE_CODEC[content_type] + +UNSPECIFIED = object() + +class Message: + + """ + A message consists of a standard set of fields, an application + defined set of properties, and some content. + + @type id: str + @ivar id: the message id + @type user_id: str + @ivar user_id: the user-id of the message producer + @type to: str + @ivar to: the destination address + @type reply_to: str + @ivar reply_to: the address to send replies + @type correlation_id: str + @ivar correlation_id: a correlation-id for the message + @type properties: dict + @ivar properties: application specific message properties + @type content_type: str + @ivar content_type: the content-type of the message + @type content: str, unicode, buffer, dict, list + @ivar content: the message content + """ + + def __init__(self, content=None, content_type=UNSPECIFIED, id=None, + subject=None, to=None, user_id=None, reply_to=None, + correlation_id=None, durable=None, properties=None): + """ + Construct a new message with the supplied content. The + content-type of the message will be automatically inferred from + type of the content parameter. + + @type content: str, unicode, buffer, dict, list + @param content: the message content + + @type content_type: str + @param content_type: the content-type of the message + """ + self.id = id + self.subject = subject + self.to = to + self.user_id = user_id + self.reply_to = reply_to + self.correlation_id = correlation_id + self.durable = durable + self.redelivered = False + if properties is None: + self.properties = {} + else: + self.properties = properties + if content_type is UNSPECIFIED: + self.content_type = get_type(content) + else: + self.content_type = content_type + self.content = content + + def __repr__(self): + args = [] + for name in ["id", "subject", "to", "user_id", "reply_to", + "correlation_id"]: + value = self.__dict__[name] + if value is not None: args.append("%s=%r" % (name, value)) + for name in ["durable", "properties"]: + value = self.__dict__[name] + if value: args.append("%s=%r" % (name, value)) + if self.content_type != get_type(self.content): + args.append("content_type=%r" % self.content_type) + if self.content is not None: + if args: + args.append("content=%r" % self.content) + else: + args.append(repr(self.content)) + return "Message(%s)" % ", ".join(args) + +__all__ = ["Message"] |