summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-04-22 16:11:34 +0000
committerRafael H. Schloming <rhs@apache.org>2008-04-22 16:11:34 +0000
commit6b4af6b24c0e29007c28998d4d7d19383c0ae702 (patch)
treee0c01ddcddbb5b5bfa2cfdd22980d39ed46810b4
parentab24602c21632ffb3f0748331819b2e099b188da (diff)
downloadqpid-python-6b4af6b24c0e29007c28998d4d7d19383c0ae702.tar.gz
QPID-947: update cpp and python management to 0-10 final
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@650565 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/framing/FieldTable.cpp2
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp18
-rw-r--r--qpid/cpp/src/tests/FieldTable.cpp12
-rw-r--r--qpid/cpp/src/tests/MessageTest.cpp2
-rwxr-xr-xqpid/cpp/src/tests/federation.py49
-rwxr-xr-xqpid/cpp/src/tests/run_federation_tests2
-rw-r--r--qpid/cpp/src/tests/topic_listener.cpp6
-rw-r--r--qpid/python/qpid/client.py2
-rw-r--r--qpid/python/qpid/codec.py6
-rw-r--r--qpid/python/qpid/codec010.py8
-rw-r--r--qpid/python/qpid/connection.py566
-rw-r--r--qpid/python/qpid/connection010.py181
-rw-r--r--qpid/python/qpid/connection08.py483
-rw-r--r--qpid/python/qpid/delegate.py2
-rw-r--r--qpid/python/qpid/delegates.py10
-rw-r--r--qpid/python/qpid/framer.py26
-rw-r--r--qpid/python/qpid/management.py274
-rw-r--r--qpid/python/qpid/message.py2
-rw-r--r--qpid/python/qpid/peer.py2
-rw-r--r--qpid/python/qpid/session.py31
-rw-r--r--qpid/python/qpid/spec.py486
-rw-r--r--qpid/python/qpid/spec08.py512
-rw-r--r--qpid/python/qpid/testlib.py6
-rw-r--r--qpid/python/tests/__init__.py2
-rw-r--r--qpid/python/tests/connection.py (renamed from qpid/python/tests/connection010.py)2
-rw-r--r--qpid/python/tests_0-10/management.py (renamed from qpid/python/tests_0-10_preview/management.py)22
-rw-r--r--qpid/python/tests_0-10/persistence.py67
27 files changed, 1453 insertions, 1328 deletions
diff --git a/qpid/cpp/src/qpid/framing/FieldTable.cpp b/qpid/cpp/src/qpid/framing/FieldTable.cpp
index 089bc5d4a5..ac2a0f286d 100644
--- a/qpid/cpp/src/qpid/framing/FieldTable.cpp
+++ b/qpid/cpp/src/qpid/framing/FieldTable.cpp
@@ -66,7 +66,7 @@ void FieldTable::set(const std::string& name, const ValuePtr& value){
}
void FieldTable::setString(const std::string& name, const std::string& value){
- values[name] = ValuePtr(new StringValue(value));
+ values[name] = ValuePtr(new Str16Value(value));
}
void FieldTable::setInt(const std::string& name, int value){
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 769593c8d2..9b4290232d 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -24,7 +24,7 @@
#include "qpid/log/Statement.h"
#include <qpid/broker/Message.h>
#include <qpid/broker/MessageDelivery.h>
-#include "qpid/framing/MessageXTransferBody.h"
+#include "qpid/framing/MessageTransferBody.h"
#include <list>
#include <iostream>
#include <fstream>
@@ -218,8 +218,8 @@ void ManagementAgent::SendBuffer (Buffer& buf,
return;
intrusive_ptr<Message> msg (new Message ());
- AMQFrame method (in_place<MessageXTransferBody>(
- ProtocolVersion(), 0, exchange->getName (), 0, 0));
+ AMQFrame method (in_place<MessageTransferBody>(
+ ProtocolVersion(), exchange->getName (), 0, 0));
AMQFrame header (in_place<AMQHeaderBody>());
AMQFrame content(in_place<AMQContentBody>());
@@ -233,8 +233,8 @@ void ManagementAgent::SendBuffer (Buffer& buf,
msg->getFrames().append(method);
msg->getFrames().append(header);
- PreviewMessageProperties* props =
- msg->getFrames().getHeaders()->get<PreviewMessageProperties>(true);
+ MessageProperties* props =
+ msg->getFrames().getHeaders()->get<MessageProperties>(true);
props->setContentLength(length);
msg->getFrames().append(content);
@@ -394,8 +394,8 @@ void ManagementAgent::dispatchMethod (Message& msg,
uint64_t objId = inBuffer.getLongLong ();
string replyToKey;
- const framing::PreviewMessageProperties* p =
- msg.getFrames().getHeaders()->get<framing::PreviewMessageProperties>();
+ const framing::MessageProperties* p =
+ msg.getFrames().getHeaders()->get<framing::MessageProperties>();
if (p && p->hasReplyTo())
{
const framing::ReplyTo& rt = p->getReplyTo ();
@@ -601,8 +601,8 @@ void ManagementAgent::dispatchAgentCommand (Message& msg)
uint32_t sequence;
string replyToKey;
- const framing::PreviewMessageProperties* p =
- msg.getFrames().getHeaders()->get<framing::PreviewMessageProperties>();
+ const framing::MessageProperties* p =
+ msg.getFrames().getHeaders()->get<framing::MessageProperties>();
if (p && p->hasReplyTo())
{
const framing::ReplyTo& rt = p->getReplyTo ();
diff --git a/qpid/cpp/src/tests/FieldTable.cpp b/qpid/cpp/src/tests/FieldTable.cpp
index 716a5d3d39..ba506d4101 100644
--- a/qpid/cpp/src/tests/FieldTable.cpp
+++ b/qpid/cpp/src/tests/FieldTable.cpp
@@ -32,7 +32,7 @@ QPID_AUTO_TEST_CASE(testMe)
{
FieldTable ft;
ft.setString("A", "BCDE");
- BOOST_CHECK(StringValue("BCDE") == *ft.get("A"));
+ BOOST_CHECK(string("BCDE") == ft.getString("A"));
char buff[100];
Buffer wbuffer(buff, 100);
@@ -41,7 +41,7 @@ QPID_AUTO_TEST_CASE(testMe)
Buffer rbuffer(buff, 100);
FieldTable ft2;
rbuffer.get(ft2);
- BOOST_CHECK(StringValue("BCDE") == *ft2.get("A"));
+ BOOST_CHECK(string("BCDE") == ft2.getString("A"));
}
@@ -55,8 +55,8 @@ QPID_AUTO_TEST_CASE(testAssignment)
b = a;
a.setString("A", "CCCC");
- BOOST_CHECK(StringValue("CCCC") == *a.get("A"));
- BOOST_CHECK(StringValue("BBBB") == *b.get("A"));
+ BOOST_CHECK(string("CCCC") == a.getString("A"));
+ BOOST_CHECK(string("BBBB") == b.getString("A"));
BOOST_CHECK_EQUAL(1234, a.getInt("B"));
BOOST_CHECK_EQUAL(1234, b.getInt("B"));
BOOST_CHECK(IntegerValue(1234) == *a.get("B"));
@@ -74,10 +74,10 @@ QPID_AUTO_TEST_CASE(testAssignment)
Buffer rbuffer(buff, c.size());
rbuffer.get(d);
BOOST_CHECK_EQUAL(c, d);
- BOOST_CHECK(StringValue("CCCC") == *c.get("A"));
+ BOOST_CHECK(string("CCCC") == c.getString("A"));
BOOST_CHECK(IntegerValue(1234) == *c.get("B"));
}
- BOOST_CHECK(StringValue("CCCC") == *d.get("A"));
+ BOOST_CHECK(string("CCCC") == d.getString("A"));
BOOST_CHECK(IntegerValue(1234) == *d.get("B"));
}
diff --git a/qpid/cpp/src/tests/MessageTest.cpp b/qpid/cpp/src/tests/MessageTest.cpp
index d7688c74a9..f642658825 100644
--- a/qpid/cpp/src/tests/MessageTest.cpp
+++ b/qpid/cpp/src/tests/MessageTest.cpp
@@ -87,7 +87,7 @@ class MessageTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL((uint64_t) data1.size() + data2.size(), msg->contentSize());
CPPUNIT_ASSERT_EQUAL((uint64_t) data1.size() + data2.size(), msg->getProperties<MessageProperties>()->getContentLength());
CPPUNIT_ASSERT_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId());
- CPPUNIT_ASSERT(StringValue("xyz") == *msg->getProperties<MessageProperties>()->getApplicationHeaders().get("abc"));
+ CPPUNIT_ASSERT_EQUAL(string("xyz"), msg->getProperties<MessageProperties>()->getApplicationHeaders().getString("abc"));
CPPUNIT_ASSERT_EQUAL((uint8_t) PERSISTENT, msg->getProperties<DeliveryProperties>()->getDeliveryMode());
CPPUNIT_ASSERT(msg->isPersistent());
}
diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py
index 28ebc4a24d..dbd244bcda 100755
--- a/qpid/cpp/src/tests/federation.py
+++ b/qpid/cpp/src/tests/federation.py
@@ -19,10 +19,10 @@
#
import sys
-from qpid.testlib import TestBase, testrunner
+from qpid.testlib import TestBase010, testrunner
from qpid.management import managementChannel, managementClient
+from qpid.datatypes import Message
from qpid.queue import Empty
-from qpid.content import Content
def scan_args(name, default=None, args=sys.argv[1:]):
@@ -51,9 +51,9 @@ def remote_port():
class Helper:
def __init__(self, parent):
self.parent = parent
- self.channel = parent.client.channel(2)
- self.mc = managementClient(self.channel.spec)
- self.mch = self.mc.addChannel(self.channel)
+ self.session = parent.conn.session("2")
+ self.mc = managementClient(self.session.spec)
+ self.mch = self.mc.addChannel(self.session)
self.mc.syncWaitForStable(self.mch)
def get_objects(self, type):
@@ -75,7 +75,7 @@ class Helper:
def assertEqual(self, a, b):
self.parent.assertEqual(a, b)
-class FederationTests(TestBase):
+class FederationTests(TestBase010):
def test_bridge_create_and_close(self):
mgmt = Helper(self)
@@ -94,8 +94,8 @@ class FederationTests(TestBase):
mgmt.call_method(link, "close")
self.assertEqual(len(mgmt.get_objects("link")), 0)
- def test_pull_from_exchange(self):
- channel = self.channel
+ def DISABLED_test_pull_from_exchange(self):
+ session = self.session
mgmt = Helper(self)
broker = mgmt.get_object("broker")
@@ -107,18 +107,18 @@ class FederationTests(TestBase):
bridge = mgmt.get_object("bridge")
#setup queue to receive messages from local broker
- channel.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
- channel.queue_bind(queue="fed1", exchange="amq.fanout")
+ session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="fed1", exchange="amq.fanout")
self.subscribe(queue="fed1", destination="f1")
- queue = self.client.queue("f1")
+ queue = session.incoming("f1")
#send messages to remote broker and confirm it is routed to local broker
r_conn = self.connect(host=remote_host(), port=remote_port())
- r_channel = r_conn.channel(1)
- r_channel.session_open()
+ r_session = r_conn.session("1")
for i in range(1, 11):
- r_channel.message_transfer(destination="amq.direct", content=Content(properties={'routing_key' : "my-key"}, body="Message %d" % i))
+ dp = r_session.delivery_properties(routing_key="my-key")
+ r_session.message_transfer(destination="amq.direct", message=Message(dp, "Message %d" % i))
for i in range(1, 11):
msg = queue.get(timeout=5)
@@ -135,22 +135,22 @@ class FederationTests(TestBase):
mgmt.call_method(link, "close")
self.assertEqual(len(mgmt.get_objects("link")), 0)
- def test_pull_from_queue(self):
- channel = self.channel
+ def DISABLED_test_pull_from_queue(self):
+ session = self.session
#setup queue on remote broker and add some messages
r_conn = self.connect(host=remote_host(), port=remote_port())
- r_channel = r_conn.channel(1)
- r_channel.session_open()
- r_channel.queue_declare(queue="my-bridge-queue", exclusive=True, auto_delete=True)
+ r_session = r_conn.session("1")
+ r_session.queue_declare(queue="my-bridge-queue", exclusive=True, auto_delete=True)
for i in range(1, 6):
- r_channel.message_transfer(content=Content(properties={'routing_key' : "my-bridge-queue"}, body="Message %d" % i))
+ dp = r_session.delivery_properties(routing_key="my-bridge-queue")
+ r_session.message_transfer(message=Message(dp, "Message %d" % i))
#setup queue to receive messages from local broker
- channel.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
- channel.queue_bind(queue="fed1", exchange="amq.fanout")
+ session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="fed1", exchange="amq.fanout")
self.subscribe(queue="fed1", destination="f1")
- queue = self.client.queue("f1")
+ queue = session.incoming("f1")
mgmt = Helper(self)
broker = mgmt.get_object("broker")
@@ -163,7 +163,8 @@ class FederationTests(TestBase):
#add some more messages (i.e. after bridge was created)
for i in range(6, 11):
- r_channel.message_transfer(content=Content(properties={'routing_key' : "my-bridge-queue"}, body="Message %d" % i))
+ dp = r_session.delivery_properties(routing_key="my-bridge-queue")
+ r_session.message_transfer(message=Message(dp, "Message %d" % i))
for i in range(1, 11):
msg = queue.get(timeout=5)
diff --git a/qpid/cpp/src/tests/run_federation_tests b/qpid/cpp/src/tests/run_federation_tests
index 1f5917af0e..4a5fd39a41 100755
--- a/qpid/cpp/src/tests/run_federation_tests
+++ b/qpid/cpp/src/tests/run_federation_tests
@@ -19,6 +19,6 @@ if test -d ../../../python ; then
start_brokers
echo "Running federation tests using brokers on ports $LOCAL_PORT $REMOTE_PORT"
export PYTHONPATH=../../../python
- ./federation.py -v -s ../../../specs/amqp.0-10-preview.xml -b localhost:$LOCAL_PORT --remote-port $REMOTE_PORT || { echo "FAIL federation tests"; exit 1; }
+ ./federation.py -v -s ../../../specs/amqp.0-10-qpid-errata.xml -b localhost:$LOCAL_PORT --remote-port $REMOTE_PORT || { echo "FAIL federation tests"; exit 1; }
fi
diff --git a/qpid/cpp/src/tests/topic_listener.cpp b/qpid/cpp/src/tests/topic_listener.cpp
index 3dd042605e..f1007ee4c2 100644
--- a/qpid/cpp/src/tests/topic_listener.cpp
+++ b/qpid/cpp/src/tests/topic_listener.cpp
@@ -154,11 +154,11 @@ void Listener::received(Message& message){
init = true;
cout << "Batch started." << endl;
}
- FieldTable::ValuePtr type(message.getHeaders().get("TYPE"));
+ string type = message.getHeaders().getString("TYPE");
- if(!!type && StringValue("TERMINATION_REQUEST") == *type){
+ if(string("TERMINATION_REQUEST") == type){
shutdown();
- }else if(!!type && StringValue("REPORT_REQUEST") == *type){
+ }else if(string("REPORT_REQUEST") == type){
mgr.getAckPolicy().ackOutstanding(session);//acknowledge everything upto this point
cout <<"Batch ended, sending report." << endl;
//send a report:
diff --git a/qpid/python/qpid/client.py b/qpid/python/qpid/client.py
index 13c3f13fe5..4605710de8 100644
--- a/qpid/python/qpid/client.py
+++ b/qpid/python/qpid/client.py
@@ -25,7 +25,7 @@ interacting with the server.
import os, threading
from peer import Peer, Channel, Closed
from delegate import Delegate
-from connection import Connection, Frame, connect
+from connection08 import Connection, Frame, connect
from spec import load
from queue import Queue
from reference import ReferenceId, References
diff --git a/qpid/python/qpid/codec.py b/qpid/python/qpid/codec.py
index dfa74b6a2f..8026b209dc 100644
--- a/qpid/python/qpid/codec.py
+++ b/qpid/python/qpid/codec.py
@@ -26,7 +26,7 @@ fields.
The unit test for this module is located in tests/codec.py
"""
-import re, qpid, spec
+import re, qpid, spec08
from cStringIO import StringIO
from struct import *
from reference import ReferenceId
@@ -156,7 +156,7 @@ class Codec:
"""
calls the appropriate encode function e.g. encode_octet, encode_short etc.
"""
- if isinstance(type, spec.Struct):
+ if isinstance(type, spec08.Struct):
self.encode_struct(type, value)
else:
getattr(self, "encode_" + type)(value)
@@ -165,7 +165,7 @@ class Codec:
"""
calls the appropriate decode function e.g. decode_octet, decode_short etc.
"""
- if isinstance(type, spec.Struct):
+ if isinstance(type, spec08.Struct):
return self.decode_struct(type)
else:
return getattr(self, "decode_" + type)()
diff --git a/qpid/python/qpid/codec010.py b/qpid/python/qpid/codec010.py
index f82a7a49dc..27fcd5d418 100644
--- a/qpid/python/qpid/codec010.py
+++ b/qpid/python/qpid/codec010.py
@@ -221,12 +221,18 @@ class Codec(Packer):
attr = "write_uint%d" % (width*8)
getattr(self, attr)(n)
+ def read_uuid(self):
+ return self.unpack("16s")
+
def write_uuid(self, s):
self.pack("16s", s)
- def read_uuid(self):
+ def read_bin128(self):
return self.unpack("16s")
+ def write_bin128(self, b):
+ self.pack("16s", b)
+
class StringCodec(Codec):
diff --git a/qpid/python/qpid/connection.py b/qpid/python/qpid/connection.py
index eafad7067a..dc72cd9cb8 100644
--- a/qpid/python/qpid/connection.py
+++ b/qpid/python/qpid/connection.py
@@ -17,467 +17,165 @@
# under the License.
#
-"""
-A Connection class containing socket code that uses the spec metadata
-to read and write Frame objects. This could be used by a client,
-server, or even a proxy implementation.
-"""
+import datatypes, session
+from threading import Thread, Condition, RLock
+from util import wait
+from framer import Closed
+from assembler import Assembler, Segment
+from codec010 import StringCodec
+from session import Session
+from invoker import Invoker
+from spec010 import Control, Command
+from exceptions import *
+from logging import getLogger
+import delegates
-import socket, codec, logging, qpid
-from cStringIO import StringIO
-from spec import load
-from codec import EOF
+class ChannelBusy(Exception): pass
-class SockIO:
+class ChannelsBusy(Exception): pass
- def __init__(self, sock):
- self.sock = sock
+class SessionBusy(Exception): pass
- def write(self, buf):
-# print "OUT: %r" % buf
- self.sock.sendall(buf)
+def client(*args):
+ return delegates.Client(*args)
- def read(self, n):
- data = ""
- while len(data) < n:
- try:
- s = self.sock.recv(n - len(data))
- except socket.error:
- break
- if len(s) == 0:
- break
-# print "IN: %r" % s
- data += s
- return data
-
- def flush(self):
- pass
-
- def close(self):
- self.sock.shutdown(socket.SHUT_RDWR)
+def server(*args):
+ return delegates.Server(*args)
-def connect(host, port):
- sock = socket.socket()
- sock.connect((host, port))
- sock.setblocking(1)
- return SockIO(sock)
+class Connection(Assembler):
-def listen(host, port, predicate = lambda: True):
- sock = socket.socket()
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.bind((host, port))
- sock.listen(5)
- while predicate():
- s, a = sock.accept()
- yield SockIO(s)
-
-class Connection:
-
- def __init__(self, io, spec):
- self.codec = codec.Codec(io, spec)
+ def __init__(self, sock, spec, delegate=client):
+ Assembler.__init__(self, sock)
self.spec = spec
- self.FRAME_END = self.spec.constants.byname["frame_end"].id
- self.write = getattr(self, "write_%s_%s" % (self.spec.major, self.spec.minor))
- self.read = getattr(self, "read_%s_%s" % (self.spec.major, self.spec.minor))
-
- def flush(self):
- self.codec.flush()
-
- INIT="!4s4B"
+ self.track = self.spec["track"]
- def init(self):
- self.codec.pack(Connection.INIT, "AMQP", 1, 1, self.spec.major,
- self.spec.minor)
+ self.lock = RLock()
+ self.attached = {}
+ self.sessions = {}
- def tini(self):
- self.codec.unpack(Connection.INIT)
+ self.condition = Condition()
+ self.opened = False
- def write_8_0(self, frame):
- c = self.codec
- c.encode_octet(self.spec.constants.byname[frame.type].id)
- c.encode_short(frame.channel)
- body = StringIO()
- enc = codec.Codec(body, self.spec)
- frame.encode(enc)
- enc.flush()
- c.encode_longstr(body.getvalue())
- c.encode_octet(self.FRAME_END)
+ self.thread = Thread(target=self.run)
+ self.thread.setDaemon(True)
- def read_8_0(self):
- c = self.codec
- type = self.spec.constants.byid[c.decode_octet()].name
- channel = c.decode_short()
- body = c.decode_longstr()
- dec = codec.Codec(StringIO(body), self.spec)
- frame = Frame.DECODERS[type].decode(self.spec, dec, len(body))
- frame.channel = channel
- end = c.decode_octet()
- if end != self.FRAME_END:
- garbage = ""
- while end != self.FRAME_END:
- garbage += chr(end)
- end = c.decode_octet()
- raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage)
- return frame
+ self.channel_max = 65535
- def write_0_10(self, frame):
- c = self.codec
- flags = 0
- if frame.bof: flags |= 0x08
- if frame.eof: flags |= 0x04
- if frame.bos: flags |= 0x02
- if frame.eos: flags |= 0x01
+ self.delegate = delegate(self)
- c.encode_octet(flags) # TODO: currently fixed at ver=0, B=E=b=e=1
- c.encode_octet(self.spec.constants.byname[frame.type].id)
- body = StringIO()
- enc = codec.Codec(body, self.spec)
- frame.encode(enc)
- enc.flush()
- frame_size = len(body.getvalue()) + 12 # TODO: Magic number (frame header size)
- c.encode_short(frame_size)
- c.encode_octet(0) # Reserved
- c.encode_octet(frame.subchannel & 0x0f)
- c.encode_short(frame.channel)
- c.encode_long(0) # Reserved
- c.write(body.getvalue())
- c.encode_octet(self.FRAME_END)
-
- def read_0_10(self):
- c = self.codec
- flags = c.decode_octet() # TODO: currently ignoring flags
- framing_version = (flags & 0xc0) >> 6
- if framing_version != 0:
- raise "frame error: unknown framing version"
- type = self.spec.constants.byid[c.decode_octet()].name
- frame_size = c.decode_short()
- if frame_size < 12: # TODO: Magic number (frame header size)
- raise "frame error: frame size too small"
- reserved1 = c.decode_octet()
- field = c.decode_octet()
- subchannel = field & 0x0f
- channel = c.decode_short()
- reserved2 = c.decode_long() # TODO: reserved maybe need to ensure 0
- if (flags & 0x30) != 0 or reserved1 != 0 or (field & 0xf0) != 0:
- raise "frame error: reserved bits not all zero"
- body_size = frame_size - 12 # TODO: Magic number (frame header size)
- body = c.read(body_size)
- dec = codec.Codec(StringIO(body), self.spec)
+ def attach(self, name, ch, delegate, force=False):
+ self.lock.acquire()
try:
- frame = Frame.DECODERS[type].decode(self.spec, dec, len(body))
- except EOF:
- raise "truncated frame body: %r" % body
- frame.channel = channel
- frame.subchannel = subchannel
- end = c.decode_octet()
- if end != self.FRAME_END:
- garbage = ""
- while end != self.FRAME_END:
- garbage += chr(end)
- end = c.decode_octet()
- raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage)
- return frame
-
- def write_99_0(self, frame):
- self.write_0_10(frame)
-
- def read_99_0(self):
- return self.read_0_10()
-
-class Frame:
-
- DECODERS = {}
-
- class __metaclass__(type):
-
- def __new__(cls, name, bases, dict):
- for attr in ("encode", "decode", "type"):
- if not dict.has_key(attr):
- raise TypeError("%s must define %s" % (name, attr))
- dict["decode"] = staticmethod(dict["decode"])
- if dict.has_key("__init__"):
- __init__ = dict["__init__"]
- def init(self, *args, **kwargs):
- args = list(args)
- self.init(args, kwargs)
- __init__(self, *args, **kwargs)
- dict["__init__"] = init
- t = type.__new__(cls, name, bases, dict)
- if t.type != None:
- Frame.DECODERS[t.type] = t
- return t
-
- type = None
-
- def init(self, args, kwargs):
- self.channel = kwargs.pop("channel", 0)
- self.subchannel = kwargs.pop("subchannel", 0)
- self.bos = True
- self.eos = True
- self.bof = True
- self.eof = True
-
- def encode(self, enc): abstract
-
- def decode(spec, dec, size): abstract
-
-class Method(Frame):
-
- type = "frame_method"
-
- def __init__(self, method, args):
- if len(args) != len(method.fields):
- argspec = ["%s: %s" % (f.name, f.type)
- for f in method.fields]
- raise TypeError("%s.%s expecting (%s), got %s" %
- (method.klass.name, method.name, ", ".join(argspec),
- args))
- self.method = method
- self.method_type = method
- self.args = args
- self.eof = not method.content
-
- def encode(self, c):
- version = (c.spec.major, c.spec.minor)
- if version == (0, 10) or version == (99, 0):
- c.encode_octet(self.method.klass.id)
- c.encode_octet(self.method.id)
- else:
- c.encode_short(self.method.klass.id)
- c.encode_short(self.method.id)
- for field, arg in zip(self.method.fields, self.args):
- c.encode(field.type, arg)
-
- def decode(spec, c, size):
- version = (c.spec.major, c.spec.minor)
- if version == (0, 10) or version == (99, 0):
- klass = spec.classes.byid[c.decode_octet()]
- meth = klass.methods.byid[c.decode_octet()]
- else:
- klass = spec.classes.byid[c.decode_short()]
- meth = klass.methods.byid[c.decode_short()]
- args = tuple([c.decode(f.type) for f in meth.fields])
- return Method(meth, args)
-
- def __str__(self):
- return "[%s] %s %s" % (self.channel, self.method,
- ", ".join([str(a) for a in self.args]))
-
-class Request(Frame):
-
- type = "frame_request"
-
- def __init__(self, id, response_mark, method):
- self.id = id
- self.response_mark = response_mark
- self.method = method
- self.method_type = method.method_type
- self.args = method.args
-
- def encode(self, enc):
- enc.encode_longlong(self.id)
- enc.encode_longlong(self.response_mark)
- # reserved
- enc.encode_long(0)
- self.method.encode(enc)
-
- def decode(spec, dec, size):
- id = dec.decode_longlong()
- mark = dec.decode_longlong()
- # reserved
- dec.decode_long()
- method = Method.decode(spec, dec, size - 20)
- return Request(id, mark, method)
-
- def __str__(self):
- return "[%s] Request(%s) %s" % (self.channel, self.id, self.method)
-
-class Response(Frame):
-
- type = "frame_response"
-
- def __init__(self, id, request_id, batch_offset, method):
- self.id = id
- self.request_id = request_id
- self.batch_offset = batch_offset
- self.method = method
- self.method_type = method.method_type
- self.args = method.args
-
- def encode(self, enc):
- enc.encode_longlong(self.id)
- enc.encode_longlong(self.request_id)
- enc.encode_long(self.batch_offset)
- self.method.encode(enc)
-
- def decode(spec, dec, size):
- id = dec.decode_longlong()
- request_id = dec.decode_longlong()
- batch_offset = dec.decode_long()
- method = Method.decode(spec, dec, size - 20)
- return Response(id, request_id, batch_offset, method)
-
- def __str__(self):
- return "[%s] Response(%s,%s,%s) %s" % (self.channel, self.id, self.request_id, self.batch_offset, self.method)
-
-def uses_struct_encoding(spec):
- return (spec.major == 0 and spec.minor == 10) or (spec.major == 99 and spec.minor == 0)
-
-class Header(Frame):
-
- type = "frame_header"
-
- def __init__(self, klass, weight, size, properties):
- self.klass = klass
- self.weight = weight
- self.size = size
- self.properties = properties
- self.eof = size == 0
- self.bof = False
-
- def __getitem__(self, name):
- return self.properties[name]
-
- def __setitem__(self, name, value):
- self.properties[name] = value
-
- def __delitem__(self, name):
- del self.properties[name]
-
- def encode(self, c):
- if uses_struct_encoding(c.spec):
- self.encode_structs(c)
- else:
- self.encode_legacy(c)
-
- def encode_structs(self, c):
- # XXX
- structs = [qpid.Struct(c.spec.domains.byname["delivery_properties"].type),
- qpid.Struct(c.spec.domains.byname["message_properties"].type)]
-
- # XXX
- props = self.properties.copy()
- for k in self.properties:
- for s in structs:
- if s.exists(k):
- s.set(k, props.pop(k))
- if props:
- raise TypeError("no such property: %s" % (", ".join(props)))
-
- # message properties store the content-length now, and weight is
- # deprecated
- if self.size != None:
- structs[1].content_length = self.size
-
- for s in structs:
- c.encode_long_struct(s)
-
- def encode_legacy(self, c):
- c.encode_short(self.klass.id)
- c.encode_short(self.weight)
- c.encode_longlong(self.size)
-
- # property flags
- nprops = len(self.klass.fields)
- flags = 0
- for i in range(nprops):
- f = self.klass.fields.items[i]
- flags <<= 1
- if self.properties.get(f.name) != None:
- flags |= 1
- # the last bit indicates more flags
- if i > 0 and (i % 15) == 0:
- flags <<= 1
- if nprops > (i + 1):
- flags |= 1
- c.encode_short(flags)
- flags = 0
- flags <<= ((16 - (nprops % 15)) % 16)
- c.encode_short(flags)
-
- # properties
- for f in self.klass.fields:
- v = self.properties.get(f.name)
- if v != None:
- c.encode(f.type, v)
-
- def decode(spec, c, size):
- if uses_struct_encoding(spec):
- return Header.decode_structs(spec, c, size)
+ ssn = self.attached.get(ch.id)
+ if ssn is not None:
+ if ssn.name != name:
+ raise ChannelBusy(ch, ssn)
+ else:
+ ssn = self.sessions.get(name)
+ if ssn is None:
+ ssn = Session(name, self.spec, delegate=delegate)
+ self.sessions[name] = ssn
+ elif ssn.channel is not None:
+ if force:
+ del self.attached[ssn.channel.id]
+ ssn.channel = None
+ else:
+ raise SessionBusy(ssn)
+ self.attached[ch.id] = ssn
+ ssn.channel = ch
+ ch.session = ssn
+ return ssn
+ finally:
+ self.lock.release()
+
+ def detach(self, name, ch):
+ self.lock.acquire()
+ try:
+ self.attached.pop(ch.id, None)
+ ssn = self.sessions.pop(name, None)
+ if ssn is not None:
+ ssn.channel = None
+ return ssn
+ finally:
+ self.lock.release()
+
+ def __channel(self):
+ # XXX: ch 0?
+ for i in xrange(self.channel_max):
+ if not self.attached.has_key(i):
+ return i
else:
- return Header.decode_legacy(spec, c, size)
+ raise ChannelsBusy()
- @staticmethod
- def decode_structs(spec, c, size):
- structs = []
- start = c.nread
- while c.nread - start < size:
- structs.append(c.decode_long_struct())
-
- # XXX
- props = {}
- length = None
- for s in structs:
- for f in s.type.fields:
- if s.has(f.name):
- props[f.name] = s.get(f.name)
- if f.name == "content_length":
- length = s.get(f.name)
- return Header(None, 0, length, props)
-
- @staticmethod
- def decode_legacy(spec, c, size):
- klass = spec.classes.byid[c.decode_short()]
- weight = c.decode_short()
- size = c.decode_longlong()
-
- # property flags
- bits = []
+ def session(self, name, timeout=None, delegate=session.client):
+ self.lock.acquire()
+ try:
+ ch = Channel(self, self.__channel())
+ ssn = self.attach(name, ch, delegate)
+ ssn.channel.session_attach(name)
+ if wait(ssn.condition, lambda: ssn.channel is not None, timeout):
+ return ssn
+ else:
+ self.detach(name, ch)
+ raise Timeout()
+ finally:
+ self.lock.release()
+
+ def start(self, timeout=None):
+ self.delegate.start()
+ self.thread.start()
+ if not wait(self.condition, lambda: self.opened, timeout):
+ raise Timeout()
+
+ def run(self):
+ # XXX: we don't really have a good way to exit this loop without
+ # getting the other end to kill the socket
while True:
- flags = c.decode_short()
- for i in range(15, 0, -1):
- if flags >> i & 0x1 != 0:
- bits.append(True)
- else:
- bits.append(False)
- if flags & 0x1 == 0:
+ try:
+ seg = self.read_segment()
+ except Closed:
break
+ self.delegate.received(seg)
- # properties
- properties = {}
- for b, f in zip(bits, klass.fields):
- if b:
- # Note: decode returns a unicode u'' string but only
- # plain '' strings can be used as keywords so we need to
- # stringify the names.
- properties[str(f.name)] = c.decode(f.type)
- return Header(klass, weight, size, properties)
+ def close(self, timeout=None):
+ if not self.opened: return
+ Channel(self, 0).connection_close(200)
+ if not wait(self.condition, lambda: not self.opened, timeout):
+ raise Timeout()
+ self.thread.join(timeout=timeout)
def __str__(self):
- return "%s %s %s %s" % (self.klass, self.weight, self.size,
- self.properties)
+ return "%s:%s" % self.sock.getsockname()
+
+ def __repr__(self):
+ return str(self)
-class Body(Frame):
+log = getLogger("qpid.io.ctl")
- type = "frame_body"
+class Channel(Invoker):
- def __init__(self, content):
- self.content = content
- self.eof = True
- self.bof = False
+ def __init__(self, connection, id):
+ self.connection = connection
+ self.id = id
+ self.session = None
- def encode(self, enc):
- enc.write(self.content)
+ def resolve_method(self, name):
+ inst = self.connection.spec.instructions.get(name)
+ if inst is not None and isinstance(inst, Control):
+ return inst
+ else:
+ return None
- def decode(spec, dec, size):
- return Body(dec.read(size))
+ def invoke(self, type, args, kwargs):
+ ctl = type.new(args, kwargs)
+ sc = StringCodec(self.connection.spec)
+ sc.write_control(ctl)
+ self.connection.write_segment(Segment(True, True, type.segment_type,
+ type.track, self.id, sc.encoded))
+ log.debug("SENT %s", ctl)
def __str__(self):
- return "Body(%r)" % self.content
+ return "%s[%s]" % (self.connection, self.id)
-# TODO:
-# OOB_METHOD = "frame_oob_method"
-# OOB_HEADER = "frame_oob_header"
-# OOB_BODY = "frame_oob_body"
-# TRACE = "frame_trace"
-# HEARTBEAT = "frame_heartbeat"
+ def __repr__(self):
+ return str(self)
diff --git a/qpid/python/qpid/connection010.py b/qpid/python/qpid/connection010.py
deleted file mode 100644
index 59aa41c88d..0000000000
--- a/qpid/python/qpid/connection010.py
+++ /dev/null
@@ -1,181 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import datatypes, session
-from threading import Thread, Condition, RLock
-from util import wait
-from framer import Closed
-from assembler import Assembler, Segment
-from codec010 import StringCodec
-from session import Session
-from invoker import Invoker
-from spec010 import Control, Command
-from exceptions import *
-from logging import getLogger
-import delegates
-
-class ChannelBusy(Exception): pass
-
-class ChannelsBusy(Exception): pass
-
-class SessionBusy(Exception): pass
-
-def client(*args):
- return delegates.Client(*args)
-
-def server(*args):
- return delegates.Server(*args)
-
-class Connection(Assembler):
-
- def __init__(self, sock, spec, delegate=client):
- Assembler.__init__(self, sock)
- self.spec = spec
- self.track = self.spec["track"]
-
- self.lock = RLock()
- self.attached = {}
- self.sessions = {}
-
- self.condition = Condition()
- self.opened = False
-
- self.thread = Thread(target=self.run)
- self.thread.setDaemon(True)
-
- self.channel_max = 65535
-
- self.delegate = delegate(self)
-
- def attach(self, name, ch, delegate, force=False):
- self.lock.acquire()
- try:
- ssn = self.attached.get(ch.id)
- if ssn is not None:
- if ssn.name != name:
- raise ChannelBusy(ch, ssn)
- else:
- ssn = self.sessions.get(name)
- if ssn is None:
- ssn = Session(name, self.spec, delegate=delegate)
- self.sessions[name] = ssn
- elif ssn.channel is not None:
- if force:
- del self.attached[ssn.channel.id]
- ssn.channel = None
- else:
- raise SessionBusy(ssn)
- self.attached[ch.id] = ssn
- ssn.channel = ch
- ch.session = ssn
- return ssn
- finally:
- self.lock.release()
-
- def detach(self, name, ch):
- self.lock.acquire()
- try:
- self.attached.pop(ch.id, None)
- ssn = self.sessions.pop(name, None)
- if ssn is not None:
- ssn.channel = None
- return ssn
- finally:
- self.lock.release()
-
- def __channel(self):
- # XXX: ch 0?
- for i in xrange(self.channel_max):
- if not self.attached.has_key(i):
- return i
- else:
- raise ChannelsBusy()
-
- def session(self, name, timeout=None, delegate=session.client):
- self.lock.acquire()
- try:
- ch = Channel(self, self.__channel())
- ssn = self.attach(name, ch, delegate)
- ssn.channel.session_attach(name)
- if wait(ssn.condition, lambda: ssn.channel is not None, timeout):
- return ssn
- else:
- self.detach(name, ch)
- raise Timeout()
- finally:
- self.lock.release()
-
- def start(self, timeout=None):
- self.delegate.start()
- self.thread.start()
- if not wait(self.condition, lambda: self.opened, timeout):
- raise Timeout()
-
- def run(self):
- # XXX: we don't really have a good way to exit this loop without
- # getting the other end to kill the socket
- while True:
- try:
- seg = self.read_segment()
- except Closed:
- break
- self.delegate.received(seg)
-
- def close(self, timeout=None):
- if not self.opened: return
- Channel(self, 0).connection_close()
- if not wait(self.condition, lambda: not self.opened, timeout):
- raise Timeout()
- self.thread.join(timeout=timeout)
-
- def __str__(self):
- return "%s:%s" % self.sock.getsockname()
-
- def __repr__(self):
- return str(self)
-
-log = getLogger("qpid.io.ctl")
-
-class Channel(Invoker):
-
- def __init__(self, connection, id):
- self.connection = connection
- self.id = id
- self.session = None
-
- def resolve_method(self, name):
- inst = self.connection.spec.instructions.get(name)
- if inst is not None and isinstance(inst, Control):
- return inst
- else:
- return None
-
- def invoke(self, type, args, kwargs):
- ctl = type.new(args, kwargs)
- sc = StringCodec(self.connection.spec)
- sc.write_control(ctl)
- self.connection.write_segment(Segment(True, True, type.segment_type,
- type.track, self.id, sc.encoded))
- log.debug("SENT %s", ctl)
-
- def __str__(self):
- return "%s[%s]" % (self.connection, self.id)
-
- def __repr__(self):
- return str(self)
diff --git a/qpid/python/qpid/connection08.py b/qpid/python/qpid/connection08.py
new file mode 100644
index 0000000000..eafad7067a
--- /dev/null
+++ b/qpid/python/qpid/connection08.py
@@ -0,0 +1,483 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+A Connection class containing socket code that uses the spec metadata
+to read and write Frame objects. This could be used by a client,
+server, or even a proxy implementation.
+"""
+
+import socket, codec, logging, qpid
+from cStringIO import StringIO
+from spec import load
+from codec import EOF
+
+class SockIO:
+
+ def __init__(self, sock):
+ self.sock = sock
+
+ def write(self, buf):
+# print "OUT: %r" % buf
+ self.sock.sendall(buf)
+
+ def read(self, n):
+ data = ""
+ while len(data) < n:
+ try:
+ s = self.sock.recv(n - len(data))
+ except socket.error:
+ break
+ if len(s) == 0:
+ break
+# print "IN: %r" % s
+ data += s
+ return data
+
+ def flush(self):
+ pass
+
+ def close(self):
+ self.sock.shutdown(socket.SHUT_RDWR)
+
+def connect(host, port):
+ sock = socket.socket()
+ sock.connect((host, port))
+ sock.setblocking(1)
+ return SockIO(sock)
+
+def listen(host, port, predicate = lambda: True):
+ sock = socket.socket()
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind((host, port))
+ sock.listen(5)
+ while predicate():
+ s, a = sock.accept()
+ yield SockIO(s)
+
+class Connection:
+
+ def __init__(self, io, spec):
+ self.codec = codec.Codec(io, spec)
+ self.spec = spec
+ self.FRAME_END = self.spec.constants.byname["frame_end"].id
+ self.write = getattr(self, "write_%s_%s" % (self.spec.major, self.spec.minor))
+ self.read = getattr(self, "read_%s_%s" % (self.spec.major, self.spec.minor))
+
+ def flush(self):
+ self.codec.flush()
+
+ INIT="!4s4B"
+
+ def init(self):
+ self.codec.pack(Connection.INIT, "AMQP", 1, 1, self.spec.major,
+ self.spec.minor)
+
+ def tini(self):
+ self.codec.unpack(Connection.INIT)
+
+ def write_8_0(self, frame):
+ c = self.codec
+ c.encode_octet(self.spec.constants.byname[frame.type].id)
+ c.encode_short(frame.channel)
+ body = StringIO()
+ enc = codec.Codec(body, self.spec)
+ frame.encode(enc)
+ enc.flush()
+ c.encode_longstr(body.getvalue())
+ c.encode_octet(self.FRAME_END)
+
+ def read_8_0(self):
+ c = self.codec
+ type = self.spec.constants.byid[c.decode_octet()].name
+ channel = c.decode_short()
+ body = c.decode_longstr()
+ dec = codec.Codec(StringIO(body), self.spec)
+ frame = Frame.DECODERS[type].decode(self.spec, dec, len(body))
+ frame.channel = channel
+ end = c.decode_octet()
+ if end != self.FRAME_END:
+ garbage = ""
+ while end != self.FRAME_END:
+ garbage += chr(end)
+ end = c.decode_octet()
+ raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage)
+ return frame
+
+ def write_0_10(self, frame):
+ c = self.codec
+ flags = 0
+ if frame.bof: flags |= 0x08
+ if frame.eof: flags |= 0x04
+ if frame.bos: flags |= 0x02
+ if frame.eos: flags |= 0x01
+
+ c.encode_octet(flags) # TODO: currently fixed at ver=0, B=E=b=e=1
+ c.encode_octet(self.spec.constants.byname[frame.type].id)
+ body = StringIO()
+ enc = codec.Codec(body, self.spec)
+ frame.encode(enc)
+ enc.flush()
+ frame_size = len(body.getvalue()) + 12 # TODO: Magic number (frame header size)
+ c.encode_short(frame_size)
+ c.encode_octet(0) # Reserved
+ c.encode_octet(frame.subchannel & 0x0f)
+ c.encode_short(frame.channel)
+ c.encode_long(0) # Reserved
+ c.write(body.getvalue())
+ c.encode_octet(self.FRAME_END)
+
+ def read_0_10(self):
+ c = self.codec
+ flags = c.decode_octet() # TODO: currently ignoring flags
+ framing_version = (flags & 0xc0) >> 6
+ if framing_version != 0:
+ raise "frame error: unknown framing version"
+ type = self.spec.constants.byid[c.decode_octet()].name
+ frame_size = c.decode_short()
+ if frame_size < 12: # TODO: Magic number (frame header size)
+ raise "frame error: frame size too small"
+ reserved1 = c.decode_octet()
+ field = c.decode_octet()
+ subchannel = field & 0x0f
+ channel = c.decode_short()
+ reserved2 = c.decode_long() # TODO: reserved maybe need to ensure 0
+ if (flags & 0x30) != 0 or reserved1 != 0 or (field & 0xf0) != 0:
+ raise "frame error: reserved bits not all zero"
+ body_size = frame_size - 12 # TODO: Magic number (frame header size)
+ body = c.read(body_size)
+ dec = codec.Codec(StringIO(body), self.spec)
+ try:
+ frame = Frame.DECODERS[type].decode(self.spec, dec, len(body))
+ except EOF:
+ raise "truncated frame body: %r" % body
+ frame.channel = channel
+ frame.subchannel = subchannel
+ end = c.decode_octet()
+ if end != self.FRAME_END:
+ garbage = ""
+ while end != self.FRAME_END:
+ garbage += chr(end)
+ end = c.decode_octet()
+ raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage)
+ return frame
+
+ def write_99_0(self, frame):
+ self.write_0_10(frame)
+
+ def read_99_0(self):
+ return self.read_0_10()
+
+class Frame:
+
+ DECODERS = {}
+
+ class __metaclass__(type):
+
+ def __new__(cls, name, bases, dict):
+ for attr in ("encode", "decode", "type"):
+ if not dict.has_key(attr):
+ raise TypeError("%s must define %s" % (name, attr))
+ dict["decode"] = staticmethod(dict["decode"])
+ if dict.has_key("__init__"):
+ __init__ = dict["__init__"]
+ def init(self, *args, **kwargs):
+ args = list(args)
+ self.init(args, kwargs)
+ __init__(self, *args, **kwargs)
+ dict["__init__"] = init
+ t = type.__new__(cls, name, bases, dict)
+ if t.type != None:
+ Frame.DECODERS[t.type] = t
+ return t
+
+ type = None
+
+ def init(self, args, kwargs):
+ self.channel = kwargs.pop("channel", 0)
+ self.subchannel = kwargs.pop("subchannel", 0)
+ self.bos = True
+ self.eos = True
+ self.bof = True
+ self.eof = True
+
+ def encode(self, enc): abstract
+
+ def decode(spec, dec, size): abstract
+
+class Method(Frame):
+
+ type = "frame_method"
+
+ def __init__(self, method, args):
+ if len(args) != len(method.fields):
+ argspec = ["%s: %s" % (f.name, f.type)
+ for f in method.fields]
+ raise TypeError("%s.%s expecting (%s), got %s" %
+ (method.klass.name, method.name, ", ".join(argspec),
+ args))
+ self.method = method
+ self.method_type = method
+ self.args = args
+ self.eof = not method.content
+
+ def encode(self, c):
+ version = (c.spec.major, c.spec.minor)
+ if version == (0, 10) or version == (99, 0):
+ c.encode_octet(self.method.klass.id)
+ c.encode_octet(self.method.id)
+ else:
+ c.encode_short(self.method.klass.id)
+ c.encode_short(self.method.id)
+ for field, arg in zip(self.method.fields, self.args):
+ c.encode(field.type, arg)
+
+ def decode(spec, c, size):
+ version = (c.spec.major, c.spec.minor)
+ if version == (0, 10) or version == (99, 0):
+ klass = spec.classes.byid[c.decode_octet()]
+ meth = klass.methods.byid[c.decode_octet()]
+ else:
+ klass = spec.classes.byid[c.decode_short()]
+ meth = klass.methods.byid[c.decode_short()]
+ args = tuple([c.decode(f.type) for f in meth.fields])
+ return Method(meth, args)
+
+ def __str__(self):
+ return "[%s] %s %s" % (self.channel, self.method,
+ ", ".join([str(a) for a in self.args]))
+
+class Request(Frame):
+
+ type = "frame_request"
+
+ def __init__(self, id, response_mark, method):
+ self.id = id
+ self.response_mark = response_mark
+ self.method = method
+ self.method_type = method.method_type
+ self.args = method.args
+
+ def encode(self, enc):
+ enc.encode_longlong(self.id)
+ enc.encode_longlong(self.response_mark)
+ # reserved
+ enc.encode_long(0)
+ self.method.encode(enc)
+
+ def decode(spec, dec, size):
+ id = dec.decode_longlong()
+ mark = dec.decode_longlong()
+ # reserved
+ dec.decode_long()
+ method = Method.decode(spec, dec, size - 20)
+ return Request(id, mark, method)
+
+ def __str__(self):
+ return "[%s] Request(%s) %s" % (self.channel, self.id, self.method)
+
+class Response(Frame):
+
+ type = "frame_response"
+
+ def __init__(self, id, request_id, batch_offset, method):
+ self.id = id
+ self.request_id = request_id
+ self.batch_offset = batch_offset
+ self.method = method
+ self.method_type = method.method_type
+ self.args = method.args
+
+ def encode(self, enc):
+ enc.encode_longlong(self.id)
+ enc.encode_longlong(self.request_id)
+ enc.encode_long(self.batch_offset)
+ self.method.encode(enc)
+
+ def decode(spec, dec, size):
+ id = dec.decode_longlong()
+ request_id = dec.decode_longlong()
+ batch_offset = dec.decode_long()
+ method = Method.decode(spec, dec, size - 20)
+ return Response(id, request_id, batch_offset, method)
+
+ def __str__(self):
+ return "[%s] Response(%s,%s,%s) %s" % (self.channel, self.id, self.request_id, self.batch_offset, self.method)
+
+def uses_struct_encoding(spec):
+ return (spec.major == 0 and spec.minor == 10) or (spec.major == 99 and spec.minor == 0)
+
+class Header(Frame):
+
+ type = "frame_header"
+
+ def __init__(self, klass, weight, size, properties):
+ self.klass = klass
+ self.weight = weight
+ self.size = size
+ self.properties = properties
+ self.eof = size == 0
+ self.bof = False
+
+ def __getitem__(self, name):
+ return self.properties[name]
+
+ def __setitem__(self, name, value):
+ self.properties[name] = value
+
+ def __delitem__(self, name):
+ del self.properties[name]
+
+ def encode(self, c):
+ if uses_struct_encoding(c.spec):
+ self.encode_structs(c)
+ else:
+ self.encode_legacy(c)
+
+ def encode_structs(self, c):
+ # XXX
+ structs = [qpid.Struct(c.spec.domains.byname["delivery_properties"].type),
+ qpid.Struct(c.spec.domains.byname["message_properties"].type)]
+
+ # XXX
+ props = self.properties.copy()
+ for k in self.properties:
+ for s in structs:
+ if s.exists(k):
+ s.set(k, props.pop(k))
+ if props:
+ raise TypeError("no such property: %s" % (", ".join(props)))
+
+ # message properties store the content-length now, and weight is
+ # deprecated
+ if self.size != None:
+ structs[1].content_length = self.size
+
+ for s in structs:
+ c.encode_long_struct(s)
+
+ def encode_legacy(self, c):
+ c.encode_short(self.klass.id)
+ c.encode_short(self.weight)
+ c.encode_longlong(self.size)
+
+ # property flags
+ nprops = len(self.klass.fields)
+ flags = 0
+ for i in range(nprops):
+ f = self.klass.fields.items[i]
+ flags <<= 1
+ if self.properties.get(f.name) != None:
+ flags |= 1
+ # the last bit indicates more flags
+ if i > 0 and (i % 15) == 0:
+ flags <<= 1
+ if nprops > (i + 1):
+ flags |= 1
+ c.encode_short(flags)
+ flags = 0
+ flags <<= ((16 - (nprops % 15)) % 16)
+ c.encode_short(flags)
+
+ # properties
+ for f in self.klass.fields:
+ v = self.properties.get(f.name)
+ if v != None:
+ c.encode(f.type, v)
+
+ def decode(spec, c, size):
+ if uses_struct_encoding(spec):
+ return Header.decode_structs(spec, c, size)
+ else:
+ return Header.decode_legacy(spec, c, size)
+
+ @staticmethod
+ def decode_structs(spec, c, size):
+ structs = []
+ start = c.nread
+ while c.nread - start < size:
+ structs.append(c.decode_long_struct())
+
+ # XXX
+ props = {}
+ length = None
+ for s in structs:
+ for f in s.type.fields:
+ if s.has(f.name):
+ props[f.name] = s.get(f.name)
+ if f.name == "content_length":
+ length = s.get(f.name)
+ return Header(None, 0, length, props)
+
+ @staticmethod
+ def decode_legacy(spec, c, size):
+ klass = spec.classes.byid[c.decode_short()]
+ weight = c.decode_short()
+ size = c.decode_longlong()
+
+ # property flags
+ bits = []
+ while True:
+ flags = c.decode_short()
+ for i in range(15, 0, -1):
+ if flags >> i & 0x1 != 0:
+ bits.append(True)
+ else:
+ bits.append(False)
+ if flags & 0x1 == 0:
+ break
+
+ # properties
+ properties = {}
+ for b, f in zip(bits, klass.fields):
+ if b:
+ # Note: decode returns a unicode u'' string but only
+ # plain '' strings can be used as keywords so we need to
+ # stringify the names.
+ properties[str(f.name)] = c.decode(f.type)
+ return Header(klass, weight, size, properties)
+
+ def __str__(self):
+ return "%s %s %s %s" % (self.klass, self.weight, self.size,
+ self.properties)
+
+class Body(Frame):
+
+ type = "frame_body"
+
+ def __init__(self, content):
+ self.content = content
+ self.eof = True
+ self.bof = False
+
+ def encode(self, enc):
+ enc.write(self.content)
+
+ def decode(spec, dec, size):
+ return Body(dec.read(size))
+
+ def __str__(self):
+ return "Body(%r)" % self.content
+
+# TODO:
+# OOB_METHOD = "frame_oob_method"
+# OOB_HEADER = "frame_oob_header"
+# OOB_BODY = "frame_oob_body"
+# TRACE = "frame_trace"
+# HEARTBEAT = "frame_heartbeat"
diff --git a/qpid/python/qpid/delegate.py b/qpid/python/qpid/delegate.py
index c4c47592b4..b447c4aa29 100644
--- a/qpid/python/qpid/delegate.py
+++ b/qpid/python/qpid/delegate.py
@@ -22,7 +22,7 @@ Delegate implementation intended for use with the peer module.
"""
import threading, inspect, traceback, sys
-from connection import Method, Request, Response
+from connection08 import Method, Request, Response
def _handler_name(method):
return "%s_%s" % (method.klass.name, method.name)
diff --git a/qpid/python/qpid/delegates.py b/qpid/python/qpid/delegates.py
index 9df2cd04bd..c958313671 100644
--- a/qpid/python/qpid/delegates.py
+++ b/qpid/python/qpid/delegates.py
@@ -17,7 +17,7 @@
# under the License.
#
-import os, connection010, session
+import os, connection, session
from util import notify
from datatypes import RangedSet
from logging import getLogger
@@ -35,7 +35,7 @@ class Delegate:
def received(self, seg):
ssn = self.connection.attached.get(seg.channel)
if ssn is None:
- ch = connection010.Channel(self.connection, seg.channel)
+ ch = connection.Channel(self.connection, seg.channel)
else:
ch = ssn.channel
@@ -61,9 +61,9 @@ class Delegate:
try:
self.connection.attach(a.name, ch, self.delegate, a.force)
ch.session_attached(a.name)
- except connection010.ChannelBusy:
+ except connection.ChannelBusy:
ch.session_detached(a.name)
- except connection010.SessionBusy:
+ except connection.SessionBusy:
ch.session_detached(a.name)
def session_attached(self, ch, a):
@@ -105,7 +105,7 @@ class Server(Delegate):
def start(self):
self.connection.read_header()
self.connection.write_header(self.spec.major, self.spec.minor)
- connection010.Channel(self.connection, 0).connection_start()
+ connection.Channel(self.connection, 0).connection_start()
def connection_start_ok(self, ch, start_ok):
ch.connection_tune()
diff --git a/qpid/python/qpid/framer.py b/qpid/python/qpid/framer.py
index 11fe385d46..fb0e677cee 100644
--- a/qpid/python/qpid/framer.py
+++ b/qpid/python/qpid/framer.py
@@ -19,6 +19,7 @@
import struct, socket
from packer import Packer
+from threading import Lock
from logging import getLogger
raw = getLogger("qpid.io.raw")
@@ -75,6 +76,7 @@ class Framer(Packer):
def __init__(self, sock):
self.sock = sock
+ self.sock_lock = Lock()
def aborted(self):
return False
@@ -116,16 +118,24 @@ class Framer(Packer):
return self.unpack(Framer.HEADER)
def write_header(self, major, minor):
- self.pack(Framer.HEADER, "AMQP", 1, 1, major, minor)
+ self.sock_lock.acquire()
+ try:
+ self.pack(Framer.HEADER, "AMQP", 1, 1, major, minor)
+ finally:
+ self.sock_lock.release()
def write_frame(self, frame):
- size = len(frame.payload) + struct.calcsize(Frame.HEADER)
- track = frame.track & 0x0F
- self.pack(Frame.HEADER, frame.flags, frame.type, size, track, frame.channel)
- self.write(frame.payload)
- # XXX: NOT 0-10 FINAL, TEMPORARY WORKAROUND for C++
- self.write("\xCE")
- frm.debug("SENT %s", frame)
+ self.sock_lock.acquire()
+ try:
+ size = len(frame.payload) + struct.calcsize(Frame.HEADER)
+ track = frame.track & 0x0F
+ self.pack(Frame.HEADER, frame.flags, frame.type, size, track, frame.channel)
+ self.write(frame.payload)
+ # XXX: NOT 0-10 FINAL, TEMPORARY WORKAROUND for C++
+ self.write("\xCE")
+ frm.debug("SENT %s", frame)
+ finally:
+ self.sock_lock.release()
def read_frame(self):
flags, type, size, track, channel = self.unpack(Frame.HEADER)
diff --git a/qpid/python/qpid/management.py b/qpid/python/qpid/management.py
index 6b25d5ea08..3a7a564e19 100644
--- a/qpid/python/qpid/management.py
+++ b/qpid/python/qpid/management.py
@@ -25,12 +25,10 @@ import qpid
import struct
import socket
from threading import Thread
-from message import Message
+from datatypes import Message, RangedSet
from time import time
-from qpid.client import Client
-from qpid.content import Content
from cStringIO import StringIO
-from codec import Codec, EOF
+from codec010 import StringCodec as Codec
from threading import Lock, Condition
@@ -83,40 +81,39 @@ class methodResult:
class managementChannel:
""" This class represents a connection to an AMQP broker. """
- def __init__ (self, ch, topicCb, replyCb, cbContext, _detlife=0):
+ def __init__ (self, ssn, topicCb, replyCb, cbContext, _detlife=0):
""" Given a channel on an established AMQP broker connection, this method
opens a session and performs all of the declarations and bindings needed
to participate in the management protocol. """
- response = ch.session_open (detached_lifetime=_detlife)
- self.sessionId = response.session_id
- self.topicName = "mgmt-%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack ("!LHHHHL", response.session_id)
- self.replyName = "repl-%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack ("!LHHHHL", response.session_id)
- self.qpidChannel = ch
+ self.sessionId = ssn.name
+ self.topicName = "mgmt-%s" % self.sessionId
+ self.replyName = "repl-%s" % self.sessionId
+ self.qpidChannel = ssn
self.tcb = topicCb
self.rcb = replyCb
self.context = cbContext
self.reqsOutstanding = 0
- ch.queue_declare (queue=self.topicName, exclusive=True, auto_delete=True)
- ch.queue_declare (queue=self.replyName, exclusive=True, auto_delete=True)
+ ssn.queue_declare (queue=self.topicName, exclusive=True, auto_delete=True)
+ ssn.queue_declare (queue=self.replyName, exclusive=True, auto_delete=True)
- ch.queue_bind (exchange="qpid.management",
- queue=self.topicName, routing_key="mgmt.#")
- ch.queue_bind (exchange="amq.direct",
- queue=self.replyName, routing_key=self.replyName)
- ch.message_subscribe (queue=self.topicName, destination="tdest")
- ch.message_subscribe (queue=self.replyName, destination="rdest")
+ ssn.exchange_bind (exchange="qpid.management",
+ queue=self.topicName, binding_key="mgmt.#")
+ ssn.exchange_bind (exchange="amq.direct",
+ queue=self.replyName, binding_key=self.replyName)
+ ssn.message_subscribe (queue=self.topicName, destination="tdest")
+ ssn.message_subscribe (queue=self.replyName, destination="rdest")
- ch.client.queue ("tdest").listen (self.topicCb)
- ch.client.queue ("rdest").listen (self.replyCb)
+ ssn.incoming ("tdest").listen (self.topicCb)
+ ssn.incoming ("rdest").listen (self.replyCb)
- ch.message_flow_mode (destination="tdest", mode=1)
- ch.message_flow (destination="tdest", unit=0, value=0xFFFFFFFF)
- ch.message_flow (destination="tdest", unit=1, value=0xFFFFFFFF)
+ ssn.message_set_flow_mode (destination="tdest", flow_mode=1)
+ ssn.message_flow (destination="tdest", unit=0, value=0xFFFFFFFF)
+ ssn.message_flow (destination="tdest", unit=1, value=0xFFFFFFFF)
- ch.message_flow_mode (destination="rdest", mode=1)
- ch.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF)
- ch.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF)
+ ssn.message_set_flow_mode (destination="rdest", flow_mode=1)
+ ssn.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF)
+ ssn.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF)
def topicCb (self, msg):
""" Receive messages via the topic queue on this channel. """
@@ -127,7 +124,18 @@ class managementChannel:
self.rcb (self, msg)
def send (self, exchange, msg):
- self.qpidChannel.message_transfer (destination=exchange, content=msg)
+ self.qpidChannel.message_transfer (destination=exchange, message=msg)
+
+ def accept (self, msg):
+ self.qpidChannel.message_accept(RangedSet(msg.id))
+
+ def message (self, body, routing_key="agent"):
+ dp = self.qpidChannel.delivery_properties()
+ dp.routing_key = routing_key
+ mp = self.qpidChannel.message_properties()
+ mp.content_type = "application/octet-stream"
+ mp.reply_to = self.qpidChannel.reply_to("amq.direct", self.replyName)
+ return Message(dp, mp, body)
class managementClient:
@@ -177,14 +185,9 @@ class managementClient:
self.channels.append (mch)
self.incOutstanding (mch)
- codec = Codec (StringIO (), self.spec)
+ codec = Codec (self.spec)
self.setHeader (codec, ord ('B'))
- msg = Content (codec.stream.getvalue ())
- msg["content_type"] = "application/octet-stream"
- msg["routing_key"] = "agent"
- msg["reply_to"] = self.spec.struct ("reply_to")
- msg["reply_to"]["exchange_name"] = "amq.direct"
- msg["reply_to"]["routing_key"] = mch.replyName
+ msg = mch.message(codec.encoded)
mch.send ("qpid.management", msg)
return mch
@@ -198,17 +201,12 @@ class managementClient:
def getObjects (self, channel, userSequence, className):
""" Request immediate content from broker """
- codec = Codec (StringIO (), self.spec)
+ codec = Codec (self.spec)
self.setHeader (codec, ord ('G'), userSequence)
ft = {}
ft["_class"] = className
- codec.encode_table (ft)
- msg = Content (codec.stream.getvalue ())
- msg["content_type"] = "application/octet-stream"
- msg["routing_key"] = "agent"
- msg["reply_to"] = self.spec.struct ("reply_to")
- msg["reply_to"]["exchange_name"] = "amq.direct"
- msg["reply_to"]["routing_key"] = channel.replyName
+ codec.write_map (ft)
+ msg = channel.message(codec.encoded)
channel.send ("qpid.management", msg)
def syncWaitForStable (self, channel):
@@ -270,19 +268,19 @@ class managementClient:
#========================================================
def topicCb (self, ch, msg):
""" Receive messages via the topic queue of a particular channel. """
- codec = Codec (StringIO (msg.content.body), self.spec)
+ codec = Codec (self.spec, msg.body)
hdr = self.checkHeader (codec)
if hdr == None:
raise ValueError ("outer header invalid");
self.parse (ch, codec, hdr[0], hdr[1])
- msg.complete ()
+ ch.accept(msg)
def replyCb (self, ch, msg):
""" Receive messages via the reply queue of a particular channel. """
- codec = Codec (StringIO (msg.content.body), self.spec)
+ codec = Codec (self.spec, msg.body)
hdr = self.checkHeader (codec)
if hdr == None:
- msg.complete ()
+ ch.accept(msg)
return
if hdr[0] == 'm':
@@ -297,102 +295,102 @@ class managementClient:
self.handleClassInd (ch, codec)
else:
self.parse (ch, codec, hdr[0], hdr[1])
- msg.complete ()
+ ch.accept(msg)
#========================================================
# Internal Functions
#========================================================
def setHeader (self, codec, opcode, seq = 0):
""" Compose the header of a management message. """
- codec.encode_octet (ord ('A'))
- codec.encode_octet (ord ('M'))
- codec.encode_octet (ord ('1'))
- codec.encode_octet (opcode)
- codec.encode_long (seq)
+ codec.write_uint8 (ord ('A'))
+ codec.write_uint8 (ord ('M'))
+ codec.write_uint8 (ord ('1'))
+ codec.write_uint8 (opcode)
+ codec.write_uint32 (seq)
def checkHeader (self, codec):
""" Check the header of a management message and extract the opcode and
class. """
- octet = chr (codec.decode_octet ())
+ octet = chr (codec.read_uint8 ())
if octet != 'A':
return None
- octet = chr (codec.decode_octet ())
+ octet = chr (codec.read_uint8 ())
if octet != 'M':
return None
- octet = chr (codec.decode_octet ())
+ octet = chr (codec.read_uint8 ())
if octet != '1':
return None
- opcode = chr (codec.decode_octet ())
- seq = codec.decode_long ()
+ opcode = chr (codec.read_uint8 ())
+ seq = codec.read_uint32 ()
return (opcode, seq)
def encodeValue (self, codec, value, typecode):
""" Encode, into the codec, a value based on its typecode. """
if typecode == 1:
- codec.encode_octet (int (value))
+ codec.write_uint8 (int (value))
elif typecode == 2:
- codec.encode_short (int (value))
+ codec.write_uint16 (int (value))
elif typecode == 3:
- codec.encode_long (long (value))
+ codec.write_uint32 (long (value))
elif typecode == 4:
- codec.encode_longlong (long (value))
+ codec.write_uint64 (long (value))
elif typecode == 5:
- codec.encode_octet (int (value))
+ codec.write_uint8 (int (value))
elif typecode == 6:
- codec.encode_shortstr (value)
+ codec.write_str8 (value)
elif typecode == 7:
- codec.encode_longstr (value)
+ codec.write_vbin32 (value)
elif typecode == 8: # ABSTIME
- codec.encode_longlong (long (value))
+ codec.write_uint64 (long (value))
elif typecode == 9: # DELTATIME
- codec.encode_longlong (long (value))
+ codec.write_uint64 (long (value))
elif typecode == 10: # REF
- codec.encode_longlong (long (value))
+ codec.write_uint64 (long (value))
elif typecode == 11: # BOOL
- codec.encode_octet (int (value))
+ codec.write_uint8 (int (value))
elif typecode == 12: # FLOAT
- codec.encode_float (float (value))
+ codec.write_float (float (value))
elif typecode == 13: # DOUBLE
- codec.encode_double (double (value))
+ codec.write_double (double (value))
elif typecode == 14: # UUID
- codec.encode_uuid (value)
+ codec.write_uuid (value)
elif typecode == 15: # FTABLE
- codec.encode_table (value)
+ codec.write_map (value)
else:
raise ValueError ("Invalid type code: %d" % typecode)
def decodeValue (self, codec, typecode):
""" Decode, from the codec, a value based on its typecode. """
if typecode == 1:
- data = codec.decode_octet ()
+ data = codec.read_uint8 ()
elif typecode == 2:
- data = codec.decode_short ()
+ data = codec.read_uint16 ()
elif typecode == 3:
- data = codec.decode_long ()
+ data = codec.read_uint32 ()
elif typecode == 4:
- data = codec.decode_longlong ()
+ data = codec.read_uint64 ()
elif typecode == 5:
- data = codec.decode_octet ()
+ data = codec.read_uint8 ()
elif typecode == 6:
- data = codec.decode_shortstr ()
+ data = codec.read_str8 ()
elif typecode == 7:
- data = codec.decode_longstr ()
+ data = codec.read_vbin32 ()
elif typecode == 8: # ABSTIME
- data = codec.decode_longlong ()
+ data = codec.read_uint64 ()
elif typecode == 9: # DELTATIME
- data = codec.decode_longlong ()
+ data = codec.read_uint64 ()
elif typecode == 10: # REF
- data = codec.decode_longlong ()
+ data = codec.read_uint64 ()
elif typecode == 11: # BOOL
- data = codec.decode_octet ()
+ data = codec.read_uint8 ()
elif typecode == 12: # FLOAT
- data = codec.decode_float ()
+ data = codec.read_float ()
elif typecode == 13: # DOUBLE
- data = codec.decode_double ()
+ data = codec.read_double ()
elif typecode == 14: # UUID
- data = codec.decode_uuid ()
+ data = codec.read_uuid ()
elif typecode == 15: # FTABLE
- data = codec.decode_table ()
+ data = codec.read_map ()
else:
raise ValueError ("Invalid type code: %d" % typecode)
return data
@@ -415,8 +413,8 @@ class managementClient:
self.ctrlCb (ch.context, self.CTRL_SCHEMA_LOADED, None)
def handleMethodReply (self, ch, codec, sequence):
- status = codec.decode_long ()
- sText = codec.decode_shortstr ()
+ status = codec.read_uint32 ()
+ sText = codec.read_str8 ()
data = self.seqMgr.release (sequence)
if data == None:
@@ -451,8 +449,8 @@ class managementClient:
self.methodCb (ch.context, userSequence, status, sText, args)
def handleCommandComplete (self, ch, codec, seq):
- code = codec.decode_long ()
- text = codec.decode_shortstr ()
+ code = codec.read_uint32 ()
+ text = codec.read_str8 ()
data = (seq, code, text)
context = self.seqMgr.release (seq)
if context == "outstanding":
@@ -467,75 +465,60 @@ class managementClient:
def handleBrokerResponse (self, ch, codec):
if self.ctrlCb != None:
- uuid = codec.decode_uuid ()
+ uuid = codec.read_uuid ()
data = (uuid, ch.sessionId)
self.ctrlCb (ch.context, self.CTRL_BROKER_INFO, data)
# Send a package request
- sendCodec = Codec (StringIO (), self.spec)
+ sendCodec = Codec (self.spec)
seq = self.seqMgr.reserve ("outstanding")
self.setHeader (sendCodec, ord ('P'), seq)
- smsg = Content (sendCodec.stream.getvalue ())
- smsg["content_type"] = "application/octet-stream"
- smsg["routing_key"] = "agent"
- smsg["reply_to"] = self.spec.struct ("reply_to")
- smsg["reply_to"]["exchange_name"] = "amq.direct"
- smsg["reply_to"]["routing_key"] = ch.replyName
+ smsg = ch.message(sendCodec.encoded)
ch.send ("qpid.management", smsg)
-
+
def handlePackageInd (self, ch, codec):
- pname = codec.decode_shortstr ()
+ pname = codec.read_str8 ()
if pname not in self.packages:
self.packages[pname] = {}
# Send a class request
- sendCodec = Codec (StringIO (), self.spec)
+ sendCodec = Codec (self.spec)
seq = self.seqMgr.reserve ("outstanding")
self.setHeader (sendCodec, ord ('Q'), seq)
self.incOutstanding (ch)
- sendCodec.encode_shortstr (pname)
- smsg = Content (sendCodec.stream.getvalue ())
- smsg["content_type"] = "application/octet-stream"
- smsg["routing_key"] = "agent"
- smsg["reply_to"] = self.spec.struct ("reply_to")
- smsg["reply_to"]["exchange_name"] = "amq.direct"
- smsg["reply_to"]["routing_key"] = ch.replyName
+ sendCodec.write_str8 (pname)
+ smsg = ch.message(sendCodec.encoded)
ch.send ("qpid.management", smsg)
def handleClassInd (self, ch, codec):
- pname = codec.decode_shortstr ()
- cname = codec.decode_shortstr ()
- hash = codec.decode_bin128 ()
+ pname = codec.read_str8 ()
+ cname = codec.read_str8 ()
+ hash = codec.read_bin128 ()
if pname not in self.packages:
return
if (cname, hash) not in self.packages[pname]:
# Send a schema request
- sendCodec = Codec (StringIO (), self.spec)
+ sendCodec = Codec (self.spec)
seq = self.seqMgr.reserve ("outstanding")
self.setHeader (sendCodec, ord ('S'), seq)
self.incOutstanding (ch)
- sendCodec.encode_shortstr (pname)
- sendCodec.encode_shortstr (cname)
- sendCodec.encode_bin128 (hash)
- smsg = Content (sendCodec.stream.getvalue ())
- smsg["content_type"] = "application/octet-stream"
- smsg["routing_key"] = "agent"
- smsg["reply_to"] = self.spec.struct ("reply_to")
- smsg["reply_to"]["exchange_name"] = "amq.direct"
- smsg["reply_to"]["routing_key"] = ch.replyName
+ sendCodec.write_str8 (pname)
+ sendCodec.write_str8 (cname)
+ sendCodec.write_bin128 (hash)
+ smsg = ch.message(sendCodec.encoded)
ch.send ("qpid.management", smsg)
def parseSchema (self, ch, codec):
""" Parse a received schema-description message. """
self.decOutstanding (ch)
- packageName = codec.decode_shortstr ()
- className = codec.decode_shortstr ()
- hash = codec.decode_bin128 ()
- configCount = codec.decode_short ()
- instCount = codec.decode_short ()
- methodCount = codec.decode_short ()
- eventCount = codec.decode_short ()
+ packageName = codec.read_str8 ()
+ className = codec.read_str8 ()
+ hash = codec.read_bin128 ()
+ configCount = codec.read_uint16 ()
+ instCount = codec.read_uint16 ()
+ methodCount = codec.read_uint16 ()
+ eventCount = codec.read_uint16 ()
if packageName not in self.packages:
return
@@ -555,7 +538,7 @@ class managementClient:
insts.append (("id", 4, None, None))
for idx in range (configCount):
- ft = codec.decode_table ()
+ ft = codec.read_map ()
name = ft["name"]
type = ft["type"]
access = ft["access"]
@@ -582,7 +565,7 @@ class managementClient:
configs.append (config)
for idx in range (instCount):
- ft = codec.decode_table ()
+ ft = codec.read_map ()
name = ft["name"]
type = ft["type"]
unit = None
@@ -598,7 +581,7 @@ class managementClient:
insts.append (inst)
for idx in range (methodCount):
- ft = codec.decode_table ()
+ ft = codec.read_map ()
mname = ft["name"]
argCount = ft["argCount"]
if "desc" in ft:
@@ -608,7 +591,7 @@ class managementClient:
args = []
for aidx in range (argCount):
- ft = codec.decode_table ()
+ ft = codec.read_map ()
name = ft["name"]
type = ft["type"]
dir = ft["dir"].upper ()
@@ -654,9 +637,9 @@ class managementClient:
if cls == 'I' and self.instCb == None:
return
- packageName = codec.decode_shortstr ()
- className = codec.decode_shortstr ()
- hash = codec.decode_bin128 ()
+ packageName = codec.read_str8 ()
+ className = codec.read_str8 ()
+ hash = codec.read_bin128 ()
classKey = (packageName, className, hash)
if classKey not in self.schema:
@@ -665,9 +648,9 @@ class managementClient:
row = []
timestamps = []
- timestamps.append (codec.decode_longlong ()) # Current Time
- timestamps.append (codec.decode_longlong ()) # Create Time
- timestamps.append (codec.decode_longlong ()) # Delete Time
+ timestamps.append (codec.read_uint64 ()) # Current Time
+ timestamps.append (codec.read_uint64 ()) # Create Time
+ timestamps.append (codec.read_uint64 ()) # Delete Time
schemaClass = self.schema[classKey]
if cls == 'C' or cls == 'B':
@@ -712,10 +695,10 @@ class managementClient:
def method (self, channel, userSequence, objId, classId, methodName, args):
""" Invoke a method on an object """
- codec = Codec (StringIO (), self.spec)
+ codec = Codec (self.spec)
sequence = self.seqMgr.reserve ((userSequence, classId, methodName))
self.setHeader (codec, ord ('M'), sequence)
- codec.encode_longlong (objId) # ID of object
+ codec.write_uint64 (objId) # ID of object
# Encode args according to schema
if classId not in self.schema:
@@ -745,11 +728,6 @@ class managementClient:
packageName = classId[0]
className = classId[1]
- msg = Content (codec.stream.getvalue ())
- msg["content_type"] = "application/octet-stream"
- msg["routing_key"] = "agent.method." + packageName + "." + \
- className + "." + methodName
- msg["reply_to"] = self.spec.struct ("reply_to")
- msg["reply_to"]["exchange_name"] = "amq.direct"
- msg["reply_to"]["routing_key"] = channel.replyName
+ msg = channel.message(codec.encoded, "agent.method." + packageName + "." + \
+ className + "." + methodName)
channel.send ("qpid.management", msg)
diff --git a/qpid/python/qpid/message.py b/qpid/python/qpid/message.py
index c9ea5a8a0c..eb3ef5c03c 100644
--- a/qpid/python/qpid/message.py
+++ b/qpid/python/qpid/message.py
@@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
#
-from connection import Method, Request
+from connection08 import Method, Request
from sets import Set
class Message:
diff --git a/qpid/python/qpid/peer.py b/qpid/python/qpid/peer.py
index a464e95593..0932efeab3 100644
--- a/qpid/python/qpid/peer.py
+++ b/qpid/python/qpid/peer.py
@@ -25,7 +25,7 @@ incoming method frames to a delegate.
"""
import thread, threading, traceback, socket, sys, logging
-from connection import EOF, Method, Header, Body, Request, Response
+from connection08 import EOF, Method, Header, Body, Request, Response
from message import Message
from queue import Queue, Closed as QueueClosed
from content import Content
diff --git a/qpid/python/qpid/session.py b/qpid/python/qpid/session.py
index 427a403b90..f649b95a2c 100644
--- a/qpid/python/qpid/session.py
+++ b/qpid/python/qpid/session.py
@@ -17,7 +17,7 @@
# under the License.
#
-from threading import Condition, RLock, currentThread
+from threading import Condition, RLock, Lock, currentThread
from invoker import Invoker
from datatypes import RangedSet, Struct, Future
from codec010 import StringCodec
@@ -29,8 +29,11 @@ from exceptions import *
from logging import getLogger
log = getLogger("qpid.io.cmd")
+msg = getLogger("qpid.io.msg")
-class SessionDetached(Exception): pass
+class SessionException(Exception): pass
+class SessionClosed(SessionException): pass
+class SessionDetached(SessionException): pass
def client(*args):
return Client(*args)
@@ -38,8 +41,6 @@ def client(*args):
def server(*args):
return Server(*args)
-class SessionException(Exception): pass
-
INCOMPLETE = object()
class Session(Invoker):
@@ -50,6 +51,8 @@ class Session(Invoker):
self.auto_sync = auto_sync
self.timeout = timeout
self.channel = None
+ self.invoke_lock = Lock()
+ self.closed = False
self.condition = Condition()
@@ -97,7 +100,12 @@ class Session(Invoker):
raise SessionException(self.error())
def close(self, timeout=None):
- self.channel.session_detach(self.name)
+ self.invoke_lock.acquire()
+ try:
+ self.closed = True
+ self.channel.session_detach(self.name)
+ finally:
+ self.invoke_lock.release()
if not wait(self.condition, lambda: self.channel is None, timeout):
raise Timeout()
@@ -119,6 +127,16 @@ class Session(Invoker):
if not hasattr(type, "track"):
return type.new(args, kwargs)
+ self.invoke_lock.acquire()
+ try:
+ return self.do_invoke(type, args, kwargs)
+ finally:
+ self.invoke_lock.release()
+
+ def do_invoke(self, type, args, kwargs):
+ if self.closed:
+ raise SessionClosed()
+
if self.channel == None:
raise SessionDetached()
@@ -160,6 +178,7 @@ class Session(Invoker):
seg = Segment(False, True, self.spec["segment_type.body"].value,
type.track, self.channel.id, message.body)
self.send(seg)
+ msg.debug("SENT %s", message)
if type.result:
if self.auto_sync:
@@ -304,8 +323,6 @@ class Delegate:
finally:
self.session.lock.release()
-msg = getLogger("qpid.io.msg")
-
class Client(Delegate):
def message_transfer(self, cmd, headers, body):
diff --git a/qpid/python/qpid/spec.py b/qpid/python/qpid/spec.py
index 623b2e9e9f..64a14b0f61 100644
--- a/qpid/python/qpid/spec.py
+++ b/qpid/python/qpid/spec.py
@@ -29,484 +29,18 @@ class so that the generated code can be reused in a variety of
situations.
"""
-import re, textwrap, new, mllib, qpid
-
-class SpecContainer:
-
- def __init__(self):
- self.items = []
- self.byname = {}
- self.byid = {}
- self.indexes = {}
-
- def add(self, item):
- if self.byname.has_key(item.name):
- raise ValueError("duplicate name: %s" % item)
- if item.id == None:
- item.id = len(self)
- elif self.byid.has_key(item.id):
- raise ValueError("duplicate id: %s" % item)
- self.indexes[item] = len(self.items)
- self.items.append(item)
- self.byname[item.name] = item
- self.byid[item.id] = item
-
- def index(self, item):
- try:
- return self.indexes[item]
- except KeyError:
- raise ValueError(item)
-
- def __iter__(self):
- return iter(self.items)
-
- def __len__(self):
- return len(self.items)
-
-class Metadata:
-
- PRINT = []
-
- def __init__(self):
- pass
-
- def __str__(self):
- args = map(lambda f: "%s=%s" % (f, getattr(self, f)), self.PRINT)
- return "%s(%s)" % (self.__class__.__name__, ", ".join(args))
-
- def __repr__(self):
- return str(self)
-
-class Spec(Metadata):
-
- PRINT=["major", "minor", "file"]
-
- def __init__(self, major, minor, file):
- Metadata.__init__(self)
- self.major = major
- self.minor = minor
- self.file = file
- self.constants = SpecContainer()
- self.domains = SpecContainer()
- self.classes = SpecContainer()
- # methods indexed by classname_methname
- self.methods = {}
- # structs by type code
- self.structs = {}
-
- def post_load(self):
- self.module = self.define_module("amqp%s%s" % (self.major, self.minor))
- self.klass = self.define_class("Amqp%s%s" % (self.major, self.minor))
-
- def method(self, name):
- if not self.methods.has_key(name):
- for cls in self.classes:
- clen = len(cls.name)
- if name.startswith(cls.name) and name[clen] == "_":
- end = name[clen + 1:]
- if cls.methods.byname.has_key(end):
- self.methods[name] = cls.methods.byname[end]
- return self.methods.get(name)
-
- def parse_method(self, name):
- parts = re.split(r"\s*\.\s*", name)
- if len(parts) != 2:
- raise ValueError(name)
- klass, meth = parts
- return self.classes.byname[klass].methods.byname[meth]
-
- def struct(self, name, *args, **kwargs):
- type = self.domains.byname[name].type
- return qpid.Struct(type, *args, **kwargs)
-
- def define_module(self, name, doc = None):
- module = new.module(name, doc)
- module.__file__ = self.file
- for c in self.classes:
- cls = c.define_class(c.name)
- cls.__module__ = module.__name__
- setattr(module, c.name, cls)
- return module
-
- def define_class(self, name):
- methods = {}
- for c in self.classes:
- for m in c.methods:
- meth = m.klass.name + "_" + m.name
- methods[meth] = m.define_method(meth)
- return type(name, (), methods)
-
-class Constant(Metadata):
-
- PRINT=["name", "id"]
-
- def __init__(self, spec, name, id, klass, docs):
- Metadata.__init__(self)
- self.spec = spec
- self.name = name
- self.id = id
- self.klass = klass
- self.docs = docs
-
-class Domain(Metadata):
-
- PRINT=["name", "type"]
-
- def __init__(self, spec, name, type, description, docs):
- Metadata.__init__(self)
- self.spec = spec
- self.id = None
- self.name = name
- self.type = type
- self.description = description
- self.docs = docs
-
-class Struct(Metadata):
-
- PRINT=["size", "type", "pack"]
-
- def __init__(self, size, type, pack):
- Metadata.__init__(self)
- self.size = size
- self.type = type
- self.pack = pack
- self.fields = SpecContainer()
-
-class Class(Metadata):
-
- PRINT=["name", "id"]
-
- def __init__(self, spec, name, id, handler, docs):
- Metadata.__init__(self)
- self.spec = spec
- self.name = name
- self.id = id
- self.handler = handler
- self.fields = SpecContainer()
- self.methods = SpecContainer()
- self.docs = docs
-
- def define_class(self, name):
- methods = {}
- for m in self.methods:
- methods[m.name] = m.define_method(m.name)
- return type(name, (), methods)
-
-class Method(Metadata):
-
- PRINT=["name", "id"]
-
- def __init__(self, klass, name, id, content, responses, result, synchronous,
- description, docs):
- Metadata.__init__(self)
- self.klass = klass
- self.name = name
- self.id = id
- self.content = content
- self.responses = responses
- self.result = result
- self.synchronous = synchronous
- self.fields = SpecContainer()
- self.description = description
- self.docs = docs
- self.response = False
-
- def is_l4_command(self):
- return self.klass.name not in ["execution", "channel", "connection", "session"]
-
- def arguments(self, *args, **kwargs):
- nargs = len(args) + len(kwargs)
- maxargs = len(self.fields)
- if nargs > maxargs:
- self._type_error("takes at most %s arguments (%s) given", maxargs, nargs)
- result = []
- for f in self.fields:
- idx = self.fields.index(f)
- if idx < len(args):
- result.append(args[idx])
- elif kwargs.has_key(f.name):
- result.append(kwargs.pop(f.name))
- else:
- result.append(Method.DEFAULTS[f.type])
- for key, value in kwargs.items():
- if self.fields.byname.has_key(key):
- self._type_error("got multiple values for keyword argument '%s'", key)
- else:
- self._type_error("got an unexpected keyword argument '%s'", key)
- return tuple(result)
-
- def _type_error(self, msg, *args):
- raise TypeError("%s %s" % (self.name, msg % args))
-
- def docstring(self):
- s = "\n\n".join([fill(d, 2) for d in [self.description] + self.docs])
- for f in self.fields:
- if f.docs:
- s += "\n\n" + "\n\n".join([fill(f.docs[0], 4, f.name)] +
- [fill(d, 4) for d in f.docs[1:]])
- if self.responses:
- s += "\n\nValid responses: "
- for r in self.responses:
- s += r.name + " "
- return s
-
- METHOD = "__method__"
- DEFAULTS = {"bit": False,
- "shortstr": "",
- "longstr": "",
- "table": {},
- "array": [],
- "octet": 0,
- "short": 0,
- "long": 0,
- "longlong": 0,
- "timestamp": 0,
- "content": None,
- "uuid": "",
- "rfc1982_long": 0,
- "rfc1982_long_set": [],
- "long_struct": None}
-
- def define_method(self, name):
- g = {Method.METHOD: self}
- l = {}
- args = [(f.name, Method.DEFAULTS[f.type]) for f in self.fields]
- methargs = args[:]
- if self.content:
- args += [("content", None)]
- code = "def %s(self, %s):\n" % \
- (name, ", ".join(["%s = %r" % a for a in args]))
- code += " %r\n" % self.docstring()
- argnames = ", ".join([a[0] for a in methargs])
- code += " return self.invoke(%s" % Method.METHOD
- if argnames:
- code += ", (%s,)" % argnames
- else:
- code += ", ()"
- if self.content:
- code += ", content"
- code += ")"
- exec code in g, l
- return l[name]
-
-class Field(Metadata):
-
- PRINT=["name", "id", "type"]
-
- def __init__(self, name, id, type, domain, description, docs):
- Metadata.__init__(self)
- self.name = name
- self.id = id
- self.type = type
- self.domain = domain
- self.description = description
- self.docs = docs
-
- def default(self):
- if isinstance(self.type, Struct):
- return None
- else:
- return Method.DEFAULTS[self.type]
-
-WIDTHS = {
- "octet": 1,
- "short": 2,
- "long": 4
- }
-
-def width(st, default=None):
- if st in (None, "none", ""):
- return default
- else:
- return WIDTHS[st]
-
-def get_result(nd, spec):
- result = nd["result"]
- if not result: return None
- name = result["@domain"]
- if name != None: return spec.domains.byname[name]
- st_nd = result["struct"]
- st = Struct(width(st_nd["@size"]), int(result.parent.parent["@index"])*256 +
- int(st_nd["@type"]), width(st_nd["@pack"], 2))
- spec.structs[st.type] = st
- load_fields(st_nd, st.fields, spec.domains.byname)
- return st
-
-def get_desc(nd):
- label = nd["@label"]
- if not label:
- label = nd.text()
- if label:
- label = label.strip()
- return label
-
-def get_docs(nd):
- return [n.text() for n in nd.query["doc"]]
-
-def load_fields(nd, l, domains):
- for f_nd in nd.query["field"]:
- type = f_nd["@domain"]
- if type == None:
- type = f_nd["@type"]
- type = pythonize(type)
- domain = None
- while domains.has_key(type) and domains[type].type != type:
- domain = domains[type]
- type = domain.type
- l.add(Field(pythonize(f_nd["@name"]), f_nd.index(), type, domain,
- get_desc(f_nd), get_docs(f_nd)))
+import os, mllib, spec08, spec010
def load(specfile, *errata):
- doc = mllib.xml_parse(specfile)
- spec_root = doc["amqp"]
- spec = Spec(int(spec_root["@major"]), int(spec_root["@minor"]), specfile)
-
- for root in [spec_root] + map(lambda x: mllib.xml_parse(x)["amqp"], errata):
- # constants
- for nd in root.query["constant"]:
- val = nd["@value"]
- if val.startswith("0x"): val = int(val, 16)
- else: val = int(val)
- const = Constant(spec, pythonize(nd["@name"]), val, nd["@class"],
- get_docs(nd))
- try:
- spec.constants.add(const)
- except ValueError, e:
- pass
- #print "Warning:", e
-
- # domains are typedefs
- structs = []
- for nd in root.query["domain"]:
- type = nd["@type"]
- if type == None:
- st_nd = nd["struct"]
- code = st_nd["@type"]
- if code not in (None, "", "none"):
- code = int(code)
- type = Struct(width(st_nd["@size"]), code, width(st_nd["@pack"], 2))
- if type.type != None:
- spec.structs[type.type] = type
- structs.append((type, st_nd))
- else:
- type = pythonize(type)
- domain = Domain(spec, pythonize(nd["@name"]), type, get_desc(nd),
- get_docs(nd))
- spec.domains.add(domain)
-
- # structs
- for st, st_nd in structs:
- load_fields(st_nd, st.fields, spec.domains.byname)
-
- # classes
- for c_nd in root.query["class"]:
- cname = pythonize(c_nd["@name"])
- if spec.classes.byname.has_key(cname):
- klass = spec.classes.byname[cname]
- else:
- klass = Class(spec, cname, int(c_nd["@index"]), c_nd["@handler"],
- get_docs(c_nd))
- spec.classes.add(klass)
-
- added_methods = []
- load_fields(c_nd, klass.fields, spec.domains.byname)
- for m_nd in c_nd.query["method"]:
- mname = pythonize(m_nd["@name"])
- if klass.methods.byname.has_key(mname):
- meth = klass.methods.byname[mname]
- else:
- meth = Method(klass, mname,
- int(m_nd["@index"]),
- m_nd["@content"] == "1",
- [pythonize(nd["@name"]) for nd in m_nd.query["response"]],
- get_result(m_nd, spec),
- m_nd["@synchronous"] == "1",
- get_desc(m_nd),
- get_docs(m_nd))
- klass.methods.add(meth)
- added_methods.append(meth)
- load_fields(m_nd, meth.fields, spec.domains.byname)
- # resolve the responses
- for m in added_methods:
- m.responses = [klass.methods.byname[r] for r in m.responses]
- for resp in m.responses:
- resp.response = True
-
- spec.post_load()
- return spec
-
-REPLACE = {" ": "_", "-": "_"}
-KEYWORDS = {"global": "global_",
- "return": "return_"}
+ for name in (specfile,) + errata:
+ if not os.path.exists(name):
+ raise IOError("No such file or directory: '%s'" % name)
-def pythonize(name):
- name = str(name)
- for key, val in REPLACE.items():
- name = name.replace(key, val)
- try:
- name = KEYWORDS[name]
- except KeyError:
- pass
- return name
+ doc = mllib.xml_parse(specfile)
+ major = doc["amqp/@major"]
+ minor = doc["amqp/@minor"]
-def fill(text, indent, heading = None):
- sub = indent * " "
- if heading:
- init = (indent - 2) * " " + heading + " -- "
+ if major == "0" and minor == "10":
+ return spec010.load(specfile, *errata)
else:
- init = sub
- w = textwrap.TextWrapper(initial_indent = init, subsequent_indent = sub)
- return w.fill(" ".join(text.split()))
-
-class Rule(Metadata):
-
- PRINT = ["text", "implement", "tests"]
-
- def __init__(self, text, implement, tests, path):
- self.text = text
- self.implement = implement
- self.tests = tests
- self.path = path
-
-def find_rules(node, rules):
- if node.name == "rule":
- rules.append(Rule(node.text, node.get("@implement"),
- [ch.text for ch in node if ch.name == "test"],
- node.path()))
- if node.name == "doc" and node.get("@name") == "rule":
- tests = []
- if node.has("@test"):
- tests.append(node["@test"])
- rules.append(Rule(node.text, None, tests, node.path()))
- for child in node:
- find_rules(child, rules)
-
-def load_rules(specfile):
- rules = []
- find_rules(xmlutil.parse(specfile), rules)
- return rules
-
-def test_summary():
- template = """
- <html><head><title>AMQP Tests</title></head>
- <body>
- <table width="80%%" align="center">
- %s
- </table>
- </body>
- </html>
- """
- rows = []
- for rule in load_rules("amqp.org/specs/amqp7.xml"):
- if rule.tests:
- tests = ", ".join(rule.tests)
- else:
- tests = "&nbsp;"
- rows.append('<tr bgcolor="#EEEEEE"><td><b>Path:</b> %s</td>'
- '<td><b>Implement:</b> %s</td>'
- '<td><b>Tests:</b> %s</td></tr>' %
- (rule.path[len("/root/amqp"):], rule.implement, tests))
- rows.append('<tr><td colspan="3">%s</td></tr>' % rule.text)
- rows.append('<tr><td colspan="3">&nbsp;</td></tr>')
-
- print template % "\n".join(rows)
+ return spec08.load(specfile, *errata)
diff --git a/qpid/python/qpid/spec08.py b/qpid/python/qpid/spec08.py
new file mode 100644
index 0000000000..623b2e9e9f
--- /dev/null
+++ b/qpid/python/qpid/spec08.py
@@ -0,0 +1,512 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+This module loads protocol metadata into python objects. It provides
+access to spec metadata via a python object model, and can also
+dynamically creating python methods, classes, and modules based on the
+spec metadata. All the generated methods have proper signatures and
+doc strings based on the spec metadata so the python help system can
+be used to browse the spec documentation. The generated methods all
+dispatch to the self.invoke(meth, args) callback of the containing
+class so that the generated code can be reused in a variety of
+situations.
+"""
+
+import re, textwrap, new, mllib, qpid
+
+class SpecContainer:
+
+ def __init__(self):
+ self.items = []
+ self.byname = {}
+ self.byid = {}
+ self.indexes = {}
+
+ def add(self, item):
+ if self.byname.has_key(item.name):
+ raise ValueError("duplicate name: %s" % item)
+ if item.id == None:
+ item.id = len(self)
+ elif self.byid.has_key(item.id):
+ raise ValueError("duplicate id: %s" % item)
+ self.indexes[item] = len(self.items)
+ self.items.append(item)
+ self.byname[item.name] = item
+ self.byid[item.id] = item
+
+ def index(self, item):
+ try:
+ return self.indexes[item]
+ except KeyError:
+ raise ValueError(item)
+
+ def __iter__(self):
+ return iter(self.items)
+
+ def __len__(self):
+ return len(self.items)
+
+class Metadata:
+
+ PRINT = []
+
+ def __init__(self):
+ pass
+
+ def __str__(self):
+ args = map(lambda f: "%s=%s" % (f, getattr(self, f)), self.PRINT)
+ return "%s(%s)" % (self.__class__.__name__, ", ".join(args))
+
+ def __repr__(self):
+ return str(self)
+
+class Spec(Metadata):
+
+ PRINT=["major", "minor", "file"]
+
+ def __init__(self, major, minor, file):
+ Metadata.__init__(self)
+ self.major = major
+ self.minor = minor
+ self.file = file
+ self.constants = SpecContainer()
+ self.domains = SpecContainer()
+ self.classes = SpecContainer()
+ # methods indexed by classname_methname
+ self.methods = {}
+ # structs by type code
+ self.structs = {}
+
+ def post_load(self):
+ self.module = self.define_module("amqp%s%s" % (self.major, self.minor))
+ self.klass = self.define_class("Amqp%s%s" % (self.major, self.minor))
+
+ def method(self, name):
+ if not self.methods.has_key(name):
+ for cls in self.classes:
+ clen = len(cls.name)
+ if name.startswith(cls.name) and name[clen] == "_":
+ end = name[clen + 1:]
+ if cls.methods.byname.has_key(end):
+ self.methods[name] = cls.methods.byname[end]
+ return self.methods.get(name)
+
+ def parse_method(self, name):
+ parts = re.split(r"\s*\.\s*", name)
+ if len(parts) != 2:
+ raise ValueError(name)
+ klass, meth = parts
+ return self.classes.byname[klass].methods.byname[meth]
+
+ def struct(self, name, *args, **kwargs):
+ type = self.domains.byname[name].type
+ return qpid.Struct(type, *args, **kwargs)
+
+ def define_module(self, name, doc = None):
+ module = new.module(name, doc)
+ module.__file__ = self.file
+ for c in self.classes:
+ cls = c.define_class(c.name)
+ cls.__module__ = module.__name__
+ setattr(module, c.name, cls)
+ return module
+
+ def define_class(self, name):
+ methods = {}
+ for c in self.classes:
+ for m in c.methods:
+ meth = m.klass.name + "_" + m.name
+ methods[meth] = m.define_method(meth)
+ return type(name, (), methods)
+
+class Constant(Metadata):
+
+ PRINT=["name", "id"]
+
+ def __init__(self, spec, name, id, klass, docs):
+ Metadata.__init__(self)
+ self.spec = spec
+ self.name = name
+ self.id = id
+ self.klass = klass
+ self.docs = docs
+
+class Domain(Metadata):
+
+ PRINT=["name", "type"]
+
+ def __init__(self, spec, name, type, description, docs):
+ Metadata.__init__(self)
+ self.spec = spec
+ self.id = None
+ self.name = name
+ self.type = type
+ self.description = description
+ self.docs = docs
+
+class Struct(Metadata):
+
+ PRINT=["size", "type", "pack"]
+
+ def __init__(self, size, type, pack):
+ Metadata.__init__(self)
+ self.size = size
+ self.type = type
+ self.pack = pack
+ self.fields = SpecContainer()
+
+class Class(Metadata):
+
+ PRINT=["name", "id"]
+
+ def __init__(self, spec, name, id, handler, docs):
+ Metadata.__init__(self)
+ self.spec = spec
+ self.name = name
+ self.id = id
+ self.handler = handler
+ self.fields = SpecContainer()
+ self.methods = SpecContainer()
+ self.docs = docs
+
+ def define_class(self, name):
+ methods = {}
+ for m in self.methods:
+ methods[m.name] = m.define_method(m.name)
+ return type(name, (), methods)
+
+class Method(Metadata):
+
+ PRINT=["name", "id"]
+
+ def __init__(self, klass, name, id, content, responses, result, synchronous,
+ description, docs):
+ Metadata.__init__(self)
+ self.klass = klass
+ self.name = name
+ self.id = id
+ self.content = content
+ self.responses = responses
+ self.result = result
+ self.synchronous = synchronous
+ self.fields = SpecContainer()
+ self.description = description
+ self.docs = docs
+ self.response = False
+
+ def is_l4_command(self):
+ return self.klass.name not in ["execution", "channel", "connection", "session"]
+
+ def arguments(self, *args, **kwargs):
+ nargs = len(args) + len(kwargs)
+ maxargs = len(self.fields)
+ if nargs > maxargs:
+ self._type_error("takes at most %s arguments (%s) given", maxargs, nargs)
+ result = []
+ for f in self.fields:
+ idx = self.fields.index(f)
+ if idx < len(args):
+ result.append(args[idx])
+ elif kwargs.has_key(f.name):
+ result.append(kwargs.pop(f.name))
+ else:
+ result.append(Method.DEFAULTS[f.type])
+ for key, value in kwargs.items():
+ if self.fields.byname.has_key(key):
+ self._type_error("got multiple values for keyword argument '%s'", key)
+ else:
+ self._type_error("got an unexpected keyword argument '%s'", key)
+ return tuple(result)
+
+ def _type_error(self, msg, *args):
+ raise TypeError("%s %s" % (self.name, msg % args))
+
+ def docstring(self):
+ s = "\n\n".join([fill(d, 2) for d in [self.description] + self.docs])
+ for f in self.fields:
+ if f.docs:
+ s += "\n\n" + "\n\n".join([fill(f.docs[0], 4, f.name)] +
+ [fill(d, 4) for d in f.docs[1:]])
+ if self.responses:
+ s += "\n\nValid responses: "
+ for r in self.responses:
+ s += r.name + " "
+ return s
+
+ METHOD = "__method__"
+ DEFAULTS = {"bit": False,
+ "shortstr": "",
+ "longstr": "",
+ "table": {},
+ "array": [],
+ "octet": 0,
+ "short": 0,
+ "long": 0,
+ "longlong": 0,
+ "timestamp": 0,
+ "content": None,
+ "uuid": "",
+ "rfc1982_long": 0,
+ "rfc1982_long_set": [],
+ "long_struct": None}
+
+ def define_method(self, name):
+ g = {Method.METHOD: self}
+ l = {}
+ args = [(f.name, Method.DEFAULTS[f.type]) for f in self.fields]
+ methargs = args[:]
+ if self.content:
+ args += [("content", None)]
+ code = "def %s(self, %s):\n" % \
+ (name, ", ".join(["%s = %r" % a for a in args]))
+ code += " %r\n" % self.docstring()
+ argnames = ", ".join([a[0] for a in methargs])
+ code += " return self.invoke(%s" % Method.METHOD
+ if argnames:
+ code += ", (%s,)" % argnames
+ else:
+ code += ", ()"
+ if self.content:
+ code += ", content"
+ code += ")"
+ exec code in g, l
+ return l[name]
+
+class Field(Metadata):
+
+ PRINT=["name", "id", "type"]
+
+ def __init__(self, name, id, type, domain, description, docs):
+ Metadata.__init__(self)
+ self.name = name
+ self.id = id
+ self.type = type
+ self.domain = domain
+ self.description = description
+ self.docs = docs
+
+ def default(self):
+ if isinstance(self.type, Struct):
+ return None
+ else:
+ return Method.DEFAULTS[self.type]
+
+WIDTHS = {
+ "octet": 1,
+ "short": 2,
+ "long": 4
+ }
+
+def width(st, default=None):
+ if st in (None, "none", ""):
+ return default
+ else:
+ return WIDTHS[st]
+
+def get_result(nd, spec):
+ result = nd["result"]
+ if not result: return None
+ name = result["@domain"]
+ if name != None: return spec.domains.byname[name]
+ st_nd = result["struct"]
+ st = Struct(width(st_nd["@size"]), int(result.parent.parent["@index"])*256 +
+ int(st_nd["@type"]), width(st_nd["@pack"], 2))
+ spec.structs[st.type] = st
+ load_fields(st_nd, st.fields, spec.domains.byname)
+ return st
+
+def get_desc(nd):
+ label = nd["@label"]
+ if not label:
+ label = nd.text()
+ if label:
+ label = label.strip()
+ return label
+
+def get_docs(nd):
+ return [n.text() for n in nd.query["doc"]]
+
+def load_fields(nd, l, domains):
+ for f_nd in nd.query["field"]:
+ type = f_nd["@domain"]
+ if type == None:
+ type = f_nd["@type"]
+ type = pythonize(type)
+ domain = None
+ while domains.has_key(type) and domains[type].type != type:
+ domain = domains[type]
+ type = domain.type
+ l.add(Field(pythonize(f_nd["@name"]), f_nd.index(), type, domain,
+ get_desc(f_nd), get_docs(f_nd)))
+
+def load(specfile, *errata):
+ doc = mllib.xml_parse(specfile)
+ spec_root = doc["amqp"]
+ spec = Spec(int(spec_root["@major"]), int(spec_root["@minor"]), specfile)
+
+ for root in [spec_root] + map(lambda x: mllib.xml_parse(x)["amqp"], errata):
+ # constants
+ for nd in root.query["constant"]:
+ val = nd["@value"]
+ if val.startswith("0x"): val = int(val, 16)
+ else: val = int(val)
+ const = Constant(spec, pythonize(nd["@name"]), val, nd["@class"],
+ get_docs(nd))
+ try:
+ spec.constants.add(const)
+ except ValueError, e:
+ pass
+ #print "Warning:", e
+
+ # domains are typedefs
+ structs = []
+ for nd in root.query["domain"]:
+ type = nd["@type"]
+ if type == None:
+ st_nd = nd["struct"]
+ code = st_nd["@type"]
+ if code not in (None, "", "none"):
+ code = int(code)
+ type = Struct(width(st_nd["@size"]), code, width(st_nd["@pack"], 2))
+ if type.type != None:
+ spec.structs[type.type] = type
+ structs.append((type, st_nd))
+ else:
+ type = pythonize(type)
+ domain = Domain(spec, pythonize(nd["@name"]), type, get_desc(nd),
+ get_docs(nd))
+ spec.domains.add(domain)
+
+ # structs
+ for st, st_nd in structs:
+ load_fields(st_nd, st.fields, spec.domains.byname)
+
+ # classes
+ for c_nd in root.query["class"]:
+ cname = pythonize(c_nd["@name"])
+ if spec.classes.byname.has_key(cname):
+ klass = spec.classes.byname[cname]
+ else:
+ klass = Class(spec, cname, int(c_nd["@index"]), c_nd["@handler"],
+ get_docs(c_nd))
+ spec.classes.add(klass)
+
+ added_methods = []
+ load_fields(c_nd, klass.fields, spec.domains.byname)
+ for m_nd in c_nd.query["method"]:
+ mname = pythonize(m_nd["@name"])
+ if klass.methods.byname.has_key(mname):
+ meth = klass.methods.byname[mname]
+ else:
+ meth = Method(klass, mname,
+ int(m_nd["@index"]),
+ m_nd["@content"] == "1",
+ [pythonize(nd["@name"]) for nd in m_nd.query["response"]],
+ get_result(m_nd, spec),
+ m_nd["@synchronous"] == "1",
+ get_desc(m_nd),
+ get_docs(m_nd))
+ klass.methods.add(meth)
+ added_methods.append(meth)
+ load_fields(m_nd, meth.fields, spec.domains.byname)
+ # resolve the responses
+ for m in added_methods:
+ m.responses = [klass.methods.byname[r] for r in m.responses]
+ for resp in m.responses:
+ resp.response = True
+
+ spec.post_load()
+ return spec
+
+REPLACE = {" ": "_", "-": "_"}
+KEYWORDS = {"global": "global_",
+ "return": "return_"}
+
+def pythonize(name):
+ name = str(name)
+ for key, val in REPLACE.items():
+ name = name.replace(key, val)
+ try:
+ name = KEYWORDS[name]
+ except KeyError:
+ pass
+ return name
+
+def fill(text, indent, heading = None):
+ sub = indent * " "
+ if heading:
+ init = (indent - 2) * " " + heading + " -- "
+ else:
+ init = sub
+ w = textwrap.TextWrapper(initial_indent = init, subsequent_indent = sub)
+ return w.fill(" ".join(text.split()))
+
+class Rule(Metadata):
+
+ PRINT = ["text", "implement", "tests"]
+
+ def __init__(self, text, implement, tests, path):
+ self.text = text
+ self.implement = implement
+ self.tests = tests
+ self.path = path
+
+def find_rules(node, rules):
+ if node.name == "rule":
+ rules.append(Rule(node.text, node.get("@implement"),
+ [ch.text for ch in node if ch.name == "test"],
+ node.path()))
+ if node.name == "doc" and node.get("@name") == "rule":
+ tests = []
+ if node.has("@test"):
+ tests.append(node["@test"])
+ rules.append(Rule(node.text, None, tests, node.path()))
+ for child in node:
+ find_rules(child, rules)
+
+def load_rules(specfile):
+ rules = []
+ find_rules(xmlutil.parse(specfile), rules)
+ return rules
+
+def test_summary():
+ template = """
+ <html><head><title>AMQP Tests</title></head>
+ <body>
+ <table width="80%%" align="center">
+ %s
+ </table>
+ </body>
+ </html>
+ """
+ rows = []
+ for rule in load_rules("amqp.org/specs/amqp7.xml"):
+ if rule.tests:
+ tests = ", ".join(rule.tests)
+ else:
+ tests = "&nbsp;"
+ rows.append('<tr bgcolor="#EEEEEE"><td><b>Path:</b> %s</td>'
+ '<td><b>Implement:</b> %s</td>'
+ '<td><b>Tests:</b> %s</td></tr>' %
+ (rule.path[len("/root/amqp"):], rule.implement, tests))
+ rows.append('<tr><td colspan="3">%s</td></tr>' % rule.text)
+ rows.append('<tr><td colspan="3">&nbsp;</td></tr>')
+
+ print template % "\n".join(rows)
diff --git a/qpid/python/qpid/testlib.py b/qpid/python/qpid/testlib.py
index a0018c671f..adda1a650f 100644
--- a/qpid/python/qpid/testlib.py
+++ b/qpid/python/qpid/testlib.py
@@ -30,7 +30,7 @@ from qpid.content import Content
from qpid.message import Message
#0-10 support
-from qpid.connection010 import Connection
+from qpid.connection import Connection
from qpid.spec010 import load
from qpid.util import connect
@@ -357,9 +357,9 @@ class TestBase010(unittest.TestCase):
self.conn.start(timeout=10)
self.session = self.conn.session("test-session", timeout=10)
- def connect(self):
+ def connect(self, host=None, port=None):
spec = testrunner.spec
- conn = Connection(connect(testrunner.host, testrunner.port), spec)
+ conn = Connection(connect(host or testrunner.host, port or testrunner.port), spec)
conn.start(timeout=10)
return conn
diff --git a/qpid/python/tests/__init__.py b/qpid/python/tests/__init__.py
index 521e2f15c9..8ad514fc2f 100644
--- a/qpid/python/tests/__init__.py
+++ b/qpid/python/tests/__init__.py
@@ -25,6 +25,6 @@ from spec import *
from framer import *
from assembler import *
from datatypes import *
-from connection010 import *
+from connection import *
from spec010 import *
from codec010 import *
diff --git a/qpid/python/tests/connection010.py b/qpid/python/tests/connection.py
index a953e034a2..6925480ed3 100644
--- a/qpid/python/tests/connection010.py
+++ b/qpid/python/tests/connection.py
@@ -20,7 +20,7 @@
from threading import *
from unittest import TestCase
from qpid.util import connect, listen
-from qpid.connection010 import *
+from qpid.connection import *
from qpid.datatypes import Message
from qpid.testlib import testrunner
from qpid.delegates import Server
diff --git a/qpid/python/tests_0-10_preview/management.py b/qpid/python/tests_0-10/management.py
index de6161ae96..e893dbbd87 100644
--- a/qpid/python/tests_0-10_preview/management.py
+++ b/qpid/python/tests_0-10/management.py
@@ -18,10 +18,10 @@
#
from qpid.datatypes import Message, RangedSet
-from qpid.testlib import TestBase
+from qpid.testlib import TestBase010
from qpid.management import managementChannel, managementClient
-class ManagementTest (TestBase):
+class ManagementTest (TestBase010):
"""
Tests for the management hooks
"""
@@ -30,10 +30,10 @@ class ManagementTest (TestBase):
"""
Call the "echo" method on the broker to verify it is alive and talking.
"""
- channel = self.client.channel(2)
+ session = self.session
- mc = managementClient (channel.spec)
- mch = mc.addChannel (channel)
+ mc = managementClient (session.spec)
+ mch = mc.addChannel (session)
mc.syncWaitForStable (mch)
brokers = mc.syncGetObjects (mch, "broker")
@@ -52,20 +52,20 @@ class ManagementTest (TestBase):
self.assertEqual (res.body, body)
def test_system_object (self):
- channel = self.client.channel(2)
+ session = self.session
- mc = managementClient (channel.spec)
- mch = mc.addChannel (channel)
+ mc = managementClient (session.spec)
+ mch = mc.addChannel (session)
mc.syncWaitForStable (mch)
systems = mc.syncGetObjects (mch, "system")
self.assertEqual (len (systems), 1)
def test_standard_exchanges (self):
- channel = self.client.channel(2)
+ session = self.session
- mc = managementClient (channel.spec)
- mch = mc.addChannel (channel)
+ mc = managementClient (session.spec)
+ mch = mc.addChannel (session)
mc.syncWaitForStable (mch)
exchanges = mc.syncGetObjects (mch, "exchange")
diff --git a/qpid/python/tests_0-10/persistence.py b/qpid/python/tests_0-10/persistence.py
new file mode 100644
index 0000000000..a4b5691910
--- /dev/null
+++ b/qpid/python/tests_0-10/persistence.py
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.datatypes import Message, RangedSet
+from qpid.testlib import testrunner, TestBase010
+
+class PersistenceTests(TestBase010):
+ def test_delete_queue_after_publish(self):
+ session = self.session
+ session.auto_sync = False
+
+ #create queue
+ session.queue_declare(queue = "q", auto_delete=True, durable=True)
+
+ #send message
+ for i in range(1, 10):
+ dp = session.delivery_properties(routing_key="q", delivery_mode=2)
+ session.message_transfer(message=Message(dp, "my-message"))
+
+ session.auto_sync = True
+ #explicitly delete queue
+ session.queue_delete(queue = "q")
+
+ def test_ack_message_from_deleted_queue(self):
+ session = self.session
+ session.auto_sync = False
+
+ #create queue
+ session.queue_declare(queue = "q", auto_delete=True, durable=True)
+
+ #send message
+ dp = session.delivery_properties(routing_key="q", delivery_mode=2)
+ session.message_transfer(message=Message(dp, "my-message"))
+
+ #create consumer
+ session.message_subscribe(queue = "q", destination = "a", accept_mode = 1, acquire_mode=0)
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ session.message_flow(unit = 0, value = 10, destination = "a")
+ queue = session.incoming("a")
+
+ #consume the message, cancel subscription (triggering auto-delete), then ack it
+ msg = queue.get(timeout = 5)
+ session.message_cancel(destination = "a")
+ session.message_accept(RangedSet(msg.id))
+
+ def test_queue_deletion(self):
+ session = self.session
+ session.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True)
+ session.exchange_bind(exchange="amq.topic", queue="durable-subscriber-queue", binding_key="xyz")
+ dp = session.delivery_properties(routing_key="xyz", delivery_mode=2)
+ session.message_transfer(destination="amq.topic", message=Message(dp, "my-message"))
+ session.queue_delete(queue = "durable-subscriber-queue")