summaryrefslogtreecommitdiff
path: root/qpid/python
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python')
-rw-r--r--qpid/python/Makefile2
-rwxr-xr-xqpid/python/examples/api/drain62
-rwxr-xr-xqpid/python/examples/api/ping76
-rwxr-xr-xqpid/python/qpid-python-test42
-rw-r--r--qpid/python/qpid/address.py171
-rw-r--r--qpid/python/qpid/compat.py11
-rw-r--r--qpid/python/qpid/driver.py864
-rw-r--r--qpid/python/qpid/messaging.py148
-rw-r--r--qpid/python/qpid/ops.py6
-rw-r--r--qpid/python/qpid/selector.py156
-rw-r--r--qpid/python/qpid/tests/messaging.py174
11 files changed, 1346 insertions, 366 deletions
diff --git a/qpid/python/Makefile b/qpid/python/Makefile
index 31547c8f57..ff4a9af4f1 100644
--- a/qpid/python/Makefile
+++ b/qpid/python/Makefile
@@ -36,7 +36,7 @@ SRCS=$(shell find $(DIRS) -name "*.py") qpid_config.py
BUILD=build
TARGETS=$(SRCS:%.py=$(BUILD)/%.py)
-PYCC=python -c "import compileall, sys; compileall.compile_dir(sys.argv[1])"
+PYCC=python -O -c "import compileall; compileall.main()"
all: build
diff --git a/qpid/python/examples/api/drain b/qpid/python/examples/api/drain
new file mode 100755
index 0000000000..485985f16d
--- /dev/null
+++ b/qpid/python/examples/api/drain
@@ -0,0 +1,62 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse
+from qpid.messaging import *
+from qpid.util import URL
+
+parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS ...",
+ description="Drain messages from the supplied address.")
+parser.add_option("-b", "--broker", default="localhost",
+ help="connect to specified BROKER (default %default)")
+parser.add_option("-t", "--timeout", type=float, default=0,
+ help="timeout in seconds to wait before exiting (default %default)")
+parser.add_option("-f", "--forever", action="store_true",
+ help="ignore timeout and wait forever")
+
+opts, args = parser.parse_args()
+
+url = URL(opts.broker)
+if args:
+ addr = args.pop(0)
+else:
+ parser.error("address is required")
+if opts.forever:
+ timeout = None
+else:
+ timeout = opts.timeout
+
+# XXX: should make URL default the port for us
+conn = Connection.open(url.host, url.port or AMQP_PORT,
+ username=url.user, password=url.password)
+ssn = conn.session()
+rcv = ssn.receiver(addr)
+
+while True:
+ try:
+ print rcv.fetch(timeout=timeout)
+ ssn.acknowledge()
+ except Empty:
+ break
+ except ReceiveError, e:
+ print e
+ break
+
+conn.close()
diff --git a/qpid/python/examples/api/ping b/qpid/python/examples/api/ping
new file mode 100755
index 0000000000..59b367cca6
--- /dev/null
+++ b/qpid/python/examples/api/ping
@@ -0,0 +1,76 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, time
+from qpid.messaging import *
+from qpid.util import URL
+
+parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS [ CONTENT ... ]",
+ description="Drain messages from the supplied address.")
+parser.add_option("-b", "--broker", default="localhost",
+ help="connect to specified BROKER (default %default)")
+parser.add_option("-c", "--count", type=int, default=1,
+ help="stop after count messages have been sent, zero disables (default %default)")
+parser.add_option("-t", "--timeout", type=float, default=None,
+ help="exit after the specified time")
+parser.add_option("-m", "--map", action="store_true",
+ help="interpret content as map")
+parser.add_option("-i", "--id", help="use the supplied id instead of generating one")
+
+opts, args = parser.parse_args()
+
+url = URL(opts.broker)
+if opts.id is None:
+ ping_id = str(uuid4())
+else:
+ ping_id = opts.id
+if args:
+ addr = args.pop(0)
+else:
+ parser.error("address is required")
+if args:
+ content = " ".join(args)
+ if opts.map:
+ content = eval(content)
+else:
+ content = None
+
+# XXX: should make URL default the port for us
+conn = Connection.open(url.host, url.port or AMQP_PORT,
+ username=url.user, password=url.password)
+ssn = conn.session()
+snd = ssn.sender(addr)
+
+count = 0
+start = time.time()
+while (opts.count == 0 or count < opts.count) and \
+ (opts.timeout is None or time.time() - start < opts.timeout):
+ msg = Message(content)
+ msg.properties["ping-id"] = "%s:%s" % (ping_id, count)
+
+ try:
+ snd.send(msg)
+ count += 1
+ print msg
+ except SendError, e:
+ print e
+ break
+
+conn.close()
diff --git a/qpid/python/qpid-python-test b/qpid/python/qpid-python-test
index 528acaa124..b569020368 100755
--- a/qpid/python/qpid-python-test
+++ b/qpid/python/qpid-python-test
@@ -20,7 +20,7 @@
# TODO: summarize, test harness preconditions (e.g. broker is alive)
-import fcntl, logging, optparse, os, struct, sys, termios, traceback, types
+import logging, optparse, os, struct, sys, traceback, types
from fnmatch import fnmatchcase as match
from getopt import GetoptError
from logging import getLogger, StreamHandler, Formatter, Filter, \
@@ -126,27 +126,33 @@ def is_included(path):
def is_smart():
return sys.stdout.isatty() and os.environ.get("TERM", "dumb") != "dumb"
-def width():
- if is_smart():
- s = struct.pack("HHHH", 0, 0, 0, 0)
- fd_stdout = sys.stdout.fileno()
- x = fcntl.ioctl(fd_stdout, termios.TIOCGWINSZ, s)
- rows, cols, xpx, ypx = struct.unpack("HHHH", x)
- return cols
- else:
- try:
- return int(os.environ.get("COLUMNS", "80"))
- except ValueError:
- return 80
+try:
+ import fcntl, termios
-WIDTH = width()
+ def width():
+ if is_smart():
+ s = struct.pack("HHHH", 0, 0, 0, 0)
+ fd_stdout = sys.stdout.fileno()
+ x = fcntl.ioctl(fd_stdout, termios.TIOCGWINSZ, s)
+ rows, cols, xpx, ypx = struct.unpack("HHHH", x)
+ return cols
+ else:
+ try:
+ return int(os.environ.get("COLUMNS", "80"))
+ except ValueError:
+ return 80
-def resize(sig, frm):
- global WIDTH
WIDTH = width()
-import signal
-signal.signal(signal.SIGWINCH, resize)
+ def resize(sig, frm):
+ global WIDTH
+ WIDTH = width()
+
+ import signal
+ signal.signal(signal.SIGWINCH, resize)
+
+except ImportError:
+ WIDTH = 80
def vt100_attrs(*attrs):
return "\x1B[%sm" % ";".join(map(str, attrs))
diff --git a/qpid/python/qpid/address.py b/qpid/python/qpid/address.py
new file mode 100644
index 0000000000..5976d4889b
--- /dev/null
+++ b/qpid/python/qpid/address.py
@@ -0,0 +1,171 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import re
+
+TYPES = []
+
+class Type:
+
+ def __init__(self, name, pattern=None):
+ self.name = name
+ self.pattern = pattern
+ if self.pattern:
+ TYPES.append(self)
+
+ def __repr__(self):
+ return self.name
+
+LBRACE = Type("LBRACE", r"\{")
+RBRACE = Type("RBRACE", r"\}")
+COLON = Type("COLON", r":")
+COMMA = Type("COMMA", r",")
+SLASH = Type("SLASH", r"/")
+ID = Type("ID", r'[a-zA-Z_][a-zA-Z0-9_.-]*')
+NUMBER = Type("NUMBER", r'[+-]?[0-9]*\.?[0-9]+')
+STRING = Type("STRING", r""""(?:[^\\"]|\\.)*"|'(?:[^\\']|\\.)*'""")
+WSPACE = Type("WSPACE", r"[ \n\r\t]+")
+EOF = Type("EOF")
+
+class Token:
+
+ def __init__(self, type, value):
+ self.type = type
+ self.value = value
+
+ def __repr__(self):
+ return "%s: %r" % (self.type, self.value)
+
+joined = "|".join(["(%s)" % t.pattern for t in TYPES])
+LEXER = re.compile(joined)
+
+class LexError(Exception):
+ pass
+
+def line_info(st, pos):
+ idx = 0
+ lineno = 1
+ column = 0
+ line_pos = 0
+ while idx < pos:
+ if st[idx] == "\n":
+ lineno += 1
+ column = 0
+ line_pos = idx
+ column += 1
+ idx += 1
+
+ end = st.find("\n", line_pos)
+ if end < 0:
+ end = len(st)
+ line = st[line_pos:end]
+
+ return line, lineno, column
+
+def lex(st):
+ pos = 0
+ while pos < len(st):
+ m = LEXER.match(st, pos)
+ if m is None:
+ line, ln, col = line_info(st, pos)
+ raise LexError("unrecognized character in <string>:%s,%s: %s" % (ln, col, line))
+ else:
+ idx = m.lastindex
+ t = Token(TYPES[idx - 1], m.group(idx))
+ yield t
+ pos = m.end()
+ yield Token(EOF, None)
+
+class ParseError(Exception): pass
+
+class Parser:
+
+ def __init__(self, tokens):
+ self.tokens = [t for t in tokens if t.type is not WSPACE]
+ self.idx = 0
+
+ def next(self):
+ return self.tokens[self.idx]
+
+ def matches(self, *types):
+ return self.next().type in types
+
+ def eat(self, *types):
+ if types and not self.matches(*types):
+ raise ParseError("expecting %s -- got %s" % (", ".join(map(str, types)), self.next()))
+ else:
+ t = self.next()
+ self.idx += 1
+ return t
+
+ def parse(self):
+ result = self.address()
+ self.eat(EOF)
+ return result
+
+ def address(self):
+ name = self.eat(ID).value
+ subject = None
+ options = None
+ if self.matches(SLASH):
+ self.eat(SLASH)
+ if self.matches(ID):
+ subject = self.eat(ID).value
+ else:
+ subject = ""
+ elif self.matches(LBRACE):
+ options = self.map()
+ return name, subject, options
+
+ def map(self):
+ self.eat(LBRACE)
+ result = {}
+ while True:
+ if self.matches(RBRACE):
+ self.eat(RBRACE)
+ break
+ else:
+ if self.matches(ID):
+ n, v = self.nameval()
+ result[n] = v
+ elif self.matches(COMMA):
+ self.eat(COMMA)
+ else:
+ raise ParseError("expecting (ID, COMMA), got %s" % self.next())
+ return result
+
+ def nameval(self):
+ name = self.eat(ID).value
+ self.eat(COLON)
+ val = self.value()
+ return (name, val)
+
+ def value(self):
+ if self.matches(NUMBER, STRING):
+ return eval(self.eat().value)
+ elif self.matches(ID):
+ return self.eat().value
+ elif self.matches(LBRACE):
+ return self.map()
+ else:
+ raise ParseError("expecting (NUMBER, STRING, LBRACE) got %s" % self.next())
+
+def parse(addr):
+ return Parser(lex(addr)).parse()
+
+__all__ = ["parse"]
diff --git a/qpid/python/qpid/compat.py b/qpid/python/qpid/compat.py
index 49273193df..53ab757e89 100644
--- a/qpid/python/qpid/compat.py
+++ b/qpid/python/qpid/compat.py
@@ -17,6 +17,8 @@
# under the License.
#
+import sys
+
try:
set = set
except NameError:
@@ -30,6 +32,13 @@ except ImportError:
try:
from traceback import format_exc
except ImportError:
- import sys, traceback
+ import traceback
def format_exc():
return "".join(traceback.format_exception(*sys.exc_info()))
+
+if tuple(sys.version_info[0:2]) < (2, 4):
+ from select import select as old_select
+ def select(rlist, wlist, xlist, timeout=None):
+ return old_select(list(rlist), list(wlist), list(xlist), timeout)
+else:
+ from select import select
diff --git a/qpid/python/qpid/driver.py b/qpid/python/qpid/driver.py
index 2e07c82a0d..7c293fe146 100644
--- a/qpid/python/qpid/driver.py
+++ b/qpid/python/qpid/driver.py
@@ -17,25 +17,23 @@
# under the License.
#
-import compat, connection, socket, sys, time
+import address, compat, connection, socket, struct, sys, time
from concurrency import synchronized
-from datatypes import RangedSet, Message as Message010
-from exceptions import Timeout
+from datatypes import RangedSet, Serial
+from exceptions import Timeout, VersionError
+from framing import OpEncoder, SegmentEncoder, FrameEncoder, FrameDecoder, SegmentDecoder, OpDecoder
from logging import getLogger
from messaging import get_codec, ConnectError, Message, Pattern, UNLIMITED
-from ops import delivery_mode
-from session import Client, INCOMPLETE, SessionDetached
+from ops import *
+from selector import Selector
from threading import Condition, Thread
from util import connect
log = getLogger("qpid.messaging")
-def parse_addr(address):
- parts = address.split("/", 1)
- if len(parts) == 1:
- return parts[0], None
- else:
- return parts[0], parts[i1]
+def addr2reply_to(addr):
+ name, subject, options = address.parse(addr)
+ return ReplyTo(name, subject)
def reply_to2addr(reply_to):
if reply_to.routing_key is None:
@@ -50,287 +48,617 @@ class Attachment:
def __init__(self, target):
self.target = target
+# XXX
+
DURABLE_DEFAULT=True
+# XXX
+
FILTER_DEFAULTS = {
"topic": Pattern("*")
}
-def delegate(handler, session):
- class Delegate(Client):
-
- def message_transfer(self, cmd):
- return handler._message_transfer(session, cmd)
- return Delegate
+# XXX
+
+CLIENT_PROPERTIES = {"product": "qpid python client",
+ "version": "development",
+ "platform": os.name,
+ "qpid.client_process": os.path.basename(sys.argv[0]),
+ "qpid.client_pid": os.getpid(),
+ "qpid.client_ppid": os.getppid()}
+
+def noop(): pass
+
+class SessionState:
+
+ def __init__(self, driver, session, name, channel):
+ self.driver = driver
+ self.session = session
+ self.name = name
+ self.channel = channel
+ self.detached = False
+ self.committing = False
+ self.aborting = False
+
+ # sender state
+ self.sent = Serial(0)
+ self.acknowledged = RangedSet()
+ self.completions = {}
+ self.min_completion = self.sent
+ self.max_completion = self.sent
+ self.results = {}
+
+ # receiver state
+ self.received = None
+ self.executed = RangedSet()
+
+ # XXX: need to periodically exchange completion/known_completion
+
+ def write_query(self, query, handler):
+ id = self.sent
+ query.sync = True
+ self.write_cmd(query, lambda: handler(self.results.pop(id)))
+
+ def write_cmd(self, cmd, completion=noop):
+ if self.detached:
+ raise Exception("detached")
+ cmd.id = self.sent
+ self.sent += 1
+ self.completions[cmd.id] = completion
+ self.max_completion = cmd.id
+ self.write_op(cmd)
+
+ def write_op(self, op):
+ op.channel = self.channel
+ self.driver.write_op(op)
+
+# XXX
+HEADER="!4s4B"
+
+EMPTY_DP = DeliveryProperties()
+EMPTY_MP = MessageProperties()
class Driver:
def __init__(self, connection):
self.connection = connection
self._lock = self.connection._lock
- self._wakeup_cond = Condition()
- self._socket = None
- self._conn = None
+
+ self._selector = Selector.default()
+ self.reset()
+
+ def reset(self):
+ self._opening = False
+ self._closing = False
self._connected = False
self._attachments = {}
- self._modcount = self.connection._modcount
- self.thread = Thread(target=self.run)
- self.thread.setDaemon(True)
- # XXX: need to figure out how to join on this thread
+ self._channel_max = 65536
+ self._channels = 0
+ self._sessions = {}
+
+ self._socket = None
+ self._buf = ""
+ self._hdr = ""
+ self._op_enc = OpEncoder()
+ self._seg_enc = SegmentEncoder()
+ self._frame_enc = FrameEncoder()
+ self._frame_dec = FrameDecoder()
+ self._seg_dec = SegmentDecoder()
+ self._op_dec = OpDecoder()
+ self._timeout = None
+
+ for ssn in self.connection.sessions.values():
+ for m in ssn.acked + ssn.unacked + ssn.incoming:
+ m._transfer_id = None
+ for snd in ssn.senders:
+ snd.linked = False
+ for rcv in ssn.receivers:
+ rcv.impending = rcv.received
+ rcv.linked = False
+
+ @synchronized
def wakeup(self):
- self._wakeup_cond.acquire()
- try:
- self._wakeup_cond.notifyAll()
- finally:
- self._wakeup_cond.release()
+ self.dispatch()
+ self._selector.wakeup()
def start(self):
- self.thread.start()
+ self._selector.register(self)
+
+ def fileno(self):
+ return self._socket.fileno()
- def run(self):
- while True:
- self._wakeup_cond.acquire()
+ @synchronized
+ def reading(self):
+ return self._socket is not None
+
+ @synchronized
+ def writing(self):
+ return self._socket is not None and self._buf
+
+ @synchronized
+ def timing(self):
+ return self._timeout
+
+ @synchronized
+ def readable(self):
+ error = None
+ recoverable = False
+ try:
+ data = self._socket.recv(64*1024)
+ if data:
+ log.debug("READ: %r", data)
+ else:
+ log.debug("ABORTED: %s", self._socket.getpeername())
+ error = "connection aborted"
+ recoverable = True
+ except socket.error, e:
+ error = e
+ recoverable = True
+
+ if not error:
try:
- if self.connection._modcount <= self._modcount:
- self._wakeup_cond.wait(10)
- finally:
- self._wakeup_cond.release()
- self.dispatch(self.connection._modcount)
+ if len(self._hdr) < 8:
+ r = 8 - len(self._hdr)
+ self._hdr += data[:r]
+ data = data[r:]
+
+ if len(self._hdr) == 8:
+ self.do_header(self._hdr)
+
+ self._frame_dec.write(data)
+ self._seg_dec.write(*self._frame_dec.read())
+ self._op_dec.write(*self._seg_dec.read())
+ for op in self._op_dec.read():
+ self.assign_id(op)
+ log.debug("RCVD: %r", op)
+ op.dispatch(self)
+ except VersionError, e:
+ error = e
+ except:
+ msg = compat.format_exc()
+ error = msg
+
+ if error:
+ self._error(error, recoverable)
+ else:
+ self.dispatch()
+
+ self.connection._waiter.notifyAll()
+
+ def assign_id(self, op):
+ if isinstance(op, Command):
+ sst = self.get_sst(op)
+ op.id = sst.received
+ sst.received += 1
@synchronized
- def dispatch(self, modcount):
+ def writeable(self):
try:
- if self._conn is None and self.connection._connected:
+ n = self._socket.send(self._buf)
+ log.debug("SENT: %r", self._buf[:n])
+ self._buf = self._buf[n:]
+ except socket.error, e:
+ self._error(e, True)
+ self.connection._waiter.notifyAll()
+
+ @synchronized
+ def timeout(self):
+ log.warn("retrying ...")
+ self.dispatch()
+ self.connection._waiter.notifyAll()
+
+ def _error(self, err, recoverable):
+ if self._socket is not None:
+ self._socket.close()
+ self.reset()
+ if recoverable and self.connection.reconnect:
+ self._timeout = time.time() + 3
+ log.warn("recoverable error: %s" % err)
+ log.warn("sleeping 3 seconds")
+ else:
+ self.connection.error = (err,)
+
+ def write_op(self, op):
+ log.debug("SENT: %r", op)
+ self._op_enc.write(op)
+ self._seg_enc.write(*self._op_enc.read())
+ self._frame_enc.write(*self._seg_enc.read())
+ self._buf += self._frame_enc.read()
+
+ def do_header(self, hdr):
+ cli_major = 0; cli_minor = 10
+ magic, _, _, major, minor = struct.unpack(HEADER, hdr)
+ if major != cli_major or minor != cli_minor:
+ raise VersionError("client: %s-%s, server: %s-%s" %
+ (cli_major, cli_minor, major, minor))
+
+ def do_connection_start(self, start):
+ # XXX: should we use some sort of callback for this?
+ r = "\0%s\0%s" % (self.connection.username, self.connection.password)
+ m = self.connection.mechanism
+ self.write_op(ConnectionStartOk(client_properties=CLIENT_PROPERTIES,
+ mechanism=m, response=r))
+
+ def do_connection_tune(self, tune):
+ # XXX: is heartbeat protocol specific?
+ if tune.channel_max is not None:
+ self.channel_max = tune.channel_max
+ self.write_op(ConnectionTuneOk(heartbeat=self.connection.heartbeat,
+ channel_max=self.channel_max))
+ self.write_op(ConnectionOpen())
+
+ def do_connection_open_ok(self, open_ok):
+ self._connected = True
+
+ def connection_heartbeat(self, hrt):
+ self.write_op(ConnectionHeartbeat())
+
+ def do_connection_close(self, close):
+ self.write_op(ConnectionCloseOk())
+ if close.reply_code != close_code.normal:
+ self.connection.error = (close.reply_code, close.reply_text)
+ # XXX: should we do a half shutdown on the socket here?
+ # XXX: we really need to test this, we may end up reporting a
+ # connection abort after this, if we were to do a shutdown on read
+ # and stop reading, then we wouldn't report the abort, that's
+ # probably the right thing to do
+
+ def do_connection_close_ok(self, close_ok):
+ self._socket.close()
+ self.reset()
+
+ def do_session_attached(self, atc):
+ pass
+
+ def do_session_command_point(self, cp):
+ sst = self.get_sst(cp)
+ sst.received = cp.command_id
+
+ def do_session_completed(self, sc):
+ sst = self.get_sst(sc)
+ for r in sc.commands:
+ sst.acknowledged.add(r.lower, r.upper)
+
+ if not sc.commands.empty():
+ while sst.min_completion in sc.commands:
+ if sst.completions.has_key(sst.min_completion):
+ sst.completions.pop(sst.min_completion)()
+ sst.min_completion += 1
+
+ def session_known_completed(self, kcmp):
+ sst = self.get_sst(kcmp)
+ executed = RangedSet()
+ for e in sst.executed.ranges:
+ for ke in kcmp.ranges:
+ if e.lower in ke and e.upper in ke:
+ break
+ else:
+ executed.add_range(e)
+ sst.executed = completed
+
+ def do_session_flush(self, sf):
+ sst = self.get_sst(sf)
+ if sf.expected:
+ if sst.received is None:
+ exp = None
+ else:
+ exp = RangedSet(sst.received)
+ sst.write_op(SessionExpected(exp))
+ if sf.confirmed:
+ sst.write_op(SessionConfirmed(sst.executed))
+ if sf.completed:
+ sst.write_op(SessionCompleted(sst.executed))
+
+ def do_execution_result(self, er):
+ sst = self.get_sst(er)
+ sst.results[er.command_id] = er.value
+
+ def do_execution_exception(self, ex):
+ sst = self.get_sst(ex)
+ sst.session.error = (ex,)
+
+ def dispatch(self):
+ try:
+ if self._socket is None and self.connection._connected and not self._opening:
self.connect()
- elif self._conn is not None and not self.connection._connected:
+ elif self._socket is not None and not self.connection._connected and not self._closing:
self.disconnect()
- if self._conn is not None:
+ if self._connected and not self._closing:
for ssn in self.connection.sessions.values():
self.attach(ssn)
self.process(ssn)
-
- exi = None
except:
- exi = sys.exc_info()
-
- if exi:
msg = compat.format_exc()
- recoverable = ["aborted", "Connection refused", "SessionDetached", "Connection reset by peer",
- "Bad file descriptor", "start timed out", "Broken pipe"]
- for r in recoverable:
- if self.connection.reconnect and r in msg:
- print "waiting to retry"
- self.reset()
- time.sleep(3)
- print "retrying..."
- return
- else:
- self.connection.error = (msg,)
-
- self._modcount = modcount
- self.connection._waiter.notifyAll()
+ self.connection.error = (msg,)
def connect(self):
- if self._conn is not None:
- return
try:
+ # XXX: should make this non blocking
self._socket = connect(self.connection.host, self.connection.port)
+ self._timeout = None
except socket.error, e:
- raise ConnectError(e)
- self._conn = connection.Connection(self._socket)
- try:
- self._conn.start(timeout=10)
- self._connected = True
- except connection.VersionError, e:
- raise ConnectError(e)
- except Timeout:
- print "start timed out"
- raise ConnectError("start timed out")
+ if self.connection.reconnect:
+ self._error(e, True)
+ return
+ else:
+ raise e
+ self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
+ self._opening = True
def disconnect(self):
- self._conn.close()
- self.reset()
-
- def reset(self):
- self._conn = None
- self._connected = False
- self._attachments.clear()
- for ssn in self.connection.sessions.values():
- for m in ssn.acked + ssn.unacked + ssn.incoming:
- m._transfer_id = None
- for rcv in ssn.receivers:
- rcv.impending = rcv.received
-
- def connected(self):
- return self._conn is not None
+ self.write_op(ConnectionClose(close_code.normal))
+ self._closing = True
def attach(self, ssn):
- _ssn = self._attachments.get(ssn)
- if _ssn is None:
- _ssn = self._conn.session(ssn.name, delegate=delegate(self, ssn))
- _ssn.auto_sync = False
- _ssn.invoke_lock = self._lock
- _ssn.lock = self._lock
- _ssn.condition = self.connection._condition
+ sst = self._attachments.get(ssn)
+ if sst is None and not ssn.closed:
+ for i in xrange(0, self.channel_max):
+ if not self._sessions.has_key(i):
+ ch = i
+ break
+ else:
+ raise RuntimeError("all channels used")
+ sst = SessionState(self, ssn, ssn.name, ch)
+ sst.write_op(SessionAttach(name=ssn.name))
+ sst.write_op(SessionCommandPoint(sst.sent, 0))
+ sst.outgoing_idx = 0
+ sst.acked = []
if ssn.transactional:
- # XXX: adding an attribute to qpid.session.Session
- _ssn.acked = []
- _ssn.tx_select()
- self._attachments[ssn] = _ssn
+ sst.write_cmd(TxSelect())
+ self._attachments[ssn] = sst
+ self._sessions[sst.channel] = sst
for snd in ssn.senders:
self.link_out(snd)
for rcv in ssn.receivers:
self.link_in(rcv)
- if ssn.closing:
- _ssn.close()
- del self._attachments[ssn]
- ssn.closed = True
+ if sst is not None and ssn.closing and not sst.detached:
+ sst.detached = True
+ sst.write_op(SessionDetach(name=ssn.name))
+
+ def get_sst(self, op):
+ return self._sessions[op.channel]
- def _exchange_query(self, ssn, address):
- # XXX: auto sync hack is to avoid deadlock on future
- result = ssn.exchange_query(name=address, sync=True)
- ssn.sync()
- return result.get()
+ def do_session_detached(self, dtc):
+ sst = self._sessions.pop(dtc.channel)
+ ssn = sst.session
+ del self._attachments[ssn]
+ ssn.closed = True
+
+ def do_session_detach(self, dtc):
+ sst = self.get_sst(dtc)
+ sst.write_op(SessionDetached(name=dtc.name))
+ self.do_session_detached(dtc)
def link_out(self, snd):
- _ssn = self._attachments[snd.session]
+ sst = self._attachments.get(snd.session)
_snd = self._attachments.get(snd)
- if _snd is None:
+ if _snd is None and not snd.closing and not snd.closed:
_snd = Attachment(snd)
- node, _snd._subject = parse_addr(snd.target)
- result = self._exchange_query(_ssn, node)
- if result.not_found:
- # XXX: should check 'create' option
- _ssn.queue_declare(queue=node, durable=DURABLE_DEFAULT, sync=True)
- _ssn.sync()
- _snd._exchange = ""
- _snd._routing_key = node
- else:
- _snd._exchange = node
- _snd._routing_key = _snd._subject
+
+ try:
+ _snd.name, _snd.subject, _snd.options = address.parse(snd.target)
+ except address.LexError, e:
+ snd.error = e
+ snd.closed = True
+ return
+ except address.ParseError, e:
+ snd.error = e
+ snd.closed = True
+ return
+
+ # XXX: subject
+ if _snd.options is None:
+ _snd.options = {}
+
+ def do_link():
+ snd.linked = True
+
+ def do_queue_q(result):
+ if sst.detached:
+ return
+
+ if result.queue:
+ _snd._exchange = ""
+ _snd._routing_key = _snd.name
+ do_link()
+ else:
+ snd.error = ("no such queue: %s" % _snd.name,)
+ del self._attachments[snd]
+ snd.closed = True
+
+ def do_exchange_q(result):
+ if sst.detached:
+ return
+
+ if result.not_found:
+ if _snd.options.get("create") in ("always", "receiver"):
+ sst.write_cmd(QueueDeclare(queue=_snd.name, durable=DURABLE_DEFAULT))
+ _snd._exchange = ""
+ _snd._routing_key = _snd.name
+ else:
+ sst.write_query(QueueQuery(queue=_snd.name), do_queue_q)
+ return
+ else:
+ _snd._exchange = _snd.name
+ _snd._routing_key = _snd.subject
+ do_link()
+
+ sst.write_query(ExchangeQuery(name=_snd.name), do_exchange_q)
self._attachments[snd] = _snd
- if snd.closed:
+ if snd.closing and not snd.closed:
del self._attachments[snd]
- return None
- else:
- return _snd
+ snd.closed = True
def link_in(self, rcv):
- _ssn = self._attachments[rcv.session]
+ sst = self._attachments.get(rcv.session)
_rcv = self._attachments.get(rcv)
- if _rcv is None:
+ if _rcv is None and not rcv.closing and not rcv.closed:
_rcv = Attachment(rcv)
- result = self._exchange_query(_ssn, rcv.source)
- if result.not_found:
- _rcv._queue = rcv.source
- # XXX: should check 'create' option
- _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT)
- else:
- _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination)
- _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)
- if rcv.filter is None:
- f = FILTER_DEFAULTS[result.type]
+ _rcv.canceled = False
+ _rcv.draining = False
+
+ try:
+ _rcv.name, _rcv.subject, _rcv.options = address.parse(rcv.source)
+ except address.LexError, e:
+ rcv.error = e
+ rcv.closed = True
+ return
+ except address.ParseError, e:
+ rcv.error = e
+ rcv.closed = True
+ return
+
+ # XXX: subject
+ if _rcv.options is None:
+ _rcv.options = {}
+
+ def do_link():
+ sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination))
+ sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit))
+ rcv.linked = True
+
+ def do_queue_q(result):
+ if sst.detached:
+ return
+ if result.queue:
+ _rcv._queue = _rcv.name
+ do_link()
+ else:
+ rcv.error = ("no such queue: %s" % _rcv.name,)
+ del self._attachments[rcv]
+ rcv.closed = True
+
+ def do_exchange_q(result):
+ if sst.detached:
+ return
+ if result.not_found:
+ if _rcv.options.get("create") in ("always", "receiver"):
+ _rcv._queue = _rcv.name
+ sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT))
+ else:
+ sst.write_query(QueueQuery(queue=_rcv.name), do_queue_q)
+ return
else:
- f = rcv.filter
- f._bind(_ssn, rcv.source, _rcv._queue)
- _ssn.message_subscribe(queue=_rcv._queue, destination=rcv.destination)
- _ssn.message_set_flow_mode(rcv.destination, _ssn.flow_mode.credit, sync=True)
+ _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination)
+ sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
+ filter = _rcv.options.get("filter")
+ if _rcv.subject is None and filter is None:
+ f = FILTER_DEFAULTS[result.type]
+ elif _rcv.subject and filter:
+ # XXX
+ raise Exception("can't supply both subject and filter")
+ elif _rcv.subject:
+ # XXX
+ from messaging import Pattern
+ f = Pattern(_rcv.subject)
+ else:
+ f = filter
+ f._bind(sst, _rcv.name, _rcv._queue)
+ do_link()
+ sst.write_query(ExchangeQuery(name=_rcv.name), do_exchange_q)
self._attachments[rcv] = _rcv
- # XXX: need to kill syncs
- _ssn.sync()
-
- if rcv.closing:
- _ssn.message_cancel(rcv.destination, sync=True)
- # XXX: need to kill syncs
- _ssn.sync()
- del self._attachments[rcv]
- rcv.closed = True
- return None
- else:
- return _rcv
+
+ if rcv.closing and not rcv.closed:
+ if rcv.linked:
+ if not _rcv.canceled:
+ def close_rcv():
+ del self._attachments[rcv]
+ rcv.closed = True
+ sst.write_cmd(MessageCancel(rcv.destination, sync=True), close_rcv)
+ _rcv.canceled = True
+ else:
+ rcv.closed = True
def process(self, ssn):
if ssn.closing: return
- _ssn = self._attachments[ssn]
+ sst = self._attachments[ssn]
- while ssn.outgoing:
- msg = ssn.outgoing[0]
+ while sst.outgoing_idx < len(ssn.outgoing):
+ msg = ssn.outgoing[sst.outgoing_idx]
snd = msg._sender
- self.send(snd, msg)
- ssn.outgoing.pop(0)
+ # XXX: should check for sender error here
+ _snd = self._attachments.get(snd)
+ if _snd and snd.linked:
+ self.send(snd, msg)
+ sst.outgoing_idx += 1
+ else:
+ break
for rcv in ssn.receivers:
self.process_receiver(rcv)
if ssn.acked:
- messages = ssn.acked[:]
- ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None])
- for range in ids:
- _ssn.receiver._completed.add_range(range)
- ch = _ssn.channel
- if ch is None:
- raise SessionDetached()
- ch.session_completed(_ssn.receiver._completed)
- _ssn.message_accept(ids, sync=True)
- # XXX: really need to make this async so that we don't give up the lock
- _ssn.sync()
-
- # XXX: we're ignoring acks that get lost when disconnected
- for m in messages:
- ssn.acked.remove(m)
- if ssn.transactional:
- _ssn.acked.append(m)
-
- if ssn.committing:
- _ssn.tx_commit(sync=True)
- # XXX: need to kill syncs
- _ssn.sync()
- del _ssn.acked[:]
- ssn.committing = False
- ssn.committed = True
- ssn.aborting = False
- ssn.aborted = False
-
- if ssn.aborting:
- for rcv in ssn.receivers:
- _ssn.message_stop(rcv.destination)
- _ssn.sync()
-
- messages = _ssn.acked + ssn.unacked + ssn.incoming
- ids = RangedSet(*[m._transfer_id for m in messages])
- for range in ids:
- _ssn.receiver._completed.add_range(range)
- _ssn.channel.session_completed(_ssn.receiver._completed)
- _ssn.message_release(ids)
- _ssn.tx_rollback(sync=True)
- _ssn.sync()
-
- del ssn.incoming[:]
- del ssn.unacked[:]
- del _ssn.acked[:]
+ messages = [m for m in ssn.acked if m not in sst.acked]
+ if messages:
+ # XXX: we're ignoring acks that get lost when disconnected,
+ # could we deal this via some message-id based purge?
+ ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None])
+ for range in ids:
+ sst.executed.add_range(range)
+ sst.write_op(SessionCompleted(sst.executed))
+ def ack_ack():
+ for m in messages:
+ ssn.acked.remove(m)
+ if not ssn.transactional:
+ sst.acked.remove(m)
+ sst.write_cmd(MessageAccept(ids, sync=True), ack_ack)
+ sst.acked.extend(messages)
+
+ if ssn.committing and not sst.committing:
+ def commit_ok():
+ del sst.acked[:]
+ ssn.committing = False
+ ssn.committed = True
+ ssn.aborting = False
+ ssn.aborted = False
+ sst.write_cmd(TxCommit(sync=True), commit_ok)
+ sst.committing = True
+
+ if ssn.aborting and not sst.aborting:
+ sst.aborting = True
+ def do_rb():
+ messages = sst.acked + ssn.unacked + ssn.incoming
+ ids = RangedSet(*[m._transfer_id for m in messages])
+ for range in ids:
+ sst.executed.add_range(range)
+ sst.write_op(SessionCompleted(sst.executed))
+ sst.write_cmd(MessageRelease(ids))
+ sst.write_cmd(TxRollback(sync=True), do_rb_ok)
+
+ def do_rb_ok():
+ del ssn.incoming[:]
+ del ssn.unacked[:]
+ del sst.acked[:]
+
+ for rcv in ssn.receivers:
+ rcv.impending = rcv.received
+ rcv.returned = rcv.received
+ # XXX: do we need to update granted here as well?
+
+ for rcv in ssn.receivers:
+ self.process_receiver(rcv)
+
+ ssn.aborting = False
+ ssn.aborted = True
+ ssn.committing = False
+ ssn.committed = False
+ sst.aborting = False
for rcv in ssn.receivers:
- rcv.impending = rcv.received
- rcv.returned = rcv.received
- # XXX: do we need to update granted here as well?
-
- for rcv in ssn.receivers:
- self.process_receiver(rcv)
-
- ssn.aborting = False
- ssn.aborted = True
- ssn.committing = False
- ssn.committed = False
+ sst.write_cmd(MessageStop(rcv.destination))
+ sst.write_cmd(ExecutionSync(sync=True), do_rb)
def grant(self, rcv):
- _ssn = self._attachments[rcv.session]
- _rcv = self.link_in(rcv)
+ sst = self._attachments[rcv.session]
+ _rcv = self._attachments.get(rcv)
+ if _rcv is None or not rcv.linked or _rcv.canceled or _rcv.draining:
+ return
if rcv.granted is UNLIMITED:
if rcv.impending is UNLIMITED:
@@ -343,30 +671,37 @@ class Driver:
delta = max(rcv.granted, rcv.received) - rcv.impending
if delta is UNLIMITED:
- _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value)
- _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, UNLIMITED.value)
+ sst.write_cmd(MessageFlow(rcv.destination, credit_unit.byte, UNLIMITED.value))
+ sst.write_cmd(MessageFlow(rcv.destination, credit_unit.message, UNLIMITED.value))
rcv.impending = UNLIMITED
elif delta > 0:
- _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value)
- _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, delta)
+ sst.write_cmd(MessageFlow(rcv.destination, credit_unit.byte, UNLIMITED.value))
+ sst.write_cmd(MessageFlow(rcv.destination, credit_unit.message, delta))
rcv.impending += delta
- elif delta < 0:
- if rcv.drain:
- _ssn.message_flush(rcv.destination, sync=True)
- else:
- _ssn.message_stop(rcv.destination, sync=True)
- # XXX: need to kill syncs
- _ssn.sync()
- rcv.impending = rcv.received
- self.grant(rcv)
+ elif delta < 0 and not rcv.draining:
+ _rcv.draining = True
+ def do_stop():
+ rcv.impending = rcv.received
+ _rcv.draining = False
+ self.grant(rcv)
+ sst.write_cmd(MessageStop(rcv.destination, sync=True), do_stop)
+
+ if rcv.draining:
+ def do_flush():
+ rcv.impending = rcv.received
+ rcv.granted = rcv.impending
+ _rcv.draining = False
+ rcv.draining = False
+ sst.write_cmd(MessageFlush(rcv.destination, sync=True), do_flush)
+
def process_receiver(self, rcv):
if rcv.closed: return
self.grant(rcv)
def send(self, snd, msg):
- _ssn = self._attachments[snd.session]
- _snd = self.link_out(snd)
+ sst = self._attachments[snd.session]
+ _snd = self._attachments[snd]
# XXX: what if subject is specified for a normal queue?
if _snd._routing_key is None:
@@ -375,16 +710,16 @@ class Driver:
rk = _snd._routing_key
# XXX: do we need to query to figure out how to create the reply-to interoperably?
if msg.reply_to:
- rt = _ssn.reply_to(*parse_addr(msg.reply_to))
+ rt = addr2reply_to(msg.reply_to)
else:
rt = None
- dp = _ssn.delivery_properties(routing_key=rk)
- mp = _ssn.message_properties(message_id=msg.id,
- user_id=msg.user_id,
- reply_to=rt,
- correlation_id=msg.correlation_id,
- content_type=msg.content_type,
- application_headers=msg.properties)
+ dp = DeliveryProperties(routing_key=rk)
+ mp = MessageProperties(message_id=msg.id,
+ user_id=msg.user_id,
+ reply_to=rt,
+ correlation_id=msg.correlation_id,
+ content_type=msg.content_type,
+ application_headers=msg.properties)
if msg.subject is not None:
if mp.application_headers is None:
mp.application_headers = {}
@@ -397,37 +732,42 @@ class Driver:
dp.delivery_mode = delivery_mode.persistent
enc, dec = get_codec(msg.content_type)
body = enc(msg.content)
- _ssn.message_transfer(destination=_snd._exchange,
- message=Message010(dp, mp, body),
- sync=True)
- log.debug("SENT [%s] %s", snd.session, msg)
- # XXX: really need to make this async so that we don't give up the lock
- _ssn.sync()
- # XXX: should we log the ack somehow too?
- snd.acked += 1
-
- @synchronized
- def _message_transfer(self, ssn, cmd):
- m = Message010(cmd.payload)
- m.headers = cmd.headers
- m.id = cmd.id
- msg = self._decode(m)
- rcv = ssn.receivers[int(cmd.destination)]
+ def msg_acked():
+ # XXX: should we log the ack somehow too?
+ snd.acked += 1
+ m = snd.session.outgoing.pop(0)
+ sst.outgoing_idx -= 1
+ assert msg == m
+ sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp),
+ payload=body, sync=True), msg_acked)
+
+ def do_message_transfer(self, xfr):
+ sst = self.get_sst(xfr)
+ ssn = sst.session
+
+ msg = self._decode(xfr)
+ rcv = ssn.receivers[int(xfr.destination)]
msg._receiver = rcv
if rcv.impending is not UNLIMITED:
- assert rcv.received < rcv.impending
+ assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending)
rcv.received += 1
log.debug("RECV [%s] %s", ssn, msg)
ssn.incoming.append(msg)
self.connection._waiter.notifyAll()
- return INCOMPLETE
- def _decode(self, message):
- dp = message.get("delivery_properties")
- mp = message.get("message_properties")
+ def _decode(self, xfr):
+ dp = EMPTY_DP
+ mp = EMPTY_MP
+
+ for h in xfr.headers:
+ if isinstance(h, DeliveryProperties):
+ dp = h
+ elif isinstance(h, MessageProperties):
+ mp = h
+
ap = mp.application_headers
enc, dec = get_codec(mp.content_type)
- content = dec(message.body)
+ content = dec(xfr.payload)
msg = Message(content)
msg.id = mp.message_id
if ap is not None:
@@ -440,5 +780,5 @@ class Driver:
msg.durable = dp.delivery_mode == delivery_mode.persistent
msg.properties = mp.application_headers
msg.content_type = mp.content_type
- msg._transfer_id = message.id
+ msg._transfer_id = xfr.id
return msg
diff --git a/qpid/python/qpid/messaging.py b/qpid/python/qpid/messaging.py
index d755aa5054..3e3c8f36cb 100644
--- a/qpid/python/qpid/messaging.py
+++ b/qpid/python/qpid/messaging.py
@@ -77,7 +77,8 @@ class Connection:
"""
@static
- def open(host, port=None):
+ def open(host, port=None, username="guest", password="guest",
+ mechanism="PLAIN", heartbeat=None, **options):
"""
Creates an AMQP connection and connects it to the given host and port.
@@ -88,11 +89,12 @@ class Connection:
@rtype: Connection
@return: a connected Connection
"""
- conn = Connection(host, port)
+ conn = Connection(host, port, username, password, mechanism, heartbeat, **options)
conn.connect()
return conn
- def __init__(self, host, port=None):
+ def __init__(self, host, port=None, username="guest", password="guest",
+ mechanism="PLAIN", heartbeat=None, **options):
"""
Creates a connection. A newly created connection must be connected
with the Connection.connect() method before it can be started.
@@ -106,11 +108,16 @@ class Connection:
"""
self.host = host
self.port = default(port, AMQP_PORT)
+ self.username = username
+ self.password = password
+ self.mechanism = mechanism
+ self.heartbeat = heartbeat
+
self.started = False
self.id = str(uuid4())
self.session_counter = 0
self.sessions = {}
- self.reconnect = False
+ self.reconnect = options.get("reconnect", False)
self._connected = False
self._lock = RLock()
self._condition = Condition(self._lock)
@@ -230,9 +237,10 @@ class Pattern:
self.value = value
# XXX: this should become part of the driver
- def _bind(self, ssn, exchange, queue):
- ssn.exchange_bind(exchange=exchange, queue=queue,
- binding_key=self.value.replace("*", "#"))
+ def _bind(self, sst, exchange, queue):
+ from qpid.ops import ExchangeBind
+ sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue,
+ binding_key=self.value.replace("*", "#")))
class SessionError(Exception):
pass
@@ -282,6 +290,7 @@ class Session:
# XXX: I hate this name.
self.ack_capacity = UNLIMITED
+ self.error = None
self.closing = False
self.closed = False
@@ -302,12 +311,16 @@ class Session:
def _check_error(self, exc=SessionError):
self.connection._check_error(exc)
+ if self.error:
+ raise exc(*self.error)
def _ewait(self, predicate, timeout=None, exc=SessionError):
- return self.connection._ewait(predicate, timeout, exc)
+ result = self.connection._ewait(lambda: self.error or predicate(), timeout, exc)
+ self._check_error(exc)
+ return result
@synchronized
- def sender(self, target):
+ def sender(self, target, **options):
"""
Creates a L{Sender} that may be used to send L{Messages<Message>}
to the specified target.
@@ -317,7 +330,7 @@ class Session:
@rtype: Sender
@return: a new Sender for the specified target
"""
- sender = Sender(self, len(self.senders), target)
+ sender = Sender(self, len(self.senders), target, options)
self.senders.append(sender)
self._wakeup()
# XXX: because of the lack of waiting here we can end up getting
@@ -327,7 +340,7 @@ class Session:
return sender
@synchronized
- def receiver(self, source, filter=None):
+ def receiver(self, source, **options):
"""
Creates a receiver that may be used to actively fetch or to listen
for the arrival of L{Messages<Message>} from the specified source.
@@ -337,7 +350,7 @@ class Session:
@rtype: Receiver
@return: a new Receiver for the specified source
"""
- receiver = Receiver(self, len(self.receivers), source, filter,
+ receiver = Receiver(self, len(self.receivers), source, options,
self.started)
self.receivers.append(receiver)
self._wakeup()
@@ -368,8 +381,8 @@ class Session:
@synchronized
def _get(self, predicate, timeout=None):
- if self._wait(lambda: ((self._peek(predicate) is not None) or self.closing),
- timeout):
+ if self._ewait(lambda: ((self._peek(predicate) is not None) or self.closing),
+ timeout):
msg = self._pop(predicate)
if msg is not None:
msg._receiver.returned += 1
@@ -505,13 +518,18 @@ class Sender:
Sends outgoing messages.
"""
- def __init__(self, session, index, target):
+ def __init__(self, session, index, target, options):
self.session = session
self.index = index
self.target = target
- self.capacity = UNLIMITED
+ self.options = options
+ self.capacity = options.get("capacity", UNLIMITED)
+ self.durable = options.get("durable")
self.queued = Serial(0)
self.acked = Serial(0)
+ self.error = None
+ self.linked = False
+ self.closing = False
self.closed = False
self._lock = self.session._lock
@@ -520,9 +538,13 @@ class Sender:
def _check_error(self, exc=SendError):
self.session._check_error(exc)
+ if self.error:
+ raise exc(*self.error)
def _ewait(self, predicate, timeout=None, exc=SendError):
- return self.session._ewait(predicate, timeout, exc)
+ result = self.session._ewait(lambda: self.error or predicate(), timeout, exc)
+ self._check_error(exc)
+ return result
@synchronized
def pending(self):
@@ -558,11 +580,16 @@ class Sender:
if not self.session.connection._connected or self.session.closing:
raise Disconnected()
+ self._ewait(lambda: self.linked)
+
if isinstance(object, Message):
message = object
else:
message = Message(object)
+ if message.durable is None:
+ message.durable = self.durable
+
if self.capacity is not UNLIMITED:
if self.capacity <= 0:
raise InsufficientCapacity("capacity = %s" % self.capacity)
@@ -573,15 +600,19 @@ class Sender:
message._sender = self
self.session.outgoing.append(message)
self.queued += 1
- mno = self.queued
self._wakeup()
if sync:
- self._ewait(lambda: self.acked >= mno)
+ self.sync()
assert message not in self.session.outgoing
@synchronized
+ def sync(self):
+ mno = self.queued
+ self._ewait(lambda: self.acked >= mno)
+
+ @synchronized
def close(self):
"""
Close the Sender.
@@ -609,21 +640,23 @@ class Receiver:
L{listen}.
"""
- def __init__(self, session, index, source, filter, started):
+ def __init__(self, session, index, source, options, started):
self.session = session
self.index = index
self.destination = str(self.index)
self.source = source
- self.filter = filter
+ self.options = options
self.started = started
- self.capacity = UNLIMITED
+ self.capacity = options.get("capacity", UNLIMITED)
self.granted = Serial(0)
- self.drain = False
+ self.draining = False
self.impending = Serial(0)
self.received = Serial(0)
self.returned = Serial(0)
+ self.error = None
+ self.linked = False
self.closing = False
self.closed = False
self.listener = None
@@ -634,9 +667,13 @@ class Receiver:
def _check_error(self, exc=ReceiveError):
self.session._check_error(exc)
+ if self.error:
+ raise exc(*self.error)
def _ewait(self, predicate, timeout=None, exc=ReceiveError):
- return self.session._ewait(predicate, timeout, exc)
+ result = self.session._ewait(lambda: self.error or predicate(), timeout, exc)
+ self._check_error(exc)
+ return result
@synchronized
def pending(self):
@@ -680,17 +717,18 @@ class Receiver:
@type timeout: float
@param timeout: the time to wait for a message to be available
"""
+
+ self._ewait(lambda: self.linked)
+
if self._capacity() == 0:
self.granted = self.returned + 1
self._wakeup()
self._ewait(lambda: self.impending >= self.granted)
msg = self.session._get(self._pred, timeout=timeout)
if msg is None:
- self.drain = True
- self.granted = self.received
+ self.draining = True
self._wakeup()
- self._ewait(lambda: self.impending == self.received)
- self.drain = False
+ self._ewait(lambda: not self.draining)
self._grant()
self._wakeup()
msg = self.session._get(self._pred, timeout=0)
@@ -738,7 +776,7 @@ class Receiver:
self.closing = True
self._wakeup()
try:
- self._ewait(lambda: self.closed)
+ self.session._ewait(lambda: self.closed)
finally:
self.session.receivers.remove(self)
@@ -778,6 +816,8 @@ def get_type(content):
def get_codec(content_type):
return TYPE_CODEC[content_type]
+UNSPECIFIED = object()
+
class Message:
"""
@@ -802,7 +842,9 @@ class Message:
@ivar content: the message content
"""
- def __init__(self, content=None):
+ def __init__(self, content=None, content_type=UNSPECIFIED, id=None,
+ subject=None, to=None, user_id=None, reply_to=None,
+ correlation_id=None, durable=None, properties=None):
"""
Construct a new message with the supplied content. The
content-type of the message will be automatically inferred from
@@ -810,20 +852,44 @@ class Message:
@type content: str, unicode, buffer, dict, list
@param content: the message content
- """
- self.id = None
- self.subject = None
- self.user_id = None
- self.to = None
- self.reply_to = None
- self.correlation_id = None
- self.durable = False
- self.properties = {}
- self.content_type = get_type(content)
+
+ @type content_type: str
+ @param content_type: the content-type of the message
+ """
+ self.id = id
+ self.subject = subject
+ self.to = to
+ self.user_id = user_id
+ self.reply_to = reply_to
+ self.correlation_id = correlation_id
+ self.durable = durable
+ if properties is None:
+ self.properties = {}
+ else:
+ self.properties = properties
+ if content_type is UNSPECIFIED:
+ self.content_type = get_type(content)
+ else:
+ self.content_type = content_type
self.content = content
def __repr__(self):
- return "Message(%r)" % self.content
+ args = []
+ for name in ["id", "subject", "to", "user_id", "reply_to",
+ "correlation_id"]:
+ value = self.__dict__[name]
+ if value is not None: args.append("%s=%r" % (name, value))
+ for name in ["durable", "properties"]:
+ value = self.__dict__[name]
+ if value: args.append("%s=%r" % (name, value))
+ if self.content_type != get_type(self.content):
+ args.append("content_type=%r" % self.content_type)
+ if self.content is not None:
+ if args:
+ args.append("content=%r" % self.content)
+ else:
+ args.append(repr(self.content))
+ return "Message(%s)" % ", ".join(args)
__all__ = ["Connection", "Session", "Sender", "Receiver", "Pattern", "Message",
"ConnectionError", "ConnectError", "SessionError", "Disconnected",
diff --git a/qpid/python/qpid/ops.py b/qpid/python/qpid/ops.py
index 11e7d11fe9..277d059203 100644
--- a/qpid/python/qpid/ops.py
+++ b/qpid/python/qpid/ops.py
@@ -80,7 +80,7 @@ class Compound(object):
return "%s(%s)" % (self.__class__.__name__,
", ".join(["%s=%r" % (f.name, getattr(self, f.name))
for f in self.ARGS
- if getattr(self, f.name) is not f.default]))
+ if getattr(self, f.name) != f.default]))
class Command(Compound):
UNENCODED=[Field("channel", "uint16", 0),
@@ -209,8 +209,8 @@ def make(nd):
from qpid_config import amqp_spec as file
pclfile = "%s.ops.pcl" % file
-if False and (os.path.exists(pclfile) and
- os.path.getmtime(pclfile) > os.path.getmtime(file)):
+if os.path.exists(pclfile) and \
+ os.path.getmtime(pclfile) > os.path.getmtime(file):
f = open(pclfile, "read")
types = pickle.load(f)
f.close()
diff --git a/qpid/python/qpid/selector.py b/qpid/python/qpid/selector.py
new file mode 100644
index 0000000000..46052e1108
--- /dev/null
+++ b/qpid/python/qpid/selector.py
@@ -0,0 +1,156 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import atexit, os, time
+from compat import select, set
+from threading import Thread, Lock
+
+class Acceptor:
+
+ def __init__(self, sock, handler):
+ self.sock = sock
+ self.handler = handler
+
+ def fileno(self):
+ return self.sock.fileno()
+
+ def reading(self):
+ return True
+
+ def writing(self):
+ return False
+
+ def readable(self):
+ sock, addr = self.sock.accept()
+ self.handler(sock)
+
+class Sink:
+
+ def __init__(self, fd):
+ self.fd = fd
+
+ def fileno(self):
+ return self.fd
+
+ def reading(self):
+ return True
+
+ def readable(self):
+ os.read(self.fd, 65536)
+
+ def __repr__(self):
+ return "Sink(%r)" % self.fd
+
+class Selector:
+
+ lock = Lock()
+ DEFAULT = None
+
+ @staticmethod
+ def default():
+ Selector.lock.acquire()
+ try:
+ if Selector.DEFAULT is None:
+ sel = Selector()
+ atexit.register(sel.stop)
+ sel.start()
+ Selector.DEFAULT = sel
+ return Selector.DEFAULT
+ finally:
+ Selector.lock.release()
+
+ def __init__(self):
+ self.selectables = set()
+ self.reading = set()
+ self.writing = set()
+ self.wait_fd, self.wakeup_fd = os.pipe()
+ self.reading.add(Sink(self.wait_fd))
+ self.stopped = False
+ self.thread = None
+
+ def wakeup(self):
+ os.write(self.wakeup_fd, "\0")
+
+ def register(self, selectable):
+ self.selectables.add(selectable)
+ self.modify(selectable)
+
+ def _update(self, selectable):
+ if selectable.reading():
+ self.reading.add(selectable)
+ else:
+ self.reading.discard(selectable)
+ if selectable.writing():
+ self.writing.add(selectable)
+ else:
+ self.writing.discard(selectable)
+ return selectable.timing()
+
+ def modify(self, selectable):
+ self._update(selectable)
+ self.wakeup()
+
+ def unregister(self, selectable):
+ self.reading.discard(selectable)
+ self.writing.discard(selectable)
+ self.selectables.discard(selectable)
+ self.wakeup()
+
+ def start(self):
+ self.stopped = False
+ self.thread = Thread(target=self.run)
+ self.thread.setDaemon(True)
+ self.thread.start();
+
+ def run(self):
+ while not self.stopped:
+ wakeup = None
+ for sel in self.selectables.copy():
+ t = self._update(sel)
+ if t is not None:
+ if wakeup is None:
+ wakeup = t
+ else:
+ wakeup = min(wakeup, t)
+
+ if wakeup is None:
+ timeout = None
+ else:
+ timeout = max(0, wakeup - time.time())
+
+ rd, wr, ex = select(self.reading, self.writing, (), timeout)
+
+ for sel in wr:
+ if sel.writing():
+ sel.writeable()
+
+ for sel in rd:
+ if sel.reading():
+ sel.readable()
+
+ now = time.time()
+ for sel in self.selectables.copy():
+ w = sel.timing()
+ if w is not None and now > w:
+ sel.timeout()
+
+ def stop(self, timeout=None):
+ self.stopped = True
+ self.wakeup()
+ self.thread.join(timeout)
+ self.thread = None
diff --git a/qpid/python/qpid/tests/messaging.py b/qpid/python/qpid/tests/messaging.py
index 7623c1f93b..2e4c0ca1ab 100644
--- a/qpid/python/qpid/tests/messaging.py
+++ b/qpid/python/qpid/tests/messaging.py
@@ -24,7 +24,7 @@ import time
from qpid.tests import Test
from qpid.harness import Skipped
from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \
- InsufficientCapacity, Message, UNLIMITED, uuid4
+ InsufficientCapacity, Message, ReceiveError, SendError, UNLIMITED, uuid4
from Queue import Queue, Empty as QueueEmpty
class Base(Test):
@@ -50,6 +50,8 @@ class Base(Test):
raise Skipped(e)
self.ssn = self.setup_session()
self.snd = self.setup_sender()
+ if self.snd is not None:
+ self.snd.durable = self.durable()
self.rcv = self.setup_receiver()
def teardown(self):
@@ -63,11 +65,12 @@ class Base(Test):
return "%s[%s, %s]" % (base, count, self.test_id)
def ping(self, ssn):
+ PING_Q = 'ping-queue {create: always}'
# send a message
- sender = ssn.sender("ping-queue")
+ sender = ssn.sender(PING_Q, durable=self.durable())
content = self.content("ping")
sender.send(content)
- receiver = ssn.receiver("ping-queue")
+ receiver = ssn.receiver(PING_Q)
msg = receiver.fetch(0)
ssn.acknowledge()
assert msg.content == content, "expected %r, got %r" % (content, msg.content)
@@ -97,16 +100,27 @@ class Base(Test):
def delay(self):
return float(self.config.defines.get("delay", "2"))
+ def get_bool(self, name):
+ return self.config.defines.get(name, "false").lower() in ("true", "yes", "1")
+
+ def durable(self):
+ return self.get_bool("durable")
+
+ def reconnect(self):
+ return self.get_bool("reconnect")
+
class SetupTests(Base):
def testOpen(self):
# XXX: need to flesh out URL support/syntax
- self.conn = Connection.open(self.broker.host, self.broker.port)
+ self.conn = Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
self.ping(self.conn.session())
def testConnect(self):
# XXX: need to flesh out URL support/syntax
- self.conn = Connection(self.broker.host, self.broker.port)
+ self.conn = Connection(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
self.conn.connect()
self.ping(self.conn.session())
@@ -121,7 +135,8 @@ class SetupTests(Base):
class ConnectionTests(Base):
def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port)
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
def testSessionAnon(self):
ssn1 = self.conn.session()
@@ -174,17 +189,21 @@ class ConnectionTests(Base):
self.conn.close()
assert not self.conn.connected()
+ACK_Q = 'test-ack-queue {create: always}'
+
class SessionTests(Base):
def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port)
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
def setup_session(self):
return self.conn.session()
def testSender(self):
- snd = self.ssn.sender("test-snd-queue")
- snd2 = self.ssn.sender(snd.target)
+ snd = self.ssn.sender('test-snd-queue {create: always}',
+ durable=self.durable())
+ snd2 = self.ssn.sender(snd.target, durable=self.durable())
assert snd is not snd2
snd2.close()
@@ -196,47 +215,49 @@ class SessionTests(Base):
self.ssn.acknowledge(msg)
def testReceiver(self):
- rcv = self.ssn.receiver("test-rcv-queue")
+ rcv = self.ssn.receiver('test-rcv-queue {create: always}')
rcv2 = self.ssn.receiver(rcv.source)
assert rcv is not rcv2
rcv2.close()
content = self.content("testReceiver")
- snd = self.ssn.sender(rcv.source)
+ snd = self.ssn.sender(rcv.source, durable=self.durable())
snd.send(content)
msg = rcv.fetch(0)
assert msg.content == content
self.ssn.acknowledge(msg)
def testStart(self):
- rcv = self.ssn.receiver("test-start-queue")
+ START_Q = 'test-start-queue {create: always}'
+ rcv = self.ssn.receiver(START_Q)
assert not rcv.started
self.ssn.start()
assert rcv.started
- rcv = self.ssn.receiver("test-start-queue")
+ rcv = self.ssn.receiver(START_Q)
assert rcv.started
def testStop(self):
+ STOP_Q = 'test-stop-queue {create: always}'
self.ssn.start()
- rcv = self.ssn.receiver("test-stop-queue")
+ rcv = self.ssn.receiver(STOP_Q)
assert rcv.started
self.ssn.stop()
assert not rcv.started
- rcv = self.ssn.receiver("test-stop-queue")
+ rcv = self.ssn.receiver(STOP_Q)
assert not rcv.started
# XXX, we need a convenient way to assert that required queues are
# empty on setup, and possibly also to drain queues on teardown
def ackTest(self, acker, ack_capacity=None):
# send a bunch of messages
- snd = self.ssn.sender("test-ack-queue")
+ snd = self.ssn.sender(ACK_Q, durable=self.durable())
contents = [self.content("ackTest", i) for i in range(15)]
for c in contents:
snd.send(c)
# drain the queue, verify the messages are there and then close
# without acking
- rcv = self.ssn.receiver(snd.target)
+ rcv = self.ssn.receiver(ACK_Q)
self.drain(rcv, expected=contents)
self.ssn.close()
@@ -245,7 +266,7 @@ class SessionTests(Base):
self.ssn = self.conn.session()
if ack_capacity is not None:
self.ssn.ack_capacity = ack_capacity
- rcv = self.ssn.receiver("test-ack-queue")
+ rcv = self.ssn.receiver(ACK_Q)
self.drain(rcv, expected=contents)
acker(self.ssn)
self.ssn.close()
@@ -253,7 +274,7 @@ class SessionTests(Base):
# drain the queue a final time and verify that the messages were
# dequeued
self.ssn = self.conn.session()
- rcv = self.ssn.receiver("test-ack-queue")
+ rcv = self.ssn.receiver(ACK_Q)
self.assertEmpty(rcv)
def testAcknowledge(self):
@@ -271,7 +292,7 @@ class SessionTests(Base):
pass
finally:
self.ssn.ack_capacity = UNLIMITED
- self.drain(self.ssn.receiver("test-ack-queue"))
+ self.drain(self.ssn.receiver(ACK_Q))
self.ssn.acknowledge()
def testAcknowledgeAsyncAckCap1(self):
@@ -284,7 +305,7 @@ class SessionTests(Base):
self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED)
def send(self, ssn, queue, base, count=1):
- snd = ssn.sender(queue)
+ snd = ssn.sender(queue, durable=self.durable())
contents = []
for i in range(count):
c = self.content(base, i)
@@ -294,10 +315,12 @@ class SessionTests(Base):
return contents
def txTest(self, commit):
+ TX_Q = 'test-tx-queue {create: always}'
+ TX_Q_COPY = 'test-tx-queue-copy {create: always}'
txssn = self.conn.session(transactional=True)
- contents = self.send(self.ssn, "test-tx-queue", "txTest", 3)
- txrcv = txssn.receiver("test-tx-queue")
- txsnd = txssn.sender("test-tx-queue-copy")
+ contents = self.send(self.ssn, TX_Q, "txTest", 3)
+ txrcv = txssn.receiver(TX_Q)
+ txsnd = txssn.sender(TX_Q_COPY, durable=self.durable())
rcv = self.ssn.receiver(txrcv.source)
copy_rcv = self.ssn.receiver(txsnd.target)
self.assertEmpty(copy_rcv)
@@ -323,9 +346,10 @@ class SessionTests(Base):
self.txTest(False)
def txTestSend(self, commit):
+ TX_SEND_Q = 'test-tx-send-queue {create: always}'
txssn = self.conn.session(transactional=True)
- contents = self.send(txssn, "test-tx-send-queue", "txTestSend", 3)
- rcv = self.ssn.receiver("test-tx-send-queue")
+ contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
+ rcv = self.ssn.receiver(TX_SEND_Q)
self.assertEmpty(rcv)
if commit:
@@ -345,10 +369,11 @@ class SessionTests(Base):
self.txTestSend(False)
def txTestAck(self, commit):
+ TX_ACK_Q = 'test-tx-ack-queue {create: always}'
txssn = self.conn.session(transactional=True)
- txrcv = txssn.receiver("test-tx-ack-queue")
+ txrcv = txssn.receiver(TX_ACK_Q)
self.assertEmpty(txrcv)
- contents = self.send(self.ssn, "test-tx-ack-queue", "txTestAck", 3)
+ contents = self.send(self.ssn, TX_ACK_Q, "txTestAck", 3)
assert contents == self.drain(txrcv)
if commit:
@@ -366,11 +391,11 @@ class SessionTests(Base):
txssn.close()
txssn = self.conn.session(transactional=True)
- txrcv = txssn.receiver("test-tx-ack-queue")
+ txrcv = txssn.receiver(TX_ACK_Q)
assert contents == self.drain(txrcv)
txssn.acknowledge()
txssn.commit()
- rcv = self.ssn.receiver("test-tx-ack-queue")
+ rcv = self.ssn.receiver(TX_ACK_Q)
self.assertEmpty(rcv)
txssn.close()
self.assertEmpty(rcv)
@@ -389,19 +414,22 @@ class SessionTests(Base):
except Disconnected:
pass
+RECEIVER_Q = 'test-receiver-queue {create: always}'
+
class ReceiverTests(Base):
def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port)
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
def setup_session(self):
return self.conn.session()
def setup_sender(self):
- return self.ssn.sender("test-receiver-queue")
+ return self.ssn.sender(RECEIVER_Q)
def setup_receiver(self):
- return self.ssn.receiver("test-receiver-queue")
+ return self.ssn.receiver(RECEIVER_Q)
def send(self, base, count = None):
content = self.content(base, count)
@@ -516,7 +544,7 @@ class ReceiverTests(Base):
self.assertPending(self.rcv, 5)
drained = self.drain(self.rcv)
- assert len(drained) == 10
+ assert len(drained) == 10, "%s, %s" % (len(drained), drained)
self.assertPending(self.rcv, 0)
self.ssn.acknowledge()
@@ -538,19 +566,81 @@ class ReceiverTests(Base):
# XXX: need testClose
+NOSUCH_Q = "this-queue-should-not-exist"
+UNPARSEABLE_ADDR = "{bad address}"
+UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3"
+
+class AddressErrorTests(Base):
+
+ def setup_connection(self):
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def sendErrorTest(self, addr, exc, check=lambda e: True):
+ snd = self.ssn.sender(addr, durable=self.durable())
+ try:
+ snd.send("hello")
+ assert False, "send succeeded"
+ except exc, e:
+ assert check(e), "unexpected error: %s" % e
+ snd.close()
+
+ def fetchErrorTest(self, addr, exc, check=lambda e: True):
+ rcv = self.ssn.receiver(addr)
+ try:
+ rcv.fetch(timeout=0)
+ assert False, "fetch succeeded"
+ except exc, e:
+ assert check(e), "unexpected error: %s" % e
+ rcv.close()
+
+ def testNoTarget(self):
+ # XXX: should have specific exception for this
+ self.sendErrorTest(NOSUCH_Q, SendError, lambda e: NOSUCH_Q in str(e))
+
+ def testNoSource(self):
+ # XXX: should have specific exception for this
+ self.fetchErrorTest(NOSUCH_Q, ReceiveError, lambda e: NOSUCH_Q in str(e))
+
+ def testUnparseableTarget(self):
+ # XXX: should have specific exception for this
+ self.sendErrorTest(UNPARSEABLE_ADDR, SendError,
+ lambda e: "expecting ID" in str(e))
+
+ def testUnparseableSource(self):
+ # XXX: should have specific exception for this
+ self.fetchErrorTest(UNPARSEABLE_ADDR, ReceiveError,
+ lambda e: "expecting ID" in str(e))
+
+ def testUnlexableTarget(self):
+ # XXX: should have specific exception for this
+ self.sendErrorTest(UNLEXABLE_ADDR, SendError,
+ lambda e: "unrecognized character" in str(e))
+
+ def testUnlexableSource(self):
+ # XXX: should have specific exception for this
+ self.fetchErrorTest(UNLEXABLE_ADDR, ReceiveError,
+ lambda e: "unrecognized character" in str(e))
+
+SENDER_Q = 'test-sender-q {create: always}'
+
class SenderTests(Base):
def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port)
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
def setup_session(self):
return self.conn.session()
def setup_sender(self):
- return self.ssn.sender("test-sender-queue")
+ return self.ssn.sender(SENDER_Q)
def setup_receiver(self):
- return self.ssn.receiver("test-sender-queue")
+ return self.ssn.receiver(SENDER_Q)
def checkContent(self, content):
self.snd.send(content)
@@ -611,6 +701,7 @@ class SenderTests(Base):
except InsufficientCapacity:
caught = True
break
+ self.snd.sync()
self.drain(self.rcv, expected=msgs)
self.ssn.acknowledge()
assert caught, "did not exceed capacity"
@@ -643,19 +734,22 @@ class MessageTests(Base):
m.content = u"<html/>"
assert m.content_type == "text/html; charset=utf8"
+ECHO_Q = 'test-message-echo-queue {create: always}'
+
class MessageEchoTests(Base):
def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port)
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
def setup_session(self):
return self.conn.session()
def setup_sender(self):
- return self.ssn.sender("test-message-echo-queue")
+ return self.ssn.sender(ECHO_Q)
def setup_receiver(self):
- return self.ssn.receiver("test-message-echo-queue")
+ return self.ssn.receiver(ECHO_Q)
def check(self, msg):
self.snd.send(msg)