diff options
Diffstat (limited to 'qpid/python')
-rw-r--r-- | qpid/python/Makefile | 98 | ||||
-rwxr-xr-x | qpid/python/commands/qpid-config | 8 | ||||
-rwxr-xr-x | qpid/python/preppy | 67 | ||||
-rw-r--r-- | qpid/python/qpid/compat.py | 7 | ||||
-rw-r--r-- | qpid/python/qpid/concurrency.py | 65 | ||||
-rw-r--r-- | qpid/python/qpid/datatypes.py | 11 | ||||
-rw-r--r-- | qpid/python/qpid/driver.py | 444 | ||||
-rw-r--r-- | qpid/python/qpid/messaging.py | 587 | ||||
-rw-r--r-- | qpid/python/qpid/session.py | 14 | ||||
-rw-r--r-- | qpid/python/qpid/tests/messaging.py | 124 | ||||
-rw-r--r-- | qpid/python/qpid/util.py | 6 | ||||
-rw-r--r-- | qpid/python/qpid_config.py | 8 | ||||
-rw-r--r-- | qpid/python/tests/datatypes.py | 13 | ||||
-rw-r--r-- | qpid/python/tests_0-10/alternate_exchange.py | 56 | ||||
-rw-r--r-- | qpid/python/tests_0-10/management.py | 22 | ||||
-rw-r--r-- | qpid/python/tests_0-9/queue.py | 261 |
16 files changed, 1410 insertions, 381 deletions
diff --git a/qpid/python/Makefile b/qpid/python/Makefile new file mode 100644 index 0000000000..31547c8f57 --- /dev/null +++ b/qpid/python/Makefile @@ -0,0 +1,98 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +PREFIX=/usr/local +EXEC_PREFIX=$(PREFIX)/bin +DATA_DIR=$(PREFIX)/share + +PYTHON_LIB=$(shell python -c "from distutils.sysconfig import get_python_lib; print get_python_lib(prefix='$(PREFIX)')") +PYTHON_VERSION=$(shell python -c "from distutils.sysconfig import get_python_version; print get_python_version()") + +ddfirst=$(shell ddir=$(DATA_DIR) && echo $${ddir:0:1}) +ifeq ($(ddfirst),/) +AMQP_SPEC_DIR=$(DATA_DIR)/amqp +else +AMQP_SPEC_DIR=$(PWD)/$(DATA_DIR)/amqp +endif + +DIRS=qmf qpid mllib models examples tests tests_0-8 tests_0-9 tests_0-10 +SRCS=$(shell find $(DIRS) -name "*.py") qpid_config.py +BUILD=build +TARGETS=$(SRCS:%.py=$(BUILD)/%.py) + +PYCC=python -c "import compileall, sys; compileall.compile_dir(sys.argv[1])" + +all: build + +$(BUILD)/%.py: %.py + @mkdir -p $(shell dirname $@) + ./preppy $(PYTHON_VERSION) < $< > $@ + +build: $(TARGETS) + +.PHONY: doc + +doc: + @mkdir -p $(BUILD) + epydoc qpid.messaging -o $(BUILD)/doc --no-private --no-sourcecode --include-log + +install: build + install -d $(PYTHON_LIB) + + install -d $(PYTHON_LIB)/mllib + install -pm 0644 LICENSE.txt NOTICE.txt $(BUILD)/mllib/*.* $(PYTHON_LIB)/mllib + $(PYCC) $(PYTHON_LIB)/mllib + + install -d $(PYTHON_LIB)/qpid + install -pm 0644 LICENSE.txt NOTICE.txt README.txt $(BUILD)/qpid/*.* $(PYTHON_LIB)/qpid + TDIR=$(shell mktemp -d) && \ + sed s@AMQP_SPEC_DIR=.*@AMQP_SPEC_DIR='"$(AMQP_SPEC_DIR)"'@ \ + $(BUILD)/qpid_config.py > $${TDIR}/qpid_config.py && \ + install -pm 0644 $${TDIR}/qpid_config.py $(PYTHON_LIB) && \ + rm -rf $${TDIR} + + install -d $(PYTHON_LIB)/qpid/tests + install -pm 0644 $(BUILD)/qpid/tests/*.* $(PYTHON_LIB)/qpid/tests + $(PYCC) $(PYTHON_LIB)/qpid + + install -d $(PYTHON_LIB)/qmf + install -pm 0644 LICENSE.txt NOTICE.txt qmf/*.* $(PYTHON_LIB)/qmf + $(PYCC) $(PYTHON_LIB)/qmf + + install -d $(PYTHON_LIB)/tests + install -pm 0644 $(BUILD)/tests/*.* $(PYTHON_LIB)/tests + $(PYCC) $(PYTHON_LIB)/tests + + install -d $(PYTHON_LIB)/tests_0-8 + install -pm 0644 $(BUILD)/tests_0-8/*.* $(PYTHON_LIB)/tests_0-8 + $(PYCC) $(PYTHON_LIB)/tests_0-8 + + install -d $(PYTHON_LIB)/tests_0-9 + install -pm 0644 $(BUILD)/tests_0-9/*.* $(PYTHON_LIB)/tests_0-9 + $(PYCC) $(PYTHON_LIB)/tests_0-9 + + install -d $(PYTHON_LIB)/tests_0-10 + install -pm 0644 $(BUILD)/tests_0-10/*.* $(PYTHON_LIB)/tests_0-10 + $(PYCC) $(PYTHON_LIB)/tests_0-10 + + install -d $(EXEC_PREFIX) + install -pm 0755 qpid-python-test commands/* $(EXEC_PREFIX) + +clean: + rm -rf $(BUILD) diff --git a/qpid/python/commands/qpid-config b/qpid/python/commands/qpid-config index c4ea5c5f2d..4cf9505c58 100755 --- a/qpid/python/commands/qpid-config +++ b/qpid/python/commands/qpid-config @@ -110,6 +110,12 @@ def Usage (): print " --force-if-not-empty Force delete of queue even if it's not empty" print " --force-if-used Force delete of queue even if it's currently used" print + print "Add Exchange <type> values:" + print " direct Direct exchange for point-to-point communication" + print " fanout Fanout exchange for broadcast communication" + print " topic Topic exchange that routes messages using binding keys with wildcards" + print " headers Headers exchange that matches header fields against the binding keys" + print print "Add Exchange Options:" print " --alternate-exchange [name of the alternate exchange]" print " In the event that a message cannot be routed, this is the name of the exchange to" @@ -190,6 +196,8 @@ class BrokerManager: if ex.durable: print "--durable", if MSG_SEQUENCE in args and args[MSG_SEQUENCE] == 1: print "--sequence", if IVE in args and args[IVE] == 1: print "--ive", + if ex.altExchange: + print "--alternate-exchange=%s" % ex._altExchange_.name, print def ExchangeListRecurse (self, filter): diff --git a/qpid/python/preppy b/qpid/python/preppy new file mode 100755 index 0000000000..22893dad03 --- /dev/null +++ b/qpid/python/preppy @@ -0,0 +1,67 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os, re, sys + +ann = re.compile(r"([ \t]*)@([_a-zA-Z][_a-zA-Z0-9]*)([ \t\n\r]+def[ \t]+)([_a-zA-Z][_a-zA-Z0-9]*)") +line = re.compile(r"\n([ \t]*)[^ \t\n#]+") + +if len(sys.argv) == 2: + major, minor = [int(p) for p in sys.argv[1].split(".")] +elif len(sys.argv) == 1: + major, minor = sys.version_info[0:2] +else: + print "usage: %s [ version ] < input.py > output.py" % sys.argv[0] + sys.exit(-1) + +if major <= 2 and minor <= 3: + def process(input): + output = "" + pos = 0 + while True: + m = ann.search(input, pos) + if m: + indent, decorator, idef, function = m.groups() + output += input[pos:m.start()] + output += "%s#@%s%s%s" % (indent, decorator, idef, function) + pos = m.end() + + subst = "\n%s%s = %s(%s)\n" % (indent, function, decorator, function) + npos = pos + while True: + n = line.search(input, npos) + if not n: + input += subst + break + if len(n.group(1)) <= len(indent): + idx = n.start() + input = input[:idx] + subst + input[idx:] + break + npos = n.end() + else: + break + + output += input[pos:] + return output +else: + def process(input): + return input + +sys.stdout.write(process(sys.stdin.read())) diff --git a/qpid/python/qpid/compat.py b/qpid/python/qpid/compat.py index 26f60fb8aa..49273193df 100644 --- a/qpid/python/qpid/compat.py +++ b/qpid/python/qpid/compat.py @@ -26,3 +26,10 @@ try: from socket import SHUT_RDWR except ImportError: SHUT_RDWR = 2 + +try: + from traceback import format_exc +except ImportError: + import sys, traceback + def format_exc(): + return "".join(traceback.format_exception(*sys.exc_info())) diff --git a/qpid/python/qpid/concurrency.py b/qpid/python/qpid/concurrency.py new file mode 100644 index 0000000000..00cdb6b953 --- /dev/null +++ b/qpid/python/qpid/concurrency.py @@ -0,0 +1,65 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import inspect, time + +def synchronized(meth): + args, vargs, kwargs, defs = inspect.getargspec(meth) + scope = {} + scope["meth"] = meth + exec """ +def %s%s: + %s + %s._lock.acquire() + try: + return meth%s + finally: + %s._lock.release() +""" % (meth.__name__, inspect.formatargspec(args, vargs, kwargs, defs), + repr(inspect.getdoc(meth)), args[0], + inspect.formatargspec(args, vargs, kwargs, defs, + formatvalue=lambda x: ""), + args[0]) in scope + return scope[meth.__name__] + +class Waiter(object): + + def __init__(self, condition): + self.condition = condition + + def wait(self, predicate, timeout=None): + passed = 0 + start = time.time() + while not predicate(): + if timeout is None: + # using the timed wait prevents keyboard interrupts from being + # blocked while waiting + self.condition.wait(3) + elif passed < timeout: + self.condition.wait(timeout - passed) + else: + return False + passed = time.time() - start + return True + + def notify(self): + self.condition.notify() + + def notifyAll(self): + self.condition.notifyAll() diff --git a/qpid/python/qpid/datatypes.py b/qpid/python/qpid/datatypes.py index bba3f5b9ab..f832ddae34 100644 --- a/qpid/python/qpid/datatypes.py +++ b/qpid/python/qpid/datatypes.py @@ -132,7 +132,7 @@ class Serial: return hash(self.value) def __cmp__(self, other): - if other is None: + if other.__class__ not in (int, long, Serial): return 1 other = serial(other) @@ -150,7 +150,10 @@ class Serial: return Serial(self.value + other) def __sub__(self, other): - return Serial(self.value - other) + if isinstance(other, Serial): + return self.value - other.value + else: + return Serial(self.value - other) def __repr__(self): return "serial(%s)" % self.value @@ -169,7 +172,7 @@ class Range: def __contains__(self, n): return self.lower <= n and n <= self.upper - + def __iter__(self): i = self.lower while i <= self.upper: @@ -230,7 +233,7 @@ class RangedSet: def add(self, lower, upper = None): self.add_range(Range(lower, upper)) - + def __iter__(self): return iter(self.ranges) diff --git a/qpid/python/qpid/driver.py b/qpid/python/qpid/driver.py new file mode 100644 index 0000000000..2e07c82a0d --- /dev/null +++ b/qpid/python/qpid/driver.py @@ -0,0 +1,444 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import compat, connection, socket, sys, time +from concurrency import synchronized +from datatypes import RangedSet, Message as Message010 +from exceptions import Timeout +from logging import getLogger +from messaging import get_codec, ConnectError, Message, Pattern, UNLIMITED +from ops import delivery_mode +from session import Client, INCOMPLETE, SessionDetached +from threading import Condition, Thread +from util import connect + +log = getLogger("qpid.messaging") + +def parse_addr(address): + parts = address.split("/", 1) + if len(parts) == 1: + return parts[0], None + else: + return parts[0], parts[i1] + +def reply_to2addr(reply_to): + if reply_to.routing_key is None: + return reply_to.exchange + elif reply_to.exchange in (None, ""): + return reply_to.routing_key + else: + return "%s/%s" % (reply_to.exchange, reply_to.routing_key) + +class Attachment: + + def __init__(self, target): + self.target = target + +DURABLE_DEFAULT=True + +FILTER_DEFAULTS = { + "topic": Pattern("*") + } + +def delegate(handler, session): + class Delegate(Client): + + def message_transfer(self, cmd): + return handler._message_transfer(session, cmd) + return Delegate + +class Driver: + + def __init__(self, connection): + self.connection = connection + self._lock = self.connection._lock + self._wakeup_cond = Condition() + self._socket = None + self._conn = None + self._connected = False + self._attachments = {} + self._modcount = self.connection._modcount + self.thread = Thread(target=self.run) + self.thread.setDaemon(True) + # XXX: need to figure out how to join on this thread + + def wakeup(self): + self._wakeup_cond.acquire() + try: + self._wakeup_cond.notifyAll() + finally: + self._wakeup_cond.release() + + def start(self): + self.thread.start() + + def run(self): + while True: + self._wakeup_cond.acquire() + try: + if self.connection._modcount <= self._modcount: + self._wakeup_cond.wait(10) + finally: + self._wakeup_cond.release() + self.dispatch(self.connection._modcount) + + @synchronized + def dispatch(self, modcount): + try: + if self._conn is None and self.connection._connected: + self.connect() + elif self._conn is not None and not self.connection._connected: + self.disconnect() + + if self._conn is not None: + for ssn in self.connection.sessions.values(): + self.attach(ssn) + self.process(ssn) + + exi = None + except: + exi = sys.exc_info() + + if exi: + msg = compat.format_exc() + recoverable = ["aborted", "Connection refused", "SessionDetached", "Connection reset by peer", + "Bad file descriptor", "start timed out", "Broken pipe"] + for r in recoverable: + if self.connection.reconnect and r in msg: + print "waiting to retry" + self.reset() + time.sleep(3) + print "retrying..." + return + else: + self.connection.error = (msg,) + + self._modcount = modcount + self.connection._waiter.notifyAll() + + def connect(self): + if self._conn is not None: + return + try: + self._socket = connect(self.connection.host, self.connection.port) + except socket.error, e: + raise ConnectError(e) + self._conn = connection.Connection(self._socket) + try: + self._conn.start(timeout=10) + self._connected = True + except connection.VersionError, e: + raise ConnectError(e) + except Timeout: + print "start timed out" + raise ConnectError("start timed out") + + def disconnect(self): + self._conn.close() + self.reset() + + def reset(self): + self._conn = None + self._connected = False + self._attachments.clear() + for ssn in self.connection.sessions.values(): + for m in ssn.acked + ssn.unacked + ssn.incoming: + m._transfer_id = None + for rcv in ssn.receivers: + rcv.impending = rcv.received + + def connected(self): + return self._conn is not None + + def attach(self, ssn): + _ssn = self._attachments.get(ssn) + if _ssn is None: + _ssn = self._conn.session(ssn.name, delegate=delegate(self, ssn)) + _ssn.auto_sync = False + _ssn.invoke_lock = self._lock + _ssn.lock = self._lock + _ssn.condition = self.connection._condition + if ssn.transactional: + # XXX: adding an attribute to qpid.session.Session + _ssn.acked = [] + _ssn.tx_select() + self._attachments[ssn] = _ssn + + for snd in ssn.senders: + self.link_out(snd) + for rcv in ssn.receivers: + self.link_in(rcv) + + if ssn.closing: + _ssn.close() + del self._attachments[ssn] + ssn.closed = True + + def _exchange_query(self, ssn, address): + # XXX: auto sync hack is to avoid deadlock on future + result = ssn.exchange_query(name=address, sync=True) + ssn.sync() + return result.get() + + def link_out(self, snd): + _ssn = self._attachments[snd.session] + _snd = self._attachments.get(snd) + if _snd is None: + _snd = Attachment(snd) + node, _snd._subject = parse_addr(snd.target) + result = self._exchange_query(_ssn, node) + if result.not_found: + # XXX: should check 'create' option + _ssn.queue_declare(queue=node, durable=DURABLE_DEFAULT, sync=True) + _ssn.sync() + _snd._exchange = "" + _snd._routing_key = node + else: + _snd._exchange = node + _snd._routing_key = _snd._subject + self._attachments[snd] = _snd + + if snd.closed: + del self._attachments[snd] + return None + else: + return _snd + + def link_in(self, rcv): + _ssn = self._attachments[rcv.session] + _rcv = self._attachments.get(rcv) + if _rcv is None: + _rcv = Attachment(rcv) + result = self._exchange_query(_ssn, rcv.source) + if result.not_found: + _rcv._queue = rcv.source + # XXX: should check 'create' option + _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT) + else: + _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination) + _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True) + if rcv.filter is None: + f = FILTER_DEFAULTS[result.type] + else: + f = rcv.filter + f._bind(_ssn, rcv.source, _rcv._queue) + _ssn.message_subscribe(queue=_rcv._queue, destination=rcv.destination) + _ssn.message_set_flow_mode(rcv.destination, _ssn.flow_mode.credit, sync=True) + self._attachments[rcv] = _rcv + # XXX: need to kill syncs + _ssn.sync() + + if rcv.closing: + _ssn.message_cancel(rcv.destination, sync=True) + # XXX: need to kill syncs + _ssn.sync() + del self._attachments[rcv] + rcv.closed = True + return None + else: + return _rcv + + def process(self, ssn): + if ssn.closing: return + + _ssn = self._attachments[ssn] + + while ssn.outgoing: + msg = ssn.outgoing[0] + snd = msg._sender + self.send(snd, msg) + ssn.outgoing.pop(0) + + for rcv in ssn.receivers: + self.process_receiver(rcv) + + if ssn.acked: + messages = ssn.acked[:] + ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None]) + for range in ids: + _ssn.receiver._completed.add_range(range) + ch = _ssn.channel + if ch is None: + raise SessionDetached() + ch.session_completed(_ssn.receiver._completed) + _ssn.message_accept(ids, sync=True) + # XXX: really need to make this async so that we don't give up the lock + _ssn.sync() + + # XXX: we're ignoring acks that get lost when disconnected + for m in messages: + ssn.acked.remove(m) + if ssn.transactional: + _ssn.acked.append(m) + + if ssn.committing: + _ssn.tx_commit(sync=True) + # XXX: need to kill syncs + _ssn.sync() + del _ssn.acked[:] + ssn.committing = False + ssn.committed = True + ssn.aborting = False + ssn.aborted = False + + if ssn.aborting: + for rcv in ssn.receivers: + _ssn.message_stop(rcv.destination) + _ssn.sync() + + messages = _ssn.acked + ssn.unacked + ssn.incoming + ids = RangedSet(*[m._transfer_id for m in messages]) + for range in ids: + _ssn.receiver._completed.add_range(range) + _ssn.channel.session_completed(_ssn.receiver._completed) + _ssn.message_release(ids) + _ssn.tx_rollback(sync=True) + _ssn.sync() + + del ssn.incoming[:] + del ssn.unacked[:] + del _ssn.acked[:] + + for rcv in ssn.receivers: + rcv.impending = rcv.received + rcv.returned = rcv.received + # XXX: do we need to update granted here as well? + + for rcv in ssn.receivers: + self.process_receiver(rcv) + + ssn.aborting = False + ssn.aborted = True + ssn.committing = False + ssn.committed = False + + def grant(self, rcv): + _ssn = self._attachments[rcv.session] + _rcv = self.link_in(rcv) + + if rcv.granted is UNLIMITED: + if rcv.impending is UNLIMITED: + delta = 0 + else: + delta = UNLIMITED + elif rcv.impending is UNLIMITED: + delta = -1 + else: + delta = max(rcv.granted, rcv.received) - rcv.impending + + if delta is UNLIMITED: + _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value) + _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, UNLIMITED.value) + rcv.impending = UNLIMITED + elif delta > 0: + _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value) + _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, delta) + rcv.impending += delta + elif delta < 0: + if rcv.drain: + _ssn.message_flush(rcv.destination, sync=True) + else: + _ssn.message_stop(rcv.destination, sync=True) + # XXX: need to kill syncs + _ssn.sync() + rcv.impending = rcv.received + self.grant(rcv) + + def process_receiver(self, rcv): + if rcv.closed: return + self.grant(rcv) + + def send(self, snd, msg): + _ssn = self._attachments[snd.session] + _snd = self.link_out(snd) + + # XXX: what if subject is specified for a normal queue? + if _snd._routing_key is None: + rk = msg.subject + else: + rk = _snd._routing_key + # XXX: do we need to query to figure out how to create the reply-to interoperably? + if msg.reply_to: + rt = _ssn.reply_to(*parse_addr(msg.reply_to)) + else: + rt = None + dp = _ssn.delivery_properties(routing_key=rk) + mp = _ssn.message_properties(message_id=msg.id, + user_id=msg.user_id, + reply_to=rt, + correlation_id=msg.correlation_id, + content_type=msg.content_type, + application_headers=msg.properties) + if msg.subject is not None: + if mp.application_headers is None: + mp.application_headers = {} + mp.application_headers["subject"] = msg.subject + if msg.to is not None: + if mp.application_headers is None: + mp.application_headers = {} + mp.application_headers["to"] = msg.to + if msg.durable: + dp.delivery_mode = delivery_mode.persistent + enc, dec = get_codec(msg.content_type) + body = enc(msg.content) + _ssn.message_transfer(destination=_snd._exchange, + message=Message010(dp, mp, body), + sync=True) + log.debug("SENT [%s] %s", snd.session, msg) + # XXX: really need to make this async so that we don't give up the lock + _ssn.sync() + # XXX: should we log the ack somehow too? + snd.acked += 1 + + @synchronized + def _message_transfer(self, ssn, cmd): + m = Message010(cmd.payload) + m.headers = cmd.headers + m.id = cmd.id + msg = self._decode(m) + rcv = ssn.receivers[int(cmd.destination)] + msg._receiver = rcv + if rcv.impending is not UNLIMITED: + assert rcv.received < rcv.impending + rcv.received += 1 + log.debug("RECV [%s] %s", ssn, msg) + ssn.incoming.append(msg) + self.connection._waiter.notifyAll() + return INCOMPLETE + + def _decode(self, message): + dp = message.get("delivery_properties") + mp = message.get("message_properties") + ap = mp.application_headers + enc, dec = get_codec(mp.content_type) + content = dec(message.body) + msg = Message(content) + msg.id = mp.message_id + if ap is not None: + msg.to = ap.get("to") + msg.subject = ap.get("subject") + msg.user_id = mp.user_id + if mp.reply_to is not None: + msg.reply_to = reply_to2addr(mp.reply_to) + msg.correlation_id = mp.correlation_id + msg.durable = dp.delivery_mode == delivery_mode.persistent + msg.properties = mp.application_headers + msg.content_type = mp.content_type + msg._transfer_id = message.id + return msg diff --git a/qpid/python/qpid/messaging.py b/qpid/python/qpid/messaging.py index 9b3fecbf9b..d755aa5054 100644 --- a/qpid/python/qpid/messaging.py +++ b/qpid/python/qpid/messaging.py @@ -6,9 +6,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -30,63 +30,18 @@ Areas that still need work: - protocol negotiation/multiprotocol impl """ -import connection, time, socket, sys, traceback from codec010 import StringCodec -from datatypes import timestamp, uuid4, RangedSet, Message as Message010 +from concurrency import synchronized, Waiter +from datatypes import timestamp, uuid4, Serial from logging import getLogger from ops import PRIMITIVE -from session import Client, INCOMPLETE from threading import Thread, RLock, Condition -from util import connect +from util import default log = getLogger("qpid.messaging") static = staticmethod -def synchronized(meth): - def sync_wrapper(self, *args, **kwargs): - self.lock() - try: - return meth(self, *args, **kwargs) - finally: - self.unlock() - return sync_wrapper - -class Lockable(object): - - def lock(self): - self._lock.acquire() - - def unlock(self): - self._lock.release() - - def wait(self, predicate, timeout=None): - passed = 0 - start = time.time() - while not predicate(): - if timeout is None: - # using the timed wait prevents keyboard interrupts from being - # blocked while waiting - self._condition.wait(3) - elif passed < timeout: - self._condition.wait(timeout - passed) - else: - return False - passed = time.time() - start - return True - - def notify(self): - self._condition.notify() - - def notifyAll(self): - self._condition.notifyAll() - -def default(value, default): - if value is None: - return default - else: - return value - AMQP_PORT = 5672 AMQPS_PORT = 5671 @@ -101,10 +56,20 @@ class Constant: UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL) -class ConnectError(Exception): +class ConnectionError(Exception): + """ + The base class for all connection related exceptions. + """ pass -class Connection(Lockable): +class ConnectError(ConnectionError): + """ + Exception raised when there is an error connecting to the remote + peer. + """ + pass + +class Connection: """ A Connection manages a group of L{Sessions<Session>} and connects @@ -142,12 +107,35 @@ class Connection(Lockable): self.host = host self.port = default(port, AMQP_PORT) self.started = False - self._conn = None self.id = str(uuid4()) self.session_counter = 0 self.sessions = {} + self.reconnect = False + self._connected = False self._lock = RLock() self._condition = Condition(self._lock) + self._waiter = Waiter(self._condition) + self._modcount = Serial(0) + self.error = None + from driver import Driver + self._driver = Driver(self) + self._driver.start() + + def _wait(self, predicate, timeout=None): + return self._waiter.wait(predicate, timeout=timeout) + + def _wakeup(self): + self._modcount += 1 + self._driver.wakeup() + + def _check_error(self, exc=ConnectionError): + if self.error: + raise exc(*self.error) + + def _ewait(self, predicate, timeout=None, exc=ConnectionError): + result = self._wait(lambda: self.error or predicate(), timeout) + self._check_error(exc) + return result @synchronized def session(self, name=None, transactional=False): @@ -173,8 +161,7 @@ class Connection(Lockable): else: ssn = Session(self, name, self.started, transactional=transactional) self.sessions[name] = ssn - if self._conn is not None: - ssn._attach() + self._wakeup() return ssn @synchronized @@ -186,38 +173,25 @@ class Connection(Lockable): """ Connect to the remote endpoint. """ - if self._conn is not None: - return - try: - self._socket = connect(self.host, self.port) - except socket.error, e: - raise ConnectError(e) - self._conn = connection.Connection(self._socket) - try: - self._conn.start() - except connection.VersionError, e: - raise ConnectError(e) - - for ssn in self.sessions.values(): - ssn._attach() + self._connected = True + self._wakeup() + self._ewait(lambda: self._driver._connected, exc=ConnectError) @synchronized def disconnect(self): """ Disconnect from the remote endpoint. """ - if self._conn is not None: - self._conn.close() - self._conn = None - for ssn in self.sessions.values(): - ssn._disconnected() + self._connected = False + self._wakeup() + self._ewait(lambda: not self._driver._connected) @synchronized def connected(self): """ Return true if the connection is connected, false otherwise. """ - return self._conn is not None + return self._connected @synchronized def start(self): @@ -255,22 +229,32 @@ class Pattern: def __init__(self, value): self.value = value + # XXX: this should become part of the driver def _bind(self, ssn, exchange, queue): ssn.exchange_bind(exchange=exchange, queue=queue, binding_key=self.value.replace("*", "#")) -FILTER_DEFAULTS = { - "topic": Pattern("*") - } +class SessionError(Exception): + pass + +class Disconnected(SessionError): + """ + Exception raised when an operation is attempted that is illegal when + disconnected. + """ + pass -def delegate(session): - class Delegate(Client): +class NontransactionalSession(SessionError): + """ + Exception raised when commit or rollback is attempted on a non + transactional session. + """ + pass - def message_transfer(self, cmd): - session._message_transfer(cmd) - return Delegate +class TransactionAborted(SessionError): + pass -class Session(Lockable): +class Session: """ Sessions provide a linear context for sending and receiving @@ -281,18 +265,28 @@ class Session(Lockable): self.connection = connection self.name = name self.started = started + self.transactional = transactional - self._ssn = None + + self.committing = False + self.committed = True + self.aborting = False + self.aborted = False + self.senders = [] self.receivers = [] - self.closing = False + self.outgoing = [] self.incoming = [] - self.closed = False self.unacked = [] - if self.transactional: - self.acked = [] - self._lock = RLock() - self._condition = Condition(self._lock) + self.acked = [] + # XXX: I hate this name. + self.ack_capacity = UNLIMITED + + self.closing = False + self.closed = False + + self._lock = connection._lock + self.running = True self.thread = Thread(target = self.run) self.thread.setDaemon(True) self.thread.start() @@ -300,60 +294,17 @@ class Session(Lockable): def __repr__(self): return "<Session %s>" % self.name - def _attach(self): - self._ssn = self.connection._conn.session(self.name, delegate=delegate(self)) - self._ssn.auto_sync = False - self._ssn.invoke_lock = self._lock - self._ssn.lock = self._lock - self._ssn.condition = self._condition - if self.transactional: - self._ssn.tx_select() - for link in self.senders + self.receivers: - link._link() - - def _disconnected(self): - self._ssn = None - for link in self.senders + self.receivers: - link._disconnected() + def _wait(self, predicate, timeout=None): + return self.connection._wait(predicate, timeout=timeout) - @synchronized - def _message_transfer(self, cmd): - m = Message010(cmd.payload) - m.headers = cmd.headers - m.id = cmd.id - msg = self._decode(m) - rcv = self.receivers[int(cmd.destination)] - msg._receiver = rcv - log.debug("RECV [%s] %s", self, msg) - self.incoming.append(msg) - self.notifyAll() - return INCOMPLETE - - def _decode(self, message): - dp = message.get("delivery_properties") - mp = message.get("message_properties") - ap = mp.application_headers - enc, dec = get_codec(mp.content_type) - content = dec(message.body) - msg = Message(content) - msg.id = mp.message_id - if ap is not None: - msg.to = ap.get("to") - msg.subject = ap.get("subject") - msg.user_id = mp.user_id - if mp.reply_to is not None: - msg.reply_to = reply_to2addr(mp.reply_to) - msg.correlation_id = mp.correlation_id - msg.properties = mp.application_headers - msg.content_type = mp.content_type - msg._transfer_id = message.id - return msg + def _wakeup(self): + self.connection._wakeup() - def _exchange_query(self, address): - # XXX: auto sync hack is to avoid deadlock on future - result = self._ssn.exchange_query(name=address, sync=True) - self._ssn.sync() - return result.get() + def _check_error(self, exc=SessionError): + self.connection._check_error(exc) + + def _ewait(self, predicate, timeout=None, exc=SessionError): + return self.connection._ewait(predicate, timeout, exc) @synchronized def sender(self, target): @@ -368,8 +319,11 @@ class Session(Lockable): """ sender = Sender(self, len(self.senders), target) self.senders.append(sender) - if self._ssn is not None: - sender._link() + self._wakeup() + # XXX: because of the lack of waiting here we can end up getting + # into the driver loop with messages sent for senders that haven't + # been linked yet, something similar can probably happen for + # receivers return sender @synchronized @@ -386,8 +340,7 @@ class Session(Lockable): receiver = Receiver(self, len(self.receivers), source, filter, self.started) self.receivers.append(receiver) - if self._ssn is not None: - receiver._link() + self._wakeup() return receiver @synchronized @@ -415,43 +368,45 @@ class Session(Lockable): @synchronized def _get(self, predicate, timeout=None): - if self.wait(lambda: ((self._peek(predicate) is not None) or self.closing), - timeout): + if self._wait(lambda: ((self._peek(predicate) is not None) or self.closing), + timeout): msg = self._pop(predicate) if msg is not None: + msg._receiver.returned += 1 self.unacked.append(msg) log.debug("RETR [%s] %s", self, msg) return msg return None @synchronized - def acknowledge(self, message=None): + def acknowledge(self, message=None, sync=True): """ Acknowledge the given L{Message}. If message is None, then all unacknowledged messages on the session are acknowledged. @type message: Message @param message: the message to acknowledge or None + @type sync: boolean + @param sync: if true then block until the message(s) are acknowledged """ if message is None: messages = self.unacked[:] else: messages = [message] - ids = RangedSet(*[m._transfer_id for m in messages]) - for range in ids: - self._ssn.receiver._completed.add_range(range) - self._ssn.channel.session_completed(self._ssn.receiver._completed) - self._ssn.message_accept(ids, sync=True) - self._ssn.sync() - for m in messages: - try: - self.unacked.remove(m) - except ValueError: - pass - if self.transactional: - self.acked.append(m) + if self.ack_capacity is not UNLIMITED: + if self.ack_capacity <= 0: + # XXX: this is currently a SendError, maybe it should be a SessionError? + raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity) + self._wakeup() + self._ewait(lambda: len(self.acked) < self.ack_capacity) + self.unacked.remove(m) + self.acked.append(m) + + self._wakeup() + if sync: + self._ewait(lambda: not [m for m in messages if m in self.acked]) @synchronized def commit(self): @@ -461,11 +416,12 @@ class Session(Lockable): """ if not self.transactional: raise NontransactionalSession() - if self._ssn is None: - raise Disconnected() - self._ssn.tx_commit(sync=True) - del self.acked[:] - self._ssn.sync() + self.committing = True + self._wakeup() + self._ewait(lambda: not self.committing) + if self.aborted: + raise TransactionAborted() + assert self.committed @synchronized def rollback(self): @@ -475,21 +431,10 @@ class Session(Lockable): """ if not self.transactional: raise NontransactionalSession() - if self._ssn is None: - raise Disconnected() - - ids = RangedSet(*[m._transfer_id for m in self.acked + self.unacked + self.incoming]) - for range in ids: - self._ssn.receiver._completed.add_range(range) - self._ssn.channel.session_completed(self._ssn.receiver._completed) - self._ssn.message_release(ids) - self._ssn.tx_rollback(sync=True) - - del self.incoming[:] - del self.unacked[:] - del self.acked[:] - - self._ssn.sync() + self.aborting = True + self._wakeup() + self._ewait(lambda: not self.aborting) + assert self.aborted @synchronized def start(self): @@ -508,7 +453,7 @@ class Session(Lockable): for rcv in self.receivers: rcv.stop() # TODO: think about stopping individual receivers in listen mode - self.wait(lambda: self._peek(self._pred) is None) + self._wait(lambda: self._peek(self._pred) is None) self.started = False def _pred(self, m): @@ -516,6 +461,7 @@ class Session(Lockable): @synchronized def run(self): + self.running = True try: while True: msg = self._get(self._pred) @@ -524,10 +470,10 @@ class Session(Lockable): else: msg._receiver.listener(msg) if self._peek(self._pred) is None: - self.notifyAll() + self.connection._waiter.notifyAll() finally: - self.closed = True - self.notifyAll() + self.running = False + self.connection._waiter.notifyAll() @synchronized def close(self): @@ -538,45 +484,22 @@ class Session(Lockable): link.close() self.closing = True - self.notifyAll() - self.wait(lambda: self.closed) + self._wakeup() + self._ewait(lambda: self.closed and not self.running) while self.thread.isAlive(): self.thread.join(3) self.thread = None - self._ssn.close() - self._ssn = None + # XXX: should be able to express this condition through API calls + self._ewait(lambda: not self.outgoing and not self.acked) self.connection._remove_session(self) -def parse_addr(address): - parts = address.split("/", 1) - if len(parts) == 1: - return parts[0], None - else: - return parts[0], parts[i1] - -def reply_to2addr(reply_to): - if reply_to.routing_key is None: - return reply_to.exchange - elif reply_to.exchange in (None, ""): - return reply_to.routing_key - else: - return "%s/%s" % (reply_to.exchange, reply_to.routing_key) - -class Disconnected(Exception): - """ - Exception raised when an operation is attempted that is illegal when - disconnected. - """ +class SendError(SessionError): pass -class NontransactionalSession(Exception): - """ - Exception raised when commit or rollback is attempted on a non - transactional session. - """ +class InsufficientCapacity(SendError): pass -class Sender(Lockable): +class Sender: """ Sends outgoing messages. @@ -586,100 +509,99 @@ class Sender(Lockable): self.session = session self.index = index self.target = target + self.capacity = UNLIMITED + self.queued = Serial(0) + self.acked = Serial(0) self.closed = False - self._ssn = None - self._exchange = None - self._routing_key = None - self._subject = None self._lock = self.session._lock - self._condition = self.session._condition - - def _link(self): - self._ssn = self.session._ssn - node, self._subject = parse_addr(self.target) - result = self.session._exchange_query(node) - if result.not_found: - # XXX: should check 'create' option - self._ssn.queue_declare(queue=node, durable=False, sync=True) - self._ssn.sync() - self._exchange = "" - self._routing_key = node - else: - self._exchange = node - self._routing_key = self._subject - def _disconnected(self): - self._ssn = None + def _wakeup(self): + self.session._wakeup() + + def _check_error(self, exc=SendError): + self.session._check_error(exc) + + def _ewait(self, predicate, timeout=None, exc=SendError): + return self.session._ewait(predicate, timeout, exc) @synchronized - def send(self, object): + def pending(self): + """ + Returns the number of messages awaiting acknowledgment. + @rtype: int + @return: the number of unacknowledged messages + """ + return self.queued - self.acked + + @synchronized + def send(self, object, sync=True, timeout=None): """ Send a message. If the object passed in is of type L{unicode}, L{str}, L{list}, or L{dict}, it will automatically be wrapped in a L{Message} and sent. If it is of type L{Message}, it will be sent - directly. + directly. If the sender capacity is not L{UNLIMITED} then send + will block until there is available capacity to send the message. + If the timeout parameter is specified, then send will throw an + L{InsufficientCapacity} exception if capacity does not become + available within the specified time. @type object: unicode, str, list, dict, Message @param object: the message or content to send + + @type sync: boolean + @param sync: if true then block until the message is sent + + @type timeout: float + @param timeout: the time to wait for available capacity """ - if self._ssn is None: + if not self.session.connection._connected or self.session.closing: raise Disconnected() if isinstance(object, Message): message = object else: message = Message(object) - # XXX: what if subject is specified for a normal queue? - if self._routing_key is None: - rk = message.subject - else: - rk = self._routing_key - # XXX: do we need to query to figure out how to create the reply-to interoperably? - if message.reply_to: - rt = self._ssn.reply_to(*parse_addr(message.reply_to)) - else: - rt = None - dp = self._ssn.delivery_properties(routing_key=rk) - mp = self._ssn.message_properties(message_id=message.id, - user_id=message.user_id, - reply_to=rt, - correlation_id=message.correlation_id, - content_type=message.content_type, - application_headers=message.properties) - if message.subject is not None: - if mp.application_headers is None: - mp.application_headers = {} - mp.application_headers["subject"] = message.subject - if message.to is not None: - if mp.application_headers is None: - mp.application_headers = {} - mp.application_headers["to"] = message.to - enc, dec = get_codec(message.content_type) - body = enc(message.content) - self._ssn.message_transfer(destination=self._exchange, - message=Message010(dp, mp, body), - sync=True) - log.debug("SENT [%s] %s", self.session, message) - self._ssn.sync() + + if self.capacity is not UNLIMITED: + if self.capacity <= 0: + raise InsufficientCapacity("capacity = %s" % self.capacity) + if not self._ewait(lambda: self.pending() < self.capacity, timeout=timeout): + raise InsufficientCapacity("capacity = %s" % self.capacity) + + # XXX: what if we send the same message to multiple senders? + message._sender = self + self.session.outgoing.append(message) + self.queued += 1 + mno = self.queued + + self._wakeup() + + if sync: + self._ewait(lambda: self.acked >= mno) + assert message not in self.session.outgoing @synchronized def close(self): """ Close the Sender. """ + # XXX: should make driver do something here if not self.closed: self.session.senders.remove(self) self.closed = True -class Empty(Exception): +class ReceiveError(SessionError): + pass + +class Empty(ReceiveError): """ Exception raised by L{Receiver.fetch} when there is no message available within the alloted time. """ pass -class Receiver(Lockable): +class Receiver: """ Receives incoming messages from a remote source. Messages may be @@ -693,43 +615,39 @@ class Receiver(Lockable): self.destination = str(self.index) self.source = source self.filter = filter + self.started = started self.capacity = UNLIMITED + self.granted = Serial(0) + self.drain = False + self.impending = Serial(0) + self.received = Serial(0) + self.returned = Serial(0) + + self.closing = False self.closed = False self.listener = None - self._ssn = None - self._queue = None self._lock = self.session._lock - self._condition = self.session._condition - - def _link(self): - self._ssn = self.session._ssn - result = self.session._exchange_query(self.source) - if result.not_found: - self._queue = self.source - # XXX: should check 'create' option - self._ssn.queue_declare(queue=self._queue, durable=False) - else: - self._queue = "%s.%s" % (self.session.name, self.destination) - self._ssn.queue_declare(queue=self._queue, durable=False, exclusive=True, auto_delete=True) - if self.filter is None: - f = FILTER_DEFAULTS[result.type] - else: - f = self.filter - f._bind(self._ssn, self.source, self._queue) - self._ssn.message_subscribe(queue=self._queue, destination=self.destination, - sync=True) - self._ssn.message_set_flow_mode(self.destination, self._ssn.flow_mode.credit) - self._ssn.sync() - if self.started: - self._start() - def _disconnected(self): - self._ssn = None + def _wakeup(self): + self.session._wakeup() + + def _check_error(self, exc=ReceiveError): + self.session._check_error(exc) + + def _ewait(self, predicate, timeout=None, exc=ReceiveError): + return self.session._ewait(predicate, timeout, exc) @synchronized def pending(self): - return self.session._count(self._pred) + """ + Returns the number of messages available to be fetched by the + application. + + @rtype: int + @return: the number of available messages + """ + return self.received - self.returned def _capacity(self): if not self.started: @@ -762,23 +680,36 @@ class Receiver(Lockable): @type timeout: float @param timeout: the time to wait for a message to be available """ - if self.capacity is not UNLIMITED or not self.started: - self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, - UNLIMITED.value) - self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 1) + if self._capacity() == 0: + self.granted = self.returned + 1 + self._wakeup() + self._ewait(lambda: self.impending >= self.granted) msg = self.session._get(self._pred, timeout=timeout) if msg is None: - self._ssn.message_flush(self.destination) - self._start() - self._ssn.sync() + self.drain = True + self.granted = self.received + self._wakeup() + self._ewait(lambda: self.impending == self.received) + self.drain = False + self._grant() + self._wakeup() msg = self.session._get(self._pred, timeout=0) if msg is None: raise Empty() + elif self._capacity() not in (0, UNLIMITED.value): + self.granted += 1 + self._wakeup() return msg - def _start(self): - self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, UNLIMITED.value) - self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, self._capacity()) + def _grant(self): + if self.started: + if self.capacity is UNLIMITED: + self.granted = UNLIMITED + else: + self.granted = self.received + self._capacity() + else: + self.granted = self.received + @synchronized def start(self): @@ -786,34 +717,31 @@ class Receiver(Lockable): Start incoming message delivery for this receiver. """ self.started = True - if self._ssn is not None: - self._start() - - def _stop(self): - self._ssn.message_stop(self.destination) + self._grant() + self._wakeup() @synchronized def stop(self): """ Stop incoming message delivery for this receiver. """ - if self._ssn is not None: - self._stop() self.started = False + self._grant() + self._wakeup() + self._ewait(lambda: self.impending == self.received) @synchronized def close(self): """ Close the receiver. """ - if not self.closed: - self.closed = True - self._ssn.message_cancel(self.destination, sync=True) - self._ssn.sync() + self.closing = True + self._wakeup() + try: + self._ewait(lambda: self.closed) + finally: self.session.receivers.remove(self) - - def codec(name): type = PRIMITIVE[name] @@ -889,6 +817,7 @@ class Message: self.to = None self.reply_to = None self.correlation_id = None + self.durable = False self.properties = {} self.content_type = get_type(content) self.content = content @@ -896,5 +825,7 @@ class Message: def __repr__(self): return "Message(%r)" % self.content -__all__ = ["Connection", "Pattern", "Session", "Sender", "Receiver", "Message", - "Empty", "timestamp", "uuid4"] +__all__ = ["Connection", "Session", "Sender", "Receiver", "Pattern", "Message", + "ConnectionError", "ConnectError", "SessionError", "Disconnected", + "SendError", "InsufficientCapacity", "ReceiveError", "Empty", + "timestamp", "uuid4", "UNLIMITED", "AMQP_PORT", "AMQPS_PORT"] diff --git a/qpid/python/qpid/session.py b/qpid/python/qpid/session.py index 4413a22899..2f1bd81bd4 100644 --- a/qpid/python/qpid/session.py +++ b/qpid/python/qpid/session.py @@ -146,7 +146,8 @@ class Session(command_invoker()): if self._closing: raise SessionClosed() - if self.channel == None: + ch = self.channel + if ch == None: raise SessionDetached() if op == MessageTransfer: @@ -162,14 +163,12 @@ class Session(command_invoker()): cmd = op(*args, **kwargs) cmd.sync = self.auto_sync or cmd.sync self.need_sync = not cmd.sync - cmd.channel = self.channel.id + cmd.channel = ch.id if op.RESULT: result = Future(exception=SessionException) self.results[self.sender.next_id] = result - log.debug("SENDING %s", cmd) - self.send(cmd) log.debug("SENT %s", cmd) @@ -245,13 +244,16 @@ class Sender: self._completed = RangedSet() def send(self, cmd): + ch = self.session.channel + if ch is None: + raise SessionDetached() cmd.id = self.next_id self.next_id += 1 if self.session.send_id: self.session.send_id = False - self.session.channel.session_command_point(cmd.id, 0) + ch.session_command_point(cmd.id, 0) self.commands.append(cmd) - self.session.channel.connection.write_op(cmd) + ch.connection.write_op(cmd) def completed(self, commands): idx = 0 diff --git a/qpid/python/qpid/tests/messaging.py b/qpid/python/qpid/tests/messaging.py index 7706ebbabe..7623c1f93b 100644 --- a/qpid/python/qpid/tests/messaging.py +++ b/qpid/python/qpid/tests/messaging.py @@ -23,7 +23,8 @@ import time from qpid.tests import Test from qpid.harness import Skipped -from qpid.messaging import Connection, ConnectError, Disconnected, Empty, Message, UNLIMITED, uuid4 +from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \ + InsufficientCapacity, Message, UNLIMITED, uuid4 from Queue import Queue, Empty as QueueEmpty class Base(Test): @@ -71,19 +72,25 @@ class Base(Test): ssn.acknowledge() assert msg.content == content, "expected %r, got %r" % (content, msg.content) - def drain(self, rcv, limit=None): + def drain(self, rcv, limit=None, timeout=0, expected=None): contents = [] try: while limit is None or len(contents) < limit: - contents.append(rcv.fetch(0).content) + contents.append(rcv.fetch(timeout=timeout).content) except Empty: pass + if expected is not None: + assert expected == contents, "expected %s, got %s" % (expected, contents) return contents def assertEmpty(self, rcv): contents = self.drain(rcv) assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents) + def assertPending(self, rcv, expected): + p = rcv.pending() + assert p == expected, "expected %s, got %s" % (expected, p) + def sleep(self): time.sleep(self.delay()) @@ -107,7 +114,8 @@ class SetupTests(Base): try: self.conn = Connection.open("localhost", 0) assert False, "connect succeeded" - except ConnectError: + except ConnectError, e: + # XXX: should verify that e includes appropriate diagnostic info pass class ConnectionTests(Base): @@ -219,26 +227,27 @@ class SessionTests(Base): # XXX, we need a convenient way to assert that required queues are # empty on setup, and possibly also to drain queues on teardown - def testAcknowledge(self): + def ackTest(self, acker, ack_capacity=None): # send a bunch of messages snd = self.ssn.sender("test-ack-queue") - tid = "a" - contents = ["testAcknowledge[%s, %s]" % (i, tid) for i in range(10)] + contents = [self.content("ackTest", i) for i in range(15)] for c in contents: snd.send(c) # drain the queue, verify the messages are there and then close # without acking rcv = self.ssn.receiver(snd.target) - assert contents == self.drain(rcv) + self.drain(rcv, expected=contents) self.ssn.close() # drain the queue again, verify that they are all the messages # were requeued, and ack this time before closing self.ssn = self.conn.session() + if ack_capacity is not None: + self.ssn.ack_capacity = ack_capacity rcv = self.ssn.receiver("test-ack-queue") - assert contents == self.drain(rcv) - self.ssn.acknowledge() + self.drain(rcv, expected=contents) + acker(self.ssn) self.ssn.close() # drain the queue a final time and verify that the messages were @@ -247,6 +256,33 @@ class SessionTests(Base): rcv = self.ssn.receiver("test-ack-queue") self.assertEmpty(rcv) + def testAcknowledge(self): + self.ackTest(lambda ssn: ssn.acknowledge()) + + def testAcknowledgeAsync(self): + self.ackTest(lambda ssn: ssn.acknowledge(sync=False)) + + def testAcknowledgeAsyncAckCap0(self): + try: + try: + self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 0) + assert False, "acknowledge shouldn't succeed with ack_capacity of zero" + except InsufficientCapacity: + pass + finally: + self.ssn.ack_capacity = UNLIMITED + self.drain(self.ssn.receiver("test-ack-queue")) + self.ssn.acknowledge() + + def testAcknowledgeAsyncAckCap1(self): + self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 1) + + def testAcknowledgeAsyncAckCap5(self): + self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 5) + + def testAcknowledgeAsyncAckCapUNLIMITED(self): + self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED) + def send(self, ssn, queue, base, count=1): snd = ssn.sender(queue) contents = [] @@ -319,7 +355,8 @@ class SessionTests(Base): txssn.acknowledge() else: txssn.rollback() - assert contents == self.drain(txrcv) + drained = self.drain(txrcv) + assert contents == drained, "expected %s, got %s" % (contents, drained) txssn.acknowledge() txssn.rollback() assert contents == self.drain(txrcv) @@ -401,9 +438,9 @@ class ReceiverTests(Base): elapsed = time.time() - start assert elapsed >= self.delay() - one = self.send("testListen", 1) - two = self.send("testListen", 2) - three = self.send("testListen", 3) + one = self.send("testFetch", 1) + two = self.send("testFetch", 2) + three = self.send("testFetch", 3) msg = self.rcv.fetch(0) assert msg.content == one msg = self.rcv.fetch(self.delay()) @@ -467,34 +504,35 @@ class ReceiverTests(Base): def testCapacity(self): self.rcv.capacity = 5 self.rcv.start() - assert self.rcv.pending() == 0 + self.assertPending(self.rcv, 0) for i in range(15): self.send("testCapacity", i) self.sleep() - assert self.rcv.pending() == 5 + self.assertPending(self.rcv, 5) self.drain(self.rcv, limit = 5) self.sleep() - assert self.rcv.pending() == 5 + self.assertPending(self.rcv, 5) - self.drain(self.rcv) - assert self.rcv.pending() == 0 + drained = self.drain(self.rcv) + assert len(drained) == 10 + self.assertPending(self.rcv, 0) self.ssn.acknowledge() def testCapacityUNLIMITED(self): self.rcv.capacity = UNLIMITED self.rcv.start() - assert self.rcv.pending() == 0 + self.assertPending(self.rcv, 0) for i in range(10): self.send("testCapacityUNLIMITED", i) self.sleep() - assert self.rcv.pending() == 10 + self.assertPending(self.rcv, 10) self.drain(self.rcv) - assert self.rcv.pending() == 0 + self.assertPending(self.rcv, 0) self.ssn.acknowledge() @@ -535,6 +573,48 @@ class SenderTests(Base): def testSendMap(self): self.checkContent({"testSendMap": self.test_id, "pie": "blueberry", "pi": 3.14}) + def asyncTest(self, capacity): + self.snd.capacity = capacity + msgs = [self.content("asyncTest", i) for i in range(15)] + for m in msgs: + self.snd.send(m, sync=False) + drained = self.drain(self.rcv, timeout=self.delay()) + assert msgs == drained, "expected %s, got %s" % (msgs, drained) + self.ssn.acknowledge() + + def testSendAsyncCapacity0(self): + try: + self.asyncTest(0) + assert False, "send shouldn't succeed with zero capacity" + except InsufficientCapacity: + # this is expected + pass + + def testSendAsyncCapacity1(self): + self.asyncTest(1) + + def testSendAsyncCapacity5(self): + self.asyncTest(5) + + def testSendAsyncCapacityUNLIMITED(self): + self.asyncTest(UNLIMITED) + + def testCapacityTimeout(self): + self.snd.capacity = 1 + msgs = [] + caught = False + while len(msgs) < 100: + m = self.content("testCapacity", len(msgs)) + try: + self.snd.send(m, sync=False, timeout=0) + msgs.append(m) + except InsufficientCapacity: + caught = True + break + self.drain(self.rcv, expected=msgs) + self.ssn.acknowledge() + assert caught, "did not exceed capacity" + class MessageTests(Base): def testCreateString(self): diff --git a/qpid/python/qpid/util.py b/qpid/python/qpid/util.py index c46716b88f..3409d777f9 100644 --- a/qpid/python/qpid/util.py +++ b/qpid/python/qpid/util.py @@ -134,3 +134,9 @@ class URL: if self.port: s += ":%s" % self.port return s + +def default(value, default): + if value is None: + return default + else: + return value diff --git a/qpid/python/qpid_config.py b/qpid/python/qpid_config.py index 3cf6b69b7e..d740a53dfe 100644 --- a/qpid/python/qpid_config.py +++ b/qpid/python/qpid_config.py @@ -19,7 +19,7 @@ import os -qpid_home = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) -amqp_spec = os.path.join(qpid_home, "specs", "amqp.0-10-qpid-errata.xml") -amqp_spec_0_8 = os.path.join(qpid_home, "specs", "amqp.0-8.xml") -amqp_spec_0_9 = os.path.join(qpid_home, "specs", "amqp.0-9.xml") +AMQP_SPEC_DIR=os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "specs") +amqp_spec = os.path.join(AMQP_SPEC_DIR, "amqp.0-10-qpid-errata.xml") +amqp_spec_0_8 = os.path.join(AMQP_SPEC_DIR, "amqp.0-8.xml") +amqp_spec_0_9 = os.path.join(AMQP_SPEC_DIR, "amqp.0-9.xml") diff --git a/qpid/python/tests/datatypes.py b/qpid/python/tests/datatypes.py index 1a60bb4107..b00e5e78f8 100644 --- a/qpid/python/tests/datatypes.py +++ b/qpid/python/tests/datatypes.py @@ -54,6 +54,19 @@ class SerialTest(TestCase): d[serial(0)] = "zero" assert d[0] == "zero" + def testAdd(self): + assert serial(2) + 2 == serial(4) + assert serial(2) + 2 == 4 + + def testSub(self): + delta = serial(4) - serial(2) + assert isinstance(delta, int) or isinstance(delta, long) + assert delta == 2 + + delta = serial(4) - 2 + assert isinstance(delta, Serial) + assert delta == serial(2) + class RangedSetTest(TestCase): def check(self, ranges): diff --git a/qpid/python/tests_0-10/alternate_exchange.py b/qpid/python/tests_0-10/alternate_exchange.py index 3b75145907..4d8617eb8e 100644 --- a/qpid/python/tests_0-10/alternate_exchange.py +++ b/qpid/python/tests_0-10/alternate_exchange.py @@ -141,7 +141,61 @@ class AlternateExchangeTests(TestBase010): session.exchange_delete(exchange="e") session.exchange_delete(exchange="alternate") self.assertEquals(530, e.args[0].error_code) - + + + def test_modify_existing_exchange_alternate(self): + """ + Ensure that attempting to modify an exhange to change + the alternate throws an exception + """ + session = self.session + session.exchange_declare(exchange="alt1", type="direct") + session.exchange_declare(exchange="alt2", type="direct") + session.exchange_declare(exchange="onealternate", type="fanout", alternate_exchange="alt1") + try: + # attempt to change the alternate on an already existing exchange + session.exchange_declare(exchange="onealternate", type="fanout", alternate_exchange="alt2") + self.fail("Expected changing an alternate on an existing exchange to fail") + except SessionException, e: + self.assertEquals(530, e.args[0].error_code) + session = self.conn.session("alternate", 2) + session.exchange_delete(exchange="onealternate") + session.exchange_delete(exchange="alt2") + session.exchange_delete(exchange="alt1") + + + def test_add_alternate_to_exchange(self): + """ + Ensure that attempting to modify an exhange by adding + an alternate throws an exception + """ + session = self.session + session.exchange_declare(exchange="alt1", type="direct") + session.exchange_declare(exchange="noalternate", type="fanout") + try: + # attempt to add an alternate on an already existing exchange + session.exchange_declare(exchange="noalternate", type="fanout", alternate_exchange="alt1") + self.fail("Expected adding an alternate on an existing exchange to fail") + except SessionException, e: + self.assertEquals(530, e.args[0].error_code) + session = self.conn.session("alternate", 2) + session.exchange_delete(exchange="noalternate") + session.exchange_delete(exchange="alt1") + + + def test_del_alternate_to_exchange(self): + """ + Ensure that attempting to modify an exhange by declaring + it again without an alternate does nothing + """ + session = self.session + session.exchange_declare(exchange="alt1", type="direct") + session.exchange_declare(exchange="onealternate", type="fanout", alternate_exchange="alt1") + # attempt to re-declare without an alternate - silently ignore + session.exchange_declare(exchange="onealternate", type="fanout" ) + session.exchange_delete(exchange="onealternate") + session.exchange_delete(exchange="alt1") + def assertEmpty(self, queue): try: diff --git a/qpid/python/tests_0-10/management.py b/qpid/python/tests_0-10/management.py index 51c2a687cb..53573f309e 100644 --- a/qpid/python/tests_0-10/management.py +++ b/qpid/python/tests_0-10/management.py @@ -295,3 +295,25 @@ class ManagementTest (TestBase010): sleep(1) self.assertEqual(handler.check(), "pass") + def test_connection_close(self): + """ + Test management method for closing connection + """ + self.startQmf() + conn = self.connect() + session = conn.session("my-named-session") + + #using qmf find named session and close the corresponding connection: + qmf_ssn_object = self.qmf.getObjects(_class="session", name="my-named-session")[0] + qmf_ssn_object._connectionRef_.close() + + #check that connection is closed + try: + conn.session("another-session") + self.fail("Expected failure from closed connection") + except: None + + #make sure that the named session has been closed and the name can be re-used + conn = self.connect() + session = conn.session("my-named-session") + session.queue_declare(queue="whatever", exclusive=True, auto_delete=True) diff --git a/qpid/python/tests_0-9/queue.py b/qpid/python/tests_0-9/queue.py index de1153307c..e7fe0b3ed4 100644 --- a/qpid/python/tests_0-9/queue.py +++ b/qpid/python/tests_0-9/queue.py @@ -6,9 +6,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,11 +19,137 @@ from qpid.client import Client, Closed from qpid.queue import Empty from qpid.content import Content -from qpid.testlib import TestBase +from qpid.testlib import testrunner, TestBase class QueueTests(TestBase): """Tests for 'methods' on the amqp queue 'class'""" + def test_purge(self): + """ + Test that the purge method removes messages from the queue + """ + channel = self.channel + #setup, declare a queue and add some messages to it: + channel.exchange_declare(exchange="test-exchange", type="direct") + channel.queue_declare(queue="test-queue", exclusive=True) + channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") + channel.message_transfer(destination="test-exchange", routing_key="key", body="one") + channel.message_transfer(destination="test-exchange", routing_key="key", body="two") + channel.message_transfer(destination="test-exchange", routing_key="key", body="three") + + #check that the queue now reports 3 messages: + reply = channel.queue_declare(queue="test-queue") + self.assertEqual(3, reply.message_count) + + #now do the purge, then test that three messages are purged and the count drops to 0 + reply = channel.queue_purge(queue="test-queue"); + self.assertEqual(3, reply.message_count) + reply = channel.queue_declare(queue="test-queue") + self.assertEqual(0, reply.message_count) + + #send a further message and consume it, ensuring that the other messages are really gone + channel.message_transfer(destination="test-exchange", routing_key="key", body="four") + channel.message_consume(queue="test-queue", destination="tag", no_ack=True) + queue = self.client.queue("tag") + msg = queue.get(timeout=1) + self.assertEqual("four", msg.body) + + #check error conditions (use new channels): + channel = self.client.channel(2) + channel.channel_open() + try: + #queue specified but doesn't exist: + channel.queue_purge(queue="invalid-queue") + self.fail("Expected failure when purging non-existent queue") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + channel = self.client.channel(3) + channel.channel_open() + try: + #queue not specified and none previously declared for channel: + channel.queue_purge() + self.fail("Expected failure when purging unspecified queue") + except Closed, e: + self.assertConnectionException(530, e.args[0]) + + #cleanup + other = self.connect() + channel = other.channel(1) + channel.channel_open() + channel.exchange_delete(exchange="test-exchange") + + def test_declare_exclusive(self): + """ + Test that the exclusive field is honoured in queue.declare + """ + # TestBase.setUp has already opened channel(1) + c1 = self.channel + # Here we open a second separate connection: + other = self.connect() + c2 = other.channel(1) + c2.channel_open() + + #declare an exclusive queue: + c1.queue_declare(queue="exclusive-queue", exclusive="True") + try: + #other connection should not be allowed to declare this: + c2.queue_declare(queue="exclusive-queue", exclusive="True") + self.fail("Expected second exclusive queue_declare to raise a channel exception") + except Closed, e: + self.assertChannelException(405, e.args[0]) + + + def test_declare_passive(self): + """ + Test that the passive field is honoured in queue.declare + """ + channel = self.channel + #declare an exclusive queue: + channel.queue_declare(queue="passive-queue-1", exclusive="True") + channel.queue_declare(queue="passive-queue-1", passive="True") + try: + #other connection should not be allowed to declare this: + channel.queue_declare(queue="passive-queue-2", passive="True") + self.fail("Expected passive declaration of non-existant queue to raise a channel exception") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + + def test_bind(self): + """ + Test various permutations of the queue.bind method + """ + channel = self.channel + channel.queue_declare(queue="queue-1", exclusive="True") + + #straightforward case, both exchange & queue exist so no errors expected: + channel.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1") + + #bind the default queue for the channel (i.e. last one declared): + channel.queue_bind(exchange="amq.direct", routing_key="key2") + + #use the queue name where neither routing key nor queue are specified: + channel.queue_bind(exchange="amq.direct") + + #try and bind to non-existant exchange + try: + channel.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1") + self.fail("Expected bind to non-existant exchange to fail") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + #need to reopen a channel: + channel = self.client.channel(2) + channel.channel_open() + + #try and bind non-existant queue: + try: + channel.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1") + self.fail("Expected bind of non-existant queue to fail") + except Closed, e: + self.assertChannelException(404, e.args[0]) + def test_unbind_direct(self): self.unbind_test(exchange="amq.direct", routing_key="key") @@ -39,12 +165,12 @@ class QueueTests(TestBase): def unbind_test(self, exchange, routing_key="", args=None, headers={}): #bind two queues and consume from them channel = self.channel - + channel.queue_declare(queue="queue-1", exclusive="True") channel.queue_declare(queue="queue-2", exclusive="True") - channel.basic_consume(queue="queue-1", consumer_tag="queue-1", no_ack=True) - channel.basic_consume(queue="queue-2", consumer_tag="queue-2", no_ack=True) + channel.message_consume(queue="queue-1", destination="queue-1", no_ack=True) + channel.message_consume(queue="queue-2", destination="queue-2", no_ack=True) queue1 = self.client.queue("queue-1") queue2 = self.client.queue("queue-2") @@ -53,29 +179,130 @@ class QueueTests(TestBase): channel.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args) #send a message that will match both bindings - channel.basic_publish(exchange=exchange, routing_key=routing_key, - content=Content("one", properties={"headers": headers})) - + channel.message_transfer(destination=exchange, routing_key=routing_key, application_headers=headers, body="one") + #unbind first queue channel.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) - + #send another message - channel.basic_publish(exchange=exchange, routing_key=routing_key, - content=Content("two", properties={"headers": headers})) + channel.message_transfer(destination=exchange, routing_key=routing_key, application_headers=headers, body="two") #check one queue has both messages and the other has only one - self.assertEquals("one", queue1.get(timeout=1).content.body) + self.assertEquals("one", queue1.get(timeout=1).body) try: msg = queue1.get(timeout=1) self.fail("Got extra message: %s" % msg.body) except Empty: pass - self.assertEquals("one", queue2.get(timeout=1).content.body) - self.assertEquals("two", queue2.get(timeout=1).content.body) + self.assertEquals("one", queue2.get(timeout=1).body) + self.assertEquals("two", queue2.get(timeout=1).body) try: msg = queue2.get(timeout=1) self.fail("Got extra message: " + msg) - except Empty: pass + except Empty: pass + + + def test_delete_simple(self): + """ + Test core queue deletion behaviour + """ + channel = self.channel + + #straight-forward case: + channel.queue_declare(queue="delete-me") + channel.message_transfer(routing_key="delete-me", body="a") + channel.message_transfer(routing_key="delete-me", body="b") + channel.message_transfer(routing_key="delete-me", body="c") + reply = channel.queue_delete(queue="delete-me") + self.assertEqual(3, reply.message_count) + #check that it has gone be declaring passively + try: + channel.queue_declare(queue="delete-me", passive="True") + self.fail("Queue has not been deleted") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + #check attempted deletion of non-existant queue is handled correctly: + channel = self.client.channel(2) + channel.channel_open() + try: + channel.queue_delete(queue="i-dont-exist", if_empty="True") + self.fail("Expected delete of non-existant queue to fail") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + + + def test_delete_ifempty(self): + """ + Test that if_empty field of queue_delete is honoured + """ + channel = self.channel + + #create a queue and add a message to it (use default binding): + channel.queue_declare(queue="delete-me-2") + channel.queue_declare(queue="delete-me-2", passive="True") + channel.message_transfer(routing_key="delete-me-2", body="message") + + #try to delete, but only if empty: + try: + channel.queue_delete(queue="delete-me-2", if_empty="True") + self.fail("Expected delete if_empty to fail for non-empty queue") + except Closed, e: + self.assertChannelException(406, e.args[0]) + + #need new channel now: + channel = self.client.channel(2) + channel.channel_open() + + #empty queue: + channel.message_consume(destination="consumer_tag", queue="delete-me-2", no_ack=True) + queue = self.client.queue("consumer_tag") + msg = queue.get(timeout=1) + self.assertEqual("message", msg.body) + channel.message_cancel(destination="consumer_tag") + + #retry deletion on empty queue: + channel.queue_delete(queue="delete-me-2", if_empty="True") + + #check that it has gone by declaring passively: + try: + channel.queue_declare(queue="delete-me-2", passive="True") + self.fail("Queue has not been deleted") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + def test_delete_ifunused(self): + """ + Test that if_unused field of queue_delete is honoured + """ + channel = self.channel + + #create a queue and register a consumer: + channel.queue_declare(queue="delete-me-3") + channel.queue_declare(queue="delete-me-3", passive="True") + channel.message_consume(destination="consumer_tag", queue="delete-me-3", no_ack=True) + + #need new channel now: + channel2 = self.client.channel(2) + channel2.channel_open() + #try to delete, but only if empty: + try: + channel2.queue_delete(queue="delete-me-3", if_unused="True") + self.fail("Expected delete if_unused to fail for queue with existing consumer") + except Closed, e: + self.assertChannelException(406, e.args[0]) + + + channel.message_cancel(destination="consumer_tag") + channel.queue_delete(queue="delete-me-3", if_unused="True") + #check that it has gone by declaring passively: + try: + channel.queue_declare(queue="delete-me-3", passive="True") + self.fail("Queue has not been deleted") + except Closed, e: + self.assertChannelException(404, e.args[0]) + def test_autodelete_shared(self): """ @@ -109,3 +336,5 @@ class QueueTests(TestBase): self.fail("Expected queue to have been deleted") except Closed, e: self.assertChannelException(404, e.args[0]) + + |