From 1d2ab4dbafd09fd0ae959b48810c2ef93f8d7af4 Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Thu, 18 Feb 2010 20:22:23 +0000 Subject: split messaging.py into multiple files and made qpid.messaging a package git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@911550 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/python/qpid/address.py | 168 ---- qpid/python/qpid/driver.py | 1035 ------------------------ qpid/python/qpid/messaging.py | 1013 ----------------------- qpid/python/qpid/messaging/__init__.py | 35 + qpid/python/qpid/messaging/address.py | 168 ++++ qpid/python/qpid/messaging/constants.py | 32 + qpid/python/qpid/messaging/driver.py | 1041 ++++++++++++++++++++++++ qpid/python/qpid/messaging/endpoints.py | 832 +++++++++++++++++++ qpid/python/qpid/messaging/exceptions.py | 67 ++ qpid/python/qpid/messaging/message.py | 141 ++++ qpid/python/qpid/tests/__init__.py | 32 +- qpid/python/qpid/tests/address.py | 309 ------- qpid/python/qpid/tests/messaging.py | 1079 ------------------------- qpid/python/qpid/tests/messaging/__init__.py | 106 +++ qpid/python/qpid/tests/messaging/address.py | 309 +++++++ qpid/python/qpid/tests/messaging/endpoints.py | 878 ++++++++++++++++++++ qpid/python/qpid/tests/messaging/message.py | 116 +++ 17 files changed, 3755 insertions(+), 3606 deletions(-) delete mode 100644 qpid/python/qpid/address.py delete mode 100644 qpid/python/qpid/driver.py delete mode 100644 qpid/python/qpid/messaging.py create mode 100644 qpid/python/qpid/messaging/__init__.py create mode 100644 qpid/python/qpid/messaging/address.py create mode 100644 qpid/python/qpid/messaging/constants.py create mode 100644 qpid/python/qpid/messaging/driver.py create mode 100644 qpid/python/qpid/messaging/endpoints.py create mode 100644 qpid/python/qpid/messaging/exceptions.py create mode 100644 qpid/python/qpid/messaging/message.py delete mode 100644 qpid/python/qpid/tests/address.py delete mode 100644 qpid/python/qpid/tests/messaging.py create mode 100644 qpid/python/qpid/tests/messaging/__init__.py create mode 100644 qpid/python/qpid/tests/messaging/address.py create mode 100644 qpid/python/qpid/tests/messaging/endpoints.py create mode 100644 qpid/python/qpid/tests/messaging/message.py diff --git a/qpid/python/qpid/address.py b/qpid/python/qpid/address.py deleted file mode 100644 index ab0fe8221a..0000000000 --- a/qpid/python/qpid/address.py +++ /dev/null @@ -1,168 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import re -from lexer import Lexicon, LexError -from parser import Parser, ParseError - -l = Lexicon() - -LBRACE = l.define("LBRACE", r"\{") -RBRACE = l.define("RBRACE", r"\}") -LBRACK = l.define("LBRACK", r"\[") -RBRACK = l.define("RBRACK", r"\]") -COLON = l.define("COLON", r":") -SEMI = l.define("SEMI", r";") -SLASH = l.define("SLASH", r"/") -COMMA = l.define("COMMA", r",") -NUMBER = l.define("NUMBER", r'[+-]?[0-9]*\.?[0-9]+') -TRUE = l.define("TRUE", r'True') -FALSE = l.define("FALSE", r'False') -ID = l.define("ID", r'[a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])?') -STRING = l.define("STRING", r""""(?:[^\\"]|\\.)*"|'(?:[^\\']|\\.)*'""") -ESC = l.define("ESC", r"\\[^ux]|\\x[0-9a-fA-F][0-9a-fA-F]|\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F]") -SYM = l.define("SYM", r"[.#*%@$^!+-]") -WSPACE = l.define("WSPACE", r"[ \n\r\t]+") -EOF = l.eof("EOF") - -LEXER = l.compile() - -def lex(st): - return LEXER.lex(st) - -def tok2str(tok): - if tok.type is STRING: - return eval(tok.value) - elif tok.type is ESC: - if tok.value[1] == "x": - return eval('"%s"' % tok.value) - elif tok.value[1] == "u": - return eval('u"%s"' % tok.value) - else: - return tok.value[1] - else: - return tok.value - -def tok2obj(tok): - if tok.type in (STRING, NUMBER): - return eval(tok.value) - elif tok.type == TRUE: - return True - elif tok.type == FALSE: - return False - else: - return tok.value - -def toks2str(toks): - if toks: - return "".join(map(tok2str, toks)) - else: - return None - -class AddressParser(Parser): - - def __init__(self, tokens): - Parser.__init__(self, [t for t in tokens if t.type is not WSPACE]) - - def parse(self): - result = self.address() - self.eat(EOF) - return result - - def address(self): - name = toks2str(self.eat_until(SLASH, SEMI, EOF)) - - if name is None: - raise ParseError(self.next()) - - if self.matches(SLASH): - self.eat(SLASH) - subject = toks2str(self.eat_until(SEMI, EOF)) - else: - subject = None - - if self.matches(SEMI): - self.eat(SEMI) - options = self.map() - else: - options = None - return name, subject, options - - def map(self): - self.eat(LBRACE) - - result = {} - while True: - if self.matches(NUMBER, STRING, ID, LBRACE, LBRACK): - n, v = self.keyval() - result[n] = v - if self.matches(COMMA): - self.eat(COMMA) - elif self.matches(RBRACE): - break - else: - raise ParseError(self.next(), COMMA, RBRACE) - elif self.matches(RBRACE): - break - else: - raise ParseError(self.next(), NUMBER, STRING, ID, LBRACE, LBRACK, - RBRACE) - - self.eat(RBRACE) - return result - - def keyval(self): - key = self.value() - self.eat(COLON) - val = self.value() - return (key, val) - - def value(self): - if self.matches(NUMBER, STRING, ID, TRUE, FALSE): - return tok2obj(self.eat()) - elif self.matches(LBRACE): - return self.map() - elif self.matches(LBRACK): - return self.list() - else: - raise ParseError(self.next(), NUMBER, STRING, ID, LBRACE, LBRACK) - - def list(self): - self.eat(LBRACK) - - result = [] - - while True: - if self.matches(RBRACK): - break - else: - result.append(self.value()) - if self.matches(COMMA): - self.eat(COMMA) - elif self.matches(RBRACK): - break - else: - raise ParseError(self.next(), COMMA, RBRACK) - - self.eat(RBRACK) - return result - -def parse(addr): - return AddressParser(lex(addr)).parse() - -__all__ = ["parse", "ParseError"] diff --git a/qpid/python/qpid/driver.py b/qpid/python/qpid/driver.py deleted file mode 100644 index ed6fbc3b6a..0000000000 --- a/qpid/python/qpid/driver.py +++ /dev/null @@ -1,1035 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import address, compat, connection, sasl, socket, struct, sys, time -from concurrency import synchronized -from datatypes import RangedSet, Serial -from exceptions import Timeout, VersionError -from framing import OpEncoder, SegmentEncoder, FrameEncoder, FrameDecoder, SegmentDecoder, OpDecoder -from logging import getLogger -from messaging import get_codec, ConnectError, Message, Pattern, UNLIMITED -from ops import * -from selector import Selector -from threading import Condition, Thread -from util import connect -from validator import And, Context, Map, Types, Values - -log = getLogger("qpid.messaging") -rawlog = getLogger("qpid.messaging.io.raw") -opslog = getLogger("qpid.messaging.io.ops") - -def addr2reply_to(addr): - name, subject, options = address.parse(addr) - return ReplyTo(name, subject) - -def reply_to2addr(reply_to): - if reply_to.routing_key is None: - return reply_to.exchange - elif reply_to.exchange in (None, ""): - return reply_to.routing_key - else: - return "%s/%s" % (reply_to.exchange, reply_to.routing_key) - -class Attachment: - - def __init__(self, target): - self.target = target - -# XXX - -DURABLE_DEFAULT=True - -# XXX - -FILTER_DEFAULTS = { - "topic": Pattern("*"), - "amq.failover": Pattern("DUMMY") - } - -# XXX -ppid = 0 -try: - ppid = os.getppid() -except: - pass - -CLIENT_PROPERTIES = {"product": "qpid python client", - "version": "development", - "platform": os.name, - "qpid.client_process": os.path.basename(sys.argv[0]), - "qpid.client_pid": os.getpid(), - "qpid.client_ppid": ppid} - -def noop(): pass - -class SessionState: - - def __init__(self, driver, session, name, channel): - self.driver = driver - self.session = session - self.name = name - self.channel = channel - self.detached = False - self.committing = False - self.aborting = False - - # sender state - self.sent = Serial(0) - self.acknowledged = RangedSet() - self.actions = {} - self.min_completion = self.sent - self.max_completion = self.sent - self.results = {} - - # receiver state - self.received = None - self.executed = RangedSet() - - # XXX: need to periodically exchange completion/known_completion - - self.destinations = {} - - def write_query(self, query, handler): - id = self.sent - self.write_cmd(query, lambda: handler(self.results.pop(id))) - - def write_cmd(self, cmd, action=noop): - if action != noop: - cmd.sync = True - if self.detached: - raise Exception("detached") - cmd.id = self.sent - self.sent += 1 - self.actions[cmd.id] = action - self.max_completion = cmd.id - self.write_op(cmd) - - def write_cmds(self, cmds, action=noop): - if cmds: - for cmd in cmds[:-1]: - self.write_cmd(cmd) - self.write_cmd(cmds[-1], action) - else: - action() - - def write_op(self, op): - op.channel = self.channel - self.driver.write_op(op) - -POLICIES = Values("always", "sender", "receiver", "never") - -class Bindings: - - def validate(self, o, ctx): - t = ctx.containers[1].get("type", "queue") - if t != "queue": - return "bindings are only permitted on nodes of type queue" - -COMMON_OPTS = { - "create": POLICIES, - "delete": POLICIES, - "assert": POLICIES, - "node-properties": Map({ - "type": Values("queue", "topic"), - "durable": Types(bool), - "x-properties": Map({ - "type": Types(basestring), - "bindings": And(Types(list), Bindings()) - }, - restricted=False) - }) - } - -RECEIVE_MODES = Values("browse", "consume") - -SOURCE_OPTS = COMMON_OPTS.copy() -SOURCE_OPTS.update({ - "mode": RECEIVE_MODES - }) - -TARGET_OPTS = COMMON_OPTS.copy() - -class LinkIn: - - ADDR_NAME = "source" - DIR_NAME = "receiver" - VALIDATOR = Map(SOURCE_OPTS) - - def init_link(self, sst, rcv, _rcv): - _rcv.destination = str(rcv.id) - sst.destinations[_rcv.destination] = _rcv - _rcv.draining = False - - def do_link(self, sst, rcv, _rcv, type, subtype, action): - acq_mode = acquire_mode.pre_acquired - - if type == "topic": - _rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination) - sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)) - filter = _rcv.options.get("filter") - if _rcv.subject is None and filter is None: - f = FILTER_DEFAULTS[subtype] - elif _rcv.subject and filter: - # XXX - raise Exception("can't supply both subject and filter") - elif _rcv.subject: - # XXX - from messaging import Pattern - f = Pattern(_rcv.subject) - else: - f = filter - f._bind(sst, _rcv.name, _rcv._queue) - elif type == "queue": - _rcv._queue = _rcv.name - if _rcv.options.get("mode", "consume") == "browse": - acq_mode = acquire_mode.not_acquired - - sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination, - acquire_mode = acq_mode)) - sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action) - - def do_unlink(self, sst, rcv, _rcv, action=noop): - sst.write_cmd(MessageCancel(_rcv.destination), action) - - def del_link(self, sst, rcv, _rcv): - del sst.destinations[_rcv.destination] - -class LinkOut: - - ADDR_NAME = "target" - DIR_NAME = "sender" - VALIDATOR = Map(TARGET_OPTS) - - def init_link(self, sst, snd, _snd): - _snd.closing = False - - def do_link(self, sst, snd, _snd, type, subtype, action): - if type == "topic": - _snd._exchange = _snd.name - _snd._routing_key = _snd.subject - elif type == "queue": - _snd._exchange = "" - _snd._routing_key = _snd.name - action() - - def do_unlink(self, sst, snd, _snd, action=noop): - action() - - def del_link(self, sst, snd, _snd): - pass - -class Cache: - - def __init__(self, ttl): - self.ttl = ttl - self.entries = {} - - def __setitem__(self, key, value): - self.entries[key] = time.time(), value - - def __getitem__(self, key): - tstamp, value = self.entries[key] - if time.time() - tstamp >= self.ttl: - del self.entries[key] - raise KeyError(key) - else: - return value - - def __delitem__(self, key): - del self.entries[key] - -# XXX -HEADER="!4s4B" - -EMPTY_DP = DeliveryProperties() -EMPTY_MP = MessageProperties() - -SUBJECT = "qpid.subject" -TO = "qpid.to" - -class Driver: - - def __init__(self, connection): - self.connection = connection - self.log_id = "%x" % id(self.connection) - self._lock = self.connection._lock - - self._in = LinkIn() - self._out = LinkOut() - - self._selector = Selector.default() - self._attempts = 0 - self._hosts = [(self.connection.host, self.connection.port)] + \ - self.connection.backups - self._host = 0 - self._retrying = False - - self.reset() - - def reset(self): - self._opening = False - self._closing = False - self._connected = False - self._attachments = {} - - self._channel_max = 65536 - self._channels = 0 - self._sessions = {} - - options = self.connection.options - - self.address_cache = Cache(options.get("address_ttl", 60)) - - self._socket = None - self._buf = "" - self._hdr = "" - self._op_enc = OpEncoder() - self._seg_enc = SegmentEncoder() - self._frame_enc = FrameEncoder() - self._frame_dec = FrameDecoder() - self._seg_dec = SegmentDecoder() - self._op_dec = OpDecoder() - self._timeout = None - - self._sasl = sasl.Client() - if self.connection.username: - self._sasl.setAttr("username", self.connection.username) - if self.connection.password: - self._sasl.setAttr("password", self.connection.password) - if self.connection.host: - self._sasl.setAttr("host", self.connection.host) - self._sasl.setAttr("service", options.get("service", "qpidd")) - if "min_ssf" in options: - self._sasl.setAttr("minssf", options["min_ssf"]) - if "max_ssf" in options: - self._sasl.setAttr("maxssf", options["max_ssf"]) - self._sasl.init() - self._sasl_encode = False - self._sasl_decode = False - - for ssn in self.connection.sessions.values(): - for m in ssn.acked + ssn.unacked + ssn.incoming: - m._transfer_id = None - for snd in ssn.senders: - snd.linked = False - for rcv in ssn.receivers: - rcv.impending = rcv.received - rcv.linked = False - - @synchronized - def wakeup(self): - self.dispatch() - self._selector.wakeup() - - def start(self): - self._selector.register(self) - - def fileno(self): - return self._socket.fileno() - - @synchronized - def reading(self): - return self._socket is not None - - @synchronized - def writing(self): - return self._socket is not None and self._buf - - @synchronized - def timing(self): - return self._timeout - - @synchronized - def readable(self): - error = None - recoverable = False - try: - data = self._socket.recv(64*1024) - if data: - rawlog.debug("READ[%s]: %r", self.log_id, data) - if self._sasl_decode: - data = self._sasl.decode(data) - else: - rawlog.debug("ABORTED[%s]: %s", self.log_id, self._socket.getpeername()) - error = "connection aborted" - recoverable = True - except socket.error, e: - error = e - recoverable = True - - if not error: - try: - if len(self._hdr) < 8: - r = 8 - len(self._hdr) - self._hdr += data[:r] - data = data[r:] - - if len(self._hdr) == 8: - self.do_header(self._hdr) - - self._frame_dec.write(data) - self._seg_dec.write(*self._frame_dec.read()) - self._op_dec.write(*self._seg_dec.read()) - for op in self._op_dec.read(): - self.assign_id(op) - opslog.debug("RCVD[%s]: %r", self.log_id, op) - op.dispatch(self) - except VersionError, e: - error = e - except: - msg = compat.format_exc() - error = msg - - if error: - self._error(error, recoverable) - else: - self.dispatch() - - self.connection._waiter.notifyAll() - - def assign_id(self, op): - if isinstance(op, Command): - sst = self.get_sst(op) - op.id = sst.received - sst.received += 1 - - @synchronized - def writeable(self): - try: - n = self._socket.send(self._buf) - rawlog.debug("SENT[%s]: %r", self.log_id, self._buf[:n]) - self._buf = self._buf[n:] - except socket.error, e: - self._error(e, True) - self.connection._waiter.notifyAll() - - @synchronized - def timeout(self): - self.dispatch() - self.connection._waiter.notifyAll() - - def _error(self, err, recoverable): - if self._socket is not None: - self._socket.close() - self.reset() - if (recoverable and self.connection.reconnect and - (self.connection.reconnect_limit is None or - self.connection.reconnect_limit <= 0 or - self._attempts <= self.connection.reconnect_limit)): - if self._host > 0: - delay = 0 - else: - delay = self.connection.reconnect_delay - self._timeout = time.time() + delay - log.warn("recoverable error[attempt %s]: %s" % (self._attempts, err)) - if delay > 0: - log.warn("sleeping %s seconds" % delay) - self._retrying = True - else: - self.connection.error = (err,) - - def write_op(self, op): - opslog.debug("SENT[%s]: %r", self.log_id, op) - self._op_enc.write(op) - self._seg_enc.write(*self._op_enc.read()) - self._frame_enc.write(*self._seg_enc.read()) - bytes = self._frame_enc.read() - if self._sasl_encode: - bytes = self._sasl.encode(bytes) - self._buf += bytes - - def do_header(self, hdr): - cli_major = 0; cli_minor = 10 - magic, _, _, major, minor = struct.unpack(HEADER, hdr) - if major != cli_major or minor != cli_minor: - raise VersionError("client: %s-%s, server: %s-%s" % - (cli_major, cli_minor, major, minor)) - - def do_connection_start(self, start): - if self.connection.mechanisms: - permitted = self.connection.mechanisms.split() - mechs = [m for m in start.mechanisms if m in permitted] - else: - mechs = start.mechanisms - mech, initial = self._sasl.start(" ".join(mechs)) - self.write_op(ConnectionStartOk(client_properties=CLIENT_PROPERTIES, - mechanism=mech, response=initial)) - - def do_connection_secure(self, secure): - resp = self._sasl.step(secure.challenge) - self.write_op(ConnectionSecureOk(response=resp)) - - def do_connection_tune(self, tune): - # XXX: is heartbeat protocol specific? - if tune.channel_max is not None: - self.channel_max = tune.channel_max - self.write_op(ConnectionTuneOk(heartbeat=self.connection.heartbeat, - channel_max=self.channel_max)) - self.write_op(ConnectionOpen()) - self._sasl_encode = True - - def do_connection_open_ok(self, open_ok): - self._connected = True - self._sasl_decode = True - - def connection_heartbeat(self, hrt): - self.write_op(ConnectionHeartbeat()) - - def do_connection_close(self, close): - self.write_op(ConnectionCloseOk()) - if close.reply_code != close_code.normal: - self.connection.error = (close.reply_code, close.reply_text) - # XXX: should we do a half shutdown on the socket here? - # XXX: we really need to test this, we may end up reporting a - # connection abort after this, if we were to do a shutdown on read - # and stop reading, then we wouldn't report the abort, that's - # probably the right thing to do - - def do_connection_close_ok(self, close_ok): - self._socket.close() - self.reset() - - def do_session_attached(self, atc): - pass - - def do_session_command_point(self, cp): - sst = self.get_sst(cp) - sst.received = cp.command_id - - def do_session_completed(self, sc): - sst = self.get_sst(sc) - for r in sc.commands: - sst.acknowledged.add(r.lower, r.upper) - - if not sc.commands.empty(): - while sst.min_completion in sc.commands: - if sst.actions.has_key(sst.min_completion): - sst.actions.pop(sst.min_completion)() - sst.min_completion += 1 - - def session_known_completed(self, kcmp): - sst = self.get_sst(kcmp) - executed = RangedSet() - for e in sst.executed.ranges: - for ke in kcmp.ranges: - if e.lower in ke and e.upper in ke: - break - else: - executed.add_range(e) - sst.executed = completed - - def do_session_flush(self, sf): - sst = self.get_sst(sf) - if sf.expected: - if sst.received is None: - exp = None - else: - exp = RangedSet(sst.received) - sst.write_op(SessionExpected(exp)) - if sf.confirmed: - sst.write_op(SessionConfirmed(sst.executed)) - if sf.completed: - sst.write_op(SessionCompleted(sst.executed)) - - def do_execution_result(self, er): - sst = self.get_sst(er) - sst.results[er.command_id] = er.value - - def do_execution_exception(self, ex): - sst = self.get_sst(ex) - sst.session.error = (ex,) - - def dispatch(self): - try: - if self._socket is None and self.connection._connected and not self._opening: - self.connect() - elif self._socket is not None and not self.connection._connected and not self._closing: - self.disconnect() - - if self._connected and not self._closing: - for ssn in self.connection.sessions.values(): - self.attach(ssn) - self.process(ssn) - except: - msg = compat.format_exc() - self.connection.error = (msg,) - - def connect(self): - try: - # XXX: should make this non blocking - if self._host == 0: - self._attempts += 1 - host, port = self._hosts[self._host] - if self._retrying: - log.warn("trying: %s:%s", host, port) - self._socket = connect(host, port) - if self._retrying: - log.warn("reconnect succeeded: %s:%s", host, port) - self._timeout = None - self._attempts = 0 - self._host = 0 - self._retrying = False - self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10) - self._opening = True - except socket.error, e: - self._host = (self._host + 1) % len(self._hosts) - self._error(e, True) - - def disconnect(self): - self.write_op(ConnectionClose(close_code.normal)) - self._closing = True - - def attach(self, ssn): - sst = self._attachments.get(ssn) - if sst is None and not ssn.closed: - for i in xrange(0, self.channel_max): - if not self._sessions.has_key(i): - ch = i - break - else: - raise RuntimeError("all channels used") - sst = SessionState(self, ssn, ssn.name, ch) - sst.write_op(SessionAttach(name=ssn.name)) - sst.write_op(SessionCommandPoint(sst.sent, 0)) - sst.outgoing_idx = 0 - sst.acked = [] - if ssn.transactional: - sst.write_cmd(TxSelect()) - self._attachments[ssn] = sst - self._sessions[sst.channel] = sst - - for snd in ssn.senders: - self.link(snd, self._out, snd.target) - for rcv in ssn.receivers: - self.link(rcv, self._in, rcv.source) - - if sst is not None and ssn.closing and not sst.detached: - sst.detached = True - sst.write_op(SessionDetach(name=ssn.name)) - - def get_sst(self, op): - return self._sessions[op.channel] - - def do_session_detached(self, dtc): - sst = self._sessions.pop(dtc.channel) - ssn = sst.session - del self._attachments[ssn] - ssn.closed = True - - def do_session_detach(self, dtc): - sst = self.get_sst(dtc) - sst.write_op(SessionDetached(name=dtc.name)) - self.do_session_detached(dtc) - - def link(self, lnk, dir, addr): - sst = self._attachments.get(lnk.session) - _lnk = self._attachments.get(lnk) - - if _lnk is None and not lnk.closing and not lnk.closed: - _lnk = Attachment(lnk) - _lnk.closing = False - dir.init_link(sst, lnk, _lnk) - - err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk, dir) - if err: - lnk.error = (err,) - lnk.closed = True - return - - def linked(): - lnk.linked = True - - def resolved(type, subtype): - dir.do_link(sst, lnk, _lnk, type, subtype, linked) - - self.resolve_declare(sst, _lnk, dir.DIR_NAME, resolved) - self._attachments[lnk] = _lnk - - if lnk.linked and lnk.closing and not lnk.closed: - if not _lnk.closing: - def unlinked(): - dir.del_link(sst, lnk, _lnk) - del self._attachments[lnk] - lnk.closed = True - if _lnk.options.get("delete") in ("always", dir.DIR_NAME): - dir.do_unlink(sst, lnk, _lnk) - self.delete(sst, _lnk.name, unlinked) - else: - dir.do_unlink(sst, lnk, _lnk, unlinked) - _lnk.closing = True - elif not lnk.linked and lnk.closing and not lnk.closed: - lnk.closed = True - - def parse_address(self, lnk, dir, addr): - if addr is None: - return "%s is None" % dir.ADDR_NAME - else: - try: - lnk.name, lnk.subject, lnk.options = address.parse(addr) - # XXX: subject - if lnk.options is None: - lnk.options = {} - except address.LexError, e: - return e - except address.ParseError, e: - return e - - def validate_options(self, lnk, dir): - ctx = Context() - err = dir.VALIDATOR.validate(lnk.options, ctx) - if err: return "error in options: %s" % err - - def resolve_declare(self, sst, lnk, dir, action): - declare = lnk.options.get("create") in ("always", dir) - def do_resolved(type, subtype): - err = None - if type is None: - if declare: - err = self.declare(sst, lnk, action) - else: - err = ("no such queue: %s" % lnk.name,) - elif type == "queue": - try: - cmds = self.bindings(lnk) - sst.write_cmds(cmds, lambda: action(type, subtype)) - except address.ParseError, e: - err = (e,) - else: - action(type, subtype) - - if err: - tgt = lnk.target - tgt.error = err - del self._attachments[tgt] - tgt.closed = True - return - self.resolve(sst, lnk.name, do_resolved, force=declare) - - def resolve(self, sst, name, action, force=False): - if not force: - try: - type, subtype = self.address_cache[name] - action(type, subtype) - return - except KeyError: - pass - - args = [] - def do_result(r): - args.append(r) - def do_action(r): - do_result(r) - er, qr = args - if er.not_found and not qr.queue: - type, subtype = None, None - elif qr.queue: - type, subtype = "queue", None - else: - type, subtype = "topic", er.type - if type is not None: - self.address_cache[name] = (type, subtype) - action(type, subtype) - sst.write_query(ExchangeQuery(name), do_result) - sst.write_query(QueueQuery(name), do_action) - - def declare(self, sst, lnk, action): - name = lnk.name - props = lnk.options.get("node-properties", {}) - durable = props.get("durable", DURABLE_DEFAULT) - type = props.get("type", "queue") - xprops = props.get("x-properties", {}) - - if type == "topic": - cmd = ExchangeDeclare(exchange=name, durable=durable) - elif type == "queue": - cmd = QueueDeclare(queue=name, durable=durable) - else: - raise ValueError(type) - - for f in cmd.FIELDS: - if f.name != "arguments" and xprops.has_key(f.name): - cmd[f.name] = xprops.pop(f.name) - if xprops: - cmd.arguments = xprops - - if type == "topic": - if cmd.type is None: - cmd.type = "topic" - subtype = cmd.type - else: - subtype = None - - cmds = [cmd] - if type == "queue": - try: - cmds.extend(self.bindings(lnk)) - except address.ParseError, e: - return (e,) - - def declared(): - self.address_cache[name] = (type, subtype) - action(type, subtype) - - sst.write_cmds(cmds, declared) - - def bindings(self, lnk): - props = lnk.options.get("node-properties", {}) - xprops = props.get("x-properties", {}) - bindings = xprops.get("bindings", []) - cmds = [] - for b in bindings: - n, s, o = address.parse(b) - cmds.append(ExchangeBind(lnk.name, n, s, o)) - return cmds - - def delete(self, sst, name, action): - def deleted(): - del self.address_cache[name] - action() - - def do_delete(type, subtype): - if type == "topic": - sst.write_cmd(ExchangeDelete(name), deleted) - elif type == "queue": - sst.write_cmd(QueueDelete(name), deleted) - elif type is None: - action() - else: - raise ValueError(type) - self.resolve(sst, name, do_delete, force=True) - - def process(self, ssn): - if ssn.closed or ssn.closing: return - - sst = self._attachments[ssn] - - while sst.outgoing_idx < len(ssn.outgoing): - msg = ssn.outgoing[sst.outgoing_idx] - snd = msg._sender - # XXX: should check for sender error here - _snd = self._attachments.get(snd) - if _snd and snd.linked: - self.send(snd, msg) - sst.outgoing_idx += 1 - else: - break - - for rcv in ssn.receivers: - self.process_receiver(rcv) - - if ssn.acked: - messages = [m for m in ssn.acked if m not in sst.acked] - if messages: - # XXX: we're ignoring acks that get lost when disconnected, - # could we deal this via some message-id based purge? - ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None]) - for range in ids: - sst.executed.add_range(range) - sst.write_op(SessionCompleted(sst.executed)) - def ack_ack(): - for m in messages: - ssn.acked.remove(m) - if not ssn.transactional: - sst.acked.remove(m) - sst.write_cmd(MessageAccept(ids), ack_ack) - log.debug("SACK[%s]: %s", ssn.log_id, m) - sst.acked.extend(messages) - - if ssn.committing and not sst.committing: - def commit_ok(): - del sst.acked[:] - ssn.committing = False - ssn.committed = True - ssn.aborting = False - ssn.aborted = False - sst.write_cmd(TxCommit(), commit_ok) - sst.committing = True - - if ssn.aborting and not sst.aborting: - sst.aborting = True - def do_rb(): - messages = sst.acked + ssn.unacked + ssn.incoming - ids = RangedSet(*[m._transfer_id for m in messages]) - for range in ids: - sst.executed.add_range(range) - sst.write_op(SessionCompleted(sst.executed)) - sst.write_cmd(MessageRelease(ids)) - sst.write_cmd(TxRollback(), do_rb_ok) - - def do_rb_ok(): - del ssn.incoming[:] - del ssn.unacked[:] - del sst.acked[:] - - for rcv in ssn.receivers: - rcv.impending = rcv.received - rcv.returned = rcv.received - # XXX: do we need to update granted here as well? - - for rcv in ssn.receivers: - self.process_receiver(rcv) - - ssn.aborting = False - ssn.aborted = True - ssn.committing = False - ssn.committed = False - sst.aborting = False - - for rcv in ssn.receivers: - _rcv = self._attachments[rcv] - sst.write_cmd(MessageStop(_rcv.destination)) - sst.write_cmd(ExecutionSync(), do_rb) - - def grant(self, rcv): - sst = self._attachments[rcv.session] - _rcv = self._attachments.get(rcv) - if _rcv is None or not rcv.linked or _rcv.closing or _rcv.draining: - return - - if rcv.granted is UNLIMITED: - if rcv.impending is UNLIMITED: - delta = 0 - else: - delta = UNLIMITED - elif rcv.impending is UNLIMITED: - delta = -1 - else: - delta = max(rcv.granted, rcv.received) - rcv.impending - - if delta is UNLIMITED: - sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) - sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value)) - rcv.impending = UNLIMITED - elif delta > 0: - sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) - sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta)) - rcv.impending += delta - elif delta < 0 and not rcv.draining: - _rcv.draining = True - def do_stop(): - rcv.impending = rcv.received - _rcv.draining = False - self.grant(rcv) - sst.write_cmd(MessageStop(_rcv.destination), do_stop) - - if rcv.draining: - _rcv.draining = True - def do_flush(): - rcv.impending = rcv.received - rcv.granted = rcv.impending - _rcv.draining = False - rcv.draining = False - sst.write_cmd(MessageFlush(_rcv.destination), do_flush) - - - def process_receiver(self, rcv): - if rcv.closed: return - self.grant(rcv) - - def send(self, snd, msg): - sst = self._attachments[snd.session] - _snd = self._attachments[snd] - - if msg.subject is None or _snd._exchange == "": - rk = _snd._routing_key - else: - rk = msg.subject - - if msg.subject is None: - subject = _snd.subject - else: - subject = msg.subject - - # XXX: do we need to query to figure out how to create the reply-to interoperably? - if msg.reply_to: - rt = addr2reply_to(msg.reply_to) - else: - rt = None - dp = DeliveryProperties(routing_key=rk) - mp = MessageProperties(message_id=msg.id, - user_id=msg.user_id, - reply_to=rt, - correlation_id=msg.correlation_id, - content_type=msg.content_type, - application_headers=msg.properties) - if subject is not None: - if mp.application_headers is None: - mp.application_headers = {} - mp.application_headers[SUBJECT] = subject - if msg.to is not None: - if mp.application_headers is None: - mp.application_headers = {} - mp.application_headers[TO] = msg.to - if msg.durable: - dp.delivery_mode = delivery_mode.persistent - enc, dec = get_codec(msg.content_type) - body = enc(msg.content) - def msg_acked(): - # XXX: should we log the ack somehow too? - snd.acked += 1 - m = snd.session.outgoing.pop(0) - sst.outgoing_idx -= 1 - log.debug("RACK[%s]: %s", sst.session.log_id, msg) - assert msg == m - sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp), - payload=body), msg_acked) - log.debug("SENT[%s]: %s", sst.session.log_id, msg) - - def do_message_transfer(self, xfr): - sst = self.get_sst(xfr) - ssn = sst.session - - msg = self._decode(xfr) - rcv = sst.destinations[xfr.destination].target - msg._receiver = rcv - if rcv.impending is not UNLIMITED: - assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending) - rcv.received += 1 - log.debug("RCVD[%s]: %s", ssn.log_id, msg) - ssn.incoming.append(msg) - self.connection._waiter.notifyAll() - - def _decode(self, xfr): - dp = EMPTY_DP - mp = EMPTY_MP - - for h in xfr.headers: - if isinstance(h, DeliveryProperties): - dp = h - elif isinstance(h, MessageProperties): - mp = h - - ap = mp.application_headers - enc, dec = get_codec(mp.content_type) - content = dec(xfr.payload) - msg = Message(content) - msg.id = mp.message_id - if ap is not None: - msg.to = ap.get(TO) - msg.subject = ap.get(SUBJECT) - msg.user_id = mp.user_id - if mp.reply_to is not None: - msg.reply_to = reply_to2addr(mp.reply_to) - msg.correlation_id = mp.correlation_id - msg.durable = dp.delivery_mode == delivery_mode.persistent - msg.redelivered = dp.redelivered - msg.properties = mp.application_headers - msg.content_type = mp.content_type - msg._transfer_id = xfr.id - return msg diff --git a/qpid/python/qpid/messaging.py b/qpid/python/qpid/messaging.py deleted file mode 100644 index ed67879a4b..0000000000 --- a/qpid/python/qpid/messaging.py +++ /dev/null @@ -1,1013 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -A candidate high level messaging API for python. - -Areas that still need work: - - - definition of the arguments for L{Session.sender} and L{Session.receiver} - - standard L{Message} properties - - L{Message} content encoding - - protocol negotiation/multiprotocol impl -""" - -from codec010 import StringCodec -from concurrency import synchronized, Waiter, Condition -from datatypes import timestamp, uuid4, Serial -from logging import getLogger -from ops import PRIMITIVE -from threading import Thread, RLock -from util import default - -log = getLogger("qpid.messaging") - -static = staticmethod - -AMQP_PORT = 5672 -AMQPS_PORT = 5671 - -class Constant: - - def __init__(self, name, value=None): - self.name = name - self.value = value - - def __repr__(self): - return self.name - -UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL) - -class ConnectionError(Exception): - """ - The base class for all connection related exceptions. - """ - pass - -class ConnectError(ConnectionError): - """ - Exception raised when there is an error connecting to the remote - peer. - """ - pass - -class Connection: - - """ - A Connection manages a group of L{Sessions} and connects - them with a remote endpoint. - """ - - @static - def open(host, port=None, username="guest", password="guest", **options): - """ - Creates an AMQP connection and connects it to the given host and port. - - @type host: str - @param host: the name or ip address of the remote host - @type port: int - @param port: the port number of the remote host - @rtype: Connection - @return: a connected Connection - """ - conn = Connection(host, port, username, password, **options) - conn.connect() - return conn - - def __init__(self, host, port=None, username="guest", password="guest", **options): - """ - Creates a connection. A newly created connection must be connected - with the Connection.connect() method before it can be used. - - @type host: str - @param host: the name or ip address of the remote host - @type port: int - @param port: the port number of the remote host - @rtype: Connection - @return: a disconnected Connection - """ - self.host = host - self.port = default(port, AMQP_PORT) - self.username = username - self.password = password - self.mechanisms = options.get("mechanisms") - self.heartbeat = options.get("heartbeat") - self.reconnect = options.get("reconnect", False) - self.reconnect_delay = options.get("reconnect_delay", 3) - self.reconnect_limit = options.get("reconnect_limit") - self.backups = options.get("backups", []) - self.options = options - - self.id = str(uuid4()) - self.session_counter = 0 - self.sessions = {} - self._connected = False - self._lock = RLock() - self._condition = Condition(self._lock) - self._waiter = Waiter(self._condition) - self._modcount = Serial(0) - self.error = None - from driver import Driver - self._driver = Driver(self) - self._driver.start() - - def _wait(self, predicate, timeout=None): - return self._waiter.wait(predicate, timeout=timeout) - - def _wakeup(self): - self._modcount += 1 - self._driver.wakeup() - - def _check_error(self, exc=ConnectionError): - if self.error: - raise exc(*self.error) - - def _ewait(self, predicate, timeout=None, exc=ConnectionError): - result = self._wait(lambda: self.error or predicate(), timeout) - self._check_error(exc) - return result - - @synchronized - def session(self, name=None, transactional=False): - """ - Creates or retrieves the named session. If the name is omitted or - None, then a unique name is chosen based on a randomly generated - uuid. - - @type name: str - @param name: the session name - @rtype: Session - @return: the named Session - """ - - if name is None: - name = "%s:%s" % (self.id, self.session_counter) - self.session_counter += 1 - else: - name = "%s:%s" % (self.id, name) - - if self.sessions.has_key(name): - return self.sessions[name] - else: - ssn = Session(self, name, transactional) - self.sessions[name] = ssn - self._wakeup() - return ssn - - @synchronized - def _remove_session(self, ssn): - del self.sessions[ssn.name] - - @synchronized - def connect(self): - """ - Connect to the remote endpoint. - """ - self._connected = True - self._wakeup() - self._ewait(lambda: self._driver._connected and not self._unlinked(), - exc=ConnectError) - - def _unlinked(self): - return [l - for ssn in self.sessions.values() - for l in ssn.senders + ssn.receivers - if not (l.linked or l.error or l.closed)] - - @synchronized - def disconnect(self): - """ - Disconnect from the remote endpoint. - """ - self._connected = False - self._wakeup() - self._ewait(lambda: not self._driver._connected) - - @synchronized - def connected(self): - """ - Return true if the connection is connected, false otherwise. - """ - return self._connected - - @synchronized - def close(self): - """ - Close the connection and all sessions. - """ - for ssn in self.sessions.values(): - ssn.close() - self.disconnect() - -class Pattern: - """ - The pattern filter matches the supplied wildcard pattern against a - message subject. - """ - - def __init__(self, value): - self.value = value - - # XXX: this should become part of the driver - def _bind(self, sst, exchange, queue): - from qpid.ops import ExchangeBind - sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue, - binding_key=self.value.replace("*", "#"))) - -class SessionError(Exception): - pass - -class Disconnected(SessionError): - """ - Exception raised when an operation is attempted that is illegal when - disconnected. - """ - pass - -class NontransactionalSession(SessionError): - """ - Exception raised when commit or rollback is attempted on a non - transactional session. - """ - pass - -class TransactionAborted(SessionError): - pass - -class Session: - - """ - Sessions provide a linear context for sending and receiving - L{Messages}. L{Messages} are sent and received - using the L{Sender.send} and L{Receiver.fetch} methods of the - L{Sender} and L{Receiver} objects associated with a Session. - - Each L{Sender} and L{Receiver} is created by supplying either a - target or source address to the L{sender} and L{receiver} methods of - the Session. The address is supplied via a string syntax documented - below. - - Addresses - ========= - - An address identifies a source or target for messages. In its - simplest form this is just a name. In general a target address may - also be used as a source address, however not all source addresses - may be used as a target, e.g. a source might additionally have some - filtering criteria that would not be present in a target. - - A subject may optionally be specified along with the name. When an - address is used as a target, any subject specified in the address is - used as the default subject of outgoing messages for that target. - When an address is used as a source, any subject specified in the - address is pattern matched against the subject of available messages - as a filter for incoming messages from that source. - - The options map contains additional information about the address - including: - - - policies for automatically creating, and deleting the node to - which an address refers - - - policies for asserting facts about the node to which an address - refers - - - extension points that can be used for sender/receiver - configuration - - Mapping to AMQP 0-10 - -------------------- - The name is resolved to either an exchange or a queue by querying - the broker. - - The subject is set as a property on the message. Additionally, if - the name refers to an exchange, the routing key is set to the - subject. - - Syntax - ------ - The following regular expressions define the tokens used to parse - addresses:: - LBRACE: \\{ - RBRACE: \\} - LBRACK: \\[ - RBRACK: \\] - COLON: : - SEMI: ; - SLASH: / - COMMA: , - NUMBER: [+-]?[0-9]*\\.?[0-9]+ - ID: [a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])? - STRING: "(?:[^\\\\"]|\\\\.)*"|\'(?:[^\\\\\']|\\\\.)*\' - ESC: \\\\[^ux]|\\\\x[0-9a-fA-F][0-9a-fA-F]|\\\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F] - SYM: [.#*%@$^!+-] - WSPACE: [ \\n\\r\\t]+ - - The formal grammar for addresses is given below:: - address = name [ "/" subject ] [ ";" options ] - name = ( part | quoted )+ - subject = ( part | quoted | "/" )* - quoted = STRING / ESC - part = LBRACE / RBRACE / COLON / COMMA / NUMBER / ID / SYM - options = map - map = "{" ( keyval ( "," keyval )* )? "}" - keyval = ID ":" value - value = NUMBER / STRING / ID / map / list - list = "[" ( value ( "," value )* )? "]" - - This grammar resuls in the following informal syntax:: - - [ / ] [ ; ] - - Where options is:: - - { : , ... } - - And values may be: - - numbers - - single, double, or non quoted strings - - maps (dictionaries) - - lists - - Options - ------- - The options map permits the following parameters:: - - [ / ] ; { - create: , - delete: , - assert: , - node-properties: { - type: , - durable: , - x-properties: { - bindings: ["/", ...], - : - } - } - } - - The create, delete, and assert policies specify who should perfom - the associated action: - - - I{always}: the action will always be performed - - I{sender}: the action will only be performed by the sender - - I{receiver}: the action will only be performed by the receiver - - I{never}: the action will never be performed (this is the default) - - The node-type is one of: - - - I{topic}: a topic node will default to the topic exchange, - x-properties may be used to specify other exchange types - - I{queue}: this is the default node-type - - The x-properties map permits arbitrary additional keys and values to - be specified. These keys and values are passed through when creating - a node or asserting facts about an existing node. Any passthrough - keys and values that do not match a standard field of the underlying - exchange or queue declare command will be sent in the arguments map. - - Examples - -------- - A simple name resolves to any named node, usually a queue or a - topic:: - - my-queue-or-topic - - A simple name with a subject will also resolve to a node, but the - presence of the subject will cause a sender using this address to - set the subject on outgoing messages, and receivers to filter based - on the subject:: - - my-queue-or-topic/my-subject - - A subject pattern can be used and will cause filtering if used by - the receiver. If used for a sender, the literal value gets set as - the subject:: - - my-queue-or-topic/my-* - - In all the above cases, the address is resolved to an existing node. - If you want the node to be auto-created, then you can do the - following. By default nonexistent nodes are assumed to be queues:: - - my-queue; {create: always} - - You can customize the properties of the queue:: - - my-queue; {create: always, node-properties: {durable: True}} - - You can create a topic instead if you want:: - - my-queue; {create: always, node-properties: {type: topic}} - - You can assert that the address resolves to a node with particular - properties:: - - my-transient-topic; { - assert: always, - node-properties: { - type: topic, - durable: False - } - } - """ - - def __init__(self, connection, name, transactional): - self.connection = connection - self.name = name - self.log_id = "%x" % id(self) - - self.transactional = transactional - - self.committing = False - self.committed = True - self.aborting = False - self.aborted = False - - self.next_sender_id = 0 - self.senders = [] - self.next_receiver_id = 0 - self.receivers = [] - self.outgoing = [] - self.incoming = [] - self.unacked = [] - self.acked = [] - # XXX: I hate this name. - self.ack_capacity = UNLIMITED - - self.error = None - self.closing = False - self.closed = False - - self._lock = connection._lock - - def __repr__(self): - return "" % self.name - - def _wait(self, predicate, timeout=None): - return self.connection._wait(predicate, timeout=timeout) - - def _wakeup(self): - self.connection._wakeup() - - def _check_error(self, exc=SessionError): - self.connection._check_error(exc) - if self.error: - raise exc(*self.error) - - def _ewait(self, predicate, timeout=None, exc=SessionError): - result = self.connection._ewait(lambda: self.error or predicate(), timeout, exc) - self._check_error(exc) - return result - - @synchronized - def sender(self, target, **options): - """ - Creates a L{Sender} that may be used to send L{Messages} - to the specified target. - - @type target: str - @param target: the target to which messages will be sent - @rtype: Sender - @return: a new Sender for the specified target - """ - sender = Sender(self, self.next_sender_id, target, options) - self.next_sender_id += 1 - self.senders.append(sender) - if not self.closed and self.connection._connected: - self._wakeup() - try: - sender._ewait(lambda: sender.linked) - except SendError, e: - sender.close() - raise e - return sender - - @synchronized - def receiver(self, source, **options): - """ - Creates a receiver that may be used to fetch L{Messages} - from the specified source. - - @type source: str - @param source: the source of L{Messages} - @rtype: Receiver - @return: a new Receiver for the specified source - """ - receiver = Receiver(self, self.next_receiver_id, source, options) - self.next_receiver_id += 1 - self.receivers.append(receiver) - if not self.closed and self.connection._connected: - self._wakeup() - try: - receiver._ewait(lambda: receiver.linked) - except ReceiveError, e: - receiver.close() - raise e - return receiver - - @synchronized - def _count(self, predicate): - result = 0 - for msg in self.incoming: - if predicate(msg): - result += 1 - return result - - def _peek(self, predicate): - for msg in self.incoming: - if predicate(msg): - return msg - - def _pop(self, predicate): - i = 0 - while i < len(self.incoming): - msg = self.incoming[i] - if predicate(msg): - del self.incoming[i] - return msg - else: - i += 1 - - @synchronized - def _get(self, predicate, timeout=None): - if self._ewait(lambda: ((self._peek(predicate) is not None) or self.closing), - timeout): - msg = self._pop(predicate) - if msg is not None: - msg._receiver.returned += 1 - self.unacked.append(msg) - log.debug("RETR[%s]: %s", self.log_id, msg) - return msg - return None - - @synchronized - def next_receiver(self, timeout=None): - if self._ewait(lambda: self.incoming, timeout): - return self.incoming[0]._receiver - else: - raise Empty - - @synchronized - def acknowledge(self, message=None, sync=True): - """ - Acknowledge the given L{Message}. If message is None, then all - unacknowledged messages on the session are acknowledged. - - @type message: Message - @param message: the message to acknowledge or None - @type sync: boolean - @param sync: if true then block until the message(s) are acknowledged - """ - if message is None: - messages = self.unacked[:] - else: - messages = [message] - - for m in messages: - if self.ack_capacity is not UNLIMITED: - if self.ack_capacity <= 0: - # XXX: this is currently a SendError, maybe it should be a SessionError? - raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity) - self._wakeup() - self._ewait(lambda: len(self.acked) < self.ack_capacity) - self.unacked.remove(m) - self.acked.append(m) - - self._wakeup() - if sync: - self._ewait(lambda: not [m for m in messages if m in self.acked]) - - @synchronized - def commit(self): - """ - Commit outstanding transactional work. This consists of all - message sends and receives since the prior commit or rollback. - """ - if not self.transactional: - raise NontransactionalSession() - self.committing = True - self._wakeup() - self._ewait(lambda: not self.committing) - if self.aborted: - raise TransactionAborted() - assert self.committed - - @synchronized - def rollback(self): - """ - Rollback outstanding transactional work. This consists of all - message sends and receives since the prior commit or rollback. - """ - if not self.transactional: - raise NontransactionalSession() - self.aborting = True - self._wakeup() - self._ewait(lambda: not self.aborting) - assert self.aborted - - @synchronized - def close(self): - """ - Close the session. - """ - # XXX: should be able to express this condition through API calls - self._ewait(lambda: not self.outgoing and not self.acked) - - for link in self.receivers + self.senders: - link.close() - - self.closing = True - self._wakeup() - self._ewait(lambda: self.closed) - self.connection._remove_session(self) - -class SendError(SessionError): - pass - -class InsufficientCapacity(SendError): - pass - -class Sender: - - """ - Sends outgoing messages. - """ - - def __init__(self, session, id, target, options): - self.session = session - self.id = id - self.target = target - self.options = options - self.capacity = options.get("capacity", UNLIMITED) - self.durable = options.get("durable") - self.queued = Serial(0) - self.acked = Serial(0) - self.error = None - self.linked = False - self.closing = False - self.closed = False - self._lock = self.session._lock - - def _wakeup(self): - self.session._wakeup() - - def _check_error(self, exc=SendError): - self.session._check_error(exc) - if self.error: - raise exc(*self.error) - - def _ewait(self, predicate, timeout=None, exc=SendError): - result = self.session._ewait(lambda: self.error or predicate(), timeout, exc) - self._check_error(exc) - return result - - @synchronized - def pending(self): - """ - Returns the number of messages awaiting acknowledgment. - @rtype: int - @return: the number of unacknowledged messages - """ - return self.queued - self.acked - - @synchronized - def send(self, object, sync=True, timeout=None): - """ - Send a message. If the object passed in is of type L{unicode}, - L{str}, L{list}, or L{dict}, it will automatically be wrapped in a - L{Message} and sent. If it is of type L{Message}, it will be sent - directly. If the sender capacity is not L{UNLIMITED} then send - will block until there is available capacity to send the message. - If the timeout parameter is specified, then send will throw an - L{InsufficientCapacity} exception if capacity does not become - available within the specified time. - - @type object: unicode, str, list, dict, Message - @param object: the message or content to send - - @type sync: boolean - @param sync: if true then block until the message is sent - - @type timeout: float - @param timeout: the time to wait for available capacity - """ - - if not self.session.connection._connected or self.session.closing: - raise Disconnected() - - self._ewait(lambda: self.linked) - - if isinstance(object, Message): - message = object - else: - message = Message(object) - - if message.durable is None: - message.durable = self.durable - - if self.capacity is not UNLIMITED: - if self.capacity <= 0: - raise InsufficientCapacity("capacity = %s" % self.capacity) - if not self._ewait(lambda: self.pending() < self.capacity, timeout=timeout): - raise InsufficientCapacity("capacity = %s" % self.capacity) - - # XXX: what if we send the same message to multiple senders? - message._sender = self - self.session.outgoing.append(message) - self.queued += 1 - - self._wakeup() - - if sync: - self.sync() - assert message not in self.session.outgoing - - @synchronized - def sync(self): - mno = self.queued - self._ewait(lambda: self.acked >= mno) - - @synchronized - def close(self): - """ - Close the Sender. - """ - self.closing = True - self._wakeup() - try: - self.session._ewait(lambda: self.closed) - finally: - self.session.senders.remove(self) - -class ReceiveError(SessionError): - pass - -class Empty(ReceiveError): - """ - Exception raised by L{Receiver.fetch} when there is no message - available within the alloted time. - """ - pass - -class Receiver(object): - - """ - Receives incoming messages from a remote source. Messages may be - fetched with L{fetch}. - """ - - def __init__(self, session, id, source, options): - self.session = session - self.id = id - self.source = source - self.options = options - - self.granted = Serial(0) - self.draining = False - self.impending = Serial(0) - self.received = Serial(0) - self.returned = Serial(0) - - self.error = None - self.linked = False - self.closing = False - self.closed = False - self._lock = self.session._lock - self._capacity = 0 - self._set_capacity(options.get("capacity", 0), False) - - @synchronized - def _set_capacity(self, c, wakeup=True): - if c is UNLIMITED: - self._capacity = c.value - else: - self._capacity = c - self._grant() - if wakeup: - self._wakeup() - - def _get_capacity(self): - if self._capacity == UNLIMITED.value: - return UNLIMITED - else: - return self._capacity - - capacity = property(_get_capacity, _set_capacity) - - def _wakeup(self): - self.session._wakeup() - - def _check_error(self, exc=ReceiveError): - self.session._check_error(exc) - if self.error: - raise exc(*self.error) - - def _ewait(self, predicate, timeout=None, exc=ReceiveError): - result = self.session._ewait(lambda: self.error or predicate(), timeout, exc) - self._check_error(exc) - return result - - @synchronized - def pending(self): - """ - Returns the number of messages available to be fetched by the - application. - - @rtype: int - @return: the number of available messages - """ - return self.received - self.returned - - def _pred(self, msg): - return msg._receiver == self - - @synchronized - def fetch(self, timeout=None): - """ - Fetch and return a single message. A timeout of None will block - forever waiting for a message to arrive, a timeout of zero will - return immediately if no messages are available. - - @type timeout: float - @param timeout: the time to wait for a message to be available - """ - - self._ewait(lambda: self.linked) - - if self._capacity == 0: - self.granted = self.returned + 1 - self._wakeup() - self._ewait(lambda: self.impending >= self.granted) - msg = self.session._get(self._pred, timeout=timeout) - if msg is None: - self.draining = True - self._wakeup() - self._ewait(lambda: not self.draining) - self._grant() - self._wakeup() - msg = self.session._get(self._pred, timeout=0) - if msg is None: - raise Empty() - elif self._capacity not in (0, UNLIMITED.value): - self.granted += 1 - self._wakeup() - return msg - - def _grant(self): - if self._capacity == UNLIMITED.value: - self.granted = UNLIMITED - else: - self.granted = self.received + self._capacity - - @synchronized - def close(self): - """ - Close the receiver. - """ - self.closing = True - self._wakeup() - try: - self.session._ewait(lambda: self.closed) - finally: - self.session.receivers.remove(self) - -def codec(name): - type = PRIMITIVE[name] - - def encode(x): - sc = StringCodec() - sc.write_primitive(type, x) - return sc.encoded - - def decode(x): - sc = StringCodec(x) - return sc.read_primitive(type) - - return encode, decode - -# XXX: need to correctly parse the mime type and deal with -# content-encoding header - -TYPE_MAPPINGS={ - dict: "amqp/map", - list: "amqp/list", - unicode: "text/plain; charset=utf8", - unicode: "text/plain", - buffer: None, - str: None, - None.__class__: None - } - -TYPE_CODEC={ - "amqp/map": codec("map"), - "amqp/list": codec("list"), - "text/plain; charset=utf8": (lambda x: x.encode("utf8"), lambda x: x.decode("utf8")), - "text/plain": (lambda x: x.encode("utf8"), lambda x: x.decode("utf8")), - "": (lambda x: x, lambda x: x), - None: (lambda x: x, lambda x: x) - } - -def get_type(content): - return TYPE_MAPPINGS[content.__class__] - -def get_codec(content_type): - return TYPE_CODEC[content_type] - -UNSPECIFIED = object() - -class Message: - - """ - A message consists of a standard set of fields, an application - defined set of properties, and some content. - - @type id: str - @ivar id: the message id - @type user_id: str - @ivar user_id: the user-id of the message producer - @type to: str - @ivar to: the destination address - @type reply_to: str - @ivar reply_to: the address to send replies - @type correlation_id: str - @ivar correlation_id: a correlation-id for the message - @type properties: dict - @ivar properties: application specific message properties - @type content_type: str - @ivar content_type: the content-type of the message - @type content: str, unicode, buffer, dict, list - @ivar content: the message content - """ - - def __init__(self, content=None, content_type=UNSPECIFIED, id=None, - subject=None, to=None, user_id=None, reply_to=None, - correlation_id=None, durable=None, properties=None): - """ - Construct a new message with the supplied content. The - content-type of the message will be automatically inferred from - type of the content parameter. - - @type content: str, unicode, buffer, dict, list - @param content: the message content - - @type content_type: str - @param content_type: the content-type of the message - """ - self.id = id - self.subject = subject - self.to = to - self.user_id = user_id - self.reply_to = reply_to - self.correlation_id = correlation_id - self.durable = durable - self.redelivered = False - if properties is None: - self.properties = {} - else: - self.properties = properties - if content_type is UNSPECIFIED: - self.content_type = get_type(content) - else: - self.content_type = content_type - self.content = content - - def __repr__(self): - args = [] - for name in ["id", "subject", "to", "user_id", "reply_to", - "correlation_id"]: - value = self.__dict__[name] - if value is not None: args.append("%s=%r" % (name, value)) - for name in ["durable", "properties"]: - value = self.__dict__[name] - if value: args.append("%s=%r" % (name, value)) - if self.content_type != get_type(self.content): - args.append("content_type=%r" % self.content_type) - if self.content is not None: - if args: - args.append("content=%r" % self.content) - else: - args.append(repr(self.content)) - return "Message(%s)" % ", ".join(args) - -__all__ = ["Connection", "Session", "Sender", "Receiver", "Pattern", "Message", - "ConnectionError", "ConnectError", "SessionError", "Disconnected", - "SendError", "InsufficientCapacity", "ReceiveError", "Empty", - "timestamp", "uuid4", "UNLIMITED", "AMQP_PORT", "AMQPS_PORT"] diff --git a/qpid/python/qpid/messaging/__init__.py b/qpid/python/qpid/messaging/__init__.py new file mode 100644 index 0000000000..f9ddda2e80 --- /dev/null +++ b/qpid/python/qpid/messaging/__init__.py @@ -0,0 +1,35 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +A candidate high level messaging API for python. + +Areas that still need work: + + - definition of the arguments for L{Session.sender} and L{Session.receiver} + - standard L{Message} properties + - L{Message} content encoding + - protocol negotiation/multiprotocol impl +""" + +from qpid.datatypes import timestamp, uuid4, Serial +from qpid.messaging.constants import * +from qpid.messaging.endpoints import * +from qpid.messaging.exceptions import * +from qpid.messaging.message import * diff --git a/qpid/python/qpid/messaging/address.py b/qpid/python/qpid/messaging/address.py new file mode 100644 index 0000000000..bf494433e4 --- /dev/null +++ b/qpid/python/qpid/messaging/address.py @@ -0,0 +1,168 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import re +from qpid.lexer import Lexicon, LexError +from qpid.parser import Parser, ParseError + +l = Lexicon() + +LBRACE = l.define("LBRACE", r"\{") +RBRACE = l.define("RBRACE", r"\}") +LBRACK = l.define("LBRACK", r"\[") +RBRACK = l.define("RBRACK", r"\]") +COLON = l.define("COLON", r":") +SEMI = l.define("SEMI", r";") +SLASH = l.define("SLASH", r"/") +COMMA = l.define("COMMA", r",") +NUMBER = l.define("NUMBER", r'[+-]?[0-9]*\.?[0-9]+') +TRUE = l.define("TRUE", r'True') +FALSE = l.define("FALSE", r'False') +ID = l.define("ID", r'[a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])?') +STRING = l.define("STRING", r""""(?:[^\\"]|\\.)*"|'(?:[^\\']|\\.)*'""") +ESC = l.define("ESC", r"\\[^ux]|\\x[0-9a-fA-F][0-9a-fA-F]|\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F]") +SYM = l.define("SYM", r"[.#*%@$^!+-]") +WSPACE = l.define("WSPACE", r"[ \n\r\t]+") +EOF = l.eof("EOF") + +LEXER = l.compile() + +def lex(st): + return LEXER.lex(st) + +def tok2str(tok): + if tok.type is STRING: + return eval(tok.value) + elif tok.type is ESC: + if tok.value[1] == "x": + return eval('"%s"' % tok.value) + elif tok.value[1] == "u": + return eval('u"%s"' % tok.value) + else: + return tok.value[1] + else: + return tok.value + +def tok2obj(tok): + if tok.type in (STRING, NUMBER): + return eval(tok.value) + elif tok.type == TRUE: + return True + elif tok.type == FALSE: + return False + else: + return tok.value + +def toks2str(toks): + if toks: + return "".join(map(tok2str, toks)) + else: + return None + +class AddressParser(Parser): + + def __init__(self, tokens): + Parser.__init__(self, [t for t in tokens if t.type is not WSPACE]) + + def parse(self): + result = self.address() + self.eat(EOF) + return result + + def address(self): + name = toks2str(self.eat_until(SLASH, SEMI, EOF)) + + if name is None: + raise ParseError(self.next()) + + if self.matches(SLASH): + self.eat(SLASH) + subject = toks2str(self.eat_until(SEMI, EOF)) + else: + subject = None + + if self.matches(SEMI): + self.eat(SEMI) + options = self.map() + else: + options = None + return name, subject, options + + def map(self): + self.eat(LBRACE) + + result = {} + while True: + if self.matches(NUMBER, STRING, ID, LBRACE, LBRACK): + n, v = self.keyval() + result[n] = v + if self.matches(COMMA): + self.eat(COMMA) + elif self.matches(RBRACE): + break + else: + raise ParseError(self.next(), COMMA, RBRACE) + elif self.matches(RBRACE): + break + else: + raise ParseError(self.next(), NUMBER, STRING, ID, LBRACE, LBRACK, + RBRACE) + + self.eat(RBRACE) + return result + + def keyval(self): + key = self.value() + self.eat(COLON) + val = self.value() + return (key, val) + + def value(self): + if self.matches(NUMBER, STRING, ID, TRUE, FALSE): + return tok2obj(self.eat()) + elif self.matches(LBRACE): + return self.map() + elif self.matches(LBRACK): + return self.list() + else: + raise ParseError(self.next(), NUMBER, STRING, ID, LBRACE, LBRACK) + + def list(self): + self.eat(LBRACK) + + result = [] + + while True: + if self.matches(RBRACK): + break + else: + result.append(self.value()) + if self.matches(COMMA): + self.eat(COMMA) + elif self.matches(RBRACK): + break + else: + raise ParseError(self.next(), COMMA, RBRACK) + + self.eat(RBRACK) + return result + +def parse(addr): + return AddressParser(lex(addr)).parse() + +__all__ = ["parse", "ParseError"] diff --git a/qpid/python/qpid/messaging/constants.py b/qpid/python/qpid/messaging/constants.py new file mode 100644 index 0000000000..cad47bd52a --- /dev/null +++ b/qpid/python/qpid/messaging/constants.py @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +class Constant: + + def __init__(self, name, value=None): + self.name = name + self.value = value + + def __repr__(self): + return self.name + +AMQP_PORT = 5672 +AMQPS_PORT = 5671 + +UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL) diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py new file mode 100644 index 0000000000..9d58616804 --- /dev/null +++ b/qpid/python/qpid/messaging/driver.py @@ -0,0 +1,1041 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import socket, struct, sys, time +from logging import getLogger +from qpid import compat +from qpid import sasl +from qpid.concurrency import synchronized +from qpid.datatypes import RangedSet, Serial +from qpid.exceptions import Timeout, VersionError +from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \ + FrameDecoder, SegmentDecoder, OpDecoder +from qpid.messaging import address +from qpid.messaging.constants import UNLIMITED +from qpid.messaging.endpoints import Pattern +from qpid.messaging.exceptions import ConnectError +from qpid.messaging.message import get_codec, Message +from qpid.ops import * +from qpid.selector import Selector +from qpid.util import connect +from qpid.validator import And, Context, Map, Types, Values +from threading import Condition, Thread + +log = getLogger("qpid.messaging") +rawlog = getLogger("qpid.messaging.io.raw") +opslog = getLogger("qpid.messaging.io.ops") + +def addr2reply_to(addr): + name, subject, options = address.parse(addr) + return ReplyTo(name, subject) + +def reply_to2addr(reply_to): + if reply_to.routing_key is None: + return reply_to.exchange + elif reply_to.exchange in (None, ""): + return reply_to.routing_key + else: + return "%s/%s" % (reply_to.exchange, reply_to.routing_key) + +class Attachment: + + def __init__(self, target): + self.target = target + +# XXX + +DURABLE_DEFAULT=True + +# XXX + +FILTER_DEFAULTS = { + "topic": Pattern("*"), + "amq.failover": Pattern("DUMMY") + } + +# XXX +ppid = 0 +try: + ppid = os.getppid() +except: + pass + +CLIENT_PROPERTIES = {"product": "qpid python client", + "version": "development", + "platform": os.name, + "qpid.client_process": os.path.basename(sys.argv[0]), + "qpid.client_pid": os.getpid(), + "qpid.client_ppid": ppid} + +def noop(): pass + +class SessionState: + + def __init__(self, driver, session, name, channel): + self.driver = driver + self.session = session + self.name = name + self.channel = channel + self.detached = False + self.committing = False + self.aborting = False + + # sender state + self.sent = Serial(0) + self.acknowledged = RangedSet() + self.actions = {} + self.min_completion = self.sent + self.max_completion = self.sent + self.results = {} + + # receiver state + self.received = None + self.executed = RangedSet() + + # XXX: need to periodically exchange completion/known_completion + + self.destinations = {} + + def write_query(self, query, handler): + id = self.sent + self.write_cmd(query, lambda: handler(self.results.pop(id))) + + def write_cmd(self, cmd, action=noop): + if action != noop: + cmd.sync = True + if self.detached: + raise Exception("detached") + cmd.id = self.sent + self.sent += 1 + self.actions[cmd.id] = action + self.max_completion = cmd.id + self.write_op(cmd) + + def write_cmds(self, cmds, action=noop): + if cmds: + for cmd in cmds[:-1]: + self.write_cmd(cmd) + self.write_cmd(cmds[-1], action) + else: + action() + + def write_op(self, op): + op.channel = self.channel + self.driver.write_op(op) + +POLICIES = Values("always", "sender", "receiver", "never") + +class Bindings: + + def validate(self, o, ctx): + t = ctx.containers[1].get("type", "queue") + if t != "queue": + return "bindings are only permitted on nodes of type queue" + +COMMON_OPTS = { + "create": POLICIES, + "delete": POLICIES, + "assert": POLICIES, + "node-properties": Map({ + "type": Values("queue", "topic"), + "durable": Types(bool), + "x-properties": Map({ + "type": Types(basestring), + "bindings": And(Types(list), Bindings()) + }, + restricted=False) + }) + } + +RECEIVE_MODES = Values("browse", "consume") + +SOURCE_OPTS = COMMON_OPTS.copy() +SOURCE_OPTS.update({ + "mode": RECEIVE_MODES + }) + +TARGET_OPTS = COMMON_OPTS.copy() + +class LinkIn: + + ADDR_NAME = "source" + DIR_NAME = "receiver" + VALIDATOR = Map(SOURCE_OPTS) + + def init_link(self, sst, rcv, _rcv): + _rcv.destination = str(rcv.id) + sst.destinations[_rcv.destination] = _rcv + _rcv.draining = False + + def do_link(self, sst, rcv, _rcv, type, subtype, action): + acq_mode = acquire_mode.pre_acquired + + if type == "topic": + _rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination) + sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)) + filter = _rcv.options.get("filter") + if _rcv.subject is None and filter is None: + f = FILTER_DEFAULTS[subtype] + elif _rcv.subject and filter: + # XXX + raise Exception("can't supply both subject and filter") + elif _rcv.subject: + # XXX + f = Pattern(_rcv.subject) + else: + f = filter + f._bind(sst, _rcv.name, _rcv._queue) + elif type == "queue": + _rcv._queue = _rcv.name + if _rcv.options.get("mode", "consume") == "browse": + acq_mode = acquire_mode.not_acquired + + sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination, + acquire_mode = acq_mode)) + sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action) + + def do_unlink(self, sst, rcv, _rcv, action=noop): + sst.write_cmd(MessageCancel(_rcv.destination), action) + + def del_link(self, sst, rcv, _rcv): + del sst.destinations[_rcv.destination] + +class LinkOut: + + ADDR_NAME = "target" + DIR_NAME = "sender" + VALIDATOR = Map(TARGET_OPTS) + + def init_link(self, sst, snd, _snd): + _snd.closing = False + + def do_link(self, sst, snd, _snd, type, subtype, action): + if type == "topic": + _snd._exchange = _snd.name + _snd._routing_key = _snd.subject + elif type == "queue": + _snd._exchange = "" + _snd._routing_key = _snd.name + action() + + def do_unlink(self, sst, snd, _snd, action=noop): + action() + + def del_link(self, sst, snd, _snd): + pass + +class Cache: + + def __init__(self, ttl): + self.ttl = ttl + self.entries = {} + + def __setitem__(self, key, value): + self.entries[key] = time.time(), value + + def __getitem__(self, key): + tstamp, value = self.entries[key] + if time.time() - tstamp >= self.ttl: + del self.entries[key] + raise KeyError(key) + else: + return value + + def __delitem__(self, key): + del self.entries[key] + +# XXX +HEADER="!4s4B" + +EMPTY_DP = DeliveryProperties() +EMPTY_MP = MessageProperties() + +SUBJECT = "qpid.subject" +TO = "qpid.to" + +class Driver: + + def __init__(self, connection): + self.connection = connection + self.log_id = "%x" % id(self.connection) + self._lock = self.connection._lock + + self._in = LinkIn() + self._out = LinkOut() + + self._selector = Selector.default() + self._attempts = 0 + self._hosts = [(self.connection.host, self.connection.port)] + \ + self.connection.backups + self._host = 0 + self._retrying = False + + self.reset() + + def reset(self): + self._opening = False + self._closing = False + self._connected = False + self._attachments = {} + + self._channel_max = 65536 + self._channels = 0 + self._sessions = {} + + options = self.connection.options + + self.address_cache = Cache(options.get("address_ttl", 60)) + + self._socket = None + self._buf = "" + self._hdr = "" + self._op_enc = OpEncoder() + self._seg_enc = SegmentEncoder() + self._frame_enc = FrameEncoder() + self._frame_dec = FrameDecoder() + self._seg_dec = SegmentDecoder() + self._op_dec = OpDecoder() + self._timeout = None + + self._sasl = sasl.Client() + if self.connection.username: + self._sasl.setAttr("username", self.connection.username) + if self.connection.password: + self._sasl.setAttr("password", self.connection.password) + if self.connection.host: + self._sasl.setAttr("host", self.connection.host) + self._sasl.setAttr("service", options.get("service", "qpidd")) + if "min_ssf" in options: + self._sasl.setAttr("minssf", options["min_ssf"]) + if "max_ssf" in options: + self._sasl.setAttr("maxssf", options["max_ssf"]) + self._sasl.init() + self._sasl_encode = False + self._sasl_decode = False + + for ssn in self.connection.sessions.values(): + for m in ssn.acked + ssn.unacked + ssn.incoming: + m._transfer_id = None + for snd in ssn.senders: + snd.linked = False + for rcv in ssn.receivers: + rcv.impending = rcv.received + rcv.linked = False + + @synchronized + def wakeup(self): + self.dispatch() + self._selector.wakeup() + + def start(self): + self._selector.register(self) + + def fileno(self): + return self._socket.fileno() + + @synchronized + def reading(self): + return self._socket is not None + + @synchronized + def writing(self): + return self._socket is not None and self._buf + + @synchronized + def timing(self): + return self._timeout + + @synchronized + def readable(self): + error = None + recoverable = False + try: + data = self._socket.recv(64*1024) + if data: + rawlog.debug("READ[%s]: %r", self.log_id, data) + if self._sasl_decode: + data = self._sasl.decode(data) + else: + rawlog.debug("ABORTED[%s]: %s", self.log_id, self._socket.getpeername()) + error = "connection aborted" + recoverable = True + except socket.error, e: + error = e + recoverable = True + + if not error: + try: + if len(self._hdr) < 8: + r = 8 - len(self._hdr) + self._hdr += data[:r] + data = data[r:] + + if len(self._hdr) == 8: + self.do_header(self._hdr) + + self._frame_dec.write(data) + self._seg_dec.write(*self._frame_dec.read()) + self._op_dec.write(*self._seg_dec.read()) + for op in self._op_dec.read(): + self.assign_id(op) + opslog.debug("RCVD[%s]: %r", self.log_id, op) + op.dispatch(self) + except VersionError, e: + error = e + except: + msg = compat.format_exc() + error = msg + + if error: + self._error(error, recoverable) + else: + self.dispatch() + + self.connection._waiter.notifyAll() + + def assign_id(self, op): + if isinstance(op, Command): + sst = self.get_sst(op) + op.id = sst.received + sst.received += 1 + + @synchronized + def writeable(self): + try: + n = self._socket.send(self._buf) + rawlog.debug("SENT[%s]: %r", self.log_id, self._buf[:n]) + self._buf = self._buf[n:] + except socket.error, e: + self._error(e, True) + self.connection._waiter.notifyAll() + + @synchronized + def timeout(self): + self.dispatch() + self.connection._waiter.notifyAll() + + def _error(self, err, recoverable): + if self._socket is not None: + self._socket.close() + self.reset() + if (recoverable and self.connection.reconnect and + (self.connection.reconnect_limit is None or + self.connection.reconnect_limit <= 0 or + self._attempts <= self.connection.reconnect_limit)): + if self._host > 0: + delay = 0 + else: + delay = self.connection.reconnect_delay + self._timeout = time.time() + delay + log.warn("recoverable error[attempt %s]: %s" % (self._attempts, err)) + if delay > 0: + log.warn("sleeping %s seconds" % delay) + self._retrying = True + else: + self.connection.error = (err,) + + def write_op(self, op): + opslog.debug("SENT[%s]: %r", self.log_id, op) + self._op_enc.write(op) + self._seg_enc.write(*self._op_enc.read()) + self._frame_enc.write(*self._seg_enc.read()) + bytes = self._frame_enc.read() + if self._sasl_encode: + bytes = self._sasl.encode(bytes) + self._buf += bytes + + def do_header(self, hdr): + cli_major = 0; cli_minor = 10 + magic, _, _, major, minor = struct.unpack(HEADER, hdr) + if major != cli_major or minor != cli_minor: + raise VersionError("client: %s-%s, server: %s-%s" % + (cli_major, cli_minor, major, minor)) + + def do_connection_start(self, start): + if self.connection.mechanisms: + permitted = self.connection.mechanisms.split() + mechs = [m for m in start.mechanisms if m in permitted] + else: + mechs = start.mechanisms + mech, initial = self._sasl.start(" ".join(mechs)) + self.write_op(ConnectionStartOk(client_properties=CLIENT_PROPERTIES, + mechanism=mech, response=initial)) + + def do_connection_secure(self, secure): + resp = self._sasl.step(secure.challenge) + self.write_op(ConnectionSecureOk(response=resp)) + + def do_connection_tune(self, tune): + # XXX: is heartbeat protocol specific? + if tune.channel_max is not None: + self.channel_max = tune.channel_max + self.write_op(ConnectionTuneOk(heartbeat=self.connection.heartbeat, + channel_max=self.channel_max)) + self.write_op(ConnectionOpen()) + self._sasl_encode = True + + def do_connection_open_ok(self, open_ok): + self._connected = True + self._sasl_decode = True + + def connection_heartbeat(self, hrt): + self.write_op(ConnectionHeartbeat()) + + def do_connection_close(self, close): + self.write_op(ConnectionCloseOk()) + if close.reply_code != close_code.normal: + self.connection.error = (close.reply_code, close.reply_text) + # XXX: should we do a half shutdown on the socket here? + # XXX: we really need to test this, we may end up reporting a + # connection abort after this, if we were to do a shutdown on read + # and stop reading, then we wouldn't report the abort, that's + # probably the right thing to do + + def do_connection_close_ok(self, close_ok): + self._socket.close() + self.reset() + + def do_session_attached(self, atc): + pass + + def do_session_command_point(self, cp): + sst = self.get_sst(cp) + sst.received = cp.command_id + + def do_session_completed(self, sc): + sst = self.get_sst(sc) + for r in sc.commands: + sst.acknowledged.add(r.lower, r.upper) + + if not sc.commands.empty(): + while sst.min_completion in sc.commands: + if sst.actions.has_key(sst.min_completion): + sst.actions.pop(sst.min_completion)() + sst.min_completion += 1 + + def session_known_completed(self, kcmp): + sst = self.get_sst(kcmp) + executed = RangedSet() + for e in sst.executed.ranges: + for ke in kcmp.ranges: + if e.lower in ke and e.upper in ke: + break + else: + executed.add_range(e) + sst.executed = completed + + def do_session_flush(self, sf): + sst = self.get_sst(sf) + if sf.expected: + if sst.received is None: + exp = None + else: + exp = RangedSet(sst.received) + sst.write_op(SessionExpected(exp)) + if sf.confirmed: + sst.write_op(SessionConfirmed(sst.executed)) + if sf.completed: + sst.write_op(SessionCompleted(sst.executed)) + + def do_execution_result(self, er): + sst = self.get_sst(er) + sst.results[er.command_id] = er.value + + def do_execution_exception(self, ex): + sst = self.get_sst(ex) + sst.session.error = (ex,) + + def dispatch(self): + try: + if self._socket is None and self.connection._connected and not self._opening: + self.connect() + elif self._socket is not None and not self.connection._connected and not self._closing: + self.disconnect() + + if self._connected and not self._closing: + for ssn in self.connection.sessions.values(): + self.attach(ssn) + self.process(ssn) + except: + msg = compat.format_exc() + self.connection.error = (msg,) + + def connect(self): + try: + # XXX: should make this non blocking + if self._host == 0: + self._attempts += 1 + host, port = self._hosts[self._host] + if self._retrying: + log.warn("trying: %s:%s", host, port) + self._socket = connect(host, port) + if self._retrying: + log.warn("reconnect succeeded: %s:%s", host, port) + self._timeout = None + self._attempts = 0 + self._host = 0 + self._retrying = False + self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10) + self._opening = True + except socket.error, e: + self._host = (self._host + 1) % len(self._hosts) + self._error(e, True) + + def disconnect(self): + self.write_op(ConnectionClose(close_code.normal)) + self._closing = True + + def attach(self, ssn): + sst = self._attachments.get(ssn) + if sst is None and not ssn.closed: + for i in xrange(0, self.channel_max): + if not self._sessions.has_key(i): + ch = i + break + else: + raise RuntimeError("all channels used") + sst = SessionState(self, ssn, ssn.name, ch) + sst.write_op(SessionAttach(name=ssn.name)) + sst.write_op(SessionCommandPoint(sst.sent, 0)) + sst.outgoing_idx = 0 + sst.acked = [] + if ssn.transactional: + sst.write_cmd(TxSelect()) + self._attachments[ssn] = sst + self._sessions[sst.channel] = sst + + for snd in ssn.senders: + self.link(snd, self._out, snd.target) + for rcv in ssn.receivers: + self.link(rcv, self._in, rcv.source) + + if sst is not None and ssn.closing and not sst.detached: + sst.detached = True + sst.write_op(SessionDetach(name=ssn.name)) + + def get_sst(self, op): + return self._sessions[op.channel] + + def do_session_detached(self, dtc): + sst = self._sessions.pop(dtc.channel) + ssn = sst.session + del self._attachments[ssn] + ssn.closed = True + + def do_session_detach(self, dtc): + sst = self.get_sst(dtc) + sst.write_op(SessionDetached(name=dtc.name)) + self.do_session_detached(dtc) + + def link(self, lnk, dir, addr): + sst = self._attachments.get(lnk.session) + _lnk = self._attachments.get(lnk) + + if _lnk is None and not lnk.closing and not lnk.closed: + _lnk = Attachment(lnk) + _lnk.closing = False + dir.init_link(sst, lnk, _lnk) + + err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk, dir) + if err: + lnk.error = (err,) + lnk.closed = True + return + + def linked(): + lnk.linked = True + + def resolved(type, subtype): + dir.do_link(sst, lnk, _lnk, type, subtype, linked) + + self.resolve_declare(sst, _lnk, dir.DIR_NAME, resolved) + self._attachments[lnk] = _lnk + + if lnk.linked and lnk.closing and not lnk.closed: + if not _lnk.closing: + def unlinked(): + dir.del_link(sst, lnk, _lnk) + del self._attachments[lnk] + lnk.closed = True + if _lnk.options.get("delete") in ("always", dir.DIR_NAME): + dir.do_unlink(sst, lnk, _lnk) + self.delete(sst, _lnk.name, unlinked) + else: + dir.do_unlink(sst, lnk, _lnk, unlinked) + _lnk.closing = True + elif not lnk.linked and lnk.closing and not lnk.closed: + lnk.closed = True + + def parse_address(self, lnk, dir, addr): + if addr is None: + return "%s is None" % dir.ADDR_NAME + else: + try: + lnk.name, lnk.subject, lnk.options = address.parse(addr) + # XXX: subject + if lnk.options is None: + lnk.options = {} + except address.LexError, e: + return e + except address.ParseError, e: + return e + + def validate_options(self, lnk, dir): + ctx = Context() + err = dir.VALIDATOR.validate(lnk.options, ctx) + if err: return "error in options: %s" % err + + def resolve_declare(self, sst, lnk, dir, action): + declare = lnk.options.get("create") in ("always", dir) + def do_resolved(type, subtype): + err = None + if type is None: + if declare: + err = self.declare(sst, lnk, action) + else: + err = ("no such queue: %s" % lnk.name,) + elif type == "queue": + try: + cmds = self.bindings(lnk) + sst.write_cmds(cmds, lambda: action(type, subtype)) + except address.ParseError, e: + err = (e,) + else: + action(type, subtype) + + if err: + tgt = lnk.target + tgt.error = err + del self._attachments[tgt] + tgt.closed = True + return + self.resolve(sst, lnk.name, do_resolved, force=declare) + + def resolve(self, sst, name, action, force=False): + if not force: + try: + type, subtype = self.address_cache[name] + action(type, subtype) + return + except KeyError: + pass + + args = [] + def do_result(r): + args.append(r) + def do_action(r): + do_result(r) + er, qr = args + if er.not_found and not qr.queue: + type, subtype = None, None + elif qr.queue: + type, subtype = "queue", None + else: + type, subtype = "topic", er.type + if type is not None: + self.address_cache[name] = (type, subtype) + action(type, subtype) + sst.write_query(ExchangeQuery(name), do_result) + sst.write_query(QueueQuery(name), do_action) + + def declare(self, sst, lnk, action): + name = lnk.name + props = lnk.options.get("node-properties", {}) + durable = props.get("durable", DURABLE_DEFAULT) + type = props.get("type", "queue") + xprops = props.get("x-properties", {}) + + if type == "topic": + cmd = ExchangeDeclare(exchange=name, durable=durable) + elif type == "queue": + cmd = QueueDeclare(queue=name, durable=durable) + else: + raise ValueError(type) + + for f in cmd.FIELDS: + if f.name != "arguments" and xprops.has_key(f.name): + cmd[f.name] = xprops.pop(f.name) + if xprops: + cmd.arguments = xprops + + if type == "topic": + if cmd.type is None: + cmd.type = "topic" + subtype = cmd.type + else: + subtype = None + + cmds = [cmd] + if type == "queue": + try: + cmds.extend(self.bindings(lnk)) + except address.ParseError, e: + return (e,) + + def declared(): + self.address_cache[name] = (type, subtype) + action(type, subtype) + + sst.write_cmds(cmds, declared) + + def bindings(self, lnk): + props = lnk.options.get("node-properties", {}) + xprops = props.get("x-properties", {}) + bindings = xprops.get("bindings", []) + cmds = [] + for b in bindings: + n, s, o = address.parse(b) + cmds.append(ExchangeBind(lnk.name, n, s, o)) + return cmds + + def delete(self, sst, name, action): + def deleted(): + del self.address_cache[name] + action() + + def do_delete(type, subtype): + if type == "topic": + sst.write_cmd(ExchangeDelete(name), deleted) + elif type == "queue": + sst.write_cmd(QueueDelete(name), deleted) + elif type is None: + action() + else: + raise ValueError(type) + self.resolve(sst, name, do_delete, force=True) + + def process(self, ssn): + if ssn.closed or ssn.closing: return + + sst = self._attachments[ssn] + + while sst.outgoing_idx < len(ssn.outgoing): + msg = ssn.outgoing[sst.outgoing_idx] + snd = msg._sender + # XXX: should check for sender error here + _snd = self._attachments.get(snd) + if _snd and snd.linked: + self.send(snd, msg) + sst.outgoing_idx += 1 + else: + break + + for rcv in ssn.receivers: + self.process_receiver(rcv) + + if ssn.acked: + messages = [m for m in ssn.acked if m not in sst.acked] + if messages: + # XXX: we're ignoring acks that get lost when disconnected, + # could we deal this via some message-id based purge? + ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None]) + for range in ids: + sst.executed.add_range(range) + sst.write_op(SessionCompleted(sst.executed)) + def ack_ack(): + for m in messages: + ssn.acked.remove(m) + if not ssn.transactional: + sst.acked.remove(m) + sst.write_cmd(MessageAccept(ids), ack_ack) + log.debug("SACK[%s]: %s", ssn.log_id, m) + sst.acked.extend(messages) + + if ssn.committing and not sst.committing: + def commit_ok(): + del sst.acked[:] + ssn.committing = False + ssn.committed = True + ssn.aborting = False + ssn.aborted = False + sst.write_cmd(TxCommit(), commit_ok) + sst.committing = True + + if ssn.aborting and not sst.aborting: + sst.aborting = True + def do_rb(): + messages = sst.acked + ssn.unacked + ssn.incoming + ids = RangedSet(*[m._transfer_id for m in messages]) + for range in ids: + sst.executed.add_range(range) + sst.write_op(SessionCompleted(sst.executed)) + sst.write_cmd(MessageRelease(ids)) + sst.write_cmd(TxRollback(), do_rb_ok) + + def do_rb_ok(): + del ssn.incoming[:] + del ssn.unacked[:] + del sst.acked[:] + + for rcv in ssn.receivers: + rcv.impending = rcv.received + rcv.returned = rcv.received + # XXX: do we need to update granted here as well? + + for rcv in ssn.receivers: + self.process_receiver(rcv) + + ssn.aborting = False + ssn.aborted = True + ssn.committing = False + ssn.committed = False + sst.aborting = False + + for rcv in ssn.receivers: + _rcv = self._attachments[rcv] + sst.write_cmd(MessageStop(_rcv.destination)) + sst.write_cmd(ExecutionSync(), do_rb) + + def grant(self, rcv): + sst = self._attachments[rcv.session] + _rcv = self._attachments.get(rcv) + if _rcv is None or not rcv.linked or _rcv.closing or _rcv.draining: + return + + if rcv.granted is UNLIMITED: + if rcv.impending is UNLIMITED: + delta = 0 + else: + delta = UNLIMITED + elif rcv.impending is UNLIMITED: + delta = -1 + else: + delta = max(rcv.granted, rcv.received) - rcv.impending + + if delta is UNLIMITED: + sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) + sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value)) + rcv.impending = UNLIMITED + elif delta > 0: + sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) + sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta)) + rcv.impending += delta + elif delta < 0 and not rcv.draining: + _rcv.draining = True + def do_stop(): + rcv.impending = rcv.received + _rcv.draining = False + self.grant(rcv) + sst.write_cmd(MessageStop(_rcv.destination), do_stop) + + if rcv.draining: + _rcv.draining = True + def do_flush(): + rcv.impending = rcv.received + rcv.granted = rcv.impending + _rcv.draining = False + rcv.draining = False + sst.write_cmd(MessageFlush(_rcv.destination), do_flush) + + + def process_receiver(self, rcv): + if rcv.closed: return + self.grant(rcv) + + def send(self, snd, msg): + sst = self._attachments[snd.session] + _snd = self._attachments[snd] + + if msg.subject is None or _snd._exchange == "": + rk = _snd._routing_key + else: + rk = msg.subject + + if msg.subject is None: + subject = _snd.subject + else: + subject = msg.subject + + # XXX: do we need to query to figure out how to create the reply-to interoperably? + if msg.reply_to: + rt = addr2reply_to(msg.reply_to) + else: + rt = None + dp = DeliveryProperties(routing_key=rk) + mp = MessageProperties(message_id=msg.id, + user_id=msg.user_id, + reply_to=rt, + correlation_id=msg.correlation_id, + content_type=msg.content_type, + application_headers=msg.properties) + if subject is not None: + if mp.application_headers is None: + mp.application_headers = {} + mp.application_headers[SUBJECT] = subject + if msg.to is not None: + if mp.application_headers is None: + mp.application_headers = {} + mp.application_headers[TO] = msg.to + if msg.durable: + dp.delivery_mode = delivery_mode.persistent + enc, dec = get_codec(msg.content_type) + body = enc(msg.content) + def msg_acked(): + # XXX: should we log the ack somehow too? + snd.acked += 1 + m = snd.session.outgoing.pop(0) + sst.outgoing_idx -= 1 + log.debug("RACK[%s]: %s", sst.session.log_id, msg) + assert msg == m + sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp), + payload=body), msg_acked) + log.debug("SENT[%s]: %s", sst.session.log_id, msg) + + def do_message_transfer(self, xfr): + sst = self.get_sst(xfr) + ssn = sst.session + + msg = self._decode(xfr) + rcv = sst.destinations[xfr.destination].target + msg._receiver = rcv + if rcv.impending is not UNLIMITED: + assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending) + rcv.received += 1 + log.debug("RCVD[%s]: %s", ssn.log_id, msg) + ssn.incoming.append(msg) + self.connection._waiter.notifyAll() + + def _decode(self, xfr): + dp = EMPTY_DP + mp = EMPTY_MP + + for h in xfr.headers: + if isinstance(h, DeliveryProperties): + dp = h + elif isinstance(h, MessageProperties): + mp = h + + ap = mp.application_headers + enc, dec = get_codec(mp.content_type) + content = dec(xfr.payload) + msg = Message(content) + msg.id = mp.message_id + if ap is not None: + msg.to = ap.get(TO) + msg.subject = ap.get(SUBJECT) + msg.user_id = mp.user_id + if mp.reply_to is not None: + msg.reply_to = reply_to2addr(mp.reply_to) + msg.correlation_id = mp.correlation_id + msg.durable = dp.delivery_mode == delivery_mode.persistent + msg.redelivered = dp.redelivered + msg.properties = mp.application_headers + msg.content_type = mp.content_type + msg._transfer_id = xfr.id + return msg diff --git a/qpid/python/qpid/messaging/endpoints.py b/qpid/python/qpid/messaging/endpoints.py new file mode 100644 index 0000000000..2337986ecb --- /dev/null +++ b/qpid/python/qpid/messaging/endpoints.py @@ -0,0 +1,832 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +A candidate high level messaging API for python. + +Areas that still need work: + + - definition of the arguments for L{Session.sender} and L{Session.receiver} + - standard L{Message} properties + - L{Message} content encoding + - protocol negotiation/multiprotocol impl +""" + +from logging import getLogger +from qpid.codec010 import StringCodec +from qpid.concurrency import synchronized, Waiter, Condition +from qpid.datatypes import Serial, uuid4 +from qpid.messaging.constants import * +from qpid.messaging.exceptions import * +from qpid.messaging.message import * +from qpid.ops import PRIMITIVE +from qpid.util import default +from threading import Thread, RLock + +log = getLogger("qpid.messaging") + +static = staticmethod + +class Connection: + + """ + A Connection manages a group of L{Sessions} and connects + them with a remote endpoint. + """ + + @static + def open(host, port=None, username="guest", password="guest", **options): + """ + Creates an AMQP connection and connects it to the given host and port. + + @type host: str + @param host: the name or ip address of the remote host + @type port: int + @param port: the port number of the remote host + @rtype: Connection + @return: a connected Connection + """ + conn = Connection(host, port, username, password, **options) + conn.connect() + return conn + + def __init__(self, host, port=None, username="guest", password="guest", **options): + """ + Creates a connection. A newly created connection must be connected + with the Connection.connect() method before it can be used. + + @type host: str + @param host: the name or ip address of the remote host + @type port: int + @param port: the port number of the remote host + @rtype: Connection + @return: a disconnected Connection + """ + self.host = host + self.port = default(port, AMQP_PORT) + self.username = username + self.password = password + self.mechanisms = options.get("mechanisms") + self.heartbeat = options.get("heartbeat") + self.reconnect = options.get("reconnect", False) + self.reconnect_delay = options.get("reconnect_delay", 3) + self.reconnect_limit = options.get("reconnect_limit") + self.backups = options.get("backups", []) + self.options = options + + self.id = str(uuid4()) + self.session_counter = 0 + self.sessions = {} + self._connected = False + self._lock = RLock() + self._condition = Condition(self._lock) + self._waiter = Waiter(self._condition) + self._modcount = Serial(0) + self.error = None + from driver import Driver + self._driver = Driver(self) + self._driver.start() + + def _wait(self, predicate, timeout=None): + return self._waiter.wait(predicate, timeout=timeout) + + def _wakeup(self): + self._modcount += 1 + self._driver.wakeup() + + def _check_error(self, exc=ConnectionError): + if self.error: + raise exc(*self.error) + + def _ewait(self, predicate, timeout=None, exc=ConnectionError): + result = self._wait(lambda: self.error or predicate(), timeout) + self._check_error(exc) + return result + + @synchronized + def session(self, name=None, transactional=False): + """ + Creates or retrieves the named session. If the name is omitted or + None, then a unique name is chosen based on a randomly generated + uuid. + + @type name: str + @param name: the session name + @rtype: Session + @return: the named Session + """ + + if name is None: + name = "%s:%s" % (self.id, self.session_counter) + self.session_counter += 1 + else: + name = "%s:%s" % (self.id, name) + + if self.sessions.has_key(name): + return self.sessions[name] + else: + ssn = Session(self, name, transactional) + self.sessions[name] = ssn + self._wakeup() + return ssn + + @synchronized + def _remove_session(self, ssn): + del self.sessions[ssn.name] + + @synchronized + def connect(self): + """ + Connect to the remote endpoint. + """ + self._connected = True + self._wakeup() + self._ewait(lambda: self._driver._connected and not self._unlinked(), + exc=ConnectError) + + def _unlinked(self): + return [l + for ssn in self.sessions.values() + for l in ssn.senders + ssn.receivers + if not (l.linked or l.error or l.closed)] + + @synchronized + def disconnect(self): + """ + Disconnect from the remote endpoint. + """ + self._connected = False + self._wakeup() + self._ewait(lambda: not self._driver._connected) + + @synchronized + def connected(self): + """ + Return true if the connection is connected, false otherwise. + """ + return self._connected + + @synchronized + def close(self): + """ + Close the connection and all sessions. + """ + for ssn in self.sessions.values(): + ssn.close() + self.disconnect() + +class Pattern: + """ + The pattern filter matches the supplied wildcard pattern against a + message subject. + """ + + def __init__(self, value): + self.value = value + + # XXX: this should become part of the driver + def _bind(self, sst, exchange, queue): + from qpid.ops import ExchangeBind + sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue, + binding_key=self.value.replace("*", "#"))) + +class Session: + + """ + Sessions provide a linear context for sending and receiving + L{Messages}. L{Messages} are sent and received + using the L{Sender.send} and L{Receiver.fetch} methods of the + L{Sender} and L{Receiver} objects associated with a Session. + + Each L{Sender} and L{Receiver} is created by supplying either a + target or source address to the L{sender} and L{receiver} methods of + the Session. The address is supplied via a string syntax documented + below. + + Addresses + ========= + + An address identifies a source or target for messages. In its + simplest form this is just a name. In general a target address may + also be used as a source address, however not all source addresses + may be used as a target, e.g. a source might additionally have some + filtering criteria that would not be present in a target. + + A subject may optionally be specified along with the name. When an + address is used as a target, any subject specified in the address is + used as the default subject of outgoing messages for that target. + When an address is used as a source, any subject specified in the + address is pattern matched against the subject of available messages + as a filter for incoming messages from that source. + + The options map contains additional information about the address + including: + + - policies for automatically creating, and deleting the node to + which an address refers + + - policies for asserting facts about the node to which an address + refers + + - extension points that can be used for sender/receiver + configuration + + Mapping to AMQP 0-10 + -------------------- + The name is resolved to either an exchange or a queue by querying + the broker. + + The subject is set as a property on the message. Additionally, if + the name refers to an exchange, the routing key is set to the + subject. + + Syntax + ------ + The following regular expressions define the tokens used to parse + addresses:: + LBRACE: \\{ + RBRACE: \\} + LBRACK: \\[ + RBRACK: \\] + COLON: : + SEMI: ; + SLASH: / + COMMA: , + NUMBER: [+-]?[0-9]*\\.?[0-9]+ + ID: [a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])? + STRING: "(?:[^\\\\"]|\\\\.)*"|\'(?:[^\\\\\']|\\\\.)*\' + ESC: \\\\[^ux]|\\\\x[0-9a-fA-F][0-9a-fA-F]|\\\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F] + SYM: [.#*%@$^!+-] + WSPACE: [ \\n\\r\\t]+ + + The formal grammar for addresses is given below:: + address = name [ "/" subject ] [ ";" options ] + name = ( part | quoted )+ + subject = ( part | quoted | "/" )* + quoted = STRING / ESC + part = LBRACE / RBRACE / COLON / COMMA / NUMBER / ID / SYM + options = map + map = "{" ( keyval ( "," keyval )* )? "}" + keyval = ID ":" value + value = NUMBER / STRING / ID / map / list + list = "[" ( value ( "," value )* )? "]" + + This grammar resuls in the following informal syntax:: + + [ / ] [ ; ] + + Where options is:: + + { : , ... } + + And values may be: + - numbers + - single, double, or non quoted strings + - maps (dictionaries) + - lists + + Options + ------- + The options map permits the following parameters:: + + [ / ] ; { + create: , + delete: , + assert: , + node-properties: { + type: , + durable: , + x-properties: { + bindings: ["/", ...], + : + } + } + } + + The create, delete, and assert policies specify who should perfom + the associated action: + + - I{always}: the action will always be performed + - I{sender}: the action will only be performed by the sender + - I{receiver}: the action will only be performed by the receiver + - I{never}: the action will never be performed (this is the default) + + The node-type is one of: + + - I{topic}: a topic node will default to the topic exchange, + x-properties may be used to specify other exchange types + - I{queue}: this is the default node-type + + The x-properties map permits arbitrary additional keys and values to + be specified. These keys and values are passed through when creating + a node or asserting facts about an existing node. Any passthrough + keys and values that do not match a standard field of the underlying + exchange or queue declare command will be sent in the arguments map. + + Examples + -------- + A simple name resolves to any named node, usually a queue or a + topic:: + + my-queue-or-topic + + A simple name with a subject will also resolve to a node, but the + presence of the subject will cause a sender using this address to + set the subject on outgoing messages, and receivers to filter based + on the subject:: + + my-queue-or-topic/my-subject + + A subject pattern can be used and will cause filtering if used by + the receiver. If used for a sender, the literal value gets set as + the subject:: + + my-queue-or-topic/my-* + + In all the above cases, the address is resolved to an existing node. + If you want the node to be auto-created, then you can do the + following. By default nonexistent nodes are assumed to be queues:: + + my-queue; {create: always} + + You can customize the properties of the queue:: + + my-queue; {create: always, node-properties: {durable: True}} + + You can create a topic instead if you want:: + + my-queue; {create: always, node-properties: {type: topic}} + + You can assert that the address resolves to a node with particular + properties:: + + my-transient-topic; { + assert: always, + node-properties: { + type: topic, + durable: False + } + } + """ + + def __init__(self, connection, name, transactional): + self.connection = connection + self.name = name + self.log_id = "%x" % id(self) + + self.transactional = transactional + + self.committing = False + self.committed = True + self.aborting = False + self.aborted = False + + self.next_sender_id = 0 + self.senders = [] + self.next_receiver_id = 0 + self.receivers = [] + self.outgoing = [] + self.incoming = [] + self.unacked = [] + self.acked = [] + # XXX: I hate this name. + self.ack_capacity = UNLIMITED + + self.error = None + self.closing = False + self.closed = False + + self._lock = connection._lock + + def __repr__(self): + return "" % self.name + + def _wait(self, predicate, timeout=None): + return self.connection._wait(predicate, timeout=timeout) + + def _wakeup(self): + self.connection._wakeup() + + def _check_error(self, exc=SessionError): + self.connection._check_error(exc) + if self.error: + raise exc(*self.error) + + def _ewait(self, predicate, timeout=None, exc=SessionError): + result = self.connection._ewait(lambda: self.error or predicate(), timeout, exc) + self._check_error(exc) + return result + + @synchronized + def sender(self, target, **options): + """ + Creates a L{Sender} that may be used to send L{Messages} + to the specified target. + + @type target: str + @param target: the target to which messages will be sent + @rtype: Sender + @return: a new Sender for the specified target + """ + sender = Sender(self, self.next_sender_id, target, options) + self.next_sender_id += 1 + self.senders.append(sender) + if not self.closed and self.connection._connected: + self._wakeup() + try: + sender._ewait(lambda: sender.linked) + except SendError, e: + sender.close() + raise e + return sender + + @synchronized + def receiver(self, source, **options): + """ + Creates a receiver that may be used to fetch L{Messages} + from the specified source. + + @type source: str + @param source: the source of L{Messages} + @rtype: Receiver + @return: a new Receiver for the specified source + """ + receiver = Receiver(self, self.next_receiver_id, source, options) + self.next_receiver_id += 1 + self.receivers.append(receiver) + if not self.closed and self.connection._connected: + self._wakeup() + try: + receiver._ewait(lambda: receiver.linked) + except ReceiveError, e: + receiver.close() + raise e + return receiver + + @synchronized + def _count(self, predicate): + result = 0 + for msg in self.incoming: + if predicate(msg): + result += 1 + return result + + def _peek(self, predicate): + for msg in self.incoming: + if predicate(msg): + return msg + + def _pop(self, predicate): + i = 0 + while i < len(self.incoming): + msg = self.incoming[i] + if predicate(msg): + del self.incoming[i] + return msg + else: + i += 1 + + @synchronized + def _get(self, predicate, timeout=None): + if self._ewait(lambda: ((self._peek(predicate) is not None) or self.closing), + timeout): + msg = self._pop(predicate) + if msg is not None: + msg._receiver.returned += 1 + self.unacked.append(msg) + log.debug("RETR[%s]: %s", self.log_id, msg) + return msg + return None + + @synchronized + def next_receiver(self, timeout=None): + if self._ewait(lambda: self.incoming, timeout): + return self.incoming[0]._receiver + else: + raise Empty + + @synchronized + def acknowledge(self, message=None, sync=True): + """ + Acknowledge the given L{Message}. If message is None, then all + unacknowledged messages on the session are acknowledged. + + @type message: Message + @param message: the message to acknowledge or None + @type sync: boolean + @param sync: if true then block until the message(s) are acknowledged + """ + if message is None: + messages = self.unacked[:] + else: + messages = [message] + + for m in messages: + if self.ack_capacity is not UNLIMITED: + if self.ack_capacity <= 0: + # XXX: this is currently a SendError, maybe it should be a SessionError? + raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity) + self._wakeup() + self._ewait(lambda: len(self.acked) < self.ack_capacity) + self.unacked.remove(m) + self.acked.append(m) + + self._wakeup() + if sync: + self._ewait(lambda: not [m for m in messages if m in self.acked]) + + @synchronized + def commit(self): + """ + Commit outstanding transactional work. This consists of all + message sends and receives since the prior commit or rollback. + """ + if not self.transactional: + raise NontransactionalSession() + self.committing = True + self._wakeup() + self._ewait(lambda: not self.committing) + if self.aborted: + raise TransactionAborted() + assert self.committed + + @synchronized + def rollback(self): + """ + Rollback outstanding transactional work. This consists of all + message sends and receives since the prior commit or rollback. + """ + if not self.transactional: + raise NontransactionalSession() + self.aborting = True + self._wakeup() + self._ewait(lambda: not self.aborting) + assert self.aborted + + @synchronized + def close(self): + """ + Close the session. + """ + # XXX: should be able to express this condition through API calls + self._ewait(lambda: not self.outgoing and not self.acked) + + for link in self.receivers + self.senders: + link.close() + + self.closing = True + self._wakeup() + self._ewait(lambda: self.closed) + self.connection._remove_session(self) + +class Sender: + + """ + Sends outgoing messages. + """ + + def __init__(self, session, id, target, options): + self.session = session + self.id = id + self.target = target + self.options = options + self.capacity = options.get("capacity", UNLIMITED) + self.durable = options.get("durable") + self.queued = Serial(0) + self.acked = Serial(0) + self.error = None + self.linked = False + self.closing = False + self.closed = False + self._lock = self.session._lock + + def _wakeup(self): + self.session._wakeup() + + def _check_error(self, exc=SendError): + self.session._check_error(exc) + if self.error: + raise exc(*self.error) + + def _ewait(self, predicate, timeout=None, exc=SendError): + result = self.session._ewait(lambda: self.error or predicate(), timeout, exc) + self._check_error(exc) + return result + + @synchronized + def pending(self): + """ + Returns the number of messages awaiting acknowledgment. + @rtype: int + @return: the number of unacknowledged messages + """ + return self.queued - self.acked + + @synchronized + def send(self, object, sync=True, timeout=None): + """ + Send a message. If the object passed in is of type L{unicode}, + L{str}, L{list}, or L{dict}, it will automatically be wrapped in a + L{Message} and sent. If it is of type L{Message}, it will be sent + directly. If the sender capacity is not L{UNLIMITED} then send + will block until there is available capacity to send the message. + If the timeout parameter is specified, then send will throw an + L{InsufficientCapacity} exception if capacity does not become + available within the specified time. + + @type object: unicode, str, list, dict, Message + @param object: the message or content to send + + @type sync: boolean + @param sync: if true then block until the message is sent + + @type timeout: float + @param timeout: the time to wait for available capacity + """ + + if not self.session.connection._connected or self.session.closing: + raise Disconnected() + + self._ewait(lambda: self.linked) + + if isinstance(object, Message): + message = object + else: + message = Message(object) + + if message.durable is None: + message.durable = self.durable + + if self.capacity is not UNLIMITED: + if self.capacity <= 0: + raise InsufficientCapacity("capacity = %s" % self.capacity) + if not self._ewait(lambda: self.pending() < self.capacity, timeout=timeout): + raise InsufficientCapacity("capacity = %s" % self.capacity) + + # XXX: what if we send the same message to multiple senders? + message._sender = self + self.session.outgoing.append(message) + self.queued += 1 + + self._wakeup() + + if sync: + self.sync() + assert message not in self.session.outgoing + + @synchronized + def sync(self): + mno = self.queued + self._ewait(lambda: self.acked >= mno) + + @synchronized + def close(self): + """ + Close the Sender. + """ + self.closing = True + self._wakeup() + try: + self.session._ewait(lambda: self.closed) + finally: + self.session.senders.remove(self) + +class Receiver(object): + + """ + Receives incoming messages from a remote source. Messages may be + fetched with L{fetch}. + """ + + def __init__(self, session, id, source, options): + self.session = session + self.id = id + self.source = source + self.options = options + + self.granted = Serial(0) + self.draining = False + self.impending = Serial(0) + self.received = Serial(0) + self.returned = Serial(0) + + self.error = None + self.linked = False + self.closing = False + self.closed = False + self._lock = self.session._lock + self._capacity = 0 + self._set_capacity(options.get("capacity", 0), False) + + @synchronized + def _set_capacity(self, c, wakeup=True): + if c is UNLIMITED: + self._capacity = c.value + else: + self._capacity = c + self._grant() + if wakeup: + self._wakeup() + + def _get_capacity(self): + if self._capacity == UNLIMITED.value: + return UNLIMITED + else: + return self._capacity + + capacity = property(_get_capacity, _set_capacity) + + def _wakeup(self): + self.session._wakeup() + + def _check_error(self, exc=ReceiveError): + self.session._check_error(exc) + if self.error: + raise exc(*self.error) + + def _ewait(self, predicate, timeout=None, exc=ReceiveError): + result = self.session._ewait(lambda: self.error or predicate(), timeout, exc) + self._check_error(exc) + return result + + @synchronized + def pending(self): + """ + Returns the number of messages available to be fetched by the + application. + + @rtype: int + @return: the number of available messages + """ + return self.received - self.returned + + def _pred(self, msg): + return msg._receiver == self + + @synchronized + def fetch(self, timeout=None): + """ + Fetch and return a single message. A timeout of None will block + forever waiting for a message to arrive, a timeout of zero will + return immediately if no messages are available. + + @type timeout: float + @param timeout: the time to wait for a message to be available + """ + + self._ewait(lambda: self.linked) + + if self._capacity == 0: + self.granted = self.returned + 1 + self._wakeup() + self._ewait(lambda: self.impending >= self.granted) + msg = self.session._get(self._pred, timeout=timeout) + if msg is None: + self.draining = True + self._wakeup() + self._ewait(lambda: not self.draining) + self._grant() + self._wakeup() + msg = self.session._get(self._pred, timeout=0) + if msg is None: + raise Empty() + elif self._capacity not in (0, UNLIMITED.value): + self.granted += 1 + self._wakeup() + return msg + + def _grant(self): + if self._capacity == UNLIMITED.value: + self.granted = UNLIMITED + else: + self.granted = self.received + self._capacity + + @synchronized + def close(self): + """ + Close the receiver. + """ + self.closing = True + self._wakeup() + try: + self.session._ewait(lambda: self.closed) + finally: + self.session.receivers.remove(self) + +__all__ = ["Connection", "Session", "Sender", "Receiver"] diff --git a/qpid/python/qpid/messaging/exceptions.py b/qpid/python/qpid/messaging/exceptions.py new file mode 100644 index 0000000000..5c8bdedc26 --- /dev/null +++ b/qpid/python/qpid/messaging/exceptions.py @@ -0,0 +1,67 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +class ConnectionError(Exception): + """ + The base class for all connection related exceptions. + """ + pass + +class ConnectError(ConnectionError): + """ + Exception raised when there is an error connecting to the remote + peer. + """ + pass + +class SessionError(Exception): + pass + +class Disconnected(SessionError): + """ + Exception raised when an operation is attempted that is illegal when + disconnected. + """ + pass + +class NontransactionalSession(SessionError): + """ + Exception raised when commit or rollback is attempted on a non + transactional session. + """ + pass + +class TransactionAborted(SessionError): + pass + +class SendError(SessionError): + pass + +class InsufficientCapacity(SendError): + pass + +class ReceiveError(SessionError): + pass + +class Empty(ReceiveError): + """ + Exception raised by L{Receiver.fetch} when there is no message + available within the alloted time. + """ + pass diff --git a/qpid/python/qpid/messaging/message.py b/qpid/python/qpid/messaging/message.py new file mode 100644 index 0000000000..1c7c7beb81 --- /dev/null +++ b/qpid/python/qpid/messaging/message.py @@ -0,0 +1,141 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.codec010 import StringCodec +from qpid.ops import PRIMITIVE + +def codec(name): + type = PRIMITIVE[name] + + def encode(x): + sc = StringCodec() + sc.write_primitive(type, x) + return sc.encoded + + def decode(x): + sc = StringCodec(x) + return sc.read_primitive(type) + + return encode, decode + +# XXX: need to correctly parse the mime type and deal with +# content-encoding header + +TYPE_MAPPINGS={ + dict: "amqp/map", + list: "amqp/list", + unicode: "text/plain; charset=utf8", + unicode: "text/plain", + buffer: None, + str: None, + None.__class__: None + } + +TYPE_CODEC={ + "amqp/map": codec("map"), + "amqp/list": codec("list"), + "text/plain; charset=utf8": (lambda x: x.encode("utf8"), lambda x: x.decode("utf8")), + "text/plain": (lambda x: x.encode("utf8"), lambda x: x.decode("utf8")), + "": (lambda x: x, lambda x: x), + None: (lambda x: x, lambda x: x) + } + +def get_type(content): + return TYPE_MAPPINGS[content.__class__] + +def get_codec(content_type): + return TYPE_CODEC[content_type] + +UNSPECIFIED = object() + +class Message: + + """ + A message consists of a standard set of fields, an application + defined set of properties, and some content. + + @type id: str + @ivar id: the message id + @type user_id: str + @ivar user_id: the user-id of the message producer + @type to: str + @ivar to: the destination address + @type reply_to: str + @ivar reply_to: the address to send replies + @type correlation_id: str + @ivar correlation_id: a correlation-id for the message + @type properties: dict + @ivar properties: application specific message properties + @type content_type: str + @ivar content_type: the content-type of the message + @type content: str, unicode, buffer, dict, list + @ivar content: the message content + """ + + def __init__(self, content=None, content_type=UNSPECIFIED, id=None, + subject=None, to=None, user_id=None, reply_to=None, + correlation_id=None, durable=None, properties=None): + """ + Construct a new message with the supplied content. The + content-type of the message will be automatically inferred from + type of the content parameter. + + @type content: str, unicode, buffer, dict, list + @param content: the message content + + @type content_type: str + @param content_type: the content-type of the message + """ + self.id = id + self.subject = subject + self.to = to + self.user_id = user_id + self.reply_to = reply_to + self.correlation_id = correlation_id + self.durable = durable + self.redelivered = False + if properties is None: + self.properties = {} + else: + self.properties = properties + if content_type is UNSPECIFIED: + self.content_type = get_type(content) + else: + self.content_type = content_type + self.content = content + + def __repr__(self): + args = [] + for name in ["id", "subject", "to", "user_id", "reply_to", + "correlation_id"]: + value = self.__dict__[name] + if value is not None: args.append("%s=%r" % (name, value)) + for name in ["durable", "properties"]: + value = self.__dict__[name] + if value: args.append("%s=%r" % (name, value)) + if self.content_type != get_type(self.content): + args.append("content_type=%r" % self.content_type) + if self.content is not None: + if args: + args.append("content=%r" % self.content) + else: + args.append(repr(self.content)) + return "Message(%s)" % ", ".join(args) + +__all__ = ["Message"] diff --git a/qpid/python/qpid/tests/__init__.py b/qpid/python/qpid/tests/__init__.py index 039214ca42..101a0c3759 100644 --- a/qpid/python/qpid/tests/__init__.py +++ b/qpid/python/qpid/tests/__init__.py @@ -26,7 +26,35 @@ class Test: self.config = config # API Tests -import address, framing, mimetype, messaging +import qpid.tests.framing +import qpid.tests.mimetype +import qpid.tests.messaging # Legacy Tests -import codec, queue, datatypes, connection, spec010, codec010 +import qpid.tests.codec +import qpid.tests.queue +import qpid.tests.datatypes +import qpid.tests.connection +import qpid.tests.spec010 +import qpid.tests.codec010 + +class TestTestsXXX(Test): + + def testFoo(self): + print "this test has output" + + def testBar(self): + print "this test "*8 + print "has"*10 + print "a"*75 + print "lot of"*10 + print "output"*10 + + def testQux(self): + import sys + sys.stdout.write("this test has output with no newline") + + def testQuxFail(self): + import sys + sys.stdout.write("this test has output with no newline") + fdsa diff --git a/qpid/python/qpid/tests/address.py b/qpid/python/qpid/tests/address.py deleted file mode 100644 index 7e6c6a5ee5..0000000000 --- a/qpid/python/qpid/tests/address.py +++ /dev/null @@ -1,309 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - - -from qpid.tests import Test -from qpid.address import lex, parse, ParseError, EOF, ID, NUMBER, SYM, WSPACE, \ - LEXER -from qpid.lexer import Token -from qpid.harness import Skipped -from parser import ParserBase - -def indent(st): - return " " + st.replace("\n", "\n ") - -def pprint_address(name, subject, options): - return "NAME: %s\nSUBJECT: %s\nOPTIONS: %s" % \ - (pprint(name), pprint(subject), pprint(options)) - -def pprint(o): - if isinstance(o, dict): - return pprint_map(o) - elif isinstance(o, list): - return pprint_list(o) - elif isinstance(o, basestring): - return pprint_string(o) - else: - return repr(o) - -def pprint_map(m): - items = ["%s: %s" % (pprint(k), pprint(v)) for k, v in m.items()] - items.sort() - return pprint_items("{", items, "}") - -def pprint_list(l): - return pprint_items("[", [pprint(x) for x in l], "]") - -def pprint_items(start, items, end): - if items: - return "%s\n%s\n%s" % (start, ",\n".join([indent(i) for i in items]), end) - else: - return "%s%s" % (start, end) - -def pprint_string(s): - result = "'" - for c in s: - if c == "'": - result += "\\'" - elif c == "\n": - result += "\\n" - elif ord(c) >= 0x80: - result += "\\u%04x" % ord(c) - else: - result += c - result += "'" - return result - -class AddressTests(ParserBase, Test): - - EXCLUDE = (WSPACE, EOF) - - def fields(self, line, n): - result = line.split(":", n - 1) - result.extend([None]*(n - len(result))) - return result - - def call(self, parser, mode, input): - try: - from subprocess import Popen, PIPE, STDOUT - po = Popen([parser, mode], stdin=PIPE, stdout=PIPE, stderr=STDOUT) - except ImportError, e: - raise Skipped("%s" % e) - except OSError, e: - raise Skipped("%s: %s" % (e, parser)) - out, _ = po.communicate(input=input) - return out - - def parser(self): - return self.config.defines.get("address.parser") - - def do_lex(self, st): - parser = self.parser() - if parser: - out = self.call(parser, "lex", st) - lines = out.split("\n") - toks = [] - for line in lines: - if line.strip(): - name, position, value = self.fields(line, 3) - toks.append(Token(LEXER.type(name), value, position, st)) - return toks - else: - return lex(st) - - def do_parse(self, st): - return parse(st) - - def valid(self, addr, name=None, subject=None, options=None): - parser = self.parser() - if parser: - got = self.call(parser, "parse", addr) - expected = "%s\n" % pprint_address(name, subject, options) - assert expected == got, "expected\n%s\ngot\n%s" % (expected, got) - else: - ParserBase.valid(self, addr, (name, subject, options)) - - def invalid(self, addr, error=None): - parser = self.parser() - if parser: - got = self.call(parser, "parse", addr) - expected = "ERROR: %s\n" % error - assert expected == got, "expected %r, got %r" % (expected, got) - else: - ParserBase.invalid(self, addr, error) - - def testDashInId1(self): - self.lex("foo-bar", ID) - - def testDashInId2(self): - self.lex("foo-3", ID) - - def testDashAlone1(self): - self.lex("foo - bar", ID, SYM, ID) - - def testDashAlone2(self): - self.lex("foo - 3", ID, SYM, NUMBER) - - def testLeadingDash(self): - self.lex("-foo", SYM, ID) - - def testTrailingDash(self): - self.lex("foo-", ID, SYM) - - def testNegativeNum(self): - self.lex("-3", NUMBER) - - def testHash(self): - self.valid("foo/bar.#", "foo", "bar.#") - - def testStar(self): - self.valid("foo/bar.*", "foo", "bar.*") - - def testColon(self): - self.valid("foo.bar/baz.qux:moo:arf", "foo.bar", "baz.qux:moo:arf") - - def testOptions(self): - self.valid("foo.bar/baz.qux:moo:arf; {key: value}", - "foo.bar", "baz.qux:moo:arf", {"key": "value"}) - - def testOptionsTrailingComma(self): - self.valid("name/subject; {key: value,}", "name", "subject", - {"key": "value"}) - - def testSemiSubject(self): - self.valid("foo.bar/'baz.qux;moo:arf'; {key: value}", - "foo.bar", "baz.qux;moo:arf", {"key": "value"}) - - def testCommaSubject(self): - self.valid("foo.bar/baz.qux.{moo,arf}", "foo.bar", "baz.qux.{moo,arf}") - - def testCommaSubjectOptions(self): - self.valid("foo.bar/baz.qux.{moo,arf}; {key: value}", "foo.bar", - "baz.qux.{moo,arf}", {"key": "value"}) - - def testUnbalanced(self): - self.valid("foo.bar/baz.qux.{moo,arf; {key: value}", "foo.bar", - "baz.qux.{moo,arf", {"key": "value"}) - - def testSlashQuote(self): - self.valid("foo.bar\\/baz.qux.{moo,arf; {key: value}", - "foo.bar/baz.qux.{moo,arf", - None, {"key": "value"}) - - def testSlashHexEsc1(self): - self.valid("foo.bar\\x00baz.qux.{moo,arf; {key: value}", - "foo.bar\x00baz.qux.{moo,arf", - None, {"key": "value"}) - - def testSlashHexEsc2(self): - self.valid("foo.bar\\xffbaz.qux.{moo,arf; {key: value}", - "foo.bar\xffbaz.qux.{moo,arf", - None, {"key": "value"}) - - def testSlashHexEsc3(self): - self.valid("foo.bar\\xFFbaz.qux.{moo,arf; {key: value}", - "foo.bar\xFFbaz.qux.{moo,arf", - None, {"key": "value"}) - - def testSlashUnicode1(self): - self.valid("foo.bar\\u1234baz.qux.{moo,arf; {key: value}", - u"foo.bar\u1234baz.qux.{moo,arf", None, {"key": "value"}) - - def testSlashUnicode2(self): - self.valid("foo.bar\\u0000baz.qux.{moo,arf; {key: value}", - u"foo.bar\u0000baz.qux.{moo,arf", None, {"key": "value"}) - - def testSlashUnicode3(self): - self.valid("foo.bar\\uffffbaz.qux.{moo,arf; {key: value}", - u"foo.bar\uffffbaz.qux.{moo,arf", None, {"key": "value"}) - - def testSlashUnicode4(self): - self.valid("foo.bar\\uFFFFbaz.qux.{moo,arf; {key: value}", - u"foo.bar\uFFFFbaz.qux.{moo,arf", None, {"key": "value"}) - - def testNoName(self): - self.invalid("; {key: value}", - "unexpected token SEMI(;) line:1,0:; {key: value}") - - def testEmpty(self): - self.invalid("", "unexpected token EOF line:1,0:") - - def testNoNameSlash(self): - self.invalid("/asdf; {key: value}", - "unexpected token SLASH(/) line:1,0:/asdf; {key: value}") - - def testBadOptions1(self): - self.invalid("name/subject; {", - "expecting (NUMBER, STRING, ID, LBRACE, LBRACK, RBRACE), " - "got EOF line:1,15:name/subject; {") - - def testBadOptions2(self): - self.invalid("name/subject; { 3", - "expecting COLON, got EOF " - "line:1,17:name/subject; { 3") - - def testBadOptions3(self): - self.invalid("name/subject; { key:", - "expecting (NUMBER, STRING, ID, LBRACE, LBRACK), got EOF " - "line:1,20:name/subject; { key:") - - def testBadOptions4(self): - self.invalid("name/subject; { key: value", - "expecting (COMMA, RBRACE), got EOF " - "line:1,26:name/subject; { key: value") - - def testBadOptions5(self): - self.invalid("name/subject; { key: value asdf", - "expecting (COMMA, RBRACE), got ID(asdf) " - "line:1,27:name/subject; { key: value asdf") - - def testBadOptions6(self): - self.invalid("name/subject; { key: value,", - "expecting (NUMBER, STRING, ID, LBRACE, LBRACK, RBRACE), got EOF " - "line:1,27:name/subject; { key: value,") - - def testBadOptions7(self): - self.invalid("name/subject; { key: value } asdf", - "expecting EOF, got ID(asdf) " - "line:1,29:name/subject; { key: value } asdf") - - def testList1(self): - self.valid("name/subject; { key: [] }", "name", "subject", {"key": []}) - - def testList2(self): - self.valid("name/subject; { key: ['one'] }", "name", "subject", {"key": ['one']}) - - def testList3(self): - self.valid("name/subject; { key: [1, 2, 3] }", "name", "subject", - {"key": [1, 2, 3]}) - - def testList4(self): - self.valid("name/subject; { key: [1, [2, 3], 4] }", "name", "subject", - {"key": [1, [2, 3], 4]}) - - def testBadList1(self): - self.invalid("name/subject; { key: [ }", "expecting (NUMBER, STRING, ID, LBRACE, LBRACK), " - "got RBRACE(}) line:1,23:name/subject; { key: [ }") - - def testBadList2(self): - self.invalid("name/subject; { key: [ 1 }", "expecting (COMMA, RBRACK), " - "got RBRACE(}) line:1,25:name/subject; { key: [ 1 }") - - def testBadList3(self): - self.invalid("name/subject; { key: [ 1 2 }", "expecting (COMMA, RBRACK), " - "got NUMBER(2) line:1,25:name/subject; { key: [ 1 2 }") - - def testBadList4(self): - self.invalid("name/subject; { key: [ 1 2 ] }", "expecting (COMMA, RBRACK), " - "got NUMBER(2) line:1,25:name/subject; { key: [ 1 2 ] }") - - def testMap1(self): - self.valid("name/subject; { 'key': value }", - "name", "subject", {"key": "value"}) - - def testMap2(self): - self.valid("name/subject; { 1: value }", "name", "subject", {1: "value"}) - - def testMap3(self): - self.valid('name/subject; { "foo.bar": value }', - "name", "subject", {"foo.bar": "value"}) - - def testBoolean(self): - self.valid("name/subject; { true: True, false: False }", - "name", "subject", {"true": True, "false": False}) diff --git a/qpid/python/qpid/tests/messaging.py b/qpid/python/qpid/tests/messaging.py deleted file mode 100644 index 125f1b7157..0000000000 --- a/qpid/python/qpid/tests/messaging.py +++ /dev/null @@ -1,1079 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# setup, usage, teardown, errors(sync), errors(async), stress, soak, -# boundary-conditions, config - -import time -from qpid import compat -from qpid.tests import Test -from qpid.harness import Skipped -from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \ - InsufficientCapacity, Message, ReceiveError, SendError, SessionError, \ - UNLIMITED, uuid4 -from Queue import Queue, Empty as QueueEmpty - -class Base(Test): - - def setup_connection(self): - return None - - def setup_session(self): - return None - - def setup_sender(self): - return None - - def setup_receiver(self): - return None - - def setup(self): - self.test_id = uuid4() - self.broker = self.config.broker - try: - self.conn = self.setup_connection() - except ConnectError, e: - raise Skipped(e) - self.ssn = self.setup_session() - self.snd = self.setup_sender() - if self.snd is not None: - self.snd.durable = self.durable() - self.rcv = self.setup_receiver() - - def teardown(self): - if self.conn is not None and self.conn.connected(): - self.conn.close() - - def content(self, base, count = None): - if count is None: - return "%s[%s]" % (base, self.test_id) - else: - return "%s[%s, %s]" % (base, count, self.test_id) - - def ping(self, ssn): - PING_Q = 'ping-queue; {create: always, delete: always}' - # send a message - sender = ssn.sender(PING_Q, durable=self.durable()) - content = self.content("ping") - sender.send(content) - receiver = ssn.receiver(PING_Q) - msg = receiver.fetch(0) - ssn.acknowledge() - assert msg.content == content, "expected %r, got %r" % (content, msg.content) - - def drain(self, rcv, limit=None, timeout=0, expected=None): - contents = [] - try: - while limit is None or len(contents) < limit: - contents.append(rcv.fetch(timeout=timeout).content) - except Empty: - pass - if expected is not None: - assert expected == contents, "expected %s, got %s" % (expected, contents) - return contents - - def assertEmpty(self, rcv): - contents = self.drain(rcv) - assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents) - - def assertPending(self, rcv, expected): - p = rcv.pending() - assert p == expected, "expected %s, got %s" % (expected, p) - - def sleep(self): - time.sleep(self.delay()) - - def delay(self): - return float(self.config.defines.get("delay", "2")) - - def get_bool(self, name): - return self.config.defines.get(name, "false").lower() in ("true", "yes", "1") - - def durable(self): - return self.get_bool("durable") - - def reconnect(self): - return self.get_bool("reconnect") - -class SetupTests(Base): - - def testOpen(self): - # XXX: need to flesh out URL support/syntax - self.conn = Connection.open(self.broker.host, self.broker.port, - reconnect=self.reconnect()) - self.ping(self.conn.session()) - - def testConnect(self): - # XXX: need to flesh out URL support/syntax - self.conn = Connection(self.broker.host, self.broker.port, - reconnect=self.reconnect()) - self.conn.connect() - self.ping(self.conn.session()) - - def testConnectError(self): - try: - self.conn = Connection.open("localhost", 0) - assert False, "connect succeeded" - except ConnectError, e: - # XXX: should verify that e includes appropriate diagnostic info - pass - -class ConnectionTests(Base): - - def setup_connection(self): - return Connection.open(self.broker.host, self.broker.port, - reconnect=self.reconnect()) - - def testSessionAnon(self): - ssn1 = self.conn.session() - ssn2 = self.conn.session() - self.ping(ssn1) - self.ping(ssn2) - assert ssn1 is not ssn2 - - def testSessionNamed(self): - ssn1 = self.conn.session("one") - ssn2 = self.conn.session("two") - self.ping(ssn1) - self.ping(ssn2) - assert ssn1 is not ssn2 - assert ssn1 is self.conn.session("one") - assert ssn2 is self.conn.session("two") - - def testDisconnect(self): - ssn = self.conn.session() - self.ping(ssn) - self.conn.disconnect() - try: - self.ping(ssn) - assert False, "ping succeeded" - except Disconnected: - # this is the expected failure when pinging on a disconnected - # connection - pass - self.conn.connect() - self.ping(ssn) - - def testClose(self): - self.conn.close() - assert not self.conn.connected() - -ACK_QC = 'test-ack-queue; {create: always}' -ACK_QD = 'test-ack-queue; {delete: always}' - -class SessionTests(Base): - - def setup_connection(self): - return Connection.open(self.broker.host, self.broker.port, - reconnect=self.reconnect()) - - def setup_session(self): - return self.conn.session() - - def testSender(self): - snd = self.ssn.sender('test-snd-queue; {create: sender, delete: receiver}', - durable=self.durable()) - snd2 = self.ssn.sender(snd.target, durable=self.durable()) - assert snd is not snd2 - snd2.close() - - content = self.content("testSender") - snd.send(content) - rcv = self.ssn.receiver(snd.target) - msg = rcv.fetch(0) - assert msg.content == content - self.ssn.acknowledge(msg) - - def testReceiver(self): - rcv = self.ssn.receiver('test-rcv-queue; {create: always}') - rcv2 = self.ssn.receiver(rcv.source) - assert rcv is not rcv2 - rcv2.close() - - content = self.content("testReceiver") - snd = self.ssn.sender(rcv.source, durable=self.durable()) - snd.send(content) - msg = rcv.fetch(0) - assert msg.content == content - self.ssn.acknowledge(msg) - snd2 = self.ssn.receiver('test-rcv-queue; {delete: always}') - - def testDisconnectedReceiver(self): - self.conn.disconnect() - rcv = self.ssn.receiver("test-dis-rcv-queue; {create: always, delete: always}") - m = self.content("testDisconnectedReceiver") - self.conn.connect() - snd = self.ssn.sender("test-dis-rcv-queue") - snd.send(m) - self.drain(rcv, expected=[m]) - - def testNextReceiver(self): - ADDR = 'test-next-rcv-queue; {create: always, delete: always}' - rcv1 = self.ssn.receiver(ADDR, capacity=UNLIMITED) - rcv2 = self.ssn.receiver(ADDR, capacity=UNLIMITED) - rcv3 = self.ssn.receiver(ADDR, capacity=UNLIMITED) - - snd = self.ssn.sender(ADDR) - - msgs = [] - for i in range(10): - content = self.content("testNextReceiver", i) - snd.send(content) - msgs.append(content) - - fetched = [] - try: - while True: - rcv = self.ssn.next_receiver(timeout=self.delay()) - assert rcv in (rcv1, rcv2, rcv3) - assert rcv.pending() > 0 - fetched.append(rcv.fetch().content) - except Empty: - pass - assert msgs == fetched, "expecting %s, got %s" % (msgs, fetched) - self.ssn.acknowledge() - #we set the capacity to 0 to prevent the deletion of the queue - - #triggered the deletion policy when the first receiver is closed - - #resulting in session exceptions being issued for the remaining - #active subscriptions: - for r in [rcv1, rcv2, rcv3]: - r.capacity = 0 - - # XXX, we need a convenient way to assert that required queues are - # empty on setup, and possibly also to drain queues on teardown - def ackTest(self, acker, ack_capacity=None): - # send a bunch of messages - snd = self.ssn.sender(ACK_QC, durable=self.durable()) - contents = [self.content("ackTest", i) for i in range(15)] - for c in contents: - snd.send(c) - - # drain the queue, verify the messages are there and then close - # without acking - rcv = self.ssn.receiver(ACK_QC) - self.drain(rcv, expected=contents) - self.ssn.close() - - # drain the queue again, verify that they are all the messages - # were requeued, and ack this time before closing - self.ssn = self.conn.session() - if ack_capacity is not None: - self.ssn.ack_capacity = ack_capacity - rcv = self.ssn.receiver(ACK_QC) - self.drain(rcv, expected=contents) - acker(self.ssn) - self.ssn.close() - - # drain the queue a final time and verify that the messages were - # dequeued - self.ssn = self.conn.session() - rcv = self.ssn.receiver(ACK_QD) - self.assertEmpty(rcv) - - def testAcknowledge(self): - self.ackTest(lambda ssn: ssn.acknowledge()) - - def testAcknowledgeAsync(self): - self.ackTest(lambda ssn: ssn.acknowledge(sync=False)) - - def testAcknowledgeAsyncAckCap0(self): - try: - try: - self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 0) - assert False, "acknowledge shouldn't succeed with ack_capacity of zero" - except InsufficientCapacity: - pass - finally: - self.ssn.ack_capacity = UNLIMITED - self.drain(self.ssn.receiver(ACK_QD)) - self.ssn.acknowledge() - - def testAcknowledgeAsyncAckCap1(self): - self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 1) - - def testAcknowledgeAsyncAckCap5(self): - self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 5) - - def testAcknowledgeAsyncAckCapUNLIMITED(self): - self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED) - - def send(self, ssn, queue, base, count=1): - snd = ssn.sender(queue, durable=self.durable()) - contents = [] - for i in range(count): - c = self.content(base, i) - snd.send(c) - contents.append(c) - snd.close() - return contents - - def txTest(self, commit): - TX_Q = 'test-tx-queue; {create: sender, delete: receiver}' - TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}' - txssn = self.conn.session(transactional=True) - contents = self.send(self.ssn, TX_Q, "txTest", 3) - txrcv = txssn.receiver(TX_Q) - txsnd = txssn.sender(TX_Q_COPY, durable=self.durable()) - rcv = self.ssn.receiver(txrcv.source) - copy_rcv = self.ssn.receiver(txsnd.target) - self.assertEmpty(copy_rcv) - for i in range(3): - m = txrcv.fetch(0) - txsnd.send(m) - self.assertEmpty(copy_rcv) - txssn.acknowledge() - if commit: - txssn.commit() - self.assertEmpty(rcv) - assert contents == self.drain(copy_rcv) - else: - txssn.rollback() - assert contents == self.drain(rcv) - self.assertEmpty(copy_rcv) - self.ssn.acknowledge() - - def testCommit(self): - self.txTest(True) - - def testRollback(self): - self.txTest(False) - - def txTestSend(self, commit): - TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}' - txssn = self.conn.session(transactional=True) - contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3) - rcv = self.ssn.receiver(TX_SEND_Q) - self.assertEmpty(rcv) - - if commit: - txssn.commit() - assert contents == self.drain(rcv) - self.ssn.acknowledge() - else: - txssn.rollback() - self.assertEmpty(rcv) - txssn.commit() - self.assertEmpty(rcv) - - def testCommitSend(self): - self.txTestSend(True) - - def testRollbackSend(self): - self.txTestSend(False) - - def txTestAck(self, commit): - TX_ACK_QC = 'test-tx-ack-queue; {create: always}' - TX_ACK_QD = 'test-tx-ack-queue; {delete: always}' - txssn = self.conn.session(transactional=True) - txrcv = txssn.receiver(TX_ACK_QC) - self.assertEmpty(txrcv) - contents = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3) - assert contents == self.drain(txrcv) - - if commit: - txssn.acknowledge() - else: - txssn.rollback() - drained = self.drain(txrcv) - assert contents == drained, "expected %s, got %s" % (contents, drained) - txssn.acknowledge() - txssn.rollback() - assert contents == self.drain(txrcv) - txssn.commit() # commit without ack - self.assertEmpty(txrcv) - - txssn.close() - - txssn = self.conn.session(transactional=True) - txrcv = txssn.receiver(TX_ACK_QC) - assert contents == self.drain(txrcv) - txssn.acknowledge() - txssn.commit() - rcv = self.ssn.receiver(TX_ACK_QD) - self.assertEmpty(rcv) - txssn.close() - self.assertEmpty(rcv) - - def testCommitAck(self): - self.txTestAck(True) - - def testRollbackAck(self): - self.txTestAck(False) - - def testClose(self): - self.ssn.close() - try: - self.ping(self.ssn) - assert False, "ping succeeded" - except Disconnected: - pass - -RECEIVER_Q = 'test-receiver-queue; {create: always, delete: always}' - -class ReceiverTests(Base): - - def setup_connection(self): - return Connection.open(self.broker.host, self.broker.port, - reconnect=self.reconnect()) - - def setup_session(self): - return self.conn.session() - - def setup_sender(self): - return self.ssn.sender(RECEIVER_Q) - - def setup_receiver(self): - return self.ssn.receiver(RECEIVER_Q) - - def send(self, base, count = None): - content = self.content(base, count) - self.snd.send(content) - return content - - def testFetch(self): - try: - msg = self.rcv.fetch(0) - assert False, "unexpected message: %s" % msg - except Empty: - pass - try: - start = time.time() - msg = self.rcv.fetch(self.delay()) - assert False, "unexpected message: %s" % msg - except Empty: - elapsed = time.time() - start - assert elapsed >= self.delay() - - one = self.send("testFetch", 1) - two = self.send("testFetch", 2) - three = self.send("testFetch", 3) - msg = self.rcv.fetch(0) - assert msg.content == one - msg = self.rcv.fetch(self.delay()) - assert msg.content == two - msg = self.rcv.fetch() - assert msg.content == three - self.ssn.acknowledge() - - def testCapacityIncrease(self): - content = self.send("testCapacityIncrease") - self.sleep() - assert self.rcv.pending() == 0 - self.rcv.capacity = UNLIMITED - self.sleep() - assert self.rcv.pending() == 1 - msg = self.rcv.fetch(0) - assert msg.content == content - assert self.rcv.pending() == 0 - self.ssn.acknowledge() - - def testCapacityDecrease(self): - self.rcv.capacity = UNLIMITED - one = self.send("testCapacityDecrease", 1) - self.sleep() - assert self.rcv.pending() == 1 - msg = self.rcv.fetch(0) - assert msg.content == one - - self.rcv.capacity = 0 - - two = self.send("testCapacityDecrease", 2) - self.sleep() - assert self.rcv.pending() == 0 - msg = self.rcv.fetch(0) - assert msg.content == two - - self.ssn.acknowledge() - - def testCapacity(self): - self.rcv.capacity = 5 - self.assertPending(self.rcv, 0) - - for i in range(15): - self.send("testCapacity", i) - self.sleep() - self.assertPending(self.rcv, 5) - - self.drain(self.rcv, limit = 5) - self.sleep() - self.assertPending(self.rcv, 5) - - drained = self.drain(self.rcv) - assert len(drained) == 10, "%s, %s" % (len(drained), drained) - self.assertPending(self.rcv, 0) - - self.ssn.acknowledge() - - def testCapacityUNLIMITED(self): - self.rcv.capacity = UNLIMITED - self.assertPending(self.rcv, 0) - - for i in range(10): - self.send("testCapacityUNLIMITED", i) - self.sleep() - self.assertPending(self.rcv, 10) - - self.drain(self.rcv) - self.assertPending(self.rcv, 0) - - self.ssn.acknowledge() - - def testPending(self): - self.rcv.capacity = UNLIMITED - assert self.rcv.pending() == 0 - - for i in range(3): - self.send("testPending", i) - self.sleep() - assert self.rcv.pending() == 3 - - for i in range(3, 10): - self.send("testPending", i) - self.sleep() - assert self.rcv.pending() == 10 - - self.drain(self.rcv, limit=3) - assert self.rcv.pending() == 7 - - self.drain(self.rcv) - assert self.rcv.pending() == 0 - - self.ssn.acknowledge() - - def testDoubleClose(self): - m1 = self.content("testDoubleClose", 1) - m2 = self.content("testDoubleClose", 2) - - snd = self.ssn.sender("""test-double-close; { - create: always, - delete: sender, - node-properties: { - type: topic - } -} -""") - r1 = self.ssn.receiver(snd.target) - r2 = self.ssn.receiver(snd.target) - snd.send(m1) - self.drain(r1, expected=[m1]) - self.drain(r2, expected=[m1]) - r1.close() - snd.send(m2) - self.drain(r2, expected=[m2]) - r2.close() - - # XXX: need testClose - - def testMode(self): - msgs = [self.content("testMode", 1), - self.content("testMode", 2), - self.content("testMode", 3)] - - for m in msgs: - self.snd.send(m) - - rb = self.ssn.receiver('test-receiver-queue; {mode: browse}') - rc = self.ssn.receiver('test-receiver-queue; {mode: consume}') - self.drain(rb, expected=msgs) - self.drain(rc, expected=msgs) - rb2 = self.ssn.receiver(rb.source) - self.assertEmpty(rb2) - self.drain(self.rcv, expected=[]) - -class AddressTests(Base): - - def setup_connection(self): - return Connection.open(self.broker.host, self.broker.port, - reconnect=self.reconnect()) - - def setup_session(self): - return self.conn.session() - - def badOption(self, options, error): - try: - self.ssn.sender("test-bad-options-snd; %s" % options) - assert False - except SendError, e: - assert "error in options: %s" % error == str(e), e - - try: - self.ssn.receiver("test-bad-options-rcv; %s" % options) - assert False - except ReceiveError, e: - assert "error in options: %s" % error == str(e), e - - def testIllegalKey(self): - self.badOption("{create: always, node-properties: " - "{this-property-does-not-exist: 3}}", - "node-properties: this-property-does-not-exist: " - "illegal key") - - def testWrongValue(self): - self.badOption("{create: asdf}", "create: asdf not in " - "('always', 'sender', 'receiver', 'never')") - - def testWrongType1(self): - self.badOption("{node-properties: asdf}", - "node-properties: asdf is not a map") - - def testWrongType2(self): - self.badOption("{node-properties: {durable: []}}", - "node-properties: durable: [] is not a bool") - - def testNonQueueBindings(self): - self.badOption("{node-properties: {type: topic, x-properties: " - "{bindings: []}}}", - "node-properties: x-properties: bindings: " - "bindings are only permitted on nodes of type queue") - - def testCreateQueue(self): - snd = self.ssn.sender("test-create-queue; {create: always, delete: always, " - "node-properties: {type: queue, durable: False, " - "x-properties: {auto_delete: true}}}") - content = self.content("testCreateQueue") - snd.send(content) - rcv = self.ssn.receiver("test-create-queue") - self.drain(rcv, expected=[content]) - - def createExchangeTest(self, props=""): - addr = """test-create-exchange; { - create: always, - delete: always, - node-properties: { - type: topic, - durable: False, - x-properties: {auto_delete: true, %s} - } - }""" % props - snd = self.ssn.sender(addr) - snd.send("ping") - rcv1 = self.ssn.receiver("test-create-exchange/first") - rcv2 = self.ssn.receiver("test-create-exchange/first") - rcv3 = self.ssn.receiver("test-create-exchange/second") - for r in (rcv1, rcv2, rcv3): - try: - r.fetch(0) - assert False - except Empty: - pass - msg1 = Message(self.content("testCreateExchange", 1), subject="first") - msg2 = Message(self.content("testCreateExchange", 2), subject="second") - snd.send(msg1) - snd.send(msg2) - self.drain(rcv1, expected=[msg1.content]) - self.drain(rcv2, expected=[msg1.content]) - self.drain(rcv3, expected=[msg2.content]) - - def testCreateExchange(self): - self.createExchangeTest() - - def testCreateExchangeDirect(self): - self.createExchangeTest("type: direct") - - def testCreateExchangeTopic(self): - self.createExchangeTest("type: topic") - - def testDeleteBySender(self): - snd = self.ssn.sender("test-delete; {create: always}") - snd.send("ping") - snd.close() - snd = self.ssn.sender("test-delete; {delete: always}") - snd.send("ping") - snd.close() - try: - self.ssn.sender("test-delete") - except SendError, e: - assert "no such queue" in str(e) - - def testDeleteByReceiver(self): - rcv = self.ssn.receiver("test-delete; {create: always, delete: always}") - try: - rcv.fetch(0) - except Empty: - pass - rcv.close() - - try: - self.ssn.receiver("test-delete") - assert False - except ReceiveError, e: - assert "no such queue" in str(e) - - def testDeleteSpecial(self): - snd = self.ssn.sender("amq.topic; {delete: always}") - snd.send("asdf") - try: - snd.close() - except SessionError, e: - assert "Cannot delete default exchange" in str(e) - # XXX: need to figure out close after error - self.conn._remove_session(self.ssn) - - def testBindings(self): - snd = self.ssn.sender(""" -test-bindings-queue; { - create: always, - delete: always, - node-properties: { - x-properties: { - bindings: ["amq.topic/a.#", "amq.direct/b", "amq.topic/c.*"] - } - } -} -""") - snd.send("one") - snd_a = self.ssn.sender("amq.topic/a.foo") - snd_b = self.ssn.sender("amq.direct/b") - snd_c = self.ssn.sender("amq.topic/c.bar") - snd_a.send("two") - snd_b.send("three") - snd_c.send("four") - rcv = self.ssn.receiver("test-bindings-queue") - self.drain(rcv, expected=["one", "two", "three", "four"]) - - def testBindingsAdditive(self): - m1 = self.content("testBindingsAdditive", 1) - m2 = self.content("testBindingsAdditive", 2) - m3 = self.content("testBindingsAdditive", 3) - m4 = self.content("testBindingsAdditive", 4) - - snd = self.ssn.sender(""" -test-bindings-additive-queue; { - create: always, - delete: always, - node-properties: { - x-properties: { - bindings: ["amq.topic/a"] - } - } -} -""") - - snd_a = self.ssn.sender("amq.topic/a") - snd_b = self.ssn.sender("amq.topic/b") - - snd_a.send(m1) - snd_b.send(m2) - - rcv = self.ssn.receiver("test-bindings-additive-queue") - self.drain(rcv, expected=[m1]) - - new_snd = self.ssn.sender(""" -test-bindings-additive-queue; { - node-properties: { - x-properties: { - bindings: ["amq.topic/b"] - } - } -} -""") - - new_snd.send(m3) - snd_b.send(m4) - self.drain(rcv, expected=[m3, m4]) - - def testSubjectOverride(self): - snd = self.ssn.sender("amq.topic/a") - rcv_a = self.ssn.receiver("amq.topic/a") - rcv_b = self.ssn.receiver("amq.topic/b") - m1 = self.content("testSubjectOverride", 1) - m2 = self.content("testSubjectOverride", 2) - snd.send(m1) - snd.send(Message(subject="b", content=m2)) - self.drain(rcv_a, expected=[m1]) - self.drain(rcv_b, expected=[m2]) - - def testSubjectDefault(self): - m1 = self.content("testSubjectDefault", 1) - m2 = self.content("testSubjectDefault", 2) - snd = self.ssn.sender("amq.topic/a") - rcv = self.ssn.receiver("amq.topic") - snd.send(m1) - snd.send(Message(subject="b", content=m2)) - e1 = rcv.fetch(timeout=0) - e2 = rcv.fetch(timeout=0) - assert e1.subject == "a", "subject: %s" % e1.subject - assert e2.subject == "b", "subject: %s" % e2.subject - self.assertEmpty(rcv) - -NOSUCH_Q = "this-queue-should-not-exist" -UNPARSEABLE_ADDR = "name/subject; {bad options" -UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3" - -class AddressErrorTests(Base): - - def setup_connection(self): - return Connection.open(self.broker.host, self.broker.port, - reconnect=self.reconnect()) - - def setup_session(self): - return self.conn.session() - - def senderErrorTest(self, addr, exc, check=lambda e: True): - try: - self.ssn.sender(addr, durable=self.durable()) - assert False, "sender creation succeeded" - except exc, e: - assert check(e), "unexpected error: %s" % compat.format_exc(e) - - def receiverErrorTest(self, addr, exc, check=lambda e: True): - try: - self.ssn.receiver(addr) - assert False, "receiver creation succeeded" - except exc, e: - assert check(e), "unexpected error: %s" % compat.format_exc(e) - - def testNoneTarget(self): - # XXX: should have specific exception for this - self.senderErrorTest(None, SendError) - - def testNoneSource(self): - # XXX: should have specific exception for this - self.receiverErrorTest(None, ReceiveError) - - def testNoTarget(self): - # XXX: should have specific exception for this - self.senderErrorTest(NOSUCH_Q, SendError, lambda e: NOSUCH_Q in str(e)) - - def testNoSource(self): - # XXX: should have specific exception for this - self.receiverErrorTest(NOSUCH_Q, ReceiveError, lambda e: NOSUCH_Q in str(e)) - - def testUnparseableTarget(self): - # XXX: should have specific exception for this - self.senderErrorTest(UNPARSEABLE_ADDR, SendError, - lambda e: "expecting COLON" in str(e)) - - def testUnparseableSource(self): - # XXX: should have specific exception for this - self.receiverErrorTest(UNPARSEABLE_ADDR, ReceiveError, - lambda e: "expecting COLON" in str(e)) - - def testUnlexableTarget(self): - # XXX: should have specific exception for this - self.senderErrorTest(UNLEXABLE_ADDR, SendError, - lambda e: "unrecognized characters" in str(e)) - - def testUnlexableSource(self): - # XXX: should have specific exception for this - self.receiverErrorTest(UNLEXABLE_ADDR, ReceiveError, - lambda e: "unrecognized characters" in str(e)) - - def testInvalidMode(self): - # XXX: should have specific exception for this - self.receiverErrorTest('name; {mode: "this-is-a-bad-receiver-mode"}', - ReceiveError, - lambda e: "not in ('browse', 'consume')" in str(e)) - -SENDER_Q = 'test-sender-q; {create: always, delete: always}' - -class SenderTests(Base): - - def setup_connection(self): - return Connection.open(self.broker.host, self.broker.port, - reconnect=self.reconnect()) - - def setup_session(self): - return self.conn.session() - - def setup_sender(self): - return self.ssn.sender(SENDER_Q) - - def setup_receiver(self): - return self.ssn.receiver(SENDER_Q) - - def checkContent(self, content): - self.snd.send(content) - msg = self.rcv.fetch(0) - assert msg.content == content - - out = Message(content) - self.snd.send(out) - echo = self.rcv.fetch(0) - assert out.content == echo.content - assert echo.content == msg.content - self.ssn.acknowledge() - - def testSendString(self): - self.checkContent(self.content("testSendString")) - - def testSendList(self): - self.checkContent(["testSendList", 1, 3.14, self.test_id]) - - def testSendMap(self): - self.checkContent({"testSendMap": self.test_id, "pie": "blueberry", "pi": 3.14}) - - def asyncTest(self, capacity): - self.snd.capacity = capacity - msgs = [self.content("asyncTest", i) for i in range(15)] - for m in msgs: - self.snd.send(m, sync=False) - drained = self.drain(self.rcv, timeout=self.delay()) - assert msgs == drained, "expected %s, got %s" % (msgs, drained) - self.ssn.acknowledge() - - def testSendAsyncCapacity0(self): - try: - self.asyncTest(0) - assert False, "send shouldn't succeed with zero capacity" - except InsufficientCapacity: - # this is expected - pass - - def testSendAsyncCapacity1(self): - self.asyncTest(1) - - def testSendAsyncCapacity5(self): - self.asyncTest(5) - - def testSendAsyncCapacityUNLIMITED(self): - self.asyncTest(UNLIMITED) - - def testCapacityTimeout(self): - self.snd.capacity = 1 - msgs = [] - caught = False - while len(msgs) < 100: - m = self.content("testCapacity", len(msgs)) - try: - self.snd.send(m, sync=False, timeout=0) - msgs.append(m) - except InsufficientCapacity: - caught = True - break - self.snd.sync() - self.drain(self.rcv, expected=msgs) - self.ssn.acknowledge() - assert caught, "did not exceed capacity" - -class MessageTests(Base): - - def testCreateString(self): - m = Message("string") - assert m.content == "string" - assert m.content_type is None - - def testCreateUnicode(self): - m = Message(u"unicode") - assert m.content == u"unicode" - assert m.content_type == "text/plain" - - def testCreateMap(self): - m = Message({}) - assert m.content == {} - assert m.content_type == "amqp/map" - - def testCreateList(self): - m = Message([]) - assert m.content == [] - assert m.content_type == "amqp/list" - - def testContentTypeOverride(self): - m = Message() - m.content_type = "text/html; charset=utf8" - m.content = u"" - assert m.content_type == "text/html; charset=utf8" - -ECHO_Q = 'test-message-echo-queue; {create: always, delete: always}' - -class MessageEchoTests(Base): - - def setup_connection(self): - return Connection.open(self.broker.host, self.broker.port, - reconnect=self.reconnect()) - - def setup_session(self): - return self.conn.session() - - def setup_sender(self): - return self.ssn.sender(ECHO_Q) - - def setup_receiver(self): - return self.ssn.receiver(ECHO_Q) - - def check(self, msg): - self.snd.send(msg) - echo = self.rcv.fetch(0) - - assert msg.id == echo.id - assert msg.subject == echo.subject - assert msg.user_id == echo.user_id - assert msg.to == echo.to - assert msg.reply_to == echo.reply_to - assert msg.correlation_id == echo.correlation_id - assert msg.properties == echo.properties - assert msg.content_type == echo.content_type - assert msg.content == echo.content, "%s, %s" % (msg, echo) - - self.ssn.acknowledge(echo) - - def testStringContent(self): - self.check(Message("string")) - - def testUnicodeContent(self): - self.check(Message(u"unicode")) - - - TEST_MAP = {"key1": "string", - "key2": u"unicode", - "key3": 3, - "key4": -3, - "key5": 3.14, - "key6": -3.14, - "key7": ["one", 2, 3.14], - "key8": [], - "key9": {"sub-key0": 3}} - - def testMapContent(self): - self.check(Message(MessageEchoTests.TEST_MAP)) - - def testListContent(self): - self.check(Message([])) - self.check(Message([1, 2, 3])) - self.check(Message(["one", 2, 3.14, {"four": 4}])) - - def testProperties(self): - msg = Message() - msg.to = "to-address" - msg.subject = "subject" - msg.correlation_id = str(self.test_id) - msg.properties = MessageEchoTests.TEST_MAP - msg.reply_to = "reply-address" - self.check(msg) - -class TestTestsXXX(Test): - - def testFoo(self): - print "this test has output" - - def testBar(self): - print "this test "*8 - print "has"*10 - print "a"*75 - print "lot of"*10 - print "output"*10 - - def testQux(self): - import sys - sys.stdout.write("this test has output with no newline") - - def testQuxFail(self): - import sys - sys.stdout.write("this test has output with no newline") - fdsa diff --git a/qpid/python/qpid/tests/messaging/__init__.py b/qpid/python/qpid/tests/messaging/__init__.py new file mode 100644 index 0000000000..6785614f41 --- /dev/null +++ b/qpid/python/qpid/tests/messaging/__init__.py @@ -0,0 +1,106 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import time +from qpid.messaging import * +from qpid.tests import Test + +class Base(Test): + + def setup_connection(self): + return None + + def setup_session(self): + return None + + def setup_sender(self): + return None + + def setup_receiver(self): + return None + + def setup(self): + self.test_id = uuid4() + self.broker = self.config.broker + try: + self.conn = self.setup_connection() + except ConnectError, e: + raise Skipped(e) + self.ssn = self.setup_session() + self.snd = self.setup_sender() + if self.snd is not None: + self.snd.durable = self.durable() + self.rcv = self.setup_receiver() + + def teardown(self): + if self.conn is not None and self.conn.connected(): + self.conn.close() + + def content(self, base, count = None): + if count is None: + return "%s[%s]" % (base, self.test_id) + else: + return "%s[%s, %s]" % (base, count, self.test_id) + + def ping(self, ssn): + PING_Q = 'ping-queue; {create: always, delete: always}' + # send a message + sender = ssn.sender(PING_Q, durable=self.durable()) + content = self.content("ping") + sender.send(content) + receiver = ssn.receiver(PING_Q) + msg = receiver.fetch(0) + ssn.acknowledge() + assert msg.content == content, "expected %r, got %r" % (content, msg.content) + + def drain(self, rcv, limit=None, timeout=0, expected=None): + contents = [] + try: + while limit is None or len(contents) < limit: + contents.append(rcv.fetch(timeout=timeout).content) + except Empty: + pass + if expected is not None: + assert expected == contents, "expected %s, got %s" % (expected, contents) + return contents + + def assertEmpty(self, rcv): + contents = self.drain(rcv) + assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents) + + def assertPending(self, rcv, expected): + p = rcv.pending() + assert p == expected, "expected %s, got %s" % (expected, p) + + def sleep(self): + time.sleep(self.delay()) + + def delay(self): + return float(self.config.defines.get("delay", "2")) + + def get_bool(self, name): + return self.config.defines.get(name, "false").lower() in ("true", "yes", "1") + + def durable(self): + return self.get_bool("durable") + + def reconnect(self): + return self.get_bool("reconnect") + +import address, endpoints, message diff --git a/qpid/python/qpid/tests/messaging/address.py b/qpid/python/qpid/tests/messaging/address.py new file mode 100644 index 0000000000..7adbc0c6f7 --- /dev/null +++ b/qpid/python/qpid/tests/messaging/address.py @@ -0,0 +1,309 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +from qpid.tests import Test +from qpid.messaging.address import lex, parse, ParseError, EOF, ID, NUMBER, SYM, WSPACE, \ + LEXER +from qpid.lexer import Token +from qpid.harness import Skipped +from qpid.tests.parser import ParserBase + +def indent(st): + return " " + st.replace("\n", "\n ") + +def pprint_address(name, subject, options): + return "NAME: %s\nSUBJECT: %s\nOPTIONS: %s" % \ + (pprint(name), pprint(subject), pprint(options)) + +def pprint(o): + if isinstance(o, dict): + return pprint_map(o) + elif isinstance(o, list): + return pprint_list(o) + elif isinstance(o, basestring): + return pprint_string(o) + else: + return repr(o) + +def pprint_map(m): + items = ["%s: %s" % (pprint(k), pprint(v)) for k, v in m.items()] + items.sort() + return pprint_items("{", items, "}") + +def pprint_list(l): + return pprint_items("[", [pprint(x) for x in l], "]") + +def pprint_items(start, items, end): + if items: + return "%s\n%s\n%s" % (start, ",\n".join([indent(i) for i in items]), end) + else: + return "%s%s" % (start, end) + +def pprint_string(s): + result = "'" + for c in s: + if c == "'": + result += "\\'" + elif c == "\n": + result += "\\n" + elif ord(c) >= 0x80: + result += "\\u%04x" % ord(c) + else: + result += c + result += "'" + return result + +class AddressTests(ParserBase, Test): + + EXCLUDE = (WSPACE, EOF) + + def fields(self, line, n): + result = line.split(":", n - 1) + result.extend([None]*(n - len(result))) + return result + + def call(self, parser, mode, input): + try: + from subprocess import Popen, PIPE, STDOUT + po = Popen([parser, mode], stdin=PIPE, stdout=PIPE, stderr=STDOUT) + except ImportError, e: + raise Skipped("%s" % e) + except OSError, e: + raise Skipped("%s: %s" % (e, parser)) + out, _ = po.communicate(input=input) + return out + + def parser(self): + return self.config.defines.get("address.parser") + + def do_lex(self, st): + parser = self.parser() + if parser: + out = self.call(parser, "lex", st) + lines = out.split("\n") + toks = [] + for line in lines: + if line.strip(): + name, position, value = self.fields(line, 3) + toks.append(Token(LEXER.type(name), value, position, st)) + return toks + else: + return lex(st) + + def do_parse(self, st): + return parse(st) + + def valid(self, addr, name=None, subject=None, options=None): + parser = self.parser() + if parser: + got = self.call(parser, "parse", addr) + expected = "%s\n" % pprint_address(name, subject, options) + assert expected == got, "expected\n%s\ngot\n%s" % (expected, got) + else: + ParserBase.valid(self, addr, (name, subject, options)) + + def invalid(self, addr, error=None): + parser = self.parser() + if parser: + got = self.call(parser, "parse", addr) + expected = "ERROR: %s\n" % error + assert expected == got, "expected %r, got %r" % (expected, got) + else: + ParserBase.invalid(self, addr, error) + + def testDashInId1(self): + self.lex("foo-bar", ID) + + def testDashInId2(self): + self.lex("foo-3", ID) + + def testDashAlone1(self): + self.lex("foo - bar", ID, SYM, ID) + + def testDashAlone2(self): + self.lex("foo - 3", ID, SYM, NUMBER) + + def testLeadingDash(self): + self.lex("-foo", SYM, ID) + + def testTrailingDash(self): + self.lex("foo-", ID, SYM) + + def testNegativeNum(self): + self.lex("-3", NUMBER) + + def testHash(self): + self.valid("foo/bar.#", "foo", "bar.#") + + def testStar(self): + self.valid("foo/bar.*", "foo", "bar.*") + + def testColon(self): + self.valid("foo.bar/baz.qux:moo:arf", "foo.bar", "baz.qux:moo:arf") + + def testOptions(self): + self.valid("foo.bar/baz.qux:moo:arf; {key: value}", + "foo.bar", "baz.qux:moo:arf", {"key": "value"}) + + def testOptionsTrailingComma(self): + self.valid("name/subject; {key: value,}", "name", "subject", + {"key": "value"}) + + def testSemiSubject(self): + self.valid("foo.bar/'baz.qux;moo:arf'; {key: value}", + "foo.bar", "baz.qux;moo:arf", {"key": "value"}) + + def testCommaSubject(self): + self.valid("foo.bar/baz.qux.{moo,arf}", "foo.bar", "baz.qux.{moo,arf}") + + def testCommaSubjectOptions(self): + self.valid("foo.bar/baz.qux.{moo,arf}; {key: value}", "foo.bar", + "baz.qux.{moo,arf}", {"key": "value"}) + + def testUnbalanced(self): + self.valid("foo.bar/baz.qux.{moo,arf; {key: value}", "foo.bar", + "baz.qux.{moo,arf", {"key": "value"}) + + def testSlashQuote(self): + self.valid("foo.bar\\/baz.qux.{moo,arf; {key: value}", + "foo.bar/baz.qux.{moo,arf", + None, {"key": "value"}) + + def testSlashHexEsc1(self): + self.valid("foo.bar\\x00baz.qux.{moo,arf; {key: value}", + "foo.bar\x00baz.qux.{moo,arf", + None, {"key": "value"}) + + def testSlashHexEsc2(self): + self.valid("foo.bar\\xffbaz.qux.{moo,arf; {key: value}", + "foo.bar\xffbaz.qux.{moo,arf", + None, {"key": "value"}) + + def testSlashHexEsc3(self): + self.valid("foo.bar\\xFFbaz.qux.{moo,arf; {key: value}", + "foo.bar\xFFbaz.qux.{moo,arf", + None, {"key": "value"}) + + def testSlashUnicode1(self): + self.valid("foo.bar\\u1234baz.qux.{moo,arf; {key: value}", + u"foo.bar\u1234baz.qux.{moo,arf", None, {"key": "value"}) + + def testSlashUnicode2(self): + self.valid("foo.bar\\u0000baz.qux.{moo,arf; {key: value}", + u"foo.bar\u0000baz.qux.{moo,arf", None, {"key": "value"}) + + def testSlashUnicode3(self): + self.valid("foo.bar\\uffffbaz.qux.{moo,arf; {key: value}", + u"foo.bar\uffffbaz.qux.{moo,arf", None, {"key": "value"}) + + def testSlashUnicode4(self): + self.valid("foo.bar\\uFFFFbaz.qux.{moo,arf; {key: value}", + u"foo.bar\uFFFFbaz.qux.{moo,arf", None, {"key": "value"}) + + def testNoName(self): + self.invalid("; {key: value}", + "unexpected token SEMI(;) line:1,0:; {key: value}") + + def testEmpty(self): + self.invalid("", "unexpected token EOF line:1,0:") + + def testNoNameSlash(self): + self.invalid("/asdf; {key: value}", + "unexpected token SLASH(/) line:1,0:/asdf; {key: value}") + + def testBadOptions1(self): + self.invalid("name/subject; {", + "expecting (NUMBER, STRING, ID, LBRACE, LBRACK, RBRACE), " + "got EOF line:1,15:name/subject; {") + + def testBadOptions2(self): + self.invalid("name/subject; { 3", + "expecting COLON, got EOF " + "line:1,17:name/subject; { 3") + + def testBadOptions3(self): + self.invalid("name/subject; { key:", + "expecting (NUMBER, STRING, ID, LBRACE, LBRACK), got EOF " + "line:1,20:name/subject; { key:") + + def testBadOptions4(self): + self.invalid("name/subject; { key: value", + "expecting (COMMA, RBRACE), got EOF " + "line:1,26:name/subject; { key: value") + + def testBadOptions5(self): + self.invalid("name/subject; { key: value asdf", + "expecting (COMMA, RBRACE), got ID(asdf) " + "line:1,27:name/subject; { key: value asdf") + + def testBadOptions6(self): + self.invalid("name/subject; { key: value,", + "expecting (NUMBER, STRING, ID, LBRACE, LBRACK, RBRACE), got EOF " + "line:1,27:name/subject; { key: value,") + + def testBadOptions7(self): + self.invalid("name/subject; { key: value } asdf", + "expecting EOF, got ID(asdf) " + "line:1,29:name/subject; { key: value } asdf") + + def testList1(self): + self.valid("name/subject; { key: [] }", "name", "subject", {"key": []}) + + def testList2(self): + self.valid("name/subject; { key: ['one'] }", "name", "subject", {"key": ['one']}) + + def testList3(self): + self.valid("name/subject; { key: [1, 2, 3] }", "name", "subject", + {"key": [1, 2, 3]}) + + def testList4(self): + self.valid("name/subject; { key: [1, [2, 3], 4] }", "name", "subject", + {"key": [1, [2, 3], 4]}) + + def testBadList1(self): + self.invalid("name/subject; { key: [ }", "expecting (NUMBER, STRING, ID, LBRACE, LBRACK), " + "got RBRACE(}) line:1,23:name/subject; { key: [ }") + + def testBadList2(self): + self.invalid("name/subject; { key: [ 1 }", "expecting (COMMA, RBRACK), " + "got RBRACE(}) line:1,25:name/subject; { key: [ 1 }") + + def testBadList3(self): + self.invalid("name/subject; { key: [ 1 2 }", "expecting (COMMA, RBRACK), " + "got NUMBER(2) line:1,25:name/subject; { key: [ 1 2 }") + + def testBadList4(self): + self.invalid("name/subject; { key: [ 1 2 ] }", "expecting (COMMA, RBRACK), " + "got NUMBER(2) line:1,25:name/subject; { key: [ 1 2 ] }") + + def testMap1(self): + self.valid("name/subject; { 'key': value }", + "name", "subject", {"key": "value"}) + + def testMap2(self): + self.valid("name/subject; { 1: value }", "name", "subject", {1: "value"}) + + def testMap3(self): + self.valid('name/subject; { "foo.bar": value }', + "name", "subject", {"foo.bar": "value"}) + + def testBoolean(self): + self.valid("name/subject; { true: True, false: False }", + "name", "subject", {"true": True, "false": False}) diff --git a/qpid/python/qpid/tests/messaging/endpoints.py b/qpid/python/qpid/tests/messaging/endpoints.py new file mode 100644 index 0000000000..2e70f13f3a --- /dev/null +++ b/qpid/python/qpid/tests/messaging/endpoints.py @@ -0,0 +1,878 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# setup, usage, teardown, errors(sync), errors(async), stress, soak, +# boundary-conditions, config + +import time +from qpid import compat +from qpid.harness import Skipped +from qpid.messaging import * +from qpid.tests.messaging import Base + +class SetupTests(Base): + + def testOpen(self): + # XXX: need to flesh out URL support/syntax + self.conn = Connection.open(self.broker.host, self.broker.port, + reconnect=self.reconnect()) + self.ping(self.conn.session()) + + def testConnect(self): + # XXX: need to flesh out URL support/syntax + self.conn = Connection(self.broker.host, self.broker.port, + reconnect=self.reconnect()) + self.conn.connect() + self.ping(self.conn.session()) + + def testConnectError(self): + try: + self.conn = Connection.open("localhost", 0) + assert False, "connect succeeded" + except ConnectError, e: + # XXX: should verify that e includes appropriate diagnostic info + pass + +class ConnectionTests(Base): + + def setup_connection(self): + return Connection.open(self.broker.host, self.broker.port, + reconnect=self.reconnect()) + + def testSessionAnon(self): + ssn1 = self.conn.session() + ssn2 = self.conn.session() + self.ping(ssn1) + self.ping(ssn2) + assert ssn1 is not ssn2 + + def testSessionNamed(self): + ssn1 = self.conn.session("one") + ssn2 = self.conn.session("two") + self.ping(ssn1) + self.ping(ssn2) + assert ssn1 is not ssn2 + assert ssn1 is self.conn.session("one") + assert ssn2 is self.conn.session("two") + + def testDisconnect(self): + ssn = self.conn.session() + self.ping(ssn) + self.conn.disconnect() + try: + self.ping(ssn) + assert False, "ping succeeded" + except Disconnected: + # this is the expected failure when pinging on a disconnected + # connection + pass + self.conn.connect() + self.ping(ssn) + + def testClose(self): + self.conn.close() + assert not self.conn.connected() + +ACK_QC = 'test-ack-queue; {create: always}' +ACK_QD = 'test-ack-queue; {delete: always}' + +class SessionTests(Base): + + def setup_connection(self): + return Connection.open(self.broker.host, self.broker.port, + reconnect=self.reconnect()) + + def setup_session(self): + return self.conn.session() + + def testSender(self): + snd = self.ssn.sender('test-snd-queue; {create: sender, delete: receiver}', + durable=self.durable()) + snd2 = self.ssn.sender(snd.target, durable=self.durable()) + assert snd is not snd2 + snd2.close() + + content = self.content("testSender") + snd.send(content) + rcv = self.ssn.receiver(snd.target) + msg = rcv.fetch(0) + assert msg.content == content + self.ssn.acknowledge(msg) + + def testReceiver(self): + rcv = self.ssn.receiver('test-rcv-queue; {create: always}') + rcv2 = self.ssn.receiver(rcv.source) + assert rcv is not rcv2 + rcv2.close() + + content = self.content("testReceiver") + snd = self.ssn.sender(rcv.source, durable=self.durable()) + snd.send(content) + msg = rcv.fetch(0) + assert msg.content == content + self.ssn.acknowledge(msg) + snd2 = self.ssn.receiver('test-rcv-queue; {delete: always}') + + def testDisconnectedReceiver(self): + self.conn.disconnect() + rcv = self.ssn.receiver("test-dis-rcv-queue; {create: always, delete: always}") + m = self.content("testDisconnectedReceiver") + self.conn.connect() + snd = self.ssn.sender("test-dis-rcv-queue") + snd.send(m) + self.drain(rcv, expected=[m]) + + def testNextReceiver(self): + ADDR = 'test-next-rcv-queue; {create: always, delete: always}' + rcv1 = self.ssn.receiver(ADDR, capacity=UNLIMITED) + rcv2 = self.ssn.receiver(ADDR, capacity=UNLIMITED) + rcv3 = self.ssn.receiver(ADDR, capacity=UNLIMITED) + + snd = self.ssn.sender(ADDR) + + msgs = [] + for i in range(10): + content = self.content("testNextReceiver", i) + snd.send(content) + msgs.append(content) + + fetched = [] + try: + while True: + rcv = self.ssn.next_receiver(timeout=self.delay()) + assert rcv in (rcv1, rcv2, rcv3) + assert rcv.pending() > 0 + fetched.append(rcv.fetch().content) + except Empty: + pass + assert msgs == fetched, "expecting %s, got %s" % (msgs, fetched) + self.ssn.acknowledge() + #we set the capacity to 0 to prevent the deletion of the queue - + #triggered the deletion policy when the first receiver is closed - + #resulting in session exceptions being issued for the remaining + #active subscriptions: + for r in [rcv1, rcv2, rcv3]: + r.capacity = 0 + + # XXX, we need a convenient way to assert that required queues are + # empty on setup, and possibly also to drain queues on teardown + def ackTest(self, acker, ack_capacity=None): + # send a bunch of messages + snd = self.ssn.sender(ACK_QC, durable=self.durable()) + contents = [self.content("ackTest", i) for i in range(15)] + for c in contents: + snd.send(c) + + # drain the queue, verify the messages are there and then close + # without acking + rcv = self.ssn.receiver(ACK_QC) + self.drain(rcv, expected=contents) + self.ssn.close() + + # drain the queue again, verify that they are all the messages + # were requeued, and ack this time before closing + self.ssn = self.conn.session() + if ack_capacity is not None: + self.ssn.ack_capacity = ack_capacity + rcv = self.ssn.receiver(ACK_QC) + self.drain(rcv, expected=contents) + acker(self.ssn) + self.ssn.close() + + # drain the queue a final time and verify that the messages were + # dequeued + self.ssn = self.conn.session() + rcv = self.ssn.receiver(ACK_QD) + self.assertEmpty(rcv) + + def testAcknowledge(self): + self.ackTest(lambda ssn: ssn.acknowledge()) + + def testAcknowledgeAsync(self): + self.ackTest(lambda ssn: ssn.acknowledge(sync=False)) + + def testAcknowledgeAsyncAckCap0(self): + try: + try: + self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 0) + assert False, "acknowledge shouldn't succeed with ack_capacity of zero" + except InsufficientCapacity: + pass + finally: + self.ssn.ack_capacity = UNLIMITED + self.drain(self.ssn.receiver(ACK_QD)) + self.ssn.acknowledge() + + def testAcknowledgeAsyncAckCap1(self): + self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 1) + + def testAcknowledgeAsyncAckCap5(self): + self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 5) + + def testAcknowledgeAsyncAckCapUNLIMITED(self): + self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED) + + def send(self, ssn, queue, base, count=1): + snd = ssn.sender(queue, durable=self.durable()) + contents = [] + for i in range(count): + c = self.content(base, i) + snd.send(c) + contents.append(c) + snd.close() + return contents + + def txTest(self, commit): + TX_Q = 'test-tx-queue; {create: sender, delete: receiver}' + TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}' + txssn = self.conn.session(transactional=True) + contents = self.send(self.ssn, TX_Q, "txTest", 3) + txrcv = txssn.receiver(TX_Q) + txsnd = txssn.sender(TX_Q_COPY, durable=self.durable()) + rcv = self.ssn.receiver(txrcv.source) + copy_rcv = self.ssn.receiver(txsnd.target) + self.assertEmpty(copy_rcv) + for i in range(3): + m = txrcv.fetch(0) + txsnd.send(m) + self.assertEmpty(copy_rcv) + txssn.acknowledge() + if commit: + txssn.commit() + self.assertEmpty(rcv) + assert contents == self.drain(copy_rcv) + else: + txssn.rollback() + assert contents == self.drain(rcv) + self.assertEmpty(copy_rcv) + self.ssn.acknowledge() + + def testCommit(self): + self.txTest(True) + + def testRollback(self): + self.txTest(False) + + def txTestSend(self, commit): + TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}' + txssn = self.conn.session(transactional=True) + contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3) + rcv = self.ssn.receiver(TX_SEND_Q) + self.assertEmpty(rcv) + + if commit: + txssn.commit() + assert contents == self.drain(rcv) + self.ssn.acknowledge() + else: + txssn.rollback() + self.assertEmpty(rcv) + txssn.commit() + self.assertEmpty(rcv) + + def testCommitSend(self): + self.txTestSend(True) + + def testRollbackSend(self): + self.txTestSend(False) + + def txTestAck(self, commit): + TX_ACK_QC = 'test-tx-ack-queue; {create: always}' + TX_ACK_QD = 'test-tx-ack-queue; {delete: always}' + txssn = self.conn.session(transactional=True) + txrcv = txssn.receiver(TX_ACK_QC) + self.assertEmpty(txrcv) + contents = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3) + assert contents == self.drain(txrcv) + + if commit: + txssn.acknowledge() + else: + txssn.rollback() + drained = self.drain(txrcv) + assert contents == drained, "expected %s, got %s" % (contents, drained) + txssn.acknowledge() + txssn.rollback() + assert contents == self.drain(txrcv) + txssn.commit() # commit without ack + self.assertEmpty(txrcv) + + txssn.close() + + txssn = self.conn.session(transactional=True) + txrcv = txssn.receiver(TX_ACK_QC) + assert contents == self.drain(txrcv) + txssn.acknowledge() + txssn.commit() + rcv = self.ssn.receiver(TX_ACK_QD) + self.assertEmpty(rcv) + txssn.close() + self.assertEmpty(rcv) + + def testCommitAck(self): + self.txTestAck(True) + + def testRollbackAck(self): + self.txTestAck(False) + + def testClose(self): + self.ssn.close() + try: + self.ping(self.ssn) + assert False, "ping succeeded" + except Disconnected: + pass + +RECEIVER_Q = 'test-receiver-queue; {create: always, delete: always}' + +class ReceiverTests(Base): + + def setup_connection(self): + return Connection.open(self.broker.host, self.broker.port, + reconnect=self.reconnect()) + + def setup_session(self): + return self.conn.session() + + def setup_sender(self): + return self.ssn.sender(RECEIVER_Q) + + def setup_receiver(self): + return self.ssn.receiver(RECEIVER_Q) + + def send(self, base, count = None): + content = self.content(base, count) + self.snd.send(content) + return content + + def testFetch(self): + try: + msg = self.rcv.fetch(0) + assert False, "unexpected message: %s" % msg + except Empty: + pass + try: + start = time.time() + msg = self.rcv.fetch(self.delay()) + assert False, "unexpected message: %s" % msg + except Empty: + elapsed = time.time() - start + assert elapsed >= self.delay() + + one = self.send("testFetch", 1) + two = self.send("testFetch", 2) + three = self.send("testFetch", 3) + msg = self.rcv.fetch(0) + assert msg.content == one + msg = self.rcv.fetch(self.delay()) + assert msg.content == two + msg = self.rcv.fetch() + assert msg.content == three + self.ssn.acknowledge() + + def testCapacityIncrease(self): + content = self.send("testCapacityIncrease") + self.sleep() + assert self.rcv.pending() == 0 + self.rcv.capacity = UNLIMITED + self.sleep() + assert self.rcv.pending() == 1 + msg = self.rcv.fetch(0) + assert msg.content == content + assert self.rcv.pending() == 0 + self.ssn.acknowledge() + + def testCapacityDecrease(self): + self.rcv.capacity = UNLIMITED + one = self.send("testCapacityDecrease", 1) + self.sleep() + assert self.rcv.pending() == 1 + msg = self.rcv.fetch(0) + assert msg.content == one + + self.rcv.capacity = 0 + + two = self.send("testCapacityDecrease", 2) + self.sleep() + assert self.rcv.pending() == 0 + msg = self.rcv.fetch(0) + assert msg.content == two + + self.ssn.acknowledge() + + def testCapacity(self): + self.rcv.capacity = 5 + self.assertPending(self.rcv, 0) + + for i in range(15): + self.send("testCapacity", i) + self.sleep() + self.assertPending(self.rcv, 5) + + self.drain(self.rcv, limit = 5) + self.sleep() + self.assertPending(self.rcv, 5) + + drained = self.drain(self.rcv) + assert len(drained) == 10, "%s, %s" % (len(drained), drained) + self.assertPending(self.rcv, 0) + + self.ssn.acknowledge() + + def testCapacityUNLIMITED(self): + self.rcv.capacity = UNLIMITED + self.assertPending(self.rcv, 0) + + for i in range(10): + self.send("testCapacityUNLIMITED", i) + self.sleep() + self.assertPending(self.rcv, 10) + + self.drain(self.rcv) + self.assertPending(self.rcv, 0) + + self.ssn.acknowledge() + + def testPending(self): + self.rcv.capacity = UNLIMITED + assert self.rcv.pending() == 0 + + for i in range(3): + self.send("testPending", i) + self.sleep() + assert self.rcv.pending() == 3 + + for i in range(3, 10): + self.send("testPending", i) + self.sleep() + assert self.rcv.pending() == 10 + + self.drain(self.rcv, limit=3) + assert self.rcv.pending() == 7 + + self.drain(self.rcv) + assert self.rcv.pending() == 0 + + self.ssn.acknowledge() + + def testDoubleClose(self): + m1 = self.content("testDoubleClose", 1) + m2 = self.content("testDoubleClose", 2) + + snd = self.ssn.sender("""test-double-close; { + create: always, + delete: sender, + node-properties: { + type: topic + } +} +""") + r1 = self.ssn.receiver(snd.target) + r2 = self.ssn.receiver(snd.target) + snd.send(m1) + self.drain(r1, expected=[m1]) + self.drain(r2, expected=[m1]) + r1.close() + snd.send(m2) + self.drain(r2, expected=[m2]) + r2.close() + + # XXX: need testClose + + def testMode(self): + msgs = [self.content("testMode", 1), + self.content("testMode", 2), + self.content("testMode", 3)] + + for m in msgs: + self.snd.send(m) + + rb = self.ssn.receiver('test-receiver-queue; {mode: browse}') + rc = self.ssn.receiver('test-receiver-queue; {mode: consume}') + self.drain(rb, expected=msgs) + self.drain(rc, expected=msgs) + rb2 = self.ssn.receiver(rb.source) + self.assertEmpty(rb2) + self.drain(self.rcv, expected=[]) + +class AddressTests(Base): + + def setup_connection(self): + return Connection.open(self.broker.host, self.broker.port, + reconnect=self.reconnect()) + + def setup_session(self): + return self.conn.session() + + def badOption(self, options, error): + try: + self.ssn.sender("test-bad-options-snd; %s" % options) + assert False + except SendError, e: + assert "error in options: %s" % error == str(e), e + + try: + self.ssn.receiver("test-bad-options-rcv; %s" % options) + assert False + except ReceiveError, e: + assert "error in options: %s" % error == str(e), e + + def testIllegalKey(self): + self.badOption("{create: always, node-properties: " + "{this-property-does-not-exist: 3}}", + "node-properties: this-property-does-not-exist: " + "illegal key") + + def testWrongValue(self): + self.badOption("{create: asdf}", "create: asdf not in " + "('always', 'sender', 'receiver', 'never')") + + def testWrongType1(self): + self.badOption("{node-properties: asdf}", + "node-properties: asdf is not a map") + + def testWrongType2(self): + self.badOption("{node-properties: {durable: []}}", + "node-properties: durable: [] is not a bool") + + def testNonQueueBindings(self): + self.badOption("{node-properties: {type: topic, x-properties: " + "{bindings: []}}}", + "node-properties: x-properties: bindings: " + "bindings are only permitted on nodes of type queue") + + def testCreateQueue(self): + snd = self.ssn.sender("test-create-queue; {create: always, delete: always, " + "node-properties: {type: queue, durable: False, " + "x-properties: {auto_delete: true}}}") + content = self.content("testCreateQueue") + snd.send(content) + rcv = self.ssn.receiver("test-create-queue") + self.drain(rcv, expected=[content]) + + def createExchangeTest(self, props=""): + addr = """test-create-exchange; { + create: always, + delete: always, + node-properties: { + type: topic, + durable: False, + x-properties: {auto_delete: true, %s} + } + }""" % props + snd = self.ssn.sender(addr) + snd.send("ping") + rcv1 = self.ssn.receiver("test-create-exchange/first") + rcv2 = self.ssn.receiver("test-create-exchange/first") + rcv3 = self.ssn.receiver("test-create-exchange/second") + for r in (rcv1, rcv2, rcv3): + try: + r.fetch(0) + assert False + except Empty: + pass + msg1 = Message(self.content("testCreateExchange", 1), subject="first") + msg2 = Message(self.content("testCreateExchange", 2), subject="second") + snd.send(msg1) + snd.send(msg2) + self.drain(rcv1, expected=[msg1.content]) + self.drain(rcv2, expected=[msg1.content]) + self.drain(rcv3, expected=[msg2.content]) + + def testCreateExchange(self): + self.createExchangeTest() + + def testCreateExchangeDirect(self): + self.createExchangeTest("type: direct") + + def testCreateExchangeTopic(self): + self.createExchangeTest("type: topic") + + def testDeleteBySender(self): + snd = self.ssn.sender("test-delete; {create: always}") + snd.send("ping") + snd.close() + snd = self.ssn.sender("test-delete; {delete: always}") + snd.send("ping") + snd.close() + try: + self.ssn.sender("test-delete") + except SendError, e: + assert "no such queue" in str(e) + + def testDeleteByReceiver(self): + rcv = self.ssn.receiver("test-delete; {create: always, delete: always}") + try: + rcv.fetch(0) + except Empty: + pass + rcv.close() + + try: + self.ssn.receiver("test-delete") + assert False + except ReceiveError, e: + assert "no such queue" in str(e) + + def testDeleteSpecial(self): + snd = self.ssn.sender("amq.topic; {delete: always}") + snd.send("asdf") + try: + snd.close() + except SessionError, e: + assert "Cannot delete default exchange" in str(e) + # XXX: need to figure out close after error + self.conn._remove_session(self.ssn) + + def testBindings(self): + snd = self.ssn.sender(""" +test-bindings-queue; { + create: always, + delete: always, + node-properties: { + x-properties: { + bindings: ["amq.topic/a.#", "amq.direct/b", "amq.topic/c.*"] + } + } +} +""") + snd.send("one") + snd_a = self.ssn.sender("amq.topic/a.foo") + snd_b = self.ssn.sender("amq.direct/b") + snd_c = self.ssn.sender("amq.topic/c.bar") + snd_a.send("two") + snd_b.send("three") + snd_c.send("four") + rcv = self.ssn.receiver("test-bindings-queue") + self.drain(rcv, expected=["one", "two", "three", "four"]) + + def testBindingsAdditive(self): + m1 = self.content("testBindingsAdditive", 1) + m2 = self.content("testBindingsAdditive", 2) + m3 = self.content("testBindingsAdditive", 3) + m4 = self.content("testBindingsAdditive", 4) + + snd = self.ssn.sender(""" +test-bindings-additive-queue; { + create: always, + delete: always, + node-properties: { + x-properties: { + bindings: ["amq.topic/a"] + } + } +} +""") + + snd_a = self.ssn.sender("amq.topic/a") + snd_b = self.ssn.sender("amq.topic/b") + + snd_a.send(m1) + snd_b.send(m2) + + rcv = self.ssn.receiver("test-bindings-additive-queue") + self.drain(rcv, expected=[m1]) + + new_snd = self.ssn.sender(""" +test-bindings-additive-queue; { + node-properties: { + x-properties: { + bindings: ["amq.topic/b"] + } + } +} +""") + + new_snd.send(m3) + snd_b.send(m4) + self.drain(rcv, expected=[m3, m4]) + + def testSubjectOverride(self): + snd = self.ssn.sender("amq.topic/a") + rcv_a = self.ssn.receiver("amq.topic/a") + rcv_b = self.ssn.receiver("amq.topic/b") + m1 = self.content("testSubjectOverride", 1) + m2 = self.content("testSubjectOverride", 2) + snd.send(m1) + snd.send(Message(subject="b", content=m2)) + self.drain(rcv_a, expected=[m1]) + self.drain(rcv_b, expected=[m2]) + + def testSubjectDefault(self): + m1 = self.content("testSubjectDefault", 1) + m2 = self.content("testSubjectDefault", 2) + snd = self.ssn.sender("amq.topic/a") + rcv = self.ssn.receiver("amq.topic") + snd.send(m1) + snd.send(Message(subject="b", content=m2)) + e1 = rcv.fetch(timeout=0) + e2 = rcv.fetch(timeout=0) + assert e1.subject == "a", "subject: %s" % e1.subject + assert e2.subject == "b", "subject: %s" % e2.subject + self.assertEmpty(rcv) + +NOSUCH_Q = "this-queue-should-not-exist" +UNPARSEABLE_ADDR = "name/subject; {bad options" +UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3" + +class AddressErrorTests(Base): + + def setup_connection(self): + return Connection.open(self.broker.host, self.broker.port, + reconnect=self.reconnect()) + + def setup_session(self): + return self.conn.session() + + def senderErrorTest(self, addr, exc, check=lambda e: True): + try: + self.ssn.sender(addr, durable=self.durable()) + assert False, "sender creation succeeded" + except exc, e: + assert check(e), "unexpected error: %s" % compat.format_exc(e) + + def receiverErrorTest(self, addr, exc, check=lambda e: True): + try: + self.ssn.receiver(addr) + assert False, "receiver creation succeeded" + except exc, e: + assert check(e), "unexpected error: %s" % compat.format_exc(e) + + def testNoneTarget(self): + # XXX: should have specific exception for this + self.senderErrorTest(None, SendError) + + def testNoneSource(self): + # XXX: should have specific exception for this + self.receiverErrorTest(None, ReceiveError) + + def testNoTarget(self): + # XXX: should have specific exception for this + self.senderErrorTest(NOSUCH_Q, SendError, lambda e: NOSUCH_Q in str(e)) + + def testNoSource(self): + # XXX: should have specific exception for this + self.receiverErrorTest(NOSUCH_Q, ReceiveError, lambda e: NOSUCH_Q in str(e)) + + def testUnparseableTarget(self): + # XXX: should have specific exception for this + self.senderErrorTest(UNPARSEABLE_ADDR, SendError, + lambda e: "expecting COLON" in str(e)) + + def testUnparseableSource(self): + # XXX: should have specific exception for this + self.receiverErrorTest(UNPARSEABLE_ADDR, ReceiveError, + lambda e: "expecting COLON" in str(e)) + + def testUnlexableTarget(self): + # XXX: should have specific exception for this + self.senderErrorTest(UNLEXABLE_ADDR, SendError, + lambda e: "unrecognized characters" in str(e)) + + def testUnlexableSource(self): + # XXX: should have specific exception for this + self.receiverErrorTest(UNLEXABLE_ADDR, ReceiveError, + lambda e: "unrecognized characters" in str(e)) + + def testInvalidMode(self): + # XXX: should have specific exception for this + self.receiverErrorTest('name; {mode: "this-is-a-bad-receiver-mode"}', + ReceiveError, + lambda e: "not in ('browse', 'consume')" in str(e)) + +SENDER_Q = 'test-sender-q; {create: always, delete: always}' + +class SenderTests(Base): + + def setup_connection(self): + return Connection.open(self.broker.host, self.broker.port, + reconnect=self.reconnect()) + + def setup_session(self): + return self.conn.session() + + def setup_sender(self): + return self.ssn.sender(SENDER_Q) + + def setup_receiver(self): + return self.ssn.receiver(SENDER_Q) + + def checkContent(self, content): + self.snd.send(content) + msg = self.rcv.fetch(0) + assert msg.content == content + + out = Message(content) + self.snd.send(out) + echo = self.rcv.fetch(0) + assert out.content == echo.content + assert echo.content == msg.content + self.ssn.acknowledge() + + def testSendString(self): + self.checkContent(self.content("testSendString")) + + def testSendList(self): + self.checkContent(["testSendList", 1, 3.14, self.test_id]) + + def testSendMap(self): + self.checkContent({"testSendMap": self.test_id, "pie": "blueberry", "pi": 3.14}) + + def asyncTest(self, capacity): + self.snd.capacity = capacity + msgs = [self.content("asyncTest", i) for i in range(15)] + for m in msgs: + self.snd.send(m, sync=False) + drained = self.drain(self.rcv, timeout=self.delay()) + assert msgs == drained, "expected %s, got %s" % (msgs, drained) + self.ssn.acknowledge() + + def testSendAsyncCapacity0(self): + try: + self.asyncTest(0) + assert False, "send shouldn't succeed with zero capacity" + except InsufficientCapacity: + # this is expected + pass + + def testSendAsyncCapacity1(self): + self.asyncTest(1) + + def testSendAsyncCapacity5(self): + self.asyncTest(5) + + def testSendAsyncCapacityUNLIMITED(self): + self.asyncTest(UNLIMITED) + + def testCapacityTimeout(self): + self.snd.capacity = 1 + msgs = [] + caught = False + while len(msgs) < 100: + m = self.content("testCapacity", len(msgs)) + try: + self.snd.send(m, sync=False, timeout=0) + msgs.append(m) + except InsufficientCapacity: + caught = True + break + self.snd.sync() + self.drain(self.rcv, expected=msgs) + self.ssn.acknowledge() + assert caught, "did not exceed capacity" diff --git a/qpid/python/qpid/tests/messaging/message.py b/qpid/python/qpid/tests/messaging/message.py new file mode 100644 index 0000000000..ef2ec1aac4 --- /dev/null +++ b/qpid/python/qpid/tests/messaging/message.py @@ -0,0 +1,116 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.messaging import * +from qpid.tests.messaging import Base + +class MessageTests(Base): + + def testCreateString(self): + m = Message("string") + assert m.content == "string" + assert m.content_type is None + + def testCreateUnicode(self): + m = Message(u"unicode") + assert m.content == u"unicode" + assert m.content_type == "text/plain" + + def testCreateMap(self): + m = Message({}) + assert m.content == {} + assert m.content_type == "amqp/map" + + def testCreateList(self): + m = Message([]) + assert m.content == [] + assert m.content_type == "amqp/list" + + def testContentTypeOverride(self): + m = Message() + m.content_type = "text/html; charset=utf8" + m.content = u"" + assert m.content_type == "text/html; charset=utf8" + +ECHO_Q = 'test-message-echo-queue; {create: always, delete: always}' + +class MessageEchoTests(Base): + + def setup_connection(self): + return Connection.open(self.broker.host, self.broker.port, + reconnect=self.reconnect()) + + def setup_session(self): + return self.conn.session() + + def setup_sender(self): + return self.ssn.sender(ECHO_Q) + + def setup_receiver(self): + return self.ssn.receiver(ECHO_Q) + + def check(self, msg): + self.snd.send(msg) + echo = self.rcv.fetch(0) + + assert msg.id == echo.id + assert msg.subject == echo.subject + assert msg.user_id == echo.user_id + assert msg.to == echo.to + assert msg.reply_to == echo.reply_to + assert msg.correlation_id == echo.correlation_id + assert msg.properties == echo.properties + assert msg.content_type == echo.content_type + assert msg.content == echo.content, "%s, %s" % (msg, echo) + + self.ssn.acknowledge(echo) + + def testStringContent(self): + self.check(Message("string")) + + def testUnicodeContent(self): + self.check(Message(u"unicode")) + + + TEST_MAP = {"key1": "string", + "key2": u"unicode", + "key3": 3, + "key4": -3, + "key5": 3.14, + "key6": -3.14, + "key7": ["one", 2, 3.14], + "key8": [], + "key9": {"sub-key0": 3}} + + def testMapContent(self): + self.check(Message(MessageEchoTests.TEST_MAP)) + + def testListContent(self): + self.check(Message([])) + self.check(Message([1, 2, 3])) + self.check(Message(["one", 2, 3.14, {"four": 4}])) + + def testProperties(self): + msg = Message() + msg.to = "to-address" + msg.subject = "subject" + msg.correlation_id = str(self.test_id) + msg.properties = MessageEchoTests.TEST_MAP + msg.reply_to = "reply-address" + self.check(msg) -- cgit v1.2.1