diff options
Diffstat (limited to 'qpid/python')
-rw-r--r-- | qpid/python/Makefile | 2 | ||||
-rwxr-xr-x | qpid/python/examples/api/drain | 62 | ||||
-rwxr-xr-x | qpid/python/examples/api/ping | 76 | ||||
-rwxr-xr-x | qpid/python/qpid-python-test | 42 | ||||
-rw-r--r-- | qpid/python/qpid/address.py | 171 | ||||
-rw-r--r-- | qpid/python/qpid/compat.py | 11 | ||||
-rw-r--r-- | qpid/python/qpid/driver.py | 864 | ||||
-rw-r--r-- | qpid/python/qpid/messaging.py | 148 | ||||
-rw-r--r-- | qpid/python/qpid/ops.py | 6 | ||||
-rw-r--r-- | qpid/python/qpid/selector.py | 156 | ||||
-rw-r--r-- | qpid/python/qpid/tests/messaging.py | 174 |
11 files changed, 1346 insertions, 366 deletions
diff --git a/qpid/python/Makefile b/qpid/python/Makefile index 31547c8f57..ff4a9af4f1 100644 --- a/qpid/python/Makefile +++ b/qpid/python/Makefile @@ -36,7 +36,7 @@ SRCS=$(shell find $(DIRS) -name "*.py") qpid_config.py BUILD=build TARGETS=$(SRCS:%.py=$(BUILD)/%.py) -PYCC=python -c "import compileall, sys; compileall.compile_dir(sys.argv[1])" +PYCC=python -O -c "import compileall; compileall.main()" all: build diff --git a/qpid/python/examples/api/drain b/qpid/python/examples/api/drain new file mode 100755 index 0000000000..485985f16d --- /dev/null +++ b/qpid/python/examples/api/drain @@ -0,0 +1,62 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse +from qpid.messaging import * +from qpid.util import URL + +parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS ...", + description="Drain messages from the supplied address.") +parser.add_option("-b", "--broker", default="localhost", + help="connect to specified BROKER (default %default)") +parser.add_option("-t", "--timeout", type=float, default=0, + help="timeout in seconds to wait before exiting (default %default)") +parser.add_option("-f", "--forever", action="store_true", + help="ignore timeout and wait forever") + +opts, args = parser.parse_args() + +url = URL(opts.broker) +if args: + addr = args.pop(0) +else: + parser.error("address is required") +if opts.forever: + timeout = None +else: + timeout = opts.timeout + +# XXX: should make URL default the port for us +conn = Connection.open(url.host, url.port or AMQP_PORT, + username=url.user, password=url.password) +ssn = conn.session() +rcv = ssn.receiver(addr) + +while True: + try: + print rcv.fetch(timeout=timeout) + ssn.acknowledge() + except Empty: + break + except ReceiveError, e: + print e + break + +conn.close() diff --git a/qpid/python/examples/api/ping b/qpid/python/examples/api/ping new file mode 100755 index 0000000000..59b367cca6 --- /dev/null +++ b/qpid/python/examples/api/ping @@ -0,0 +1,76 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse, time +from qpid.messaging import * +from qpid.util import URL + +parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS [ CONTENT ... ]", + description="Drain messages from the supplied address.") +parser.add_option("-b", "--broker", default="localhost", + help="connect to specified BROKER (default %default)") +parser.add_option("-c", "--count", type=int, default=1, + help="stop after count messages have been sent, zero disables (default %default)") +parser.add_option("-t", "--timeout", type=float, default=None, + help="exit after the specified time") +parser.add_option("-m", "--map", action="store_true", + help="interpret content as map") +parser.add_option("-i", "--id", help="use the supplied id instead of generating one") + +opts, args = parser.parse_args() + +url = URL(opts.broker) +if opts.id is None: + ping_id = str(uuid4()) +else: + ping_id = opts.id +if args: + addr = args.pop(0) +else: + parser.error("address is required") +if args: + content = " ".join(args) + if opts.map: + content = eval(content) +else: + content = None + +# XXX: should make URL default the port for us +conn = Connection.open(url.host, url.port or AMQP_PORT, + username=url.user, password=url.password) +ssn = conn.session() +snd = ssn.sender(addr) + +count = 0 +start = time.time() +while (opts.count == 0 or count < opts.count) and \ + (opts.timeout is None or time.time() - start < opts.timeout): + msg = Message(content) + msg.properties["ping-id"] = "%s:%s" % (ping_id, count) + + try: + snd.send(msg) + count += 1 + print msg + except SendError, e: + print e + break + +conn.close() diff --git a/qpid/python/qpid-python-test b/qpid/python/qpid-python-test index 528acaa124..b569020368 100755 --- a/qpid/python/qpid-python-test +++ b/qpid/python/qpid-python-test @@ -20,7 +20,7 @@ # TODO: summarize, test harness preconditions (e.g. broker is alive) -import fcntl, logging, optparse, os, struct, sys, termios, traceback, types +import logging, optparse, os, struct, sys, traceback, types from fnmatch import fnmatchcase as match from getopt import GetoptError from logging import getLogger, StreamHandler, Formatter, Filter, \ @@ -126,27 +126,33 @@ def is_included(path): def is_smart(): return sys.stdout.isatty() and os.environ.get("TERM", "dumb") != "dumb" -def width(): - if is_smart(): - s = struct.pack("HHHH", 0, 0, 0, 0) - fd_stdout = sys.stdout.fileno() - x = fcntl.ioctl(fd_stdout, termios.TIOCGWINSZ, s) - rows, cols, xpx, ypx = struct.unpack("HHHH", x) - return cols - else: - try: - return int(os.environ.get("COLUMNS", "80")) - except ValueError: - return 80 +try: + import fcntl, termios -WIDTH = width() + def width(): + if is_smart(): + s = struct.pack("HHHH", 0, 0, 0, 0) + fd_stdout = sys.stdout.fileno() + x = fcntl.ioctl(fd_stdout, termios.TIOCGWINSZ, s) + rows, cols, xpx, ypx = struct.unpack("HHHH", x) + return cols + else: + try: + return int(os.environ.get("COLUMNS", "80")) + except ValueError: + return 80 -def resize(sig, frm): - global WIDTH WIDTH = width() -import signal -signal.signal(signal.SIGWINCH, resize) + def resize(sig, frm): + global WIDTH + WIDTH = width() + + import signal + signal.signal(signal.SIGWINCH, resize) + +except ImportError: + WIDTH = 80 def vt100_attrs(*attrs): return "\x1B[%sm" % ";".join(map(str, attrs)) diff --git a/qpid/python/qpid/address.py b/qpid/python/qpid/address.py new file mode 100644 index 0000000000..5976d4889b --- /dev/null +++ b/qpid/python/qpid/address.py @@ -0,0 +1,171 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import re + +TYPES = [] + +class Type: + + def __init__(self, name, pattern=None): + self.name = name + self.pattern = pattern + if self.pattern: + TYPES.append(self) + + def __repr__(self): + return self.name + +LBRACE = Type("LBRACE", r"\{") +RBRACE = Type("RBRACE", r"\}") +COLON = Type("COLON", r":") +COMMA = Type("COMMA", r",") +SLASH = Type("SLASH", r"/") +ID = Type("ID", r'[a-zA-Z_][a-zA-Z0-9_.-]*') +NUMBER = Type("NUMBER", r'[+-]?[0-9]*\.?[0-9]+') +STRING = Type("STRING", r""""(?:[^\\"]|\\.)*"|'(?:[^\\']|\\.)*'""") +WSPACE = Type("WSPACE", r"[ \n\r\t]+") +EOF = Type("EOF") + +class Token: + + def __init__(self, type, value): + self.type = type + self.value = value + + def __repr__(self): + return "%s: %r" % (self.type, self.value) + +joined = "|".join(["(%s)" % t.pattern for t in TYPES]) +LEXER = re.compile(joined) + +class LexError(Exception): + pass + +def line_info(st, pos): + idx = 0 + lineno = 1 + column = 0 + line_pos = 0 + while idx < pos: + if st[idx] == "\n": + lineno += 1 + column = 0 + line_pos = idx + column += 1 + idx += 1 + + end = st.find("\n", line_pos) + if end < 0: + end = len(st) + line = st[line_pos:end] + + return line, lineno, column + +def lex(st): + pos = 0 + while pos < len(st): + m = LEXER.match(st, pos) + if m is None: + line, ln, col = line_info(st, pos) + raise LexError("unrecognized character in <string>:%s,%s: %s" % (ln, col, line)) + else: + idx = m.lastindex + t = Token(TYPES[idx - 1], m.group(idx)) + yield t + pos = m.end() + yield Token(EOF, None) + +class ParseError(Exception): pass + +class Parser: + + def __init__(self, tokens): + self.tokens = [t for t in tokens if t.type is not WSPACE] + self.idx = 0 + + def next(self): + return self.tokens[self.idx] + + def matches(self, *types): + return self.next().type in types + + def eat(self, *types): + if types and not self.matches(*types): + raise ParseError("expecting %s -- got %s" % (", ".join(map(str, types)), self.next())) + else: + t = self.next() + self.idx += 1 + return t + + def parse(self): + result = self.address() + self.eat(EOF) + return result + + def address(self): + name = self.eat(ID).value + subject = None + options = None + if self.matches(SLASH): + self.eat(SLASH) + if self.matches(ID): + subject = self.eat(ID).value + else: + subject = "" + elif self.matches(LBRACE): + options = self.map() + return name, subject, options + + def map(self): + self.eat(LBRACE) + result = {} + while True: + if self.matches(RBRACE): + self.eat(RBRACE) + break + else: + if self.matches(ID): + n, v = self.nameval() + result[n] = v + elif self.matches(COMMA): + self.eat(COMMA) + else: + raise ParseError("expecting (ID, COMMA), got %s" % self.next()) + return result + + def nameval(self): + name = self.eat(ID).value + self.eat(COLON) + val = self.value() + return (name, val) + + def value(self): + if self.matches(NUMBER, STRING): + return eval(self.eat().value) + elif self.matches(ID): + return self.eat().value + elif self.matches(LBRACE): + return self.map() + else: + raise ParseError("expecting (NUMBER, STRING, LBRACE) got %s" % self.next()) + +def parse(addr): + return Parser(lex(addr)).parse() + +__all__ = ["parse"] diff --git a/qpid/python/qpid/compat.py b/qpid/python/qpid/compat.py index 49273193df..53ab757e89 100644 --- a/qpid/python/qpid/compat.py +++ b/qpid/python/qpid/compat.py @@ -17,6 +17,8 @@ # under the License. # +import sys + try: set = set except NameError: @@ -30,6 +32,13 @@ except ImportError: try: from traceback import format_exc except ImportError: - import sys, traceback + import traceback def format_exc(): return "".join(traceback.format_exception(*sys.exc_info())) + +if tuple(sys.version_info[0:2]) < (2, 4): + from select import select as old_select + def select(rlist, wlist, xlist, timeout=None): + return old_select(list(rlist), list(wlist), list(xlist), timeout) +else: + from select import select diff --git a/qpid/python/qpid/driver.py b/qpid/python/qpid/driver.py index 2e07c82a0d..7c293fe146 100644 --- a/qpid/python/qpid/driver.py +++ b/qpid/python/qpid/driver.py @@ -17,25 +17,23 @@ # under the License. # -import compat, connection, socket, sys, time +import address, compat, connection, socket, struct, sys, time from concurrency import synchronized -from datatypes import RangedSet, Message as Message010 -from exceptions import Timeout +from datatypes import RangedSet, Serial +from exceptions import Timeout, VersionError +from framing import OpEncoder, SegmentEncoder, FrameEncoder, FrameDecoder, SegmentDecoder, OpDecoder from logging import getLogger from messaging import get_codec, ConnectError, Message, Pattern, UNLIMITED -from ops import delivery_mode -from session import Client, INCOMPLETE, SessionDetached +from ops import * +from selector import Selector from threading import Condition, Thread from util import connect log = getLogger("qpid.messaging") -def parse_addr(address): - parts = address.split("/", 1) - if len(parts) == 1: - return parts[0], None - else: - return parts[0], parts[i1] +def addr2reply_to(addr): + name, subject, options = address.parse(addr) + return ReplyTo(name, subject) def reply_to2addr(reply_to): if reply_to.routing_key is None: @@ -50,287 +48,617 @@ class Attachment: def __init__(self, target): self.target = target +# XXX + DURABLE_DEFAULT=True +# XXX + FILTER_DEFAULTS = { "topic": Pattern("*") } -def delegate(handler, session): - class Delegate(Client): - - def message_transfer(self, cmd): - return handler._message_transfer(session, cmd) - return Delegate +# XXX + +CLIENT_PROPERTIES = {"product": "qpid python client", + "version": "development", + "platform": os.name, + "qpid.client_process": os.path.basename(sys.argv[0]), + "qpid.client_pid": os.getpid(), + "qpid.client_ppid": os.getppid()} + +def noop(): pass + +class SessionState: + + def __init__(self, driver, session, name, channel): + self.driver = driver + self.session = session + self.name = name + self.channel = channel + self.detached = False + self.committing = False + self.aborting = False + + # sender state + self.sent = Serial(0) + self.acknowledged = RangedSet() + self.completions = {} + self.min_completion = self.sent + self.max_completion = self.sent + self.results = {} + + # receiver state + self.received = None + self.executed = RangedSet() + + # XXX: need to periodically exchange completion/known_completion + + def write_query(self, query, handler): + id = self.sent + query.sync = True + self.write_cmd(query, lambda: handler(self.results.pop(id))) + + def write_cmd(self, cmd, completion=noop): + if self.detached: + raise Exception("detached") + cmd.id = self.sent + self.sent += 1 + self.completions[cmd.id] = completion + self.max_completion = cmd.id + self.write_op(cmd) + + def write_op(self, op): + op.channel = self.channel + self.driver.write_op(op) + +# XXX +HEADER="!4s4B" + +EMPTY_DP = DeliveryProperties() +EMPTY_MP = MessageProperties() class Driver: def __init__(self, connection): self.connection = connection self._lock = self.connection._lock - self._wakeup_cond = Condition() - self._socket = None - self._conn = None + + self._selector = Selector.default() + self.reset() + + def reset(self): + self._opening = False + self._closing = False self._connected = False self._attachments = {} - self._modcount = self.connection._modcount - self.thread = Thread(target=self.run) - self.thread.setDaemon(True) - # XXX: need to figure out how to join on this thread + self._channel_max = 65536 + self._channels = 0 + self._sessions = {} + + self._socket = None + self._buf = "" + self._hdr = "" + self._op_enc = OpEncoder() + self._seg_enc = SegmentEncoder() + self._frame_enc = FrameEncoder() + self._frame_dec = FrameDecoder() + self._seg_dec = SegmentDecoder() + self._op_dec = OpDecoder() + self._timeout = None + + for ssn in self.connection.sessions.values(): + for m in ssn.acked + ssn.unacked + ssn.incoming: + m._transfer_id = None + for snd in ssn.senders: + snd.linked = False + for rcv in ssn.receivers: + rcv.impending = rcv.received + rcv.linked = False + + @synchronized def wakeup(self): - self._wakeup_cond.acquire() - try: - self._wakeup_cond.notifyAll() - finally: - self._wakeup_cond.release() + self.dispatch() + self._selector.wakeup() def start(self): - self.thread.start() + self._selector.register(self) + + def fileno(self): + return self._socket.fileno() - def run(self): - while True: - self._wakeup_cond.acquire() + @synchronized + def reading(self): + return self._socket is not None + + @synchronized + def writing(self): + return self._socket is not None and self._buf + + @synchronized + def timing(self): + return self._timeout + + @synchronized + def readable(self): + error = None + recoverable = False + try: + data = self._socket.recv(64*1024) + if data: + log.debug("READ: %r", data) + else: + log.debug("ABORTED: %s", self._socket.getpeername()) + error = "connection aborted" + recoverable = True + except socket.error, e: + error = e + recoverable = True + + if not error: try: - if self.connection._modcount <= self._modcount: - self._wakeup_cond.wait(10) - finally: - self._wakeup_cond.release() - self.dispatch(self.connection._modcount) + if len(self._hdr) < 8: + r = 8 - len(self._hdr) + self._hdr += data[:r] + data = data[r:] + + if len(self._hdr) == 8: + self.do_header(self._hdr) + + self._frame_dec.write(data) + self._seg_dec.write(*self._frame_dec.read()) + self._op_dec.write(*self._seg_dec.read()) + for op in self._op_dec.read(): + self.assign_id(op) + log.debug("RCVD: %r", op) + op.dispatch(self) + except VersionError, e: + error = e + except: + msg = compat.format_exc() + error = msg + + if error: + self._error(error, recoverable) + else: + self.dispatch() + + self.connection._waiter.notifyAll() + + def assign_id(self, op): + if isinstance(op, Command): + sst = self.get_sst(op) + op.id = sst.received + sst.received += 1 @synchronized - def dispatch(self, modcount): + def writeable(self): try: - if self._conn is None and self.connection._connected: + n = self._socket.send(self._buf) + log.debug("SENT: %r", self._buf[:n]) + self._buf = self._buf[n:] + except socket.error, e: + self._error(e, True) + self.connection._waiter.notifyAll() + + @synchronized + def timeout(self): + log.warn("retrying ...") + self.dispatch() + self.connection._waiter.notifyAll() + + def _error(self, err, recoverable): + if self._socket is not None: + self._socket.close() + self.reset() + if recoverable and self.connection.reconnect: + self._timeout = time.time() + 3 + log.warn("recoverable error: %s" % err) + log.warn("sleeping 3 seconds") + else: + self.connection.error = (err,) + + def write_op(self, op): + log.debug("SENT: %r", op) + self._op_enc.write(op) + self._seg_enc.write(*self._op_enc.read()) + self._frame_enc.write(*self._seg_enc.read()) + self._buf += self._frame_enc.read() + + def do_header(self, hdr): + cli_major = 0; cli_minor = 10 + magic, _, _, major, minor = struct.unpack(HEADER, hdr) + if major != cli_major or minor != cli_minor: + raise VersionError("client: %s-%s, server: %s-%s" % + (cli_major, cli_minor, major, minor)) + + def do_connection_start(self, start): + # XXX: should we use some sort of callback for this? + r = "\0%s\0%s" % (self.connection.username, self.connection.password) + m = self.connection.mechanism + self.write_op(ConnectionStartOk(client_properties=CLIENT_PROPERTIES, + mechanism=m, response=r)) + + def do_connection_tune(self, tune): + # XXX: is heartbeat protocol specific? + if tune.channel_max is not None: + self.channel_max = tune.channel_max + self.write_op(ConnectionTuneOk(heartbeat=self.connection.heartbeat, + channel_max=self.channel_max)) + self.write_op(ConnectionOpen()) + + def do_connection_open_ok(self, open_ok): + self._connected = True + + def connection_heartbeat(self, hrt): + self.write_op(ConnectionHeartbeat()) + + def do_connection_close(self, close): + self.write_op(ConnectionCloseOk()) + if close.reply_code != close_code.normal: + self.connection.error = (close.reply_code, close.reply_text) + # XXX: should we do a half shutdown on the socket here? + # XXX: we really need to test this, we may end up reporting a + # connection abort after this, if we were to do a shutdown on read + # and stop reading, then we wouldn't report the abort, that's + # probably the right thing to do + + def do_connection_close_ok(self, close_ok): + self._socket.close() + self.reset() + + def do_session_attached(self, atc): + pass + + def do_session_command_point(self, cp): + sst = self.get_sst(cp) + sst.received = cp.command_id + + def do_session_completed(self, sc): + sst = self.get_sst(sc) + for r in sc.commands: + sst.acknowledged.add(r.lower, r.upper) + + if not sc.commands.empty(): + while sst.min_completion in sc.commands: + if sst.completions.has_key(sst.min_completion): + sst.completions.pop(sst.min_completion)() + sst.min_completion += 1 + + def session_known_completed(self, kcmp): + sst = self.get_sst(kcmp) + executed = RangedSet() + for e in sst.executed.ranges: + for ke in kcmp.ranges: + if e.lower in ke and e.upper in ke: + break + else: + executed.add_range(e) + sst.executed = completed + + def do_session_flush(self, sf): + sst = self.get_sst(sf) + if sf.expected: + if sst.received is None: + exp = None + else: + exp = RangedSet(sst.received) + sst.write_op(SessionExpected(exp)) + if sf.confirmed: + sst.write_op(SessionConfirmed(sst.executed)) + if sf.completed: + sst.write_op(SessionCompleted(sst.executed)) + + def do_execution_result(self, er): + sst = self.get_sst(er) + sst.results[er.command_id] = er.value + + def do_execution_exception(self, ex): + sst = self.get_sst(ex) + sst.session.error = (ex,) + + def dispatch(self): + try: + if self._socket is None and self.connection._connected and not self._opening: self.connect() - elif self._conn is not None and not self.connection._connected: + elif self._socket is not None and not self.connection._connected and not self._closing: self.disconnect() - if self._conn is not None: + if self._connected and not self._closing: for ssn in self.connection.sessions.values(): self.attach(ssn) self.process(ssn) - - exi = None except: - exi = sys.exc_info() - - if exi: msg = compat.format_exc() - recoverable = ["aborted", "Connection refused", "SessionDetached", "Connection reset by peer", - "Bad file descriptor", "start timed out", "Broken pipe"] - for r in recoverable: - if self.connection.reconnect and r in msg: - print "waiting to retry" - self.reset() - time.sleep(3) - print "retrying..." - return - else: - self.connection.error = (msg,) - - self._modcount = modcount - self.connection._waiter.notifyAll() + self.connection.error = (msg,) def connect(self): - if self._conn is not None: - return try: + # XXX: should make this non blocking self._socket = connect(self.connection.host, self.connection.port) + self._timeout = None except socket.error, e: - raise ConnectError(e) - self._conn = connection.Connection(self._socket) - try: - self._conn.start(timeout=10) - self._connected = True - except connection.VersionError, e: - raise ConnectError(e) - except Timeout: - print "start timed out" - raise ConnectError("start timed out") + if self.connection.reconnect: + self._error(e, True) + return + else: + raise e + self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10) + self._opening = True def disconnect(self): - self._conn.close() - self.reset() - - def reset(self): - self._conn = None - self._connected = False - self._attachments.clear() - for ssn in self.connection.sessions.values(): - for m in ssn.acked + ssn.unacked + ssn.incoming: - m._transfer_id = None - for rcv in ssn.receivers: - rcv.impending = rcv.received - - def connected(self): - return self._conn is not None + self.write_op(ConnectionClose(close_code.normal)) + self._closing = True def attach(self, ssn): - _ssn = self._attachments.get(ssn) - if _ssn is None: - _ssn = self._conn.session(ssn.name, delegate=delegate(self, ssn)) - _ssn.auto_sync = False - _ssn.invoke_lock = self._lock - _ssn.lock = self._lock - _ssn.condition = self.connection._condition + sst = self._attachments.get(ssn) + if sst is None and not ssn.closed: + for i in xrange(0, self.channel_max): + if not self._sessions.has_key(i): + ch = i + break + else: + raise RuntimeError("all channels used") + sst = SessionState(self, ssn, ssn.name, ch) + sst.write_op(SessionAttach(name=ssn.name)) + sst.write_op(SessionCommandPoint(sst.sent, 0)) + sst.outgoing_idx = 0 + sst.acked = [] if ssn.transactional: - # XXX: adding an attribute to qpid.session.Session - _ssn.acked = [] - _ssn.tx_select() - self._attachments[ssn] = _ssn + sst.write_cmd(TxSelect()) + self._attachments[ssn] = sst + self._sessions[sst.channel] = sst for snd in ssn.senders: self.link_out(snd) for rcv in ssn.receivers: self.link_in(rcv) - if ssn.closing: - _ssn.close() - del self._attachments[ssn] - ssn.closed = True + if sst is not None and ssn.closing and not sst.detached: + sst.detached = True + sst.write_op(SessionDetach(name=ssn.name)) + + def get_sst(self, op): + return self._sessions[op.channel] - def _exchange_query(self, ssn, address): - # XXX: auto sync hack is to avoid deadlock on future - result = ssn.exchange_query(name=address, sync=True) - ssn.sync() - return result.get() + def do_session_detached(self, dtc): + sst = self._sessions.pop(dtc.channel) + ssn = sst.session + del self._attachments[ssn] + ssn.closed = True + + def do_session_detach(self, dtc): + sst = self.get_sst(dtc) + sst.write_op(SessionDetached(name=dtc.name)) + self.do_session_detached(dtc) def link_out(self, snd): - _ssn = self._attachments[snd.session] + sst = self._attachments.get(snd.session) _snd = self._attachments.get(snd) - if _snd is None: + if _snd is None and not snd.closing and not snd.closed: _snd = Attachment(snd) - node, _snd._subject = parse_addr(snd.target) - result = self._exchange_query(_ssn, node) - if result.not_found: - # XXX: should check 'create' option - _ssn.queue_declare(queue=node, durable=DURABLE_DEFAULT, sync=True) - _ssn.sync() - _snd._exchange = "" - _snd._routing_key = node - else: - _snd._exchange = node - _snd._routing_key = _snd._subject + + try: + _snd.name, _snd.subject, _snd.options = address.parse(snd.target) + except address.LexError, e: + snd.error = e + snd.closed = True + return + except address.ParseError, e: + snd.error = e + snd.closed = True + return + + # XXX: subject + if _snd.options is None: + _snd.options = {} + + def do_link(): + snd.linked = True + + def do_queue_q(result): + if sst.detached: + return + + if result.queue: + _snd._exchange = "" + _snd._routing_key = _snd.name + do_link() + else: + snd.error = ("no such queue: %s" % _snd.name,) + del self._attachments[snd] + snd.closed = True + + def do_exchange_q(result): + if sst.detached: + return + + if result.not_found: + if _snd.options.get("create") in ("always", "receiver"): + sst.write_cmd(QueueDeclare(queue=_snd.name, durable=DURABLE_DEFAULT)) + _snd._exchange = "" + _snd._routing_key = _snd.name + else: + sst.write_query(QueueQuery(queue=_snd.name), do_queue_q) + return + else: + _snd._exchange = _snd.name + _snd._routing_key = _snd.subject + do_link() + + sst.write_query(ExchangeQuery(name=_snd.name), do_exchange_q) self._attachments[snd] = _snd - if snd.closed: + if snd.closing and not snd.closed: del self._attachments[snd] - return None - else: - return _snd + snd.closed = True def link_in(self, rcv): - _ssn = self._attachments[rcv.session] + sst = self._attachments.get(rcv.session) _rcv = self._attachments.get(rcv) - if _rcv is None: + if _rcv is None and not rcv.closing and not rcv.closed: _rcv = Attachment(rcv) - result = self._exchange_query(_ssn, rcv.source) - if result.not_found: - _rcv._queue = rcv.source - # XXX: should check 'create' option - _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT) - else: - _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination) - _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True) - if rcv.filter is None: - f = FILTER_DEFAULTS[result.type] + _rcv.canceled = False + _rcv.draining = False + + try: + _rcv.name, _rcv.subject, _rcv.options = address.parse(rcv.source) + except address.LexError, e: + rcv.error = e + rcv.closed = True + return + except address.ParseError, e: + rcv.error = e + rcv.closed = True + return + + # XXX: subject + if _rcv.options is None: + _rcv.options = {} + + def do_link(): + sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination)) + sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit)) + rcv.linked = True + + def do_queue_q(result): + if sst.detached: + return + if result.queue: + _rcv._queue = _rcv.name + do_link() + else: + rcv.error = ("no such queue: %s" % _rcv.name,) + del self._attachments[rcv] + rcv.closed = True + + def do_exchange_q(result): + if sst.detached: + return + if result.not_found: + if _rcv.options.get("create") in ("always", "receiver"): + _rcv._queue = _rcv.name + sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT)) + else: + sst.write_query(QueueQuery(queue=_rcv.name), do_queue_q) + return else: - f = rcv.filter - f._bind(_ssn, rcv.source, _rcv._queue) - _ssn.message_subscribe(queue=_rcv._queue, destination=rcv.destination) - _ssn.message_set_flow_mode(rcv.destination, _ssn.flow_mode.credit, sync=True) + _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination) + sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)) + filter = _rcv.options.get("filter") + if _rcv.subject is None and filter is None: + f = FILTER_DEFAULTS[result.type] + elif _rcv.subject and filter: + # XXX + raise Exception("can't supply both subject and filter") + elif _rcv.subject: + # XXX + from messaging import Pattern + f = Pattern(_rcv.subject) + else: + f = filter + f._bind(sst, _rcv.name, _rcv._queue) + do_link() + sst.write_query(ExchangeQuery(name=_rcv.name), do_exchange_q) self._attachments[rcv] = _rcv - # XXX: need to kill syncs - _ssn.sync() - - if rcv.closing: - _ssn.message_cancel(rcv.destination, sync=True) - # XXX: need to kill syncs - _ssn.sync() - del self._attachments[rcv] - rcv.closed = True - return None - else: - return _rcv + + if rcv.closing and not rcv.closed: + if rcv.linked: + if not _rcv.canceled: + def close_rcv(): + del self._attachments[rcv] + rcv.closed = True + sst.write_cmd(MessageCancel(rcv.destination, sync=True), close_rcv) + _rcv.canceled = True + else: + rcv.closed = True def process(self, ssn): if ssn.closing: return - _ssn = self._attachments[ssn] + sst = self._attachments[ssn] - while ssn.outgoing: - msg = ssn.outgoing[0] + while sst.outgoing_idx < len(ssn.outgoing): + msg = ssn.outgoing[sst.outgoing_idx] snd = msg._sender - self.send(snd, msg) - ssn.outgoing.pop(0) + # XXX: should check for sender error here + _snd = self._attachments.get(snd) + if _snd and snd.linked: + self.send(snd, msg) + sst.outgoing_idx += 1 + else: + break for rcv in ssn.receivers: self.process_receiver(rcv) if ssn.acked: - messages = ssn.acked[:] - ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None]) - for range in ids: - _ssn.receiver._completed.add_range(range) - ch = _ssn.channel - if ch is None: - raise SessionDetached() - ch.session_completed(_ssn.receiver._completed) - _ssn.message_accept(ids, sync=True) - # XXX: really need to make this async so that we don't give up the lock - _ssn.sync() - - # XXX: we're ignoring acks that get lost when disconnected - for m in messages: - ssn.acked.remove(m) - if ssn.transactional: - _ssn.acked.append(m) - - if ssn.committing: - _ssn.tx_commit(sync=True) - # XXX: need to kill syncs - _ssn.sync() - del _ssn.acked[:] - ssn.committing = False - ssn.committed = True - ssn.aborting = False - ssn.aborted = False - - if ssn.aborting: - for rcv in ssn.receivers: - _ssn.message_stop(rcv.destination) - _ssn.sync() - - messages = _ssn.acked + ssn.unacked + ssn.incoming - ids = RangedSet(*[m._transfer_id for m in messages]) - for range in ids: - _ssn.receiver._completed.add_range(range) - _ssn.channel.session_completed(_ssn.receiver._completed) - _ssn.message_release(ids) - _ssn.tx_rollback(sync=True) - _ssn.sync() - - del ssn.incoming[:] - del ssn.unacked[:] - del _ssn.acked[:] + messages = [m for m in ssn.acked if m not in sst.acked] + if messages: + # XXX: we're ignoring acks that get lost when disconnected, + # could we deal this via some message-id based purge? + ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None]) + for range in ids: + sst.executed.add_range(range) + sst.write_op(SessionCompleted(sst.executed)) + def ack_ack(): + for m in messages: + ssn.acked.remove(m) + if not ssn.transactional: + sst.acked.remove(m) + sst.write_cmd(MessageAccept(ids, sync=True), ack_ack) + sst.acked.extend(messages) + + if ssn.committing and not sst.committing: + def commit_ok(): + del sst.acked[:] + ssn.committing = False + ssn.committed = True + ssn.aborting = False + ssn.aborted = False + sst.write_cmd(TxCommit(sync=True), commit_ok) + sst.committing = True + + if ssn.aborting and not sst.aborting: + sst.aborting = True + def do_rb(): + messages = sst.acked + ssn.unacked + ssn.incoming + ids = RangedSet(*[m._transfer_id for m in messages]) + for range in ids: + sst.executed.add_range(range) + sst.write_op(SessionCompleted(sst.executed)) + sst.write_cmd(MessageRelease(ids)) + sst.write_cmd(TxRollback(sync=True), do_rb_ok) + + def do_rb_ok(): + del ssn.incoming[:] + del ssn.unacked[:] + del sst.acked[:] + + for rcv in ssn.receivers: + rcv.impending = rcv.received + rcv.returned = rcv.received + # XXX: do we need to update granted here as well? + + for rcv in ssn.receivers: + self.process_receiver(rcv) + + ssn.aborting = False + ssn.aborted = True + ssn.committing = False + ssn.committed = False + sst.aborting = False for rcv in ssn.receivers: - rcv.impending = rcv.received - rcv.returned = rcv.received - # XXX: do we need to update granted here as well? - - for rcv in ssn.receivers: - self.process_receiver(rcv) - - ssn.aborting = False - ssn.aborted = True - ssn.committing = False - ssn.committed = False + sst.write_cmd(MessageStop(rcv.destination)) + sst.write_cmd(ExecutionSync(sync=True), do_rb) def grant(self, rcv): - _ssn = self._attachments[rcv.session] - _rcv = self.link_in(rcv) + sst = self._attachments[rcv.session] + _rcv = self._attachments.get(rcv) + if _rcv is None or not rcv.linked or _rcv.canceled or _rcv.draining: + return if rcv.granted is UNLIMITED: if rcv.impending is UNLIMITED: @@ -343,30 +671,37 @@ class Driver: delta = max(rcv.granted, rcv.received) - rcv.impending if delta is UNLIMITED: - _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value) - _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, UNLIMITED.value) + sst.write_cmd(MessageFlow(rcv.destination, credit_unit.byte, UNLIMITED.value)) + sst.write_cmd(MessageFlow(rcv.destination, credit_unit.message, UNLIMITED.value)) rcv.impending = UNLIMITED elif delta > 0: - _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value) - _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, delta) + sst.write_cmd(MessageFlow(rcv.destination, credit_unit.byte, UNLIMITED.value)) + sst.write_cmd(MessageFlow(rcv.destination, credit_unit.message, delta)) rcv.impending += delta - elif delta < 0: - if rcv.drain: - _ssn.message_flush(rcv.destination, sync=True) - else: - _ssn.message_stop(rcv.destination, sync=True) - # XXX: need to kill syncs - _ssn.sync() - rcv.impending = rcv.received - self.grant(rcv) + elif delta < 0 and not rcv.draining: + _rcv.draining = True + def do_stop(): + rcv.impending = rcv.received + _rcv.draining = False + self.grant(rcv) + sst.write_cmd(MessageStop(rcv.destination, sync=True), do_stop) + + if rcv.draining: + def do_flush(): + rcv.impending = rcv.received + rcv.granted = rcv.impending + _rcv.draining = False + rcv.draining = False + sst.write_cmd(MessageFlush(rcv.destination, sync=True), do_flush) + def process_receiver(self, rcv): if rcv.closed: return self.grant(rcv) def send(self, snd, msg): - _ssn = self._attachments[snd.session] - _snd = self.link_out(snd) + sst = self._attachments[snd.session] + _snd = self._attachments[snd] # XXX: what if subject is specified for a normal queue? if _snd._routing_key is None: @@ -375,16 +710,16 @@ class Driver: rk = _snd._routing_key # XXX: do we need to query to figure out how to create the reply-to interoperably? if msg.reply_to: - rt = _ssn.reply_to(*parse_addr(msg.reply_to)) + rt = addr2reply_to(msg.reply_to) else: rt = None - dp = _ssn.delivery_properties(routing_key=rk) - mp = _ssn.message_properties(message_id=msg.id, - user_id=msg.user_id, - reply_to=rt, - correlation_id=msg.correlation_id, - content_type=msg.content_type, - application_headers=msg.properties) + dp = DeliveryProperties(routing_key=rk) + mp = MessageProperties(message_id=msg.id, + user_id=msg.user_id, + reply_to=rt, + correlation_id=msg.correlation_id, + content_type=msg.content_type, + application_headers=msg.properties) if msg.subject is not None: if mp.application_headers is None: mp.application_headers = {} @@ -397,37 +732,42 @@ class Driver: dp.delivery_mode = delivery_mode.persistent enc, dec = get_codec(msg.content_type) body = enc(msg.content) - _ssn.message_transfer(destination=_snd._exchange, - message=Message010(dp, mp, body), - sync=True) - log.debug("SENT [%s] %s", snd.session, msg) - # XXX: really need to make this async so that we don't give up the lock - _ssn.sync() - # XXX: should we log the ack somehow too? - snd.acked += 1 - - @synchronized - def _message_transfer(self, ssn, cmd): - m = Message010(cmd.payload) - m.headers = cmd.headers - m.id = cmd.id - msg = self._decode(m) - rcv = ssn.receivers[int(cmd.destination)] + def msg_acked(): + # XXX: should we log the ack somehow too? + snd.acked += 1 + m = snd.session.outgoing.pop(0) + sst.outgoing_idx -= 1 + assert msg == m + sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp), + payload=body, sync=True), msg_acked) + + def do_message_transfer(self, xfr): + sst = self.get_sst(xfr) + ssn = sst.session + + msg = self._decode(xfr) + rcv = ssn.receivers[int(xfr.destination)] msg._receiver = rcv if rcv.impending is not UNLIMITED: - assert rcv.received < rcv.impending + assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending) rcv.received += 1 log.debug("RECV [%s] %s", ssn, msg) ssn.incoming.append(msg) self.connection._waiter.notifyAll() - return INCOMPLETE - def _decode(self, message): - dp = message.get("delivery_properties") - mp = message.get("message_properties") + def _decode(self, xfr): + dp = EMPTY_DP + mp = EMPTY_MP + + for h in xfr.headers: + if isinstance(h, DeliveryProperties): + dp = h + elif isinstance(h, MessageProperties): + mp = h + ap = mp.application_headers enc, dec = get_codec(mp.content_type) - content = dec(message.body) + content = dec(xfr.payload) msg = Message(content) msg.id = mp.message_id if ap is not None: @@ -440,5 +780,5 @@ class Driver: msg.durable = dp.delivery_mode == delivery_mode.persistent msg.properties = mp.application_headers msg.content_type = mp.content_type - msg._transfer_id = message.id + msg._transfer_id = xfr.id return msg diff --git a/qpid/python/qpid/messaging.py b/qpid/python/qpid/messaging.py index d755aa5054..3e3c8f36cb 100644 --- a/qpid/python/qpid/messaging.py +++ b/qpid/python/qpid/messaging.py @@ -77,7 +77,8 @@ class Connection: """ @static - def open(host, port=None): + def open(host, port=None, username="guest", password="guest", + mechanism="PLAIN", heartbeat=None, **options): """ Creates an AMQP connection and connects it to the given host and port. @@ -88,11 +89,12 @@ class Connection: @rtype: Connection @return: a connected Connection """ - conn = Connection(host, port) + conn = Connection(host, port, username, password, mechanism, heartbeat, **options) conn.connect() return conn - def __init__(self, host, port=None): + def __init__(self, host, port=None, username="guest", password="guest", + mechanism="PLAIN", heartbeat=None, **options): """ Creates a connection. A newly created connection must be connected with the Connection.connect() method before it can be started. @@ -106,11 +108,16 @@ class Connection: """ self.host = host self.port = default(port, AMQP_PORT) + self.username = username + self.password = password + self.mechanism = mechanism + self.heartbeat = heartbeat + self.started = False self.id = str(uuid4()) self.session_counter = 0 self.sessions = {} - self.reconnect = False + self.reconnect = options.get("reconnect", False) self._connected = False self._lock = RLock() self._condition = Condition(self._lock) @@ -230,9 +237,10 @@ class Pattern: self.value = value # XXX: this should become part of the driver - def _bind(self, ssn, exchange, queue): - ssn.exchange_bind(exchange=exchange, queue=queue, - binding_key=self.value.replace("*", "#")) + def _bind(self, sst, exchange, queue): + from qpid.ops import ExchangeBind + sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue, + binding_key=self.value.replace("*", "#"))) class SessionError(Exception): pass @@ -282,6 +290,7 @@ class Session: # XXX: I hate this name. self.ack_capacity = UNLIMITED + self.error = None self.closing = False self.closed = False @@ -302,12 +311,16 @@ class Session: def _check_error(self, exc=SessionError): self.connection._check_error(exc) + if self.error: + raise exc(*self.error) def _ewait(self, predicate, timeout=None, exc=SessionError): - return self.connection._ewait(predicate, timeout, exc) + result = self.connection._ewait(lambda: self.error or predicate(), timeout, exc) + self._check_error(exc) + return result @synchronized - def sender(self, target): + def sender(self, target, **options): """ Creates a L{Sender} that may be used to send L{Messages<Message>} to the specified target. @@ -317,7 +330,7 @@ class Session: @rtype: Sender @return: a new Sender for the specified target """ - sender = Sender(self, len(self.senders), target) + sender = Sender(self, len(self.senders), target, options) self.senders.append(sender) self._wakeup() # XXX: because of the lack of waiting here we can end up getting @@ -327,7 +340,7 @@ class Session: return sender @synchronized - def receiver(self, source, filter=None): + def receiver(self, source, **options): """ Creates a receiver that may be used to actively fetch or to listen for the arrival of L{Messages<Message>} from the specified source. @@ -337,7 +350,7 @@ class Session: @rtype: Receiver @return: a new Receiver for the specified source """ - receiver = Receiver(self, len(self.receivers), source, filter, + receiver = Receiver(self, len(self.receivers), source, options, self.started) self.receivers.append(receiver) self._wakeup() @@ -368,8 +381,8 @@ class Session: @synchronized def _get(self, predicate, timeout=None): - if self._wait(lambda: ((self._peek(predicate) is not None) or self.closing), - timeout): + if self._ewait(lambda: ((self._peek(predicate) is not None) or self.closing), + timeout): msg = self._pop(predicate) if msg is not None: msg._receiver.returned += 1 @@ -505,13 +518,18 @@ class Sender: Sends outgoing messages. """ - def __init__(self, session, index, target): + def __init__(self, session, index, target, options): self.session = session self.index = index self.target = target - self.capacity = UNLIMITED + self.options = options + self.capacity = options.get("capacity", UNLIMITED) + self.durable = options.get("durable") self.queued = Serial(0) self.acked = Serial(0) + self.error = None + self.linked = False + self.closing = False self.closed = False self._lock = self.session._lock @@ -520,9 +538,13 @@ class Sender: def _check_error(self, exc=SendError): self.session._check_error(exc) + if self.error: + raise exc(*self.error) def _ewait(self, predicate, timeout=None, exc=SendError): - return self.session._ewait(predicate, timeout, exc) + result = self.session._ewait(lambda: self.error or predicate(), timeout, exc) + self._check_error(exc) + return result @synchronized def pending(self): @@ -558,11 +580,16 @@ class Sender: if not self.session.connection._connected or self.session.closing: raise Disconnected() + self._ewait(lambda: self.linked) + if isinstance(object, Message): message = object else: message = Message(object) + if message.durable is None: + message.durable = self.durable + if self.capacity is not UNLIMITED: if self.capacity <= 0: raise InsufficientCapacity("capacity = %s" % self.capacity) @@ -573,15 +600,19 @@ class Sender: message._sender = self self.session.outgoing.append(message) self.queued += 1 - mno = self.queued self._wakeup() if sync: - self._ewait(lambda: self.acked >= mno) + self.sync() assert message not in self.session.outgoing @synchronized + def sync(self): + mno = self.queued + self._ewait(lambda: self.acked >= mno) + + @synchronized def close(self): """ Close the Sender. @@ -609,21 +640,23 @@ class Receiver: L{listen}. """ - def __init__(self, session, index, source, filter, started): + def __init__(self, session, index, source, options, started): self.session = session self.index = index self.destination = str(self.index) self.source = source - self.filter = filter + self.options = options self.started = started - self.capacity = UNLIMITED + self.capacity = options.get("capacity", UNLIMITED) self.granted = Serial(0) - self.drain = False + self.draining = False self.impending = Serial(0) self.received = Serial(0) self.returned = Serial(0) + self.error = None + self.linked = False self.closing = False self.closed = False self.listener = None @@ -634,9 +667,13 @@ class Receiver: def _check_error(self, exc=ReceiveError): self.session._check_error(exc) + if self.error: + raise exc(*self.error) def _ewait(self, predicate, timeout=None, exc=ReceiveError): - return self.session._ewait(predicate, timeout, exc) + result = self.session._ewait(lambda: self.error or predicate(), timeout, exc) + self._check_error(exc) + return result @synchronized def pending(self): @@ -680,17 +717,18 @@ class Receiver: @type timeout: float @param timeout: the time to wait for a message to be available """ + + self._ewait(lambda: self.linked) + if self._capacity() == 0: self.granted = self.returned + 1 self._wakeup() self._ewait(lambda: self.impending >= self.granted) msg = self.session._get(self._pred, timeout=timeout) if msg is None: - self.drain = True - self.granted = self.received + self.draining = True self._wakeup() - self._ewait(lambda: self.impending == self.received) - self.drain = False + self._ewait(lambda: not self.draining) self._grant() self._wakeup() msg = self.session._get(self._pred, timeout=0) @@ -738,7 +776,7 @@ class Receiver: self.closing = True self._wakeup() try: - self._ewait(lambda: self.closed) + self.session._ewait(lambda: self.closed) finally: self.session.receivers.remove(self) @@ -778,6 +816,8 @@ def get_type(content): def get_codec(content_type): return TYPE_CODEC[content_type] +UNSPECIFIED = object() + class Message: """ @@ -802,7 +842,9 @@ class Message: @ivar content: the message content """ - def __init__(self, content=None): + def __init__(self, content=None, content_type=UNSPECIFIED, id=None, + subject=None, to=None, user_id=None, reply_to=None, + correlation_id=None, durable=None, properties=None): """ Construct a new message with the supplied content. The content-type of the message will be automatically inferred from @@ -810,20 +852,44 @@ class Message: @type content: str, unicode, buffer, dict, list @param content: the message content - """ - self.id = None - self.subject = None - self.user_id = None - self.to = None - self.reply_to = None - self.correlation_id = None - self.durable = False - self.properties = {} - self.content_type = get_type(content) + + @type content_type: str + @param content_type: the content-type of the message + """ + self.id = id + self.subject = subject + self.to = to + self.user_id = user_id + self.reply_to = reply_to + self.correlation_id = correlation_id + self.durable = durable + if properties is None: + self.properties = {} + else: + self.properties = properties + if content_type is UNSPECIFIED: + self.content_type = get_type(content) + else: + self.content_type = content_type self.content = content def __repr__(self): - return "Message(%r)" % self.content + args = [] + for name in ["id", "subject", "to", "user_id", "reply_to", + "correlation_id"]: + value = self.__dict__[name] + if value is not None: args.append("%s=%r" % (name, value)) + for name in ["durable", "properties"]: + value = self.__dict__[name] + if value: args.append("%s=%r" % (name, value)) + if self.content_type != get_type(self.content): + args.append("content_type=%r" % self.content_type) + if self.content is not None: + if args: + args.append("content=%r" % self.content) + else: + args.append(repr(self.content)) + return "Message(%s)" % ", ".join(args) __all__ = ["Connection", "Session", "Sender", "Receiver", "Pattern", "Message", "ConnectionError", "ConnectError", "SessionError", "Disconnected", diff --git a/qpid/python/qpid/ops.py b/qpid/python/qpid/ops.py index 11e7d11fe9..277d059203 100644 --- a/qpid/python/qpid/ops.py +++ b/qpid/python/qpid/ops.py @@ -80,7 +80,7 @@ class Compound(object): return "%s(%s)" % (self.__class__.__name__, ", ".join(["%s=%r" % (f.name, getattr(self, f.name)) for f in self.ARGS - if getattr(self, f.name) is not f.default])) + if getattr(self, f.name) != f.default])) class Command(Compound): UNENCODED=[Field("channel", "uint16", 0), @@ -209,8 +209,8 @@ def make(nd): from qpid_config import amqp_spec as file pclfile = "%s.ops.pcl" % file -if False and (os.path.exists(pclfile) and - os.path.getmtime(pclfile) > os.path.getmtime(file)): +if os.path.exists(pclfile) and \ + os.path.getmtime(pclfile) > os.path.getmtime(file): f = open(pclfile, "read") types = pickle.load(f) f.close() diff --git a/qpid/python/qpid/selector.py b/qpid/python/qpid/selector.py new file mode 100644 index 0000000000..46052e1108 --- /dev/null +++ b/qpid/python/qpid/selector.py @@ -0,0 +1,156 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import atexit, os, time +from compat import select, set +from threading import Thread, Lock + +class Acceptor: + + def __init__(self, sock, handler): + self.sock = sock + self.handler = handler + + def fileno(self): + return self.sock.fileno() + + def reading(self): + return True + + def writing(self): + return False + + def readable(self): + sock, addr = self.sock.accept() + self.handler(sock) + +class Sink: + + def __init__(self, fd): + self.fd = fd + + def fileno(self): + return self.fd + + def reading(self): + return True + + def readable(self): + os.read(self.fd, 65536) + + def __repr__(self): + return "Sink(%r)" % self.fd + +class Selector: + + lock = Lock() + DEFAULT = None + + @staticmethod + def default(): + Selector.lock.acquire() + try: + if Selector.DEFAULT is None: + sel = Selector() + atexit.register(sel.stop) + sel.start() + Selector.DEFAULT = sel + return Selector.DEFAULT + finally: + Selector.lock.release() + + def __init__(self): + self.selectables = set() + self.reading = set() + self.writing = set() + self.wait_fd, self.wakeup_fd = os.pipe() + self.reading.add(Sink(self.wait_fd)) + self.stopped = False + self.thread = None + + def wakeup(self): + os.write(self.wakeup_fd, "\0") + + def register(self, selectable): + self.selectables.add(selectable) + self.modify(selectable) + + def _update(self, selectable): + if selectable.reading(): + self.reading.add(selectable) + else: + self.reading.discard(selectable) + if selectable.writing(): + self.writing.add(selectable) + else: + self.writing.discard(selectable) + return selectable.timing() + + def modify(self, selectable): + self._update(selectable) + self.wakeup() + + def unregister(self, selectable): + self.reading.discard(selectable) + self.writing.discard(selectable) + self.selectables.discard(selectable) + self.wakeup() + + def start(self): + self.stopped = False + self.thread = Thread(target=self.run) + self.thread.setDaemon(True) + self.thread.start(); + + def run(self): + while not self.stopped: + wakeup = None + for sel in self.selectables.copy(): + t = self._update(sel) + if t is not None: + if wakeup is None: + wakeup = t + else: + wakeup = min(wakeup, t) + + if wakeup is None: + timeout = None + else: + timeout = max(0, wakeup - time.time()) + + rd, wr, ex = select(self.reading, self.writing, (), timeout) + + for sel in wr: + if sel.writing(): + sel.writeable() + + for sel in rd: + if sel.reading(): + sel.readable() + + now = time.time() + for sel in self.selectables.copy(): + w = sel.timing() + if w is not None and now > w: + sel.timeout() + + def stop(self, timeout=None): + self.stopped = True + self.wakeup() + self.thread.join(timeout) + self.thread = None diff --git a/qpid/python/qpid/tests/messaging.py b/qpid/python/qpid/tests/messaging.py index 7623c1f93b..2e4c0ca1ab 100644 --- a/qpid/python/qpid/tests/messaging.py +++ b/qpid/python/qpid/tests/messaging.py @@ -24,7 +24,7 @@ import time from qpid.tests import Test from qpid.harness import Skipped from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \ - InsufficientCapacity, Message, UNLIMITED, uuid4 + InsufficientCapacity, Message, ReceiveError, SendError, UNLIMITED, uuid4 from Queue import Queue, Empty as QueueEmpty class Base(Test): @@ -50,6 +50,8 @@ class Base(Test): raise Skipped(e) self.ssn = self.setup_session() self.snd = self.setup_sender() + if self.snd is not None: + self.snd.durable = self.durable() self.rcv = self.setup_receiver() def teardown(self): @@ -63,11 +65,12 @@ class Base(Test): return "%s[%s, %s]" % (base, count, self.test_id) def ping(self, ssn): + PING_Q = 'ping-queue {create: always}' # send a message - sender = ssn.sender("ping-queue") + sender = ssn.sender(PING_Q, durable=self.durable()) content = self.content("ping") sender.send(content) - receiver = ssn.receiver("ping-queue") + receiver = ssn.receiver(PING_Q) msg = receiver.fetch(0) ssn.acknowledge() assert msg.content == content, "expected %r, got %r" % (content, msg.content) @@ -97,16 +100,27 @@ class Base(Test): def delay(self): return float(self.config.defines.get("delay", "2")) + def get_bool(self, name): + return self.config.defines.get(name, "false").lower() in ("true", "yes", "1") + + def durable(self): + return self.get_bool("durable") + + def reconnect(self): + return self.get_bool("reconnect") + class SetupTests(Base): def testOpen(self): # XXX: need to flesh out URL support/syntax - self.conn = Connection.open(self.broker.host, self.broker.port) + self.conn = Connection.open(self.broker.host, self.broker.port, + reconnect=self.reconnect()) self.ping(self.conn.session()) def testConnect(self): # XXX: need to flesh out URL support/syntax - self.conn = Connection(self.broker.host, self.broker.port) + self.conn = Connection(self.broker.host, self.broker.port, + reconnect=self.reconnect()) self.conn.connect() self.ping(self.conn.session()) @@ -121,7 +135,8 @@ class SetupTests(Base): class ConnectionTests(Base): def setup_connection(self): - return Connection.open(self.broker.host, self.broker.port) + return Connection.open(self.broker.host, self.broker.port, + reconnect=self.reconnect()) def testSessionAnon(self): ssn1 = self.conn.session() @@ -174,17 +189,21 @@ class ConnectionTests(Base): self.conn.close() assert not self.conn.connected() +ACK_Q = 'test-ack-queue {create: always}' + class SessionTests(Base): def setup_connection(self): - return Connection.open(self.broker.host, self.broker.port) + return Connection.open(self.broker.host, self.broker.port, + reconnect=self.reconnect()) def setup_session(self): return self.conn.session() def testSender(self): - snd = self.ssn.sender("test-snd-queue") - snd2 = self.ssn.sender(snd.target) + snd = self.ssn.sender('test-snd-queue {create: always}', + durable=self.durable()) + snd2 = self.ssn.sender(snd.target, durable=self.durable()) assert snd is not snd2 snd2.close() @@ -196,47 +215,49 @@ class SessionTests(Base): self.ssn.acknowledge(msg) def testReceiver(self): - rcv = self.ssn.receiver("test-rcv-queue") + rcv = self.ssn.receiver('test-rcv-queue {create: always}') rcv2 = self.ssn.receiver(rcv.source) assert rcv is not rcv2 rcv2.close() content = self.content("testReceiver") - snd = self.ssn.sender(rcv.source) + snd = self.ssn.sender(rcv.source, durable=self.durable()) snd.send(content) msg = rcv.fetch(0) assert msg.content == content self.ssn.acknowledge(msg) def testStart(self): - rcv = self.ssn.receiver("test-start-queue") + START_Q = 'test-start-queue {create: always}' + rcv = self.ssn.receiver(START_Q) assert not rcv.started self.ssn.start() assert rcv.started - rcv = self.ssn.receiver("test-start-queue") + rcv = self.ssn.receiver(START_Q) assert rcv.started def testStop(self): + STOP_Q = 'test-stop-queue {create: always}' self.ssn.start() - rcv = self.ssn.receiver("test-stop-queue") + rcv = self.ssn.receiver(STOP_Q) assert rcv.started self.ssn.stop() assert not rcv.started - rcv = self.ssn.receiver("test-stop-queue") + rcv = self.ssn.receiver(STOP_Q) assert not rcv.started # XXX, we need a convenient way to assert that required queues are # empty on setup, and possibly also to drain queues on teardown def ackTest(self, acker, ack_capacity=None): # send a bunch of messages - snd = self.ssn.sender("test-ack-queue") + snd = self.ssn.sender(ACK_Q, durable=self.durable()) contents = [self.content("ackTest", i) for i in range(15)] for c in contents: snd.send(c) # drain the queue, verify the messages are there and then close # without acking - rcv = self.ssn.receiver(snd.target) + rcv = self.ssn.receiver(ACK_Q) self.drain(rcv, expected=contents) self.ssn.close() @@ -245,7 +266,7 @@ class SessionTests(Base): self.ssn = self.conn.session() if ack_capacity is not None: self.ssn.ack_capacity = ack_capacity - rcv = self.ssn.receiver("test-ack-queue") + rcv = self.ssn.receiver(ACK_Q) self.drain(rcv, expected=contents) acker(self.ssn) self.ssn.close() @@ -253,7 +274,7 @@ class SessionTests(Base): # drain the queue a final time and verify that the messages were # dequeued self.ssn = self.conn.session() - rcv = self.ssn.receiver("test-ack-queue") + rcv = self.ssn.receiver(ACK_Q) self.assertEmpty(rcv) def testAcknowledge(self): @@ -271,7 +292,7 @@ class SessionTests(Base): pass finally: self.ssn.ack_capacity = UNLIMITED - self.drain(self.ssn.receiver("test-ack-queue")) + self.drain(self.ssn.receiver(ACK_Q)) self.ssn.acknowledge() def testAcknowledgeAsyncAckCap1(self): @@ -284,7 +305,7 @@ class SessionTests(Base): self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED) def send(self, ssn, queue, base, count=1): - snd = ssn.sender(queue) + snd = ssn.sender(queue, durable=self.durable()) contents = [] for i in range(count): c = self.content(base, i) @@ -294,10 +315,12 @@ class SessionTests(Base): return contents def txTest(self, commit): + TX_Q = 'test-tx-queue {create: always}' + TX_Q_COPY = 'test-tx-queue-copy {create: always}' txssn = self.conn.session(transactional=True) - contents = self.send(self.ssn, "test-tx-queue", "txTest", 3) - txrcv = txssn.receiver("test-tx-queue") - txsnd = txssn.sender("test-tx-queue-copy") + contents = self.send(self.ssn, TX_Q, "txTest", 3) + txrcv = txssn.receiver(TX_Q) + txsnd = txssn.sender(TX_Q_COPY, durable=self.durable()) rcv = self.ssn.receiver(txrcv.source) copy_rcv = self.ssn.receiver(txsnd.target) self.assertEmpty(copy_rcv) @@ -323,9 +346,10 @@ class SessionTests(Base): self.txTest(False) def txTestSend(self, commit): + TX_SEND_Q = 'test-tx-send-queue {create: always}' txssn = self.conn.session(transactional=True) - contents = self.send(txssn, "test-tx-send-queue", "txTestSend", 3) - rcv = self.ssn.receiver("test-tx-send-queue") + contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3) + rcv = self.ssn.receiver(TX_SEND_Q) self.assertEmpty(rcv) if commit: @@ -345,10 +369,11 @@ class SessionTests(Base): self.txTestSend(False) def txTestAck(self, commit): + TX_ACK_Q = 'test-tx-ack-queue {create: always}' txssn = self.conn.session(transactional=True) - txrcv = txssn.receiver("test-tx-ack-queue") + txrcv = txssn.receiver(TX_ACK_Q) self.assertEmpty(txrcv) - contents = self.send(self.ssn, "test-tx-ack-queue", "txTestAck", 3) + contents = self.send(self.ssn, TX_ACK_Q, "txTestAck", 3) assert contents == self.drain(txrcv) if commit: @@ -366,11 +391,11 @@ class SessionTests(Base): txssn.close() txssn = self.conn.session(transactional=True) - txrcv = txssn.receiver("test-tx-ack-queue") + txrcv = txssn.receiver(TX_ACK_Q) assert contents == self.drain(txrcv) txssn.acknowledge() txssn.commit() - rcv = self.ssn.receiver("test-tx-ack-queue") + rcv = self.ssn.receiver(TX_ACK_Q) self.assertEmpty(rcv) txssn.close() self.assertEmpty(rcv) @@ -389,19 +414,22 @@ class SessionTests(Base): except Disconnected: pass +RECEIVER_Q = 'test-receiver-queue {create: always}' + class ReceiverTests(Base): def setup_connection(self): - return Connection.open(self.broker.host, self.broker.port) + return Connection.open(self.broker.host, self.broker.port, + reconnect=self.reconnect()) def setup_session(self): return self.conn.session() def setup_sender(self): - return self.ssn.sender("test-receiver-queue") + return self.ssn.sender(RECEIVER_Q) def setup_receiver(self): - return self.ssn.receiver("test-receiver-queue") + return self.ssn.receiver(RECEIVER_Q) def send(self, base, count = None): content = self.content(base, count) @@ -516,7 +544,7 @@ class ReceiverTests(Base): self.assertPending(self.rcv, 5) drained = self.drain(self.rcv) - assert len(drained) == 10 + assert len(drained) == 10, "%s, %s" % (len(drained), drained) self.assertPending(self.rcv, 0) self.ssn.acknowledge() @@ -538,19 +566,81 @@ class ReceiverTests(Base): # XXX: need testClose +NOSUCH_Q = "this-queue-should-not-exist" +UNPARSEABLE_ADDR = "{bad address}" +UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3" + +class AddressErrorTests(Base): + + def setup_connection(self): + return Connection.open(self.broker.host, self.broker.port, + reconnect=self.reconnect()) + + def setup_session(self): + return self.conn.session() + + def sendErrorTest(self, addr, exc, check=lambda e: True): + snd = self.ssn.sender(addr, durable=self.durable()) + try: + snd.send("hello") + assert False, "send succeeded" + except exc, e: + assert check(e), "unexpected error: %s" % e + snd.close() + + def fetchErrorTest(self, addr, exc, check=lambda e: True): + rcv = self.ssn.receiver(addr) + try: + rcv.fetch(timeout=0) + assert False, "fetch succeeded" + except exc, e: + assert check(e), "unexpected error: %s" % e + rcv.close() + + def testNoTarget(self): + # XXX: should have specific exception for this + self.sendErrorTest(NOSUCH_Q, SendError, lambda e: NOSUCH_Q in str(e)) + + def testNoSource(self): + # XXX: should have specific exception for this + self.fetchErrorTest(NOSUCH_Q, ReceiveError, lambda e: NOSUCH_Q in str(e)) + + def testUnparseableTarget(self): + # XXX: should have specific exception for this + self.sendErrorTest(UNPARSEABLE_ADDR, SendError, + lambda e: "expecting ID" in str(e)) + + def testUnparseableSource(self): + # XXX: should have specific exception for this + self.fetchErrorTest(UNPARSEABLE_ADDR, ReceiveError, + lambda e: "expecting ID" in str(e)) + + def testUnlexableTarget(self): + # XXX: should have specific exception for this + self.sendErrorTest(UNLEXABLE_ADDR, SendError, + lambda e: "unrecognized character" in str(e)) + + def testUnlexableSource(self): + # XXX: should have specific exception for this + self.fetchErrorTest(UNLEXABLE_ADDR, ReceiveError, + lambda e: "unrecognized character" in str(e)) + +SENDER_Q = 'test-sender-q {create: always}' + class SenderTests(Base): def setup_connection(self): - return Connection.open(self.broker.host, self.broker.port) + return Connection.open(self.broker.host, self.broker.port, + reconnect=self.reconnect()) def setup_session(self): return self.conn.session() def setup_sender(self): - return self.ssn.sender("test-sender-queue") + return self.ssn.sender(SENDER_Q) def setup_receiver(self): - return self.ssn.receiver("test-sender-queue") + return self.ssn.receiver(SENDER_Q) def checkContent(self, content): self.snd.send(content) @@ -611,6 +701,7 @@ class SenderTests(Base): except InsufficientCapacity: caught = True break + self.snd.sync() self.drain(self.rcv, expected=msgs) self.ssn.acknowledge() assert caught, "did not exceed capacity" @@ -643,19 +734,22 @@ class MessageTests(Base): m.content = u"<html/>" assert m.content_type == "text/html; charset=utf8" +ECHO_Q = 'test-message-echo-queue {create: always}' + class MessageEchoTests(Base): def setup_connection(self): - return Connection.open(self.broker.host, self.broker.port) + return Connection.open(self.broker.host, self.broker.port, + reconnect=self.reconnect()) def setup_session(self): return self.conn.session() def setup_sender(self): - return self.ssn.sender("test-message-echo-queue") + return self.ssn.sender(ECHO_Q) def setup_receiver(self): - return self.ssn.receiver("test-message-echo-queue") + return self.ssn.receiver(ECHO_Q) def check(self, msg): self.snd.send(msg) |