diff options
author | Rafael H. Schloming <rhs@apache.org> | 2008-04-22 16:11:34 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2008-04-22 16:11:34 +0000 |
commit | 6b4af6b24c0e29007c28998d4d7d19383c0ae702 (patch) | |
tree | e0c01ddcddbb5b5bfa2cfdd22980d39ed46810b4 | |
parent | ab24602c21632ffb3f0748331819b2e099b188da (diff) | |
download | qpid-python-6b4af6b24c0e29007c28998d4d7d19383c0ae702.tar.gz |
QPID-947: update cpp and python management to 0-10 final
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@650565 13f79535-47bb-0310-9956-ffa450edef68
27 files changed, 1453 insertions, 1328 deletions
diff --git a/qpid/cpp/src/qpid/framing/FieldTable.cpp b/qpid/cpp/src/qpid/framing/FieldTable.cpp index 089bc5d4a5..ac2a0f286d 100644 --- a/qpid/cpp/src/qpid/framing/FieldTable.cpp +++ b/qpid/cpp/src/qpid/framing/FieldTable.cpp @@ -66,7 +66,7 @@ void FieldTable::set(const std::string& name, const ValuePtr& value){ } void FieldTable::setString(const std::string& name, const std::string& value){ - values[name] = ValuePtr(new StringValue(value)); + values[name] = ValuePtr(new Str16Value(value)); } void FieldTable::setInt(const std::string& name, int value){ diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 769593c8d2..9b4290232d 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -24,7 +24,7 @@ #include "qpid/log/Statement.h" #include <qpid/broker/Message.h> #include <qpid/broker/MessageDelivery.h> -#include "qpid/framing/MessageXTransferBody.h" +#include "qpid/framing/MessageTransferBody.h" #include <list> #include <iostream> #include <fstream> @@ -218,8 +218,8 @@ void ManagementAgent::SendBuffer (Buffer& buf, return; intrusive_ptr<Message> msg (new Message ()); - AMQFrame method (in_place<MessageXTransferBody>( - ProtocolVersion(), 0, exchange->getName (), 0, 0)); + AMQFrame method (in_place<MessageTransferBody>( + ProtocolVersion(), exchange->getName (), 0, 0)); AMQFrame header (in_place<AMQHeaderBody>()); AMQFrame content(in_place<AMQContentBody>()); @@ -233,8 +233,8 @@ void ManagementAgent::SendBuffer (Buffer& buf, msg->getFrames().append(method); msg->getFrames().append(header); - PreviewMessageProperties* props = - msg->getFrames().getHeaders()->get<PreviewMessageProperties>(true); + MessageProperties* props = + msg->getFrames().getHeaders()->get<MessageProperties>(true); props->setContentLength(length); msg->getFrames().append(content); @@ -394,8 +394,8 @@ void ManagementAgent::dispatchMethod (Message& msg, uint64_t objId = inBuffer.getLongLong (); string replyToKey; - const framing::PreviewMessageProperties* p = - msg.getFrames().getHeaders()->get<framing::PreviewMessageProperties>(); + const framing::MessageProperties* p = + msg.getFrames().getHeaders()->get<framing::MessageProperties>(); if (p && p->hasReplyTo()) { const framing::ReplyTo& rt = p->getReplyTo (); @@ -601,8 +601,8 @@ void ManagementAgent::dispatchAgentCommand (Message& msg) uint32_t sequence; string replyToKey; - const framing::PreviewMessageProperties* p = - msg.getFrames().getHeaders()->get<framing::PreviewMessageProperties>(); + const framing::MessageProperties* p = + msg.getFrames().getHeaders()->get<framing::MessageProperties>(); if (p && p->hasReplyTo()) { const framing::ReplyTo& rt = p->getReplyTo (); diff --git a/qpid/cpp/src/tests/FieldTable.cpp b/qpid/cpp/src/tests/FieldTable.cpp index 716a5d3d39..ba506d4101 100644 --- a/qpid/cpp/src/tests/FieldTable.cpp +++ b/qpid/cpp/src/tests/FieldTable.cpp @@ -32,7 +32,7 @@ QPID_AUTO_TEST_CASE(testMe) { FieldTable ft; ft.setString("A", "BCDE"); - BOOST_CHECK(StringValue("BCDE") == *ft.get("A")); + BOOST_CHECK(string("BCDE") == ft.getString("A")); char buff[100]; Buffer wbuffer(buff, 100); @@ -41,7 +41,7 @@ QPID_AUTO_TEST_CASE(testMe) Buffer rbuffer(buff, 100); FieldTable ft2; rbuffer.get(ft2); - BOOST_CHECK(StringValue("BCDE") == *ft2.get("A")); + BOOST_CHECK(string("BCDE") == ft2.getString("A")); } @@ -55,8 +55,8 @@ QPID_AUTO_TEST_CASE(testAssignment) b = a; a.setString("A", "CCCC"); - BOOST_CHECK(StringValue("CCCC") == *a.get("A")); - BOOST_CHECK(StringValue("BBBB") == *b.get("A")); + BOOST_CHECK(string("CCCC") == a.getString("A")); + BOOST_CHECK(string("BBBB") == b.getString("A")); BOOST_CHECK_EQUAL(1234, a.getInt("B")); BOOST_CHECK_EQUAL(1234, b.getInt("B")); BOOST_CHECK(IntegerValue(1234) == *a.get("B")); @@ -74,10 +74,10 @@ QPID_AUTO_TEST_CASE(testAssignment) Buffer rbuffer(buff, c.size()); rbuffer.get(d); BOOST_CHECK_EQUAL(c, d); - BOOST_CHECK(StringValue("CCCC") == *c.get("A")); + BOOST_CHECK(string("CCCC") == c.getString("A")); BOOST_CHECK(IntegerValue(1234) == *c.get("B")); } - BOOST_CHECK(StringValue("CCCC") == *d.get("A")); + BOOST_CHECK(string("CCCC") == d.getString("A")); BOOST_CHECK(IntegerValue(1234) == *d.get("B")); } diff --git a/qpid/cpp/src/tests/MessageTest.cpp b/qpid/cpp/src/tests/MessageTest.cpp index d7688c74a9..f642658825 100644 --- a/qpid/cpp/src/tests/MessageTest.cpp +++ b/qpid/cpp/src/tests/MessageTest.cpp @@ -87,7 +87,7 @@ class MessageTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL((uint64_t) data1.size() + data2.size(), msg->contentSize()); CPPUNIT_ASSERT_EQUAL((uint64_t) data1.size() + data2.size(), msg->getProperties<MessageProperties>()->getContentLength()); CPPUNIT_ASSERT_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId()); - CPPUNIT_ASSERT(StringValue("xyz") == *msg->getProperties<MessageProperties>()->getApplicationHeaders().get("abc")); + CPPUNIT_ASSERT_EQUAL(string("xyz"), msg->getProperties<MessageProperties>()->getApplicationHeaders().getString("abc")); CPPUNIT_ASSERT_EQUAL((uint8_t) PERSISTENT, msg->getProperties<DeliveryProperties>()->getDeliveryMode()); CPPUNIT_ASSERT(msg->isPersistent()); } diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py index 28ebc4a24d..dbd244bcda 100755 --- a/qpid/cpp/src/tests/federation.py +++ b/qpid/cpp/src/tests/federation.py @@ -19,10 +19,10 @@ # import sys -from qpid.testlib import TestBase, testrunner +from qpid.testlib import TestBase010, testrunner from qpid.management import managementChannel, managementClient +from qpid.datatypes import Message from qpid.queue import Empty -from qpid.content import Content def scan_args(name, default=None, args=sys.argv[1:]): @@ -51,9 +51,9 @@ def remote_port(): class Helper: def __init__(self, parent): self.parent = parent - self.channel = parent.client.channel(2) - self.mc = managementClient(self.channel.spec) - self.mch = self.mc.addChannel(self.channel) + self.session = parent.conn.session("2") + self.mc = managementClient(self.session.spec) + self.mch = self.mc.addChannel(self.session) self.mc.syncWaitForStable(self.mch) def get_objects(self, type): @@ -75,7 +75,7 @@ class Helper: def assertEqual(self, a, b): self.parent.assertEqual(a, b) -class FederationTests(TestBase): +class FederationTests(TestBase010): def test_bridge_create_and_close(self): mgmt = Helper(self) @@ -94,8 +94,8 @@ class FederationTests(TestBase): mgmt.call_method(link, "close") self.assertEqual(len(mgmt.get_objects("link")), 0) - def test_pull_from_exchange(self): - channel = self.channel + def DISABLED_test_pull_from_exchange(self): + session = self.session mgmt = Helper(self) broker = mgmt.get_object("broker") @@ -107,18 +107,18 @@ class FederationTests(TestBase): bridge = mgmt.get_object("bridge") #setup queue to receive messages from local broker - channel.queue_declare(queue="fed1", exclusive=True, auto_delete=True) - channel.queue_bind(queue="fed1", exchange="amq.fanout") + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="amq.fanout") self.subscribe(queue="fed1", destination="f1") - queue = self.client.queue("f1") + queue = session.incoming("f1") #send messages to remote broker and confirm it is routed to local broker r_conn = self.connect(host=remote_host(), port=remote_port()) - r_channel = r_conn.channel(1) - r_channel.session_open() + r_session = r_conn.session("1") for i in range(1, 11): - r_channel.message_transfer(destination="amq.direct", content=Content(properties={'routing_key' : "my-key"}, body="Message %d" % i)) + dp = r_session.delivery_properties(routing_key="my-key") + r_session.message_transfer(destination="amq.direct", message=Message(dp, "Message %d" % i)) for i in range(1, 11): msg = queue.get(timeout=5) @@ -135,22 +135,22 @@ class FederationTests(TestBase): mgmt.call_method(link, "close") self.assertEqual(len(mgmt.get_objects("link")), 0) - def test_pull_from_queue(self): - channel = self.channel + def DISABLED_test_pull_from_queue(self): + session = self.session #setup queue on remote broker and add some messages r_conn = self.connect(host=remote_host(), port=remote_port()) - r_channel = r_conn.channel(1) - r_channel.session_open() - r_channel.queue_declare(queue="my-bridge-queue", exclusive=True, auto_delete=True) + r_session = r_conn.session("1") + r_session.queue_declare(queue="my-bridge-queue", exclusive=True, auto_delete=True) for i in range(1, 6): - r_channel.message_transfer(content=Content(properties={'routing_key' : "my-bridge-queue"}, body="Message %d" % i)) + dp = r_session.delivery_properties(routing_key="my-bridge-queue") + r_session.message_transfer(message=Message(dp, "Message %d" % i)) #setup queue to receive messages from local broker - channel.queue_declare(queue="fed1", exclusive=True, auto_delete=True) - channel.queue_bind(queue="fed1", exchange="amq.fanout") + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="amq.fanout") self.subscribe(queue="fed1", destination="f1") - queue = self.client.queue("f1") + queue = session.incoming("f1") mgmt = Helper(self) broker = mgmt.get_object("broker") @@ -163,7 +163,8 @@ class FederationTests(TestBase): #add some more messages (i.e. after bridge was created) for i in range(6, 11): - r_channel.message_transfer(content=Content(properties={'routing_key' : "my-bridge-queue"}, body="Message %d" % i)) + dp = r_session.delivery_properties(routing_key="my-bridge-queue") + r_session.message_transfer(message=Message(dp, "Message %d" % i)) for i in range(1, 11): msg = queue.get(timeout=5) diff --git a/qpid/cpp/src/tests/run_federation_tests b/qpid/cpp/src/tests/run_federation_tests index 1f5917af0e..4a5fd39a41 100755 --- a/qpid/cpp/src/tests/run_federation_tests +++ b/qpid/cpp/src/tests/run_federation_tests @@ -19,6 +19,6 @@ if test -d ../../../python ; then start_brokers echo "Running federation tests using brokers on ports $LOCAL_PORT $REMOTE_PORT" export PYTHONPATH=../../../python - ./federation.py -v -s ../../../specs/amqp.0-10-preview.xml -b localhost:$LOCAL_PORT --remote-port $REMOTE_PORT || { echo "FAIL federation tests"; exit 1; } + ./federation.py -v -s ../../../specs/amqp.0-10-qpid-errata.xml -b localhost:$LOCAL_PORT --remote-port $REMOTE_PORT || { echo "FAIL federation tests"; exit 1; } fi diff --git a/qpid/cpp/src/tests/topic_listener.cpp b/qpid/cpp/src/tests/topic_listener.cpp index 3dd042605e..f1007ee4c2 100644 --- a/qpid/cpp/src/tests/topic_listener.cpp +++ b/qpid/cpp/src/tests/topic_listener.cpp @@ -154,11 +154,11 @@ void Listener::received(Message& message){ init = true; cout << "Batch started." << endl; } - FieldTable::ValuePtr type(message.getHeaders().get("TYPE")); + string type = message.getHeaders().getString("TYPE"); - if(!!type && StringValue("TERMINATION_REQUEST") == *type){ + if(string("TERMINATION_REQUEST") == type){ shutdown(); - }else if(!!type && StringValue("REPORT_REQUEST") == *type){ + }else if(string("REPORT_REQUEST") == type){ mgr.getAckPolicy().ackOutstanding(session);//acknowledge everything upto this point cout <<"Batch ended, sending report." << endl; //send a report: diff --git a/qpid/python/qpid/client.py b/qpid/python/qpid/client.py index 13c3f13fe5..4605710de8 100644 --- a/qpid/python/qpid/client.py +++ b/qpid/python/qpid/client.py @@ -25,7 +25,7 @@ interacting with the server. import os, threading from peer import Peer, Channel, Closed from delegate import Delegate -from connection import Connection, Frame, connect +from connection08 import Connection, Frame, connect from spec import load from queue import Queue from reference import ReferenceId, References diff --git a/qpid/python/qpid/codec.py b/qpid/python/qpid/codec.py index dfa74b6a2f..8026b209dc 100644 --- a/qpid/python/qpid/codec.py +++ b/qpid/python/qpid/codec.py @@ -26,7 +26,7 @@ fields. The unit test for this module is located in tests/codec.py """ -import re, qpid, spec +import re, qpid, spec08 from cStringIO import StringIO from struct import * from reference import ReferenceId @@ -156,7 +156,7 @@ class Codec: """ calls the appropriate encode function e.g. encode_octet, encode_short etc. """ - if isinstance(type, spec.Struct): + if isinstance(type, spec08.Struct): self.encode_struct(type, value) else: getattr(self, "encode_" + type)(value) @@ -165,7 +165,7 @@ class Codec: """ calls the appropriate decode function e.g. decode_octet, decode_short etc. """ - if isinstance(type, spec.Struct): + if isinstance(type, spec08.Struct): return self.decode_struct(type) else: return getattr(self, "decode_" + type)() diff --git a/qpid/python/qpid/codec010.py b/qpid/python/qpid/codec010.py index f82a7a49dc..27fcd5d418 100644 --- a/qpid/python/qpid/codec010.py +++ b/qpid/python/qpid/codec010.py @@ -221,12 +221,18 @@ class Codec(Packer): attr = "write_uint%d" % (width*8) getattr(self, attr)(n) + def read_uuid(self): + return self.unpack("16s") + def write_uuid(self, s): self.pack("16s", s) - def read_uuid(self): + def read_bin128(self): return self.unpack("16s") + def write_bin128(self, b): + self.pack("16s", b) + class StringCodec(Codec): diff --git a/qpid/python/qpid/connection.py b/qpid/python/qpid/connection.py index eafad7067a..dc72cd9cb8 100644 --- a/qpid/python/qpid/connection.py +++ b/qpid/python/qpid/connection.py @@ -17,467 +17,165 @@ # under the License. # -""" -A Connection class containing socket code that uses the spec metadata -to read and write Frame objects. This could be used by a client, -server, or even a proxy implementation. -""" +import datatypes, session +from threading import Thread, Condition, RLock +from util import wait +from framer import Closed +from assembler import Assembler, Segment +from codec010 import StringCodec +from session import Session +from invoker import Invoker +from spec010 import Control, Command +from exceptions import * +from logging import getLogger +import delegates -import socket, codec, logging, qpid -from cStringIO import StringIO -from spec import load -from codec import EOF +class ChannelBusy(Exception): pass -class SockIO: +class ChannelsBusy(Exception): pass - def __init__(self, sock): - self.sock = sock +class SessionBusy(Exception): pass - def write(self, buf): -# print "OUT: %r" % buf - self.sock.sendall(buf) +def client(*args): + return delegates.Client(*args) - def read(self, n): - data = "" - while len(data) < n: - try: - s = self.sock.recv(n - len(data)) - except socket.error: - break - if len(s) == 0: - break -# print "IN: %r" % s - data += s - return data - - def flush(self): - pass - - def close(self): - self.sock.shutdown(socket.SHUT_RDWR) +def server(*args): + return delegates.Server(*args) -def connect(host, port): - sock = socket.socket() - sock.connect((host, port)) - sock.setblocking(1) - return SockIO(sock) +class Connection(Assembler): -def listen(host, port, predicate = lambda: True): - sock = socket.socket() - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind((host, port)) - sock.listen(5) - while predicate(): - s, a = sock.accept() - yield SockIO(s) - -class Connection: - - def __init__(self, io, spec): - self.codec = codec.Codec(io, spec) + def __init__(self, sock, spec, delegate=client): + Assembler.__init__(self, sock) self.spec = spec - self.FRAME_END = self.spec.constants.byname["frame_end"].id - self.write = getattr(self, "write_%s_%s" % (self.spec.major, self.spec.minor)) - self.read = getattr(self, "read_%s_%s" % (self.spec.major, self.spec.minor)) - - def flush(self): - self.codec.flush() - - INIT="!4s4B" + self.track = self.spec["track"] - def init(self): - self.codec.pack(Connection.INIT, "AMQP", 1, 1, self.spec.major, - self.spec.minor) + self.lock = RLock() + self.attached = {} + self.sessions = {} - def tini(self): - self.codec.unpack(Connection.INIT) + self.condition = Condition() + self.opened = False - def write_8_0(self, frame): - c = self.codec - c.encode_octet(self.spec.constants.byname[frame.type].id) - c.encode_short(frame.channel) - body = StringIO() - enc = codec.Codec(body, self.spec) - frame.encode(enc) - enc.flush() - c.encode_longstr(body.getvalue()) - c.encode_octet(self.FRAME_END) + self.thread = Thread(target=self.run) + self.thread.setDaemon(True) - def read_8_0(self): - c = self.codec - type = self.spec.constants.byid[c.decode_octet()].name - channel = c.decode_short() - body = c.decode_longstr() - dec = codec.Codec(StringIO(body), self.spec) - frame = Frame.DECODERS[type].decode(self.spec, dec, len(body)) - frame.channel = channel - end = c.decode_octet() - if end != self.FRAME_END: - garbage = "" - while end != self.FRAME_END: - garbage += chr(end) - end = c.decode_octet() - raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage) - return frame + self.channel_max = 65535 - def write_0_10(self, frame): - c = self.codec - flags = 0 - if frame.bof: flags |= 0x08 - if frame.eof: flags |= 0x04 - if frame.bos: flags |= 0x02 - if frame.eos: flags |= 0x01 + self.delegate = delegate(self) - c.encode_octet(flags) # TODO: currently fixed at ver=0, B=E=b=e=1 - c.encode_octet(self.spec.constants.byname[frame.type].id) - body = StringIO() - enc = codec.Codec(body, self.spec) - frame.encode(enc) - enc.flush() - frame_size = len(body.getvalue()) + 12 # TODO: Magic number (frame header size) - c.encode_short(frame_size) - c.encode_octet(0) # Reserved - c.encode_octet(frame.subchannel & 0x0f) - c.encode_short(frame.channel) - c.encode_long(0) # Reserved - c.write(body.getvalue()) - c.encode_octet(self.FRAME_END) - - def read_0_10(self): - c = self.codec - flags = c.decode_octet() # TODO: currently ignoring flags - framing_version = (flags & 0xc0) >> 6 - if framing_version != 0: - raise "frame error: unknown framing version" - type = self.spec.constants.byid[c.decode_octet()].name - frame_size = c.decode_short() - if frame_size < 12: # TODO: Magic number (frame header size) - raise "frame error: frame size too small" - reserved1 = c.decode_octet() - field = c.decode_octet() - subchannel = field & 0x0f - channel = c.decode_short() - reserved2 = c.decode_long() # TODO: reserved maybe need to ensure 0 - if (flags & 0x30) != 0 or reserved1 != 0 or (field & 0xf0) != 0: - raise "frame error: reserved bits not all zero" - body_size = frame_size - 12 # TODO: Magic number (frame header size) - body = c.read(body_size) - dec = codec.Codec(StringIO(body), self.spec) + def attach(self, name, ch, delegate, force=False): + self.lock.acquire() try: - frame = Frame.DECODERS[type].decode(self.spec, dec, len(body)) - except EOF: - raise "truncated frame body: %r" % body - frame.channel = channel - frame.subchannel = subchannel - end = c.decode_octet() - if end != self.FRAME_END: - garbage = "" - while end != self.FRAME_END: - garbage += chr(end) - end = c.decode_octet() - raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage) - return frame - - def write_99_0(self, frame): - self.write_0_10(frame) - - def read_99_0(self): - return self.read_0_10() - -class Frame: - - DECODERS = {} - - class __metaclass__(type): - - def __new__(cls, name, bases, dict): - for attr in ("encode", "decode", "type"): - if not dict.has_key(attr): - raise TypeError("%s must define %s" % (name, attr)) - dict["decode"] = staticmethod(dict["decode"]) - if dict.has_key("__init__"): - __init__ = dict["__init__"] - def init(self, *args, **kwargs): - args = list(args) - self.init(args, kwargs) - __init__(self, *args, **kwargs) - dict["__init__"] = init - t = type.__new__(cls, name, bases, dict) - if t.type != None: - Frame.DECODERS[t.type] = t - return t - - type = None - - def init(self, args, kwargs): - self.channel = kwargs.pop("channel", 0) - self.subchannel = kwargs.pop("subchannel", 0) - self.bos = True - self.eos = True - self.bof = True - self.eof = True - - def encode(self, enc): abstract - - def decode(spec, dec, size): abstract - -class Method(Frame): - - type = "frame_method" - - def __init__(self, method, args): - if len(args) != len(method.fields): - argspec = ["%s: %s" % (f.name, f.type) - for f in method.fields] - raise TypeError("%s.%s expecting (%s), got %s" % - (method.klass.name, method.name, ", ".join(argspec), - args)) - self.method = method - self.method_type = method - self.args = args - self.eof = not method.content - - def encode(self, c): - version = (c.spec.major, c.spec.minor) - if version == (0, 10) or version == (99, 0): - c.encode_octet(self.method.klass.id) - c.encode_octet(self.method.id) - else: - c.encode_short(self.method.klass.id) - c.encode_short(self.method.id) - for field, arg in zip(self.method.fields, self.args): - c.encode(field.type, arg) - - def decode(spec, c, size): - version = (c.spec.major, c.spec.minor) - if version == (0, 10) or version == (99, 0): - klass = spec.classes.byid[c.decode_octet()] - meth = klass.methods.byid[c.decode_octet()] - else: - klass = spec.classes.byid[c.decode_short()] - meth = klass.methods.byid[c.decode_short()] - args = tuple([c.decode(f.type) for f in meth.fields]) - return Method(meth, args) - - def __str__(self): - return "[%s] %s %s" % (self.channel, self.method, - ", ".join([str(a) for a in self.args])) - -class Request(Frame): - - type = "frame_request" - - def __init__(self, id, response_mark, method): - self.id = id - self.response_mark = response_mark - self.method = method - self.method_type = method.method_type - self.args = method.args - - def encode(self, enc): - enc.encode_longlong(self.id) - enc.encode_longlong(self.response_mark) - # reserved - enc.encode_long(0) - self.method.encode(enc) - - def decode(spec, dec, size): - id = dec.decode_longlong() - mark = dec.decode_longlong() - # reserved - dec.decode_long() - method = Method.decode(spec, dec, size - 20) - return Request(id, mark, method) - - def __str__(self): - return "[%s] Request(%s) %s" % (self.channel, self.id, self.method) - -class Response(Frame): - - type = "frame_response" - - def __init__(self, id, request_id, batch_offset, method): - self.id = id - self.request_id = request_id - self.batch_offset = batch_offset - self.method = method - self.method_type = method.method_type - self.args = method.args - - def encode(self, enc): - enc.encode_longlong(self.id) - enc.encode_longlong(self.request_id) - enc.encode_long(self.batch_offset) - self.method.encode(enc) - - def decode(spec, dec, size): - id = dec.decode_longlong() - request_id = dec.decode_longlong() - batch_offset = dec.decode_long() - method = Method.decode(spec, dec, size - 20) - return Response(id, request_id, batch_offset, method) - - def __str__(self): - return "[%s] Response(%s,%s,%s) %s" % (self.channel, self.id, self.request_id, self.batch_offset, self.method) - -def uses_struct_encoding(spec): - return (spec.major == 0 and spec.minor == 10) or (spec.major == 99 and spec.minor == 0) - -class Header(Frame): - - type = "frame_header" - - def __init__(self, klass, weight, size, properties): - self.klass = klass - self.weight = weight - self.size = size - self.properties = properties - self.eof = size == 0 - self.bof = False - - def __getitem__(self, name): - return self.properties[name] - - def __setitem__(self, name, value): - self.properties[name] = value - - def __delitem__(self, name): - del self.properties[name] - - def encode(self, c): - if uses_struct_encoding(c.spec): - self.encode_structs(c) - else: - self.encode_legacy(c) - - def encode_structs(self, c): - # XXX - structs = [qpid.Struct(c.spec.domains.byname["delivery_properties"].type), - qpid.Struct(c.spec.domains.byname["message_properties"].type)] - - # XXX - props = self.properties.copy() - for k in self.properties: - for s in structs: - if s.exists(k): - s.set(k, props.pop(k)) - if props: - raise TypeError("no such property: %s" % (", ".join(props))) - - # message properties store the content-length now, and weight is - # deprecated - if self.size != None: - structs[1].content_length = self.size - - for s in structs: - c.encode_long_struct(s) - - def encode_legacy(self, c): - c.encode_short(self.klass.id) - c.encode_short(self.weight) - c.encode_longlong(self.size) - - # property flags - nprops = len(self.klass.fields) - flags = 0 - for i in range(nprops): - f = self.klass.fields.items[i] - flags <<= 1 - if self.properties.get(f.name) != None: - flags |= 1 - # the last bit indicates more flags - if i > 0 and (i % 15) == 0: - flags <<= 1 - if nprops > (i + 1): - flags |= 1 - c.encode_short(flags) - flags = 0 - flags <<= ((16 - (nprops % 15)) % 16) - c.encode_short(flags) - - # properties - for f in self.klass.fields: - v = self.properties.get(f.name) - if v != None: - c.encode(f.type, v) - - def decode(spec, c, size): - if uses_struct_encoding(spec): - return Header.decode_structs(spec, c, size) + ssn = self.attached.get(ch.id) + if ssn is not None: + if ssn.name != name: + raise ChannelBusy(ch, ssn) + else: + ssn = self.sessions.get(name) + if ssn is None: + ssn = Session(name, self.spec, delegate=delegate) + self.sessions[name] = ssn + elif ssn.channel is not None: + if force: + del self.attached[ssn.channel.id] + ssn.channel = None + else: + raise SessionBusy(ssn) + self.attached[ch.id] = ssn + ssn.channel = ch + ch.session = ssn + return ssn + finally: + self.lock.release() + + def detach(self, name, ch): + self.lock.acquire() + try: + self.attached.pop(ch.id, None) + ssn = self.sessions.pop(name, None) + if ssn is not None: + ssn.channel = None + return ssn + finally: + self.lock.release() + + def __channel(self): + # XXX: ch 0? + for i in xrange(self.channel_max): + if not self.attached.has_key(i): + return i else: - return Header.decode_legacy(spec, c, size) + raise ChannelsBusy() - @staticmethod - def decode_structs(spec, c, size): - structs = [] - start = c.nread - while c.nread - start < size: - structs.append(c.decode_long_struct()) - - # XXX - props = {} - length = None - for s in structs: - for f in s.type.fields: - if s.has(f.name): - props[f.name] = s.get(f.name) - if f.name == "content_length": - length = s.get(f.name) - return Header(None, 0, length, props) - - @staticmethod - def decode_legacy(spec, c, size): - klass = spec.classes.byid[c.decode_short()] - weight = c.decode_short() - size = c.decode_longlong() - - # property flags - bits = [] + def session(self, name, timeout=None, delegate=session.client): + self.lock.acquire() + try: + ch = Channel(self, self.__channel()) + ssn = self.attach(name, ch, delegate) + ssn.channel.session_attach(name) + if wait(ssn.condition, lambda: ssn.channel is not None, timeout): + return ssn + else: + self.detach(name, ch) + raise Timeout() + finally: + self.lock.release() + + def start(self, timeout=None): + self.delegate.start() + self.thread.start() + if not wait(self.condition, lambda: self.opened, timeout): + raise Timeout() + + def run(self): + # XXX: we don't really have a good way to exit this loop without + # getting the other end to kill the socket while True: - flags = c.decode_short() - for i in range(15, 0, -1): - if flags >> i & 0x1 != 0: - bits.append(True) - else: - bits.append(False) - if flags & 0x1 == 0: + try: + seg = self.read_segment() + except Closed: break + self.delegate.received(seg) - # properties - properties = {} - for b, f in zip(bits, klass.fields): - if b: - # Note: decode returns a unicode u'' string but only - # plain '' strings can be used as keywords so we need to - # stringify the names. - properties[str(f.name)] = c.decode(f.type) - return Header(klass, weight, size, properties) + def close(self, timeout=None): + if not self.opened: return + Channel(self, 0).connection_close(200) + if not wait(self.condition, lambda: not self.opened, timeout): + raise Timeout() + self.thread.join(timeout=timeout) def __str__(self): - return "%s %s %s %s" % (self.klass, self.weight, self.size, - self.properties) + return "%s:%s" % self.sock.getsockname() + + def __repr__(self): + return str(self) -class Body(Frame): +log = getLogger("qpid.io.ctl") - type = "frame_body" +class Channel(Invoker): - def __init__(self, content): - self.content = content - self.eof = True - self.bof = False + def __init__(self, connection, id): + self.connection = connection + self.id = id + self.session = None - def encode(self, enc): - enc.write(self.content) + def resolve_method(self, name): + inst = self.connection.spec.instructions.get(name) + if inst is not None and isinstance(inst, Control): + return inst + else: + return None - def decode(spec, dec, size): - return Body(dec.read(size)) + def invoke(self, type, args, kwargs): + ctl = type.new(args, kwargs) + sc = StringCodec(self.connection.spec) + sc.write_control(ctl) + self.connection.write_segment(Segment(True, True, type.segment_type, + type.track, self.id, sc.encoded)) + log.debug("SENT %s", ctl) def __str__(self): - return "Body(%r)" % self.content + return "%s[%s]" % (self.connection, self.id) -# TODO: -# OOB_METHOD = "frame_oob_method" -# OOB_HEADER = "frame_oob_header" -# OOB_BODY = "frame_oob_body" -# TRACE = "frame_trace" -# HEARTBEAT = "frame_heartbeat" + def __repr__(self): + return str(self) diff --git a/qpid/python/qpid/connection010.py b/qpid/python/qpid/connection010.py deleted file mode 100644 index 59aa41c88d..0000000000 --- a/qpid/python/qpid/connection010.py +++ /dev/null @@ -1,181 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import datatypes, session -from threading import Thread, Condition, RLock -from util import wait -from framer import Closed -from assembler import Assembler, Segment -from codec010 import StringCodec -from session import Session -from invoker import Invoker -from spec010 import Control, Command -from exceptions import * -from logging import getLogger -import delegates - -class ChannelBusy(Exception): pass - -class ChannelsBusy(Exception): pass - -class SessionBusy(Exception): pass - -def client(*args): - return delegates.Client(*args) - -def server(*args): - return delegates.Server(*args) - -class Connection(Assembler): - - def __init__(self, sock, spec, delegate=client): - Assembler.__init__(self, sock) - self.spec = spec - self.track = self.spec["track"] - - self.lock = RLock() - self.attached = {} - self.sessions = {} - - self.condition = Condition() - self.opened = False - - self.thread = Thread(target=self.run) - self.thread.setDaemon(True) - - self.channel_max = 65535 - - self.delegate = delegate(self) - - def attach(self, name, ch, delegate, force=False): - self.lock.acquire() - try: - ssn = self.attached.get(ch.id) - if ssn is not None: - if ssn.name != name: - raise ChannelBusy(ch, ssn) - else: - ssn = self.sessions.get(name) - if ssn is None: - ssn = Session(name, self.spec, delegate=delegate) - self.sessions[name] = ssn - elif ssn.channel is not None: - if force: - del self.attached[ssn.channel.id] - ssn.channel = None - else: - raise SessionBusy(ssn) - self.attached[ch.id] = ssn - ssn.channel = ch - ch.session = ssn - return ssn - finally: - self.lock.release() - - def detach(self, name, ch): - self.lock.acquire() - try: - self.attached.pop(ch.id, None) - ssn = self.sessions.pop(name, None) - if ssn is not None: - ssn.channel = None - return ssn - finally: - self.lock.release() - - def __channel(self): - # XXX: ch 0? - for i in xrange(self.channel_max): - if not self.attached.has_key(i): - return i - else: - raise ChannelsBusy() - - def session(self, name, timeout=None, delegate=session.client): - self.lock.acquire() - try: - ch = Channel(self, self.__channel()) - ssn = self.attach(name, ch, delegate) - ssn.channel.session_attach(name) - if wait(ssn.condition, lambda: ssn.channel is not None, timeout): - return ssn - else: - self.detach(name, ch) - raise Timeout() - finally: - self.lock.release() - - def start(self, timeout=None): - self.delegate.start() - self.thread.start() - if not wait(self.condition, lambda: self.opened, timeout): - raise Timeout() - - def run(self): - # XXX: we don't really have a good way to exit this loop without - # getting the other end to kill the socket - while True: - try: - seg = self.read_segment() - except Closed: - break - self.delegate.received(seg) - - def close(self, timeout=None): - if not self.opened: return - Channel(self, 0).connection_close() - if not wait(self.condition, lambda: not self.opened, timeout): - raise Timeout() - self.thread.join(timeout=timeout) - - def __str__(self): - return "%s:%s" % self.sock.getsockname() - - def __repr__(self): - return str(self) - -log = getLogger("qpid.io.ctl") - -class Channel(Invoker): - - def __init__(self, connection, id): - self.connection = connection - self.id = id - self.session = None - - def resolve_method(self, name): - inst = self.connection.spec.instructions.get(name) - if inst is not None and isinstance(inst, Control): - return inst - else: - return None - - def invoke(self, type, args, kwargs): - ctl = type.new(args, kwargs) - sc = StringCodec(self.connection.spec) - sc.write_control(ctl) - self.connection.write_segment(Segment(True, True, type.segment_type, - type.track, self.id, sc.encoded)) - log.debug("SENT %s", ctl) - - def __str__(self): - return "%s[%s]" % (self.connection, self.id) - - def __repr__(self): - return str(self) diff --git a/qpid/python/qpid/connection08.py b/qpid/python/qpid/connection08.py new file mode 100644 index 0000000000..eafad7067a --- /dev/null +++ b/qpid/python/qpid/connection08.py @@ -0,0 +1,483 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +A Connection class containing socket code that uses the spec metadata +to read and write Frame objects. This could be used by a client, +server, or even a proxy implementation. +""" + +import socket, codec, logging, qpid +from cStringIO import StringIO +from spec import load +from codec import EOF + +class SockIO: + + def __init__(self, sock): + self.sock = sock + + def write(self, buf): +# print "OUT: %r" % buf + self.sock.sendall(buf) + + def read(self, n): + data = "" + while len(data) < n: + try: + s = self.sock.recv(n - len(data)) + except socket.error: + break + if len(s) == 0: + break +# print "IN: %r" % s + data += s + return data + + def flush(self): + pass + + def close(self): + self.sock.shutdown(socket.SHUT_RDWR) + +def connect(host, port): + sock = socket.socket() + sock.connect((host, port)) + sock.setblocking(1) + return SockIO(sock) + +def listen(host, port, predicate = lambda: True): + sock = socket.socket() + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind((host, port)) + sock.listen(5) + while predicate(): + s, a = sock.accept() + yield SockIO(s) + +class Connection: + + def __init__(self, io, spec): + self.codec = codec.Codec(io, spec) + self.spec = spec + self.FRAME_END = self.spec.constants.byname["frame_end"].id + self.write = getattr(self, "write_%s_%s" % (self.spec.major, self.spec.minor)) + self.read = getattr(self, "read_%s_%s" % (self.spec.major, self.spec.minor)) + + def flush(self): + self.codec.flush() + + INIT="!4s4B" + + def init(self): + self.codec.pack(Connection.INIT, "AMQP", 1, 1, self.spec.major, + self.spec.minor) + + def tini(self): + self.codec.unpack(Connection.INIT) + + def write_8_0(self, frame): + c = self.codec + c.encode_octet(self.spec.constants.byname[frame.type].id) + c.encode_short(frame.channel) + body = StringIO() + enc = codec.Codec(body, self.spec) + frame.encode(enc) + enc.flush() + c.encode_longstr(body.getvalue()) + c.encode_octet(self.FRAME_END) + + def read_8_0(self): + c = self.codec + type = self.spec.constants.byid[c.decode_octet()].name + channel = c.decode_short() + body = c.decode_longstr() + dec = codec.Codec(StringIO(body), self.spec) + frame = Frame.DECODERS[type].decode(self.spec, dec, len(body)) + frame.channel = channel + end = c.decode_octet() + if end != self.FRAME_END: + garbage = "" + while end != self.FRAME_END: + garbage += chr(end) + end = c.decode_octet() + raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage) + return frame + + def write_0_10(self, frame): + c = self.codec + flags = 0 + if frame.bof: flags |= 0x08 + if frame.eof: flags |= 0x04 + if frame.bos: flags |= 0x02 + if frame.eos: flags |= 0x01 + + c.encode_octet(flags) # TODO: currently fixed at ver=0, B=E=b=e=1 + c.encode_octet(self.spec.constants.byname[frame.type].id) + body = StringIO() + enc = codec.Codec(body, self.spec) + frame.encode(enc) + enc.flush() + frame_size = len(body.getvalue()) + 12 # TODO: Magic number (frame header size) + c.encode_short(frame_size) + c.encode_octet(0) # Reserved + c.encode_octet(frame.subchannel & 0x0f) + c.encode_short(frame.channel) + c.encode_long(0) # Reserved + c.write(body.getvalue()) + c.encode_octet(self.FRAME_END) + + def read_0_10(self): + c = self.codec + flags = c.decode_octet() # TODO: currently ignoring flags + framing_version = (flags & 0xc0) >> 6 + if framing_version != 0: + raise "frame error: unknown framing version" + type = self.spec.constants.byid[c.decode_octet()].name + frame_size = c.decode_short() + if frame_size < 12: # TODO: Magic number (frame header size) + raise "frame error: frame size too small" + reserved1 = c.decode_octet() + field = c.decode_octet() + subchannel = field & 0x0f + channel = c.decode_short() + reserved2 = c.decode_long() # TODO: reserved maybe need to ensure 0 + if (flags & 0x30) != 0 or reserved1 != 0 or (field & 0xf0) != 0: + raise "frame error: reserved bits not all zero" + body_size = frame_size - 12 # TODO: Magic number (frame header size) + body = c.read(body_size) + dec = codec.Codec(StringIO(body), self.spec) + try: + frame = Frame.DECODERS[type].decode(self.spec, dec, len(body)) + except EOF: + raise "truncated frame body: %r" % body + frame.channel = channel + frame.subchannel = subchannel + end = c.decode_octet() + if end != self.FRAME_END: + garbage = "" + while end != self.FRAME_END: + garbage += chr(end) + end = c.decode_octet() + raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage) + return frame + + def write_99_0(self, frame): + self.write_0_10(frame) + + def read_99_0(self): + return self.read_0_10() + +class Frame: + + DECODERS = {} + + class __metaclass__(type): + + def __new__(cls, name, bases, dict): + for attr in ("encode", "decode", "type"): + if not dict.has_key(attr): + raise TypeError("%s must define %s" % (name, attr)) + dict["decode"] = staticmethod(dict["decode"]) + if dict.has_key("__init__"): + __init__ = dict["__init__"] + def init(self, *args, **kwargs): + args = list(args) + self.init(args, kwargs) + __init__(self, *args, **kwargs) + dict["__init__"] = init + t = type.__new__(cls, name, bases, dict) + if t.type != None: + Frame.DECODERS[t.type] = t + return t + + type = None + + def init(self, args, kwargs): + self.channel = kwargs.pop("channel", 0) + self.subchannel = kwargs.pop("subchannel", 0) + self.bos = True + self.eos = True + self.bof = True + self.eof = True + + def encode(self, enc): abstract + + def decode(spec, dec, size): abstract + +class Method(Frame): + + type = "frame_method" + + def __init__(self, method, args): + if len(args) != len(method.fields): + argspec = ["%s: %s" % (f.name, f.type) + for f in method.fields] + raise TypeError("%s.%s expecting (%s), got %s" % + (method.klass.name, method.name, ", ".join(argspec), + args)) + self.method = method + self.method_type = method + self.args = args + self.eof = not method.content + + def encode(self, c): + version = (c.spec.major, c.spec.minor) + if version == (0, 10) or version == (99, 0): + c.encode_octet(self.method.klass.id) + c.encode_octet(self.method.id) + else: + c.encode_short(self.method.klass.id) + c.encode_short(self.method.id) + for field, arg in zip(self.method.fields, self.args): + c.encode(field.type, arg) + + def decode(spec, c, size): + version = (c.spec.major, c.spec.minor) + if version == (0, 10) or version == (99, 0): + klass = spec.classes.byid[c.decode_octet()] + meth = klass.methods.byid[c.decode_octet()] + else: + klass = spec.classes.byid[c.decode_short()] + meth = klass.methods.byid[c.decode_short()] + args = tuple([c.decode(f.type) for f in meth.fields]) + return Method(meth, args) + + def __str__(self): + return "[%s] %s %s" % (self.channel, self.method, + ", ".join([str(a) for a in self.args])) + +class Request(Frame): + + type = "frame_request" + + def __init__(self, id, response_mark, method): + self.id = id + self.response_mark = response_mark + self.method = method + self.method_type = method.method_type + self.args = method.args + + def encode(self, enc): + enc.encode_longlong(self.id) + enc.encode_longlong(self.response_mark) + # reserved + enc.encode_long(0) + self.method.encode(enc) + + def decode(spec, dec, size): + id = dec.decode_longlong() + mark = dec.decode_longlong() + # reserved + dec.decode_long() + method = Method.decode(spec, dec, size - 20) + return Request(id, mark, method) + + def __str__(self): + return "[%s] Request(%s) %s" % (self.channel, self.id, self.method) + +class Response(Frame): + + type = "frame_response" + + def __init__(self, id, request_id, batch_offset, method): + self.id = id + self.request_id = request_id + self.batch_offset = batch_offset + self.method = method + self.method_type = method.method_type + self.args = method.args + + def encode(self, enc): + enc.encode_longlong(self.id) + enc.encode_longlong(self.request_id) + enc.encode_long(self.batch_offset) + self.method.encode(enc) + + def decode(spec, dec, size): + id = dec.decode_longlong() + request_id = dec.decode_longlong() + batch_offset = dec.decode_long() + method = Method.decode(spec, dec, size - 20) + return Response(id, request_id, batch_offset, method) + + def __str__(self): + return "[%s] Response(%s,%s,%s) %s" % (self.channel, self.id, self.request_id, self.batch_offset, self.method) + +def uses_struct_encoding(spec): + return (spec.major == 0 and spec.minor == 10) or (spec.major == 99 and spec.minor == 0) + +class Header(Frame): + + type = "frame_header" + + def __init__(self, klass, weight, size, properties): + self.klass = klass + self.weight = weight + self.size = size + self.properties = properties + self.eof = size == 0 + self.bof = False + + def __getitem__(self, name): + return self.properties[name] + + def __setitem__(self, name, value): + self.properties[name] = value + + def __delitem__(self, name): + del self.properties[name] + + def encode(self, c): + if uses_struct_encoding(c.spec): + self.encode_structs(c) + else: + self.encode_legacy(c) + + def encode_structs(self, c): + # XXX + structs = [qpid.Struct(c.spec.domains.byname["delivery_properties"].type), + qpid.Struct(c.spec.domains.byname["message_properties"].type)] + + # XXX + props = self.properties.copy() + for k in self.properties: + for s in structs: + if s.exists(k): + s.set(k, props.pop(k)) + if props: + raise TypeError("no such property: %s" % (", ".join(props))) + + # message properties store the content-length now, and weight is + # deprecated + if self.size != None: + structs[1].content_length = self.size + + for s in structs: + c.encode_long_struct(s) + + def encode_legacy(self, c): + c.encode_short(self.klass.id) + c.encode_short(self.weight) + c.encode_longlong(self.size) + + # property flags + nprops = len(self.klass.fields) + flags = 0 + for i in range(nprops): + f = self.klass.fields.items[i] + flags <<= 1 + if self.properties.get(f.name) != None: + flags |= 1 + # the last bit indicates more flags + if i > 0 and (i % 15) == 0: + flags <<= 1 + if nprops > (i + 1): + flags |= 1 + c.encode_short(flags) + flags = 0 + flags <<= ((16 - (nprops % 15)) % 16) + c.encode_short(flags) + + # properties + for f in self.klass.fields: + v = self.properties.get(f.name) + if v != None: + c.encode(f.type, v) + + def decode(spec, c, size): + if uses_struct_encoding(spec): + return Header.decode_structs(spec, c, size) + else: + return Header.decode_legacy(spec, c, size) + + @staticmethod + def decode_structs(spec, c, size): + structs = [] + start = c.nread + while c.nread - start < size: + structs.append(c.decode_long_struct()) + + # XXX + props = {} + length = None + for s in structs: + for f in s.type.fields: + if s.has(f.name): + props[f.name] = s.get(f.name) + if f.name == "content_length": + length = s.get(f.name) + return Header(None, 0, length, props) + + @staticmethod + def decode_legacy(spec, c, size): + klass = spec.classes.byid[c.decode_short()] + weight = c.decode_short() + size = c.decode_longlong() + + # property flags + bits = [] + while True: + flags = c.decode_short() + for i in range(15, 0, -1): + if flags >> i & 0x1 != 0: + bits.append(True) + else: + bits.append(False) + if flags & 0x1 == 0: + break + + # properties + properties = {} + for b, f in zip(bits, klass.fields): + if b: + # Note: decode returns a unicode u'' string but only + # plain '' strings can be used as keywords so we need to + # stringify the names. + properties[str(f.name)] = c.decode(f.type) + return Header(klass, weight, size, properties) + + def __str__(self): + return "%s %s %s %s" % (self.klass, self.weight, self.size, + self.properties) + +class Body(Frame): + + type = "frame_body" + + def __init__(self, content): + self.content = content + self.eof = True + self.bof = False + + def encode(self, enc): + enc.write(self.content) + + def decode(spec, dec, size): + return Body(dec.read(size)) + + def __str__(self): + return "Body(%r)" % self.content + +# TODO: +# OOB_METHOD = "frame_oob_method" +# OOB_HEADER = "frame_oob_header" +# OOB_BODY = "frame_oob_body" +# TRACE = "frame_trace" +# HEARTBEAT = "frame_heartbeat" diff --git a/qpid/python/qpid/delegate.py b/qpid/python/qpid/delegate.py index c4c47592b4..b447c4aa29 100644 --- a/qpid/python/qpid/delegate.py +++ b/qpid/python/qpid/delegate.py @@ -22,7 +22,7 @@ Delegate implementation intended for use with the peer module. """ import threading, inspect, traceback, sys -from connection import Method, Request, Response +from connection08 import Method, Request, Response def _handler_name(method): return "%s_%s" % (method.klass.name, method.name) diff --git a/qpid/python/qpid/delegates.py b/qpid/python/qpid/delegates.py index 9df2cd04bd..c958313671 100644 --- a/qpid/python/qpid/delegates.py +++ b/qpid/python/qpid/delegates.py @@ -17,7 +17,7 @@ # under the License. # -import os, connection010, session +import os, connection, session from util import notify from datatypes import RangedSet from logging import getLogger @@ -35,7 +35,7 @@ class Delegate: def received(self, seg): ssn = self.connection.attached.get(seg.channel) if ssn is None: - ch = connection010.Channel(self.connection, seg.channel) + ch = connection.Channel(self.connection, seg.channel) else: ch = ssn.channel @@ -61,9 +61,9 @@ class Delegate: try: self.connection.attach(a.name, ch, self.delegate, a.force) ch.session_attached(a.name) - except connection010.ChannelBusy: + except connection.ChannelBusy: ch.session_detached(a.name) - except connection010.SessionBusy: + except connection.SessionBusy: ch.session_detached(a.name) def session_attached(self, ch, a): @@ -105,7 +105,7 @@ class Server(Delegate): def start(self): self.connection.read_header() self.connection.write_header(self.spec.major, self.spec.minor) - connection010.Channel(self.connection, 0).connection_start() + connection.Channel(self.connection, 0).connection_start() def connection_start_ok(self, ch, start_ok): ch.connection_tune() diff --git a/qpid/python/qpid/framer.py b/qpid/python/qpid/framer.py index 11fe385d46..fb0e677cee 100644 --- a/qpid/python/qpid/framer.py +++ b/qpid/python/qpid/framer.py @@ -19,6 +19,7 @@ import struct, socket from packer import Packer +from threading import Lock from logging import getLogger raw = getLogger("qpid.io.raw") @@ -75,6 +76,7 @@ class Framer(Packer): def __init__(self, sock): self.sock = sock + self.sock_lock = Lock() def aborted(self): return False @@ -116,16 +118,24 @@ class Framer(Packer): return self.unpack(Framer.HEADER) def write_header(self, major, minor): - self.pack(Framer.HEADER, "AMQP", 1, 1, major, minor) + self.sock_lock.acquire() + try: + self.pack(Framer.HEADER, "AMQP", 1, 1, major, minor) + finally: + self.sock_lock.release() def write_frame(self, frame): - size = len(frame.payload) + struct.calcsize(Frame.HEADER) - track = frame.track & 0x0F - self.pack(Frame.HEADER, frame.flags, frame.type, size, track, frame.channel) - self.write(frame.payload) - # XXX: NOT 0-10 FINAL, TEMPORARY WORKAROUND for C++ - self.write("\xCE") - frm.debug("SENT %s", frame) + self.sock_lock.acquire() + try: + size = len(frame.payload) + struct.calcsize(Frame.HEADER) + track = frame.track & 0x0F + self.pack(Frame.HEADER, frame.flags, frame.type, size, track, frame.channel) + self.write(frame.payload) + # XXX: NOT 0-10 FINAL, TEMPORARY WORKAROUND for C++ + self.write("\xCE") + frm.debug("SENT %s", frame) + finally: + self.sock_lock.release() def read_frame(self): flags, type, size, track, channel = self.unpack(Frame.HEADER) diff --git a/qpid/python/qpid/management.py b/qpid/python/qpid/management.py index 6b25d5ea08..3a7a564e19 100644 --- a/qpid/python/qpid/management.py +++ b/qpid/python/qpid/management.py @@ -25,12 +25,10 @@ import qpid import struct import socket from threading import Thread -from message import Message +from datatypes import Message, RangedSet from time import time -from qpid.client import Client -from qpid.content import Content from cStringIO import StringIO -from codec import Codec, EOF +from codec010 import StringCodec as Codec from threading import Lock, Condition @@ -83,40 +81,39 @@ class methodResult: class managementChannel: """ This class represents a connection to an AMQP broker. """ - def __init__ (self, ch, topicCb, replyCb, cbContext, _detlife=0): + def __init__ (self, ssn, topicCb, replyCb, cbContext, _detlife=0): """ Given a channel on an established AMQP broker connection, this method opens a session and performs all of the declarations and bindings needed to participate in the management protocol. """ - response = ch.session_open (detached_lifetime=_detlife) - self.sessionId = response.session_id - self.topicName = "mgmt-%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack ("!LHHHHL", response.session_id) - self.replyName = "repl-%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack ("!LHHHHL", response.session_id) - self.qpidChannel = ch + self.sessionId = ssn.name + self.topicName = "mgmt-%s" % self.sessionId + self.replyName = "repl-%s" % self.sessionId + self.qpidChannel = ssn self.tcb = topicCb self.rcb = replyCb self.context = cbContext self.reqsOutstanding = 0 - ch.queue_declare (queue=self.topicName, exclusive=True, auto_delete=True) - ch.queue_declare (queue=self.replyName, exclusive=True, auto_delete=True) + ssn.queue_declare (queue=self.topicName, exclusive=True, auto_delete=True) + ssn.queue_declare (queue=self.replyName, exclusive=True, auto_delete=True) - ch.queue_bind (exchange="qpid.management", - queue=self.topicName, routing_key="mgmt.#") - ch.queue_bind (exchange="amq.direct", - queue=self.replyName, routing_key=self.replyName) - ch.message_subscribe (queue=self.topicName, destination="tdest") - ch.message_subscribe (queue=self.replyName, destination="rdest") + ssn.exchange_bind (exchange="qpid.management", + queue=self.topicName, binding_key="mgmt.#") + ssn.exchange_bind (exchange="amq.direct", + queue=self.replyName, binding_key=self.replyName) + ssn.message_subscribe (queue=self.topicName, destination="tdest") + ssn.message_subscribe (queue=self.replyName, destination="rdest") - ch.client.queue ("tdest").listen (self.topicCb) - ch.client.queue ("rdest").listen (self.replyCb) + ssn.incoming ("tdest").listen (self.topicCb) + ssn.incoming ("rdest").listen (self.replyCb) - ch.message_flow_mode (destination="tdest", mode=1) - ch.message_flow (destination="tdest", unit=0, value=0xFFFFFFFF) - ch.message_flow (destination="tdest", unit=1, value=0xFFFFFFFF) + ssn.message_set_flow_mode (destination="tdest", flow_mode=1) + ssn.message_flow (destination="tdest", unit=0, value=0xFFFFFFFF) + ssn.message_flow (destination="tdest", unit=1, value=0xFFFFFFFF) - ch.message_flow_mode (destination="rdest", mode=1) - ch.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF) - ch.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF) + ssn.message_set_flow_mode (destination="rdest", flow_mode=1) + ssn.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF) + ssn.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF) def topicCb (self, msg): """ Receive messages via the topic queue on this channel. """ @@ -127,7 +124,18 @@ class managementChannel: self.rcb (self, msg) def send (self, exchange, msg): - self.qpidChannel.message_transfer (destination=exchange, content=msg) + self.qpidChannel.message_transfer (destination=exchange, message=msg) + + def accept (self, msg): + self.qpidChannel.message_accept(RangedSet(msg.id)) + + def message (self, body, routing_key="agent"): + dp = self.qpidChannel.delivery_properties() + dp.routing_key = routing_key + mp = self.qpidChannel.message_properties() + mp.content_type = "application/octet-stream" + mp.reply_to = self.qpidChannel.reply_to("amq.direct", self.replyName) + return Message(dp, mp, body) class managementClient: @@ -177,14 +185,9 @@ class managementClient: self.channels.append (mch) self.incOutstanding (mch) - codec = Codec (StringIO (), self.spec) + codec = Codec (self.spec) self.setHeader (codec, ord ('B')) - msg = Content (codec.stream.getvalue ()) - msg["content_type"] = "application/octet-stream" - msg["routing_key"] = "agent" - msg["reply_to"] = self.spec.struct ("reply_to") - msg["reply_to"]["exchange_name"] = "amq.direct" - msg["reply_to"]["routing_key"] = mch.replyName + msg = mch.message(codec.encoded) mch.send ("qpid.management", msg) return mch @@ -198,17 +201,12 @@ class managementClient: def getObjects (self, channel, userSequence, className): """ Request immediate content from broker """ - codec = Codec (StringIO (), self.spec) + codec = Codec (self.spec) self.setHeader (codec, ord ('G'), userSequence) ft = {} ft["_class"] = className - codec.encode_table (ft) - msg = Content (codec.stream.getvalue ()) - msg["content_type"] = "application/octet-stream" - msg["routing_key"] = "agent" - msg["reply_to"] = self.spec.struct ("reply_to") - msg["reply_to"]["exchange_name"] = "amq.direct" - msg["reply_to"]["routing_key"] = channel.replyName + codec.write_map (ft) + msg = channel.message(codec.encoded) channel.send ("qpid.management", msg) def syncWaitForStable (self, channel): @@ -270,19 +268,19 @@ class managementClient: #======================================================== def topicCb (self, ch, msg): """ Receive messages via the topic queue of a particular channel. """ - codec = Codec (StringIO (msg.content.body), self.spec) + codec = Codec (self.spec, msg.body) hdr = self.checkHeader (codec) if hdr == None: raise ValueError ("outer header invalid"); self.parse (ch, codec, hdr[0], hdr[1]) - msg.complete () + ch.accept(msg) def replyCb (self, ch, msg): """ Receive messages via the reply queue of a particular channel. """ - codec = Codec (StringIO (msg.content.body), self.spec) + codec = Codec (self.spec, msg.body) hdr = self.checkHeader (codec) if hdr == None: - msg.complete () + ch.accept(msg) return if hdr[0] == 'm': @@ -297,102 +295,102 @@ class managementClient: self.handleClassInd (ch, codec) else: self.parse (ch, codec, hdr[0], hdr[1]) - msg.complete () + ch.accept(msg) #======================================================== # Internal Functions #======================================================== def setHeader (self, codec, opcode, seq = 0): """ Compose the header of a management message. """ - codec.encode_octet (ord ('A')) - codec.encode_octet (ord ('M')) - codec.encode_octet (ord ('1')) - codec.encode_octet (opcode) - codec.encode_long (seq) + codec.write_uint8 (ord ('A')) + codec.write_uint8 (ord ('M')) + codec.write_uint8 (ord ('1')) + codec.write_uint8 (opcode) + codec.write_uint32 (seq) def checkHeader (self, codec): """ Check the header of a management message and extract the opcode and class. """ - octet = chr (codec.decode_octet ()) + octet = chr (codec.read_uint8 ()) if octet != 'A': return None - octet = chr (codec.decode_octet ()) + octet = chr (codec.read_uint8 ()) if octet != 'M': return None - octet = chr (codec.decode_octet ()) + octet = chr (codec.read_uint8 ()) if octet != '1': return None - opcode = chr (codec.decode_octet ()) - seq = codec.decode_long () + opcode = chr (codec.read_uint8 ()) + seq = codec.read_uint32 () return (opcode, seq) def encodeValue (self, codec, value, typecode): """ Encode, into the codec, a value based on its typecode. """ if typecode == 1: - codec.encode_octet (int (value)) + codec.write_uint8 (int (value)) elif typecode == 2: - codec.encode_short (int (value)) + codec.write_uint16 (int (value)) elif typecode == 3: - codec.encode_long (long (value)) + codec.write_uint32 (long (value)) elif typecode == 4: - codec.encode_longlong (long (value)) + codec.write_uint64 (long (value)) elif typecode == 5: - codec.encode_octet (int (value)) + codec.write_uint8 (int (value)) elif typecode == 6: - codec.encode_shortstr (value) + codec.write_str8 (value) elif typecode == 7: - codec.encode_longstr (value) + codec.write_vbin32 (value) elif typecode == 8: # ABSTIME - codec.encode_longlong (long (value)) + codec.write_uint64 (long (value)) elif typecode == 9: # DELTATIME - codec.encode_longlong (long (value)) + codec.write_uint64 (long (value)) elif typecode == 10: # REF - codec.encode_longlong (long (value)) + codec.write_uint64 (long (value)) elif typecode == 11: # BOOL - codec.encode_octet (int (value)) + codec.write_uint8 (int (value)) elif typecode == 12: # FLOAT - codec.encode_float (float (value)) + codec.write_float (float (value)) elif typecode == 13: # DOUBLE - codec.encode_double (double (value)) + codec.write_double (double (value)) elif typecode == 14: # UUID - codec.encode_uuid (value) + codec.write_uuid (value) elif typecode == 15: # FTABLE - codec.encode_table (value) + codec.write_map (value) else: raise ValueError ("Invalid type code: %d" % typecode) def decodeValue (self, codec, typecode): """ Decode, from the codec, a value based on its typecode. """ if typecode == 1: - data = codec.decode_octet () + data = codec.read_uint8 () elif typecode == 2: - data = codec.decode_short () + data = codec.read_uint16 () elif typecode == 3: - data = codec.decode_long () + data = codec.read_uint32 () elif typecode == 4: - data = codec.decode_longlong () + data = codec.read_uint64 () elif typecode == 5: - data = codec.decode_octet () + data = codec.read_uint8 () elif typecode == 6: - data = codec.decode_shortstr () + data = codec.read_str8 () elif typecode == 7: - data = codec.decode_longstr () + data = codec.read_vbin32 () elif typecode == 8: # ABSTIME - data = codec.decode_longlong () + data = codec.read_uint64 () elif typecode == 9: # DELTATIME - data = codec.decode_longlong () + data = codec.read_uint64 () elif typecode == 10: # REF - data = codec.decode_longlong () + data = codec.read_uint64 () elif typecode == 11: # BOOL - data = codec.decode_octet () + data = codec.read_uint8 () elif typecode == 12: # FLOAT - data = codec.decode_float () + data = codec.read_float () elif typecode == 13: # DOUBLE - data = codec.decode_double () + data = codec.read_double () elif typecode == 14: # UUID - data = codec.decode_uuid () + data = codec.read_uuid () elif typecode == 15: # FTABLE - data = codec.decode_table () + data = codec.read_map () else: raise ValueError ("Invalid type code: %d" % typecode) return data @@ -415,8 +413,8 @@ class managementClient: self.ctrlCb (ch.context, self.CTRL_SCHEMA_LOADED, None) def handleMethodReply (self, ch, codec, sequence): - status = codec.decode_long () - sText = codec.decode_shortstr () + status = codec.read_uint32 () + sText = codec.read_str8 () data = self.seqMgr.release (sequence) if data == None: @@ -451,8 +449,8 @@ class managementClient: self.methodCb (ch.context, userSequence, status, sText, args) def handleCommandComplete (self, ch, codec, seq): - code = codec.decode_long () - text = codec.decode_shortstr () + code = codec.read_uint32 () + text = codec.read_str8 () data = (seq, code, text) context = self.seqMgr.release (seq) if context == "outstanding": @@ -467,75 +465,60 @@ class managementClient: def handleBrokerResponse (self, ch, codec): if self.ctrlCb != None: - uuid = codec.decode_uuid () + uuid = codec.read_uuid () data = (uuid, ch.sessionId) self.ctrlCb (ch.context, self.CTRL_BROKER_INFO, data) # Send a package request - sendCodec = Codec (StringIO (), self.spec) + sendCodec = Codec (self.spec) seq = self.seqMgr.reserve ("outstanding") self.setHeader (sendCodec, ord ('P'), seq) - smsg = Content (sendCodec.stream.getvalue ()) - smsg["content_type"] = "application/octet-stream" - smsg["routing_key"] = "agent" - smsg["reply_to"] = self.spec.struct ("reply_to") - smsg["reply_to"]["exchange_name"] = "amq.direct" - smsg["reply_to"]["routing_key"] = ch.replyName + smsg = ch.message(sendCodec.encoded) ch.send ("qpid.management", smsg) - + def handlePackageInd (self, ch, codec): - pname = codec.decode_shortstr () + pname = codec.read_str8 () if pname not in self.packages: self.packages[pname] = {} # Send a class request - sendCodec = Codec (StringIO (), self.spec) + sendCodec = Codec (self.spec) seq = self.seqMgr.reserve ("outstanding") self.setHeader (sendCodec, ord ('Q'), seq) self.incOutstanding (ch) - sendCodec.encode_shortstr (pname) - smsg = Content (sendCodec.stream.getvalue ()) - smsg["content_type"] = "application/octet-stream" - smsg["routing_key"] = "agent" - smsg["reply_to"] = self.spec.struct ("reply_to") - smsg["reply_to"]["exchange_name"] = "amq.direct" - smsg["reply_to"]["routing_key"] = ch.replyName + sendCodec.write_str8 (pname) + smsg = ch.message(sendCodec.encoded) ch.send ("qpid.management", smsg) def handleClassInd (self, ch, codec): - pname = codec.decode_shortstr () - cname = codec.decode_shortstr () - hash = codec.decode_bin128 () + pname = codec.read_str8 () + cname = codec.read_str8 () + hash = codec.read_bin128 () if pname not in self.packages: return if (cname, hash) not in self.packages[pname]: # Send a schema request - sendCodec = Codec (StringIO (), self.spec) + sendCodec = Codec (self.spec) seq = self.seqMgr.reserve ("outstanding") self.setHeader (sendCodec, ord ('S'), seq) self.incOutstanding (ch) - sendCodec.encode_shortstr (pname) - sendCodec.encode_shortstr (cname) - sendCodec.encode_bin128 (hash) - smsg = Content (sendCodec.stream.getvalue ()) - smsg["content_type"] = "application/octet-stream" - smsg["routing_key"] = "agent" - smsg["reply_to"] = self.spec.struct ("reply_to") - smsg["reply_to"]["exchange_name"] = "amq.direct" - smsg["reply_to"]["routing_key"] = ch.replyName + sendCodec.write_str8 (pname) + sendCodec.write_str8 (cname) + sendCodec.write_bin128 (hash) + smsg = ch.message(sendCodec.encoded) ch.send ("qpid.management", smsg) def parseSchema (self, ch, codec): """ Parse a received schema-description message. """ self.decOutstanding (ch) - packageName = codec.decode_shortstr () - className = codec.decode_shortstr () - hash = codec.decode_bin128 () - configCount = codec.decode_short () - instCount = codec.decode_short () - methodCount = codec.decode_short () - eventCount = codec.decode_short () + packageName = codec.read_str8 () + className = codec.read_str8 () + hash = codec.read_bin128 () + configCount = codec.read_uint16 () + instCount = codec.read_uint16 () + methodCount = codec.read_uint16 () + eventCount = codec.read_uint16 () if packageName not in self.packages: return @@ -555,7 +538,7 @@ class managementClient: insts.append (("id", 4, None, None)) for idx in range (configCount): - ft = codec.decode_table () + ft = codec.read_map () name = ft["name"] type = ft["type"] access = ft["access"] @@ -582,7 +565,7 @@ class managementClient: configs.append (config) for idx in range (instCount): - ft = codec.decode_table () + ft = codec.read_map () name = ft["name"] type = ft["type"] unit = None @@ -598,7 +581,7 @@ class managementClient: insts.append (inst) for idx in range (methodCount): - ft = codec.decode_table () + ft = codec.read_map () mname = ft["name"] argCount = ft["argCount"] if "desc" in ft: @@ -608,7 +591,7 @@ class managementClient: args = [] for aidx in range (argCount): - ft = codec.decode_table () + ft = codec.read_map () name = ft["name"] type = ft["type"] dir = ft["dir"].upper () @@ -654,9 +637,9 @@ class managementClient: if cls == 'I' and self.instCb == None: return - packageName = codec.decode_shortstr () - className = codec.decode_shortstr () - hash = codec.decode_bin128 () + packageName = codec.read_str8 () + className = codec.read_str8 () + hash = codec.read_bin128 () classKey = (packageName, className, hash) if classKey not in self.schema: @@ -665,9 +648,9 @@ class managementClient: row = [] timestamps = [] - timestamps.append (codec.decode_longlong ()) # Current Time - timestamps.append (codec.decode_longlong ()) # Create Time - timestamps.append (codec.decode_longlong ()) # Delete Time + timestamps.append (codec.read_uint64 ()) # Current Time + timestamps.append (codec.read_uint64 ()) # Create Time + timestamps.append (codec.read_uint64 ()) # Delete Time schemaClass = self.schema[classKey] if cls == 'C' or cls == 'B': @@ -712,10 +695,10 @@ class managementClient: def method (self, channel, userSequence, objId, classId, methodName, args): """ Invoke a method on an object """ - codec = Codec (StringIO (), self.spec) + codec = Codec (self.spec) sequence = self.seqMgr.reserve ((userSequence, classId, methodName)) self.setHeader (codec, ord ('M'), sequence) - codec.encode_longlong (objId) # ID of object + codec.write_uint64 (objId) # ID of object # Encode args according to schema if classId not in self.schema: @@ -745,11 +728,6 @@ class managementClient: packageName = classId[0] className = classId[1] - msg = Content (codec.stream.getvalue ()) - msg["content_type"] = "application/octet-stream" - msg["routing_key"] = "agent.method." + packageName + "." + \ - className + "." + methodName - msg["reply_to"] = self.spec.struct ("reply_to") - msg["reply_to"]["exchange_name"] = "amq.direct" - msg["reply_to"]["routing_key"] = channel.replyName + msg = channel.message(codec.encoded, "agent.method." + packageName + "." + \ + className + "." + methodName) channel.send ("qpid.management", msg) diff --git a/qpid/python/qpid/message.py b/qpid/python/qpid/message.py index c9ea5a8a0c..eb3ef5c03c 100644 --- a/qpid/python/qpid/message.py +++ b/qpid/python/qpid/message.py @@ -16,7 +16,7 @@ # specific language governing permissions and limitations # under the License. # -from connection import Method, Request +from connection08 import Method, Request from sets import Set class Message: diff --git a/qpid/python/qpid/peer.py b/qpid/python/qpid/peer.py index a464e95593..0932efeab3 100644 --- a/qpid/python/qpid/peer.py +++ b/qpid/python/qpid/peer.py @@ -25,7 +25,7 @@ incoming method frames to a delegate. """ import thread, threading, traceback, socket, sys, logging -from connection import EOF, Method, Header, Body, Request, Response +from connection08 import EOF, Method, Header, Body, Request, Response from message import Message from queue import Queue, Closed as QueueClosed from content import Content diff --git a/qpid/python/qpid/session.py b/qpid/python/qpid/session.py index 427a403b90..f649b95a2c 100644 --- a/qpid/python/qpid/session.py +++ b/qpid/python/qpid/session.py @@ -17,7 +17,7 @@ # under the License. # -from threading import Condition, RLock, currentThread +from threading import Condition, RLock, Lock, currentThread from invoker import Invoker from datatypes import RangedSet, Struct, Future from codec010 import StringCodec @@ -29,8 +29,11 @@ from exceptions import * from logging import getLogger log = getLogger("qpid.io.cmd") +msg = getLogger("qpid.io.msg") -class SessionDetached(Exception): pass +class SessionException(Exception): pass +class SessionClosed(SessionException): pass +class SessionDetached(SessionException): pass def client(*args): return Client(*args) @@ -38,8 +41,6 @@ def client(*args): def server(*args): return Server(*args) -class SessionException(Exception): pass - INCOMPLETE = object() class Session(Invoker): @@ -50,6 +51,8 @@ class Session(Invoker): self.auto_sync = auto_sync self.timeout = timeout self.channel = None + self.invoke_lock = Lock() + self.closed = False self.condition = Condition() @@ -97,7 +100,12 @@ class Session(Invoker): raise SessionException(self.error()) def close(self, timeout=None): - self.channel.session_detach(self.name) + self.invoke_lock.acquire() + try: + self.closed = True + self.channel.session_detach(self.name) + finally: + self.invoke_lock.release() if not wait(self.condition, lambda: self.channel is None, timeout): raise Timeout() @@ -119,6 +127,16 @@ class Session(Invoker): if not hasattr(type, "track"): return type.new(args, kwargs) + self.invoke_lock.acquire() + try: + return self.do_invoke(type, args, kwargs) + finally: + self.invoke_lock.release() + + def do_invoke(self, type, args, kwargs): + if self.closed: + raise SessionClosed() + if self.channel == None: raise SessionDetached() @@ -160,6 +178,7 @@ class Session(Invoker): seg = Segment(False, True, self.spec["segment_type.body"].value, type.track, self.channel.id, message.body) self.send(seg) + msg.debug("SENT %s", message) if type.result: if self.auto_sync: @@ -304,8 +323,6 @@ class Delegate: finally: self.session.lock.release() -msg = getLogger("qpid.io.msg") - class Client(Delegate): def message_transfer(self, cmd, headers, body): diff --git a/qpid/python/qpid/spec.py b/qpid/python/qpid/spec.py index 623b2e9e9f..64a14b0f61 100644 --- a/qpid/python/qpid/spec.py +++ b/qpid/python/qpid/spec.py @@ -29,484 +29,18 @@ class so that the generated code can be reused in a variety of situations. """ -import re, textwrap, new, mllib, qpid - -class SpecContainer: - - def __init__(self): - self.items = [] - self.byname = {} - self.byid = {} - self.indexes = {} - - def add(self, item): - if self.byname.has_key(item.name): - raise ValueError("duplicate name: %s" % item) - if item.id == None: - item.id = len(self) - elif self.byid.has_key(item.id): - raise ValueError("duplicate id: %s" % item) - self.indexes[item] = len(self.items) - self.items.append(item) - self.byname[item.name] = item - self.byid[item.id] = item - - def index(self, item): - try: - return self.indexes[item] - except KeyError: - raise ValueError(item) - - def __iter__(self): - return iter(self.items) - - def __len__(self): - return len(self.items) - -class Metadata: - - PRINT = [] - - def __init__(self): - pass - - def __str__(self): - args = map(lambda f: "%s=%s" % (f, getattr(self, f)), self.PRINT) - return "%s(%s)" % (self.__class__.__name__, ", ".join(args)) - - def __repr__(self): - return str(self) - -class Spec(Metadata): - - PRINT=["major", "minor", "file"] - - def __init__(self, major, minor, file): - Metadata.__init__(self) - self.major = major - self.minor = minor - self.file = file - self.constants = SpecContainer() - self.domains = SpecContainer() - self.classes = SpecContainer() - # methods indexed by classname_methname - self.methods = {} - # structs by type code - self.structs = {} - - def post_load(self): - self.module = self.define_module("amqp%s%s" % (self.major, self.minor)) - self.klass = self.define_class("Amqp%s%s" % (self.major, self.minor)) - - def method(self, name): - if not self.methods.has_key(name): - for cls in self.classes: - clen = len(cls.name) - if name.startswith(cls.name) and name[clen] == "_": - end = name[clen + 1:] - if cls.methods.byname.has_key(end): - self.methods[name] = cls.methods.byname[end] - return self.methods.get(name) - - def parse_method(self, name): - parts = re.split(r"\s*\.\s*", name) - if len(parts) != 2: - raise ValueError(name) - klass, meth = parts - return self.classes.byname[klass].methods.byname[meth] - - def struct(self, name, *args, **kwargs): - type = self.domains.byname[name].type - return qpid.Struct(type, *args, **kwargs) - - def define_module(self, name, doc = None): - module = new.module(name, doc) - module.__file__ = self.file - for c in self.classes: - cls = c.define_class(c.name) - cls.__module__ = module.__name__ - setattr(module, c.name, cls) - return module - - def define_class(self, name): - methods = {} - for c in self.classes: - for m in c.methods: - meth = m.klass.name + "_" + m.name - methods[meth] = m.define_method(meth) - return type(name, (), methods) - -class Constant(Metadata): - - PRINT=["name", "id"] - - def __init__(self, spec, name, id, klass, docs): - Metadata.__init__(self) - self.spec = spec - self.name = name - self.id = id - self.klass = klass - self.docs = docs - -class Domain(Metadata): - - PRINT=["name", "type"] - - def __init__(self, spec, name, type, description, docs): - Metadata.__init__(self) - self.spec = spec - self.id = None - self.name = name - self.type = type - self.description = description - self.docs = docs - -class Struct(Metadata): - - PRINT=["size", "type", "pack"] - - def __init__(self, size, type, pack): - Metadata.__init__(self) - self.size = size - self.type = type - self.pack = pack - self.fields = SpecContainer() - -class Class(Metadata): - - PRINT=["name", "id"] - - def __init__(self, spec, name, id, handler, docs): - Metadata.__init__(self) - self.spec = spec - self.name = name - self.id = id - self.handler = handler - self.fields = SpecContainer() - self.methods = SpecContainer() - self.docs = docs - - def define_class(self, name): - methods = {} - for m in self.methods: - methods[m.name] = m.define_method(m.name) - return type(name, (), methods) - -class Method(Metadata): - - PRINT=["name", "id"] - - def __init__(self, klass, name, id, content, responses, result, synchronous, - description, docs): - Metadata.__init__(self) - self.klass = klass - self.name = name - self.id = id - self.content = content - self.responses = responses - self.result = result - self.synchronous = synchronous - self.fields = SpecContainer() - self.description = description - self.docs = docs - self.response = False - - def is_l4_command(self): - return self.klass.name not in ["execution", "channel", "connection", "session"] - - def arguments(self, *args, **kwargs): - nargs = len(args) + len(kwargs) - maxargs = len(self.fields) - if nargs > maxargs: - self._type_error("takes at most %s arguments (%s) given", maxargs, nargs) - result = [] - for f in self.fields: - idx = self.fields.index(f) - if idx < len(args): - result.append(args[idx]) - elif kwargs.has_key(f.name): - result.append(kwargs.pop(f.name)) - else: - result.append(Method.DEFAULTS[f.type]) - for key, value in kwargs.items(): - if self.fields.byname.has_key(key): - self._type_error("got multiple values for keyword argument '%s'", key) - else: - self._type_error("got an unexpected keyword argument '%s'", key) - return tuple(result) - - def _type_error(self, msg, *args): - raise TypeError("%s %s" % (self.name, msg % args)) - - def docstring(self): - s = "\n\n".join([fill(d, 2) for d in [self.description] + self.docs]) - for f in self.fields: - if f.docs: - s += "\n\n" + "\n\n".join([fill(f.docs[0], 4, f.name)] + - [fill(d, 4) for d in f.docs[1:]]) - if self.responses: - s += "\n\nValid responses: " - for r in self.responses: - s += r.name + " " - return s - - METHOD = "__method__" - DEFAULTS = {"bit": False, - "shortstr": "", - "longstr": "", - "table": {}, - "array": [], - "octet": 0, - "short": 0, - "long": 0, - "longlong": 0, - "timestamp": 0, - "content": None, - "uuid": "", - "rfc1982_long": 0, - "rfc1982_long_set": [], - "long_struct": None} - - def define_method(self, name): - g = {Method.METHOD: self} - l = {} - args = [(f.name, Method.DEFAULTS[f.type]) for f in self.fields] - methargs = args[:] - if self.content: - args += [("content", None)] - code = "def %s(self, %s):\n" % \ - (name, ", ".join(["%s = %r" % a for a in args])) - code += " %r\n" % self.docstring() - argnames = ", ".join([a[0] for a in methargs]) - code += " return self.invoke(%s" % Method.METHOD - if argnames: - code += ", (%s,)" % argnames - else: - code += ", ()" - if self.content: - code += ", content" - code += ")" - exec code in g, l - return l[name] - -class Field(Metadata): - - PRINT=["name", "id", "type"] - - def __init__(self, name, id, type, domain, description, docs): - Metadata.__init__(self) - self.name = name - self.id = id - self.type = type - self.domain = domain - self.description = description - self.docs = docs - - def default(self): - if isinstance(self.type, Struct): - return None - else: - return Method.DEFAULTS[self.type] - -WIDTHS = { - "octet": 1, - "short": 2, - "long": 4 - } - -def width(st, default=None): - if st in (None, "none", ""): - return default - else: - return WIDTHS[st] - -def get_result(nd, spec): - result = nd["result"] - if not result: return None - name = result["@domain"] - if name != None: return spec.domains.byname[name] - st_nd = result["struct"] - st = Struct(width(st_nd["@size"]), int(result.parent.parent["@index"])*256 + - int(st_nd["@type"]), width(st_nd["@pack"], 2)) - spec.structs[st.type] = st - load_fields(st_nd, st.fields, spec.domains.byname) - return st - -def get_desc(nd): - label = nd["@label"] - if not label: - label = nd.text() - if label: - label = label.strip() - return label - -def get_docs(nd): - return [n.text() for n in nd.query["doc"]] - -def load_fields(nd, l, domains): - for f_nd in nd.query["field"]: - type = f_nd["@domain"] - if type == None: - type = f_nd["@type"] - type = pythonize(type) - domain = None - while domains.has_key(type) and domains[type].type != type: - domain = domains[type] - type = domain.type - l.add(Field(pythonize(f_nd["@name"]), f_nd.index(), type, domain, - get_desc(f_nd), get_docs(f_nd))) +import os, mllib, spec08, spec010 def load(specfile, *errata): - doc = mllib.xml_parse(specfile) - spec_root = doc["amqp"] - spec = Spec(int(spec_root["@major"]), int(spec_root["@minor"]), specfile) - - for root in [spec_root] + map(lambda x: mllib.xml_parse(x)["amqp"], errata): - # constants - for nd in root.query["constant"]: - val = nd["@value"] - if val.startswith("0x"): val = int(val, 16) - else: val = int(val) - const = Constant(spec, pythonize(nd["@name"]), val, nd["@class"], - get_docs(nd)) - try: - spec.constants.add(const) - except ValueError, e: - pass - #print "Warning:", e - - # domains are typedefs - structs = [] - for nd in root.query["domain"]: - type = nd["@type"] - if type == None: - st_nd = nd["struct"] - code = st_nd["@type"] - if code not in (None, "", "none"): - code = int(code) - type = Struct(width(st_nd["@size"]), code, width(st_nd["@pack"], 2)) - if type.type != None: - spec.structs[type.type] = type - structs.append((type, st_nd)) - else: - type = pythonize(type) - domain = Domain(spec, pythonize(nd["@name"]), type, get_desc(nd), - get_docs(nd)) - spec.domains.add(domain) - - # structs - for st, st_nd in structs: - load_fields(st_nd, st.fields, spec.domains.byname) - - # classes - for c_nd in root.query["class"]: - cname = pythonize(c_nd["@name"]) - if spec.classes.byname.has_key(cname): - klass = spec.classes.byname[cname] - else: - klass = Class(spec, cname, int(c_nd["@index"]), c_nd["@handler"], - get_docs(c_nd)) - spec.classes.add(klass) - - added_methods = [] - load_fields(c_nd, klass.fields, spec.domains.byname) - for m_nd in c_nd.query["method"]: - mname = pythonize(m_nd["@name"]) - if klass.methods.byname.has_key(mname): - meth = klass.methods.byname[mname] - else: - meth = Method(klass, mname, - int(m_nd["@index"]), - m_nd["@content"] == "1", - [pythonize(nd["@name"]) for nd in m_nd.query["response"]], - get_result(m_nd, spec), - m_nd["@synchronous"] == "1", - get_desc(m_nd), - get_docs(m_nd)) - klass.methods.add(meth) - added_methods.append(meth) - load_fields(m_nd, meth.fields, spec.domains.byname) - # resolve the responses - for m in added_methods: - m.responses = [klass.methods.byname[r] for r in m.responses] - for resp in m.responses: - resp.response = True - - spec.post_load() - return spec - -REPLACE = {" ": "_", "-": "_"} -KEYWORDS = {"global": "global_", - "return": "return_"} + for name in (specfile,) + errata: + if not os.path.exists(name): + raise IOError("No such file or directory: '%s'" % name) -def pythonize(name): - name = str(name) - for key, val in REPLACE.items(): - name = name.replace(key, val) - try: - name = KEYWORDS[name] - except KeyError: - pass - return name + doc = mllib.xml_parse(specfile) + major = doc["amqp/@major"] + minor = doc["amqp/@minor"] -def fill(text, indent, heading = None): - sub = indent * " " - if heading: - init = (indent - 2) * " " + heading + " -- " + if major == "0" and minor == "10": + return spec010.load(specfile, *errata) else: - init = sub - w = textwrap.TextWrapper(initial_indent = init, subsequent_indent = sub) - return w.fill(" ".join(text.split())) - -class Rule(Metadata): - - PRINT = ["text", "implement", "tests"] - - def __init__(self, text, implement, tests, path): - self.text = text - self.implement = implement - self.tests = tests - self.path = path - -def find_rules(node, rules): - if node.name == "rule": - rules.append(Rule(node.text, node.get("@implement"), - [ch.text for ch in node if ch.name == "test"], - node.path())) - if node.name == "doc" and node.get("@name") == "rule": - tests = [] - if node.has("@test"): - tests.append(node["@test"]) - rules.append(Rule(node.text, None, tests, node.path())) - for child in node: - find_rules(child, rules) - -def load_rules(specfile): - rules = [] - find_rules(xmlutil.parse(specfile), rules) - return rules - -def test_summary(): - template = """ - <html><head><title>AMQP Tests</title></head> - <body> - <table width="80%%" align="center"> - %s - </table> - </body> - </html> - """ - rows = [] - for rule in load_rules("amqp.org/specs/amqp7.xml"): - if rule.tests: - tests = ", ".join(rule.tests) - else: - tests = " " - rows.append('<tr bgcolor="#EEEEEE"><td><b>Path:</b> %s</td>' - '<td><b>Implement:</b> %s</td>' - '<td><b>Tests:</b> %s</td></tr>' % - (rule.path[len("/root/amqp"):], rule.implement, tests)) - rows.append('<tr><td colspan="3">%s</td></tr>' % rule.text) - rows.append('<tr><td colspan="3"> </td></tr>') - - print template % "\n".join(rows) + return spec08.load(specfile, *errata) diff --git a/qpid/python/qpid/spec08.py b/qpid/python/qpid/spec08.py new file mode 100644 index 0000000000..623b2e9e9f --- /dev/null +++ b/qpid/python/qpid/spec08.py @@ -0,0 +1,512 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +This module loads protocol metadata into python objects. It provides +access to spec metadata via a python object model, and can also +dynamically creating python methods, classes, and modules based on the +spec metadata. All the generated methods have proper signatures and +doc strings based on the spec metadata so the python help system can +be used to browse the spec documentation. The generated methods all +dispatch to the self.invoke(meth, args) callback of the containing +class so that the generated code can be reused in a variety of +situations. +""" + +import re, textwrap, new, mllib, qpid + +class SpecContainer: + + def __init__(self): + self.items = [] + self.byname = {} + self.byid = {} + self.indexes = {} + + def add(self, item): + if self.byname.has_key(item.name): + raise ValueError("duplicate name: %s" % item) + if item.id == None: + item.id = len(self) + elif self.byid.has_key(item.id): + raise ValueError("duplicate id: %s" % item) + self.indexes[item] = len(self.items) + self.items.append(item) + self.byname[item.name] = item + self.byid[item.id] = item + + def index(self, item): + try: + return self.indexes[item] + except KeyError: + raise ValueError(item) + + def __iter__(self): + return iter(self.items) + + def __len__(self): + return len(self.items) + +class Metadata: + + PRINT = [] + + def __init__(self): + pass + + def __str__(self): + args = map(lambda f: "%s=%s" % (f, getattr(self, f)), self.PRINT) + return "%s(%s)" % (self.__class__.__name__, ", ".join(args)) + + def __repr__(self): + return str(self) + +class Spec(Metadata): + + PRINT=["major", "minor", "file"] + + def __init__(self, major, minor, file): + Metadata.__init__(self) + self.major = major + self.minor = minor + self.file = file + self.constants = SpecContainer() + self.domains = SpecContainer() + self.classes = SpecContainer() + # methods indexed by classname_methname + self.methods = {} + # structs by type code + self.structs = {} + + def post_load(self): + self.module = self.define_module("amqp%s%s" % (self.major, self.minor)) + self.klass = self.define_class("Amqp%s%s" % (self.major, self.minor)) + + def method(self, name): + if not self.methods.has_key(name): + for cls in self.classes: + clen = len(cls.name) + if name.startswith(cls.name) and name[clen] == "_": + end = name[clen + 1:] + if cls.methods.byname.has_key(end): + self.methods[name] = cls.methods.byname[end] + return self.methods.get(name) + + def parse_method(self, name): + parts = re.split(r"\s*\.\s*", name) + if len(parts) != 2: + raise ValueError(name) + klass, meth = parts + return self.classes.byname[klass].methods.byname[meth] + + def struct(self, name, *args, **kwargs): + type = self.domains.byname[name].type + return qpid.Struct(type, *args, **kwargs) + + def define_module(self, name, doc = None): + module = new.module(name, doc) + module.__file__ = self.file + for c in self.classes: + cls = c.define_class(c.name) + cls.__module__ = module.__name__ + setattr(module, c.name, cls) + return module + + def define_class(self, name): + methods = {} + for c in self.classes: + for m in c.methods: + meth = m.klass.name + "_" + m.name + methods[meth] = m.define_method(meth) + return type(name, (), methods) + +class Constant(Metadata): + + PRINT=["name", "id"] + + def __init__(self, spec, name, id, klass, docs): + Metadata.__init__(self) + self.spec = spec + self.name = name + self.id = id + self.klass = klass + self.docs = docs + +class Domain(Metadata): + + PRINT=["name", "type"] + + def __init__(self, spec, name, type, description, docs): + Metadata.__init__(self) + self.spec = spec + self.id = None + self.name = name + self.type = type + self.description = description + self.docs = docs + +class Struct(Metadata): + + PRINT=["size", "type", "pack"] + + def __init__(self, size, type, pack): + Metadata.__init__(self) + self.size = size + self.type = type + self.pack = pack + self.fields = SpecContainer() + +class Class(Metadata): + + PRINT=["name", "id"] + + def __init__(self, spec, name, id, handler, docs): + Metadata.__init__(self) + self.spec = spec + self.name = name + self.id = id + self.handler = handler + self.fields = SpecContainer() + self.methods = SpecContainer() + self.docs = docs + + def define_class(self, name): + methods = {} + for m in self.methods: + methods[m.name] = m.define_method(m.name) + return type(name, (), methods) + +class Method(Metadata): + + PRINT=["name", "id"] + + def __init__(self, klass, name, id, content, responses, result, synchronous, + description, docs): + Metadata.__init__(self) + self.klass = klass + self.name = name + self.id = id + self.content = content + self.responses = responses + self.result = result + self.synchronous = synchronous + self.fields = SpecContainer() + self.description = description + self.docs = docs + self.response = False + + def is_l4_command(self): + return self.klass.name not in ["execution", "channel", "connection", "session"] + + def arguments(self, *args, **kwargs): + nargs = len(args) + len(kwargs) + maxargs = len(self.fields) + if nargs > maxargs: + self._type_error("takes at most %s arguments (%s) given", maxargs, nargs) + result = [] + for f in self.fields: + idx = self.fields.index(f) + if idx < len(args): + result.append(args[idx]) + elif kwargs.has_key(f.name): + result.append(kwargs.pop(f.name)) + else: + result.append(Method.DEFAULTS[f.type]) + for key, value in kwargs.items(): + if self.fields.byname.has_key(key): + self._type_error("got multiple values for keyword argument '%s'", key) + else: + self._type_error("got an unexpected keyword argument '%s'", key) + return tuple(result) + + def _type_error(self, msg, *args): + raise TypeError("%s %s" % (self.name, msg % args)) + + def docstring(self): + s = "\n\n".join([fill(d, 2) for d in [self.description] + self.docs]) + for f in self.fields: + if f.docs: + s += "\n\n" + "\n\n".join([fill(f.docs[0], 4, f.name)] + + [fill(d, 4) for d in f.docs[1:]]) + if self.responses: + s += "\n\nValid responses: " + for r in self.responses: + s += r.name + " " + return s + + METHOD = "__method__" + DEFAULTS = {"bit": False, + "shortstr": "", + "longstr": "", + "table": {}, + "array": [], + "octet": 0, + "short": 0, + "long": 0, + "longlong": 0, + "timestamp": 0, + "content": None, + "uuid": "", + "rfc1982_long": 0, + "rfc1982_long_set": [], + "long_struct": None} + + def define_method(self, name): + g = {Method.METHOD: self} + l = {} + args = [(f.name, Method.DEFAULTS[f.type]) for f in self.fields] + methargs = args[:] + if self.content: + args += [("content", None)] + code = "def %s(self, %s):\n" % \ + (name, ", ".join(["%s = %r" % a for a in args])) + code += " %r\n" % self.docstring() + argnames = ", ".join([a[0] for a in methargs]) + code += " return self.invoke(%s" % Method.METHOD + if argnames: + code += ", (%s,)" % argnames + else: + code += ", ()" + if self.content: + code += ", content" + code += ")" + exec code in g, l + return l[name] + +class Field(Metadata): + + PRINT=["name", "id", "type"] + + def __init__(self, name, id, type, domain, description, docs): + Metadata.__init__(self) + self.name = name + self.id = id + self.type = type + self.domain = domain + self.description = description + self.docs = docs + + def default(self): + if isinstance(self.type, Struct): + return None + else: + return Method.DEFAULTS[self.type] + +WIDTHS = { + "octet": 1, + "short": 2, + "long": 4 + } + +def width(st, default=None): + if st in (None, "none", ""): + return default + else: + return WIDTHS[st] + +def get_result(nd, spec): + result = nd["result"] + if not result: return None + name = result["@domain"] + if name != None: return spec.domains.byname[name] + st_nd = result["struct"] + st = Struct(width(st_nd["@size"]), int(result.parent.parent["@index"])*256 + + int(st_nd["@type"]), width(st_nd["@pack"], 2)) + spec.structs[st.type] = st + load_fields(st_nd, st.fields, spec.domains.byname) + return st + +def get_desc(nd): + label = nd["@label"] + if not label: + label = nd.text() + if label: + label = label.strip() + return label + +def get_docs(nd): + return [n.text() for n in nd.query["doc"]] + +def load_fields(nd, l, domains): + for f_nd in nd.query["field"]: + type = f_nd["@domain"] + if type == None: + type = f_nd["@type"] + type = pythonize(type) + domain = None + while domains.has_key(type) and domains[type].type != type: + domain = domains[type] + type = domain.type + l.add(Field(pythonize(f_nd["@name"]), f_nd.index(), type, domain, + get_desc(f_nd), get_docs(f_nd))) + +def load(specfile, *errata): + doc = mllib.xml_parse(specfile) + spec_root = doc["amqp"] + spec = Spec(int(spec_root["@major"]), int(spec_root["@minor"]), specfile) + + for root in [spec_root] + map(lambda x: mllib.xml_parse(x)["amqp"], errata): + # constants + for nd in root.query["constant"]: + val = nd["@value"] + if val.startswith("0x"): val = int(val, 16) + else: val = int(val) + const = Constant(spec, pythonize(nd["@name"]), val, nd["@class"], + get_docs(nd)) + try: + spec.constants.add(const) + except ValueError, e: + pass + #print "Warning:", e + + # domains are typedefs + structs = [] + for nd in root.query["domain"]: + type = nd["@type"] + if type == None: + st_nd = nd["struct"] + code = st_nd["@type"] + if code not in (None, "", "none"): + code = int(code) + type = Struct(width(st_nd["@size"]), code, width(st_nd["@pack"], 2)) + if type.type != None: + spec.structs[type.type] = type + structs.append((type, st_nd)) + else: + type = pythonize(type) + domain = Domain(spec, pythonize(nd["@name"]), type, get_desc(nd), + get_docs(nd)) + spec.domains.add(domain) + + # structs + for st, st_nd in structs: + load_fields(st_nd, st.fields, spec.domains.byname) + + # classes + for c_nd in root.query["class"]: + cname = pythonize(c_nd["@name"]) + if spec.classes.byname.has_key(cname): + klass = spec.classes.byname[cname] + else: + klass = Class(spec, cname, int(c_nd["@index"]), c_nd["@handler"], + get_docs(c_nd)) + spec.classes.add(klass) + + added_methods = [] + load_fields(c_nd, klass.fields, spec.domains.byname) + for m_nd in c_nd.query["method"]: + mname = pythonize(m_nd["@name"]) + if klass.methods.byname.has_key(mname): + meth = klass.methods.byname[mname] + else: + meth = Method(klass, mname, + int(m_nd["@index"]), + m_nd["@content"] == "1", + [pythonize(nd["@name"]) for nd in m_nd.query["response"]], + get_result(m_nd, spec), + m_nd["@synchronous"] == "1", + get_desc(m_nd), + get_docs(m_nd)) + klass.methods.add(meth) + added_methods.append(meth) + load_fields(m_nd, meth.fields, spec.domains.byname) + # resolve the responses + for m in added_methods: + m.responses = [klass.methods.byname[r] for r in m.responses] + for resp in m.responses: + resp.response = True + + spec.post_load() + return spec + +REPLACE = {" ": "_", "-": "_"} +KEYWORDS = {"global": "global_", + "return": "return_"} + +def pythonize(name): + name = str(name) + for key, val in REPLACE.items(): + name = name.replace(key, val) + try: + name = KEYWORDS[name] + except KeyError: + pass + return name + +def fill(text, indent, heading = None): + sub = indent * " " + if heading: + init = (indent - 2) * " " + heading + " -- " + else: + init = sub + w = textwrap.TextWrapper(initial_indent = init, subsequent_indent = sub) + return w.fill(" ".join(text.split())) + +class Rule(Metadata): + + PRINT = ["text", "implement", "tests"] + + def __init__(self, text, implement, tests, path): + self.text = text + self.implement = implement + self.tests = tests + self.path = path + +def find_rules(node, rules): + if node.name == "rule": + rules.append(Rule(node.text, node.get("@implement"), + [ch.text for ch in node if ch.name == "test"], + node.path())) + if node.name == "doc" and node.get("@name") == "rule": + tests = [] + if node.has("@test"): + tests.append(node["@test"]) + rules.append(Rule(node.text, None, tests, node.path())) + for child in node: + find_rules(child, rules) + +def load_rules(specfile): + rules = [] + find_rules(xmlutil.parse(specfile), rules) + return rules + +def test_summary(): + template = """ + <html><head><title>AMQP Tests</title></head> + <body> + <table width="80%%" align="center"> + %s + </table> + </body> + </html> + """ + rows = [] + for rule in load_rules("amqp.org/specs/amqp7.xml"): + if rule.tests: + tests = ", ".join(rule.tests) + else: + tests = " " + rows.append('<tr bgcolor="#EEEEEE"><td><b>Path:</b> %s</td>' + '<td><b>Implement:</b> %s</td>' + '<td><b>Tests:</b> %s</td></tr>' % + (rule.path[len("/root/amqp"):], rule.implement, tests)) + rows.append('<tr><td colspan="3">%s</td></tr>' % rule.text) + rows.append('<tr><td colspan="3"> </td></tr>') + + print template % "\n".join(rows) diff --git a/qpid/python/qpid/testlib.py b/qpid/python/qpid/testlib.py index a0018c671f..adda1a650f 100644 --- a/qpid/python/qpid/testlib.py +++ b/qpid/python/qpid/testlib.py @@ -30,7 +30,7 @@ from qpid.content import Content from qpid.message import Message #0-10 support -from qpid.connection010 import Connection +from qpid.connection import Connection from qpid.spec010 import load from qpid.util import connect @@ -357,9 +357,9 @@ class TestBase010(unittest.TestCase): self.conn.start(timeout=10) self.session = self.conn.session("test-session", timeout=10) - def connect(self): + def connect(self, host=None, port=None): spec = testrunner.spec - conn = Connection(connect(testrunner.host, testrunner.port), spec) + conn = Connection(connect(host or testrunner.host, port or testrunner.port), spec) conn.start(timeout=10) return conn diff --git a/qpid/python/tests/__init__.py b/qpid/python/tests/__init__.py index 521e2f15c9..8ad514fc2f 100644 --- a/qpid/python/tests/__init__.py +++ b/qpid/python/tests/__init__.py @@ -25,6 +25,6 @@ from spec import * from framer import * from assembler import * from datatypes import * -from connection010 import * +from connection import * from spec010 import * from codec010 import * diff --git a/qpid/python/tests/connection010.py b/qpid/python/tests/connection.py index a953e034a2..6925480ed3 100644 --- a/qpid/python/tests/connection010.py +++ b/qpid/python/tests/connection.py @@ -20,7 +20,7 @@ from threading import * from unittest import TestCase from qpid.util import connect, listen -from qpid.connection010 import * +from qpid.connection import * from qpid.datatypes import Message from qpid.testlib import testrunner from qpid.delegates import Server diff --git a/qpid/python/tests_0-10_preview/management.py b/qpid/python/tests_0-10/management.py index de6161ae96..e893dbbd87 100644 --- a/qpid/python/tests_0-10_preview/management.py +++ b/qpid/python/tests_0-10/management.py @@ -18,10 +18,10 @@ # from qpid.datatypes import Message, RangedSet -from qpid.testlib import TestBase +from qpid.testlib import TestBase010 from qpid.management import managementChannel, managementClient -class ManagementTest (TestBase): +class ManagementTest (TestBase010): """ Tests for the management hooks """ @@ -30,10 +30,10 @@ class ManagementTest (TestBase): """ Call the "echo" method on the broker to verify it is alive and talking. """ - channel = self.client.channel(2) + session = self.session - mc = managementClient (channel.spec) - mch = mc.addChannel (channel) + mc = managementClient (session.spec) + mch = mc.addChannel (session) mc.syncWaitForStable (mch) brokers = mc.syncGetObjects (mch, "broker") @@ -52,20 +52,20 @@ class ManagementTest (TestBase): self.assertEqual (res.body, body) def test_system_object (self): - channel = self.client.channel(2) + session = self.session - mc = managementClient (channel.spec) - mch = mc.addChannel (channel) + mc = managementClient (session.spec) + mch = mc.addChannel (session) mc.syncWaitForStable (mch) systems = mc.syncGetObjects (mch, "system") self.assertEqual (len (systems), 1) def test_standard_exchanges (self): - channel = self.client.channel(2) + session = self.session - mc = managementClient (channel.spec) - mch = mc.addChannel (channel) + mc = managementClient (session.spec) + mch = mc.addChannel (session) mc.syncWaitForStable (mch) exchanges = mc.syncGetObjects (mch, "exchange") diff --git a/qpid/python/tests_0-10/persistence.py b/qpid/python/tests_0-10/persistence.py new file mode 100644 index 0000000000..a4b5691910 --- /dev/null +++ b/qpid/python/tests_0-10/persistence.py @@ -0,0 +1,67 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.datatypes import Message, RangedSet +from qpid.testlib import testrunner, TestBase010 + +class PersistenceTests(TestBase010): + def test_delete_queue_after_publish(self): + session = self.session + session.auto_sync = False + + #create queue + session.queue_declare(queue = "q", auto_delete=True, durable=True) + + #send message + for i in range(1, 10): + dp = session.delivery_properties(routing_key="q", delivery_mode=2) + session.message_transfer(message=Message(dp, "my-message")) + + session.auto_sync = True + #explicitly delete queue + session.queue_delete(queue = "q") + + def test_ack_message_from_deleted_queue(self): + session = self.session + session.auto_sync = False + + #create queue + session.queue_declare(queue = "q", auto_delete=True, durable=True) + + #send message + dp = session.delivery_properties(routing_key="q", delivery_mode=2) + session.message_transfer(message=Message(dp, "my-message")) + + #create consumer + session.message_subscribe(queue = "q", destination = "a", accept_mode = 1, acquire_mode=0) + session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") + session.message_flow(unit = 0, value = 10, destination = "a") + queue = session.incoming("a") + + #consume the message, cancel subscription (triggering auto-delete), then ack it + msg = queue.get(timeout = 5) + session.message_cancel(destination = "a") + session.message_accept(RangedSet(msg.id)) + + def test_queue_deletion(self): + session = self.session + session.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True) + session.exchange_bind(exchange="amq.topic", queue="durable-subscriber-queue", binding_key="xyz") + dp = session.delivery_properties(routing_key="xyz", delivery_mode=2) + session.message_transfer(destination="amq.topic", message=Message(dp, "my-message")) + session.queue_delete(queue = "durable-subscriber-queue") |