summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rwxr-xr-xpython/hello-world12
-rw-r--r--python/qpid/__init__.py23
-rw-r--r--python/qpid/client.py6
-rw-r--r--python/qpid/codec.py27
-rw-r--r--python/qpid/connection.py6
-rw-r--r--python/qpid/content.py8
-rw-r--r--python/qpid/message.py2
-rw-r--r--python/qpid/peer.py16
-rw-r--r--python/qpid/spec.py16
-rwxr-xr-xpython/server44
-rw-r--r--python/tests/codec.py7
11 files changed, 139 insertions, 28 deletions
diff --git a/python/hello-world b/python/hello-world
index 8b7a2752c5..4fb065ae53 100755
--- a/python/hello-world
+++ b/python/hello-world
@@ -3,13 +3,13 @@ import qpid
from qpid.client import Client
from qpid.content import Content
-client = Client("127.0.0.1", 5672, qpid.spec.load("../specs/amqp.0-9.xml",
- "../specs/amqp-errata.0-9.xml"))
+client = Client("127.0.0.1", 5672, qpid.spec.load("../specs/amqp.0-10-preview.xml"))
client.start({"LOGIN": "guest", "PASSWORD": "guest"})
ch = client.channel(1)
-ch.channel_open()
+ch.session_open()
ch.queue_declare(queue="test")
ch.queue_bind(exchange="amq.direct", queue="test", routing_key="test")
-ch.message_consume(queue="test", destination="test")
-ch.message_transfer(destination="amq.direct", routing_key="test",
- body="hello world")
+print ch.queue_query(queue="test")
+ch.message_subscribe(queue="test", destination="test")
+ch.message_transfer(destination="amq.direct",
+ content=Content("hello world"))
diff --git a/python/qpid/__init__.py b/python/qpid/__init__.py
index 4363f175fb..4aeccae38e 100644
--- a/python/qpid/__init__.py
+++ b/python/qpid/__init__.py
@@ -18,3 +18,26 @@
#
import spec, codec, connection, content, peer, delegate, client
+
+class Struct:
+
+ def __init__(self, type):
+ self.__dict__["type"] = type
+ self.__dict__["_values"] = {}
+
+ def _check(self, attr):
+ field = self.type.fields.byname.get(attr)
+ if field == None:
+ raise AttributeError(attr)
+ return field
+
+ def __setattr__(self, attr, value):
+ self._check(attr)
+ self._values[attr] = value
+
+ def __getattr__(self, attr):
+ field = self._check(attr)
+ return self._values.get(attr, field.default())
+
+ def __str__(self):
+ return "%s %s" % (self.type.type, self._values)
diff --git a/python/qpid/client.py b/python/qpid/client.py
index f1800204db..3efd79c389 100644
--- a/python/qpid/client.py
+++ b/python/qpid/client.py
@@ -110,7 +110,7 @@ class ClientDelegate(Delegate):
#todo: just override the params, i.e. don't require them
# all to be included in tune_params
msg.tune_ok(**self.client.tune_params)
- else:
+ else:
msg.tune_ok(*msg.frame.args)
self.client.started.set()
@@ -143,6 +143,10 @@ class ClientDelegate(Delegate):
def execution_complete(self, ch, msg):
ch.completion.complete(msg.cumulative_execution_mark)
+ def execution_result(self, ch, msg):
+ future = ch.futures[msg.command_id]
+ future.put_response(ch, msg.data)
+
def close(self, reason):
self.client.closed = True
self.client.reason = reason
diff --git a/python/qpid/codec.py b/python/qpid/codec.py
index a11486376d..3920f2c8d9 100644
--- a/python/qpid/codec.py
+++ b/python/qpid/codec.py
@@ -26,7 +26,7 @@ fields.
The unit test for this module is located in tests/codec.py
"""
-import re
+import re, qpid
from cStringIO import StringIO
from struct import *
from reference import ReferenceId
@@ -40,11 +40,12 @@ class Codec:
class that handles encoding/decoding of AMQP primitives
"""
- def __init__(self, stream):
+ def __init__(self, stream, spec):
"""
initializing the stream/fields used
"""
self.stream = stream
+ self.spec = spec
self.nwrote = 0
self.nread = 0
self.incoming_bits = []
@@ -163,7 +164,7 @@ class Codec:
# short int's valid range is [0,65535]
if (o < 0 or o > 65535):
- raise ValueError('Valid range of short int is [0,65535]')
+ raise ValueError('Valid range of short int is [0,65535]: %s' % o)
self.pack("!H", o)
@@ -255,7 +256,7 @@ class Codec:
encodes a table data structure in network byte order
"""
enc = StringIO()
- codec = Codec(enc)
+ codec = Codec(enc, self.spec)
if tbl:
for key, value in tbl.items():
if len(key) > 128:
@@ -356,3 +357,21 @@ class Codec:
def decode_uuid(self):
return self.decode_longstr()
+
+ def encode_long_struct(self, s):
+ enc = StringIO()
+ codec = Codec(enc, self.spec)
+ type = s.type
+ codec.encode_short(type.type)
+ for f in type.fields:
+ codec.encode(f.type, getattr(s, f.name))
+ codec.flush()
+ self.encode_longstr(enc.getvalue())
+
+ def decode_long_struct(self):
+ codec = Codec(StringIO(self.decode_longstr()), self.spec)
+ type = self.spec.structs[codec.decode_short()]
+ s = qpid.Struct(type)
+ for f in type.fields:
+ setattr(s, f.name, codec.decode(f.type))
+ return s
diff --git a/python/qpid/connection.py b/python/qpid/connection.py
index cdfa2c2dc0..58235117ef 100644
--- a/python/qpid/connection.py
+++ b/python/qpid/connection.py
@@ -74,7 +74,7 @@ def listen(host, port, predicate = lambda: True):
class Connection:
def __init__(self, io, spec):
- self.codec = codec.Codec(io)
+ self.codec = codec.Codec(io, spec)
self.spec = spec
self.FRAME_END = self.spec.constants.byname["frame_end"].id
@@ -95,7 +95,7 @@ class Connection:
c.encode_octet(self.spec.constants.byname[frame.type].id)
c.encode_short(frame.channel)
body = StringIO()
- enc = codec.Codec(body)
+ enc = codec.Codec(body, self.spec)
frame.encode(enc)
enc.flush()
c.encode_longstr(body.getvalue())
@@ -106,7 +106,7 @@ class Connection:
type = self.spec.constants.byid[c.decode_octet()].name
channel = c.decode_short()
body = c.decode_longstr()
- dec = codec.Codec(StringIO(body))
+ dec = codec.Codec(StringIO(body), self.spec)
frame = Frame.DECODERS[type].decode(self.spec, dec, len(body))
frame.channel = channel
end = c.decode_octet()
diff --git a/python/qpid/content.py b/python/qpid/content.py
index bcbea1697c..9391f4f1a8 100644
--- a/python/qpid/content.py
+++ b/python/qpid/content.py
@@ -48,3 +48,11 @@ class Content:
def __delitem__(self, name):
del self.properties[name]
+
+ def __str__(self):
+ if self.children:
+ return "%s [%s] %s" % (self.properties,
+ ", ".join(map(str, self.children)),
+ self.body)
+ else:
+ return "%s %s" % (self.properties, self.body)
diff --git a/python/qpid/message.py b/python/qpid/message.py
index 7e28b63037..c9ea5a8a0c 100644
--- a/python/qpid/message.py
+++ b/python/qpid/message.py
@@ -29,7 +29,7 @@ class Message:
if self.method.is_l4_command():
self.command_id = self.channel.incoming_completion.sequence.next()
#print "allocated: ", self.command_id, "to ", self.method.klass.name, "_", self.method.name
-
+
def __len__(self):
return len(self.frame.args)
diff --git a/python/qpid/peer.py b/python/qpid/peer.py
index 03c48bef90..6762f774f4 100644
--- a/python/qpid/peer.py
+++ b/python/qpid/peer.py
@@ -189,6 +189,7 @@ class Channel:
self.completion = OutgoingCompletion()
self.incoming_completion = IncomingCompletion(self)
+ self.futures = {}
# Use reliable framing if version == 0-9.
if spec.major == 0 and spec.minor == 9:
@@ -261,6 +262,7 @@ class Channel:
self.completion.reset()
self.incoming_completion.reset()
self.completion.next_command(type)
+
content = kwargs.pop("content", None)
frame = Method(type, type.arguments(*args, **kwargs))
return self.invoker(frame, content)
@@ -275,7 +277,7 @@ class Channel:
self.request(frame, self.queue_response, content)
if not frame.method.responses:
- if self.use_execution_layer and type.is_l4_command():
+ if self.use_execution_layer and frame.method_type.is_l4_command():
self.execution_flush()
self.completion.wait()
if self.closed:
@@ -287,7 +289,6 @@ class Channel:
return Message(self, resp, read_content(self.responses))
else:
return Message(self, resp)
-
except QueueClosed, e:
if self.closed:
raise Closed(self.reason)
@@ -296,6 +297,11 @@ class Channel:
# used for 0-8 and 0-10
def invoke_method(self, frame, content = None):
+ if frame.method.result:
+ cmd_id = self.completion.command_id
+ future = Future()
+ self.futures[cmd_id] = future
+
self.write(frame, content)
try:
@@ -316,6 +322,11 @@ class Channel:
return Message(self, resp, content)
else:
raise ValueError(resp)
+ elif frame.method.result:
+ if self.synchronous:
+ return future.get_response(timeout=10)
+ else:
+ return future
elif self.synchronous and not frame.method.response \
and self.use_execution_layer and frame.method.is_l4_command():
self.execution_flush()
@@ -324,7 +335,6 @@ class Channel:
raise Closed(self.reason)
if not completed:
self.close("Timed-out waiting for completion")
-
except QueueClosed, e:
if self.closed:
raise Closed(self.reason)
diff --git a/python/qpid/spec.py b/python/qpid/spec.py
index f9d305c133..8a511bcb3d 100644
--- a/python/qpid/spec.py
+++ b/python/qpid/spec.py
@@ -91,6 +91,8 @@ class Spec(Metadata):
self.classes = SpecContainer()
# methods indexed by classname_methname
self.methods = {}
+ # structs by type code
+ self.structs = {}
def post_load(self):
self.module = self.define_module("amqp%s%s" % (self.major, self.minor))
@@ -295,13 +297,18 @@ class Field(Metadata):
self.description = description
self.docs = docs
+ def default(self):
+ return Method.DEFAULTS[self.type]
+
def get_result(nd, spec):
result = nd["result"]
if not result: return None
name = result["@domain"]
if name != None: return spec.domains.byname[name]
st_nd = result["struct"]
- st = Struct(st_nd["@size"], st_nd["@type"], st_nd["@pack"])
+ st = Struct(st_nd["@size"], int(result.parent.parent["@index"])*256 +
+ int(st_nd["@type"]), st_nd["@pack"])
+ spec.structs[st.type] = st
load_fields(st_nd, st.fields, spec.domains.byname)
return st
@@ -352,7 +359,12 @@ def load(specfile, *errata):
type = nd["@type"]
if type == None:
st_nd = nd["struct"]
- type = Struct(st_nd["@size"], st_nd["@type"], st_nd["@pack"])
+ code = st_nd["@type"]
+ if code not in (None, "", "none"):
+ code = int(code)
+ type = Struct(st_nd["@size"], code, st_nd["@pack"])
+ if type.type != None:
+ spec.structs[type.type] = type
structs.append((type, st_nd))
else:
type = pythonize(type)
diff --git a/python/server b/python/server
index 56f0f32081..37416314e2 100755
--- a/python/server
+++ b/python/server
@@ -3,22 +3,54 @@ import qpid
from qpid.connection import Connection, listen
from qpid.delegate import Delegate
from qpid.peer import Peer
+from qpid import Struct
class Server(Delegate):
+ def __init__(self):
+ Delegate.__init__(self)
+ self.queues = {}
+ self.bindings = {}
+
def connection_open(self, ch, msg):
msg.open_ok()
- def channel_open(self, ch, msg):
- print "channel %s open" % ch.id
- msg.open_ok()
+ def session_open(self, ch, msg):
+ print "session open on channel %s" % ch.id
+ msg.attached()
+
+ def execution_flush(self, ch, msg):
+ pass
+
+ def queue_declare(self, ch, msg):
+ self.queues[msg.queue] = []
+ print "queue declared: %s" % msg.queue
+ msg.complete()
+
+ def queue_bind(self, ch, msg):
+ if self.bindings.has_key(msg.exchange):
+ queues = self.bindings[msg.exchange]
+ else:
+ queues = set()
+ self.bindings[msg.exchange] = queues
+ queues.add((msg.routing_key, msg.queue))
+ msg.complete()
+
+ def queue_query(self, ch, msg):
+ st = Struct(msg.method.result)
+ ch.execution_result(msg.command_id, st)
+ msg.complete()
+
+ def message_subscribe(self, ch, msg):
+ print msg
+ msg.complete()
def message_transfer(self, ch, msg):
- print msg.body
- msg.ok()
+ print msg.content
+ msg.complete()
-spec = qpid.spec.load("../specs/amqp.0-9.xml")
+spec = qpid.spec.load("../specs/amqp.0-10-preview.xml")
for io in listen("0.0.0.0", 5672):
c = Connection(io, spec)
diff --git a/python/tests/codec.py b/python/tests/codec.py
index 785f1aba6b..689a2cf4c1 100644
--- a/python/tests/codec.py
+++ b/python/tests/codec.py
@@ -20,6 +20,7 @@
import unittest
from qpid.codec import Codec
+from qpid.spec import load
from cStringIO import StringIO
from qpid.reference import ReferenceId
@@ -52,11 +53,13 @@ __doc__ = """
"""
+SPEC = load("../specs/amqp.0-10-preview.xml")
# --------------------------------------
# --------------------------------------
class BaseDataTypes(unittest.TestCase):
+
"""
Base class containing common functions
"""
@@ -66,7 +69,7 @@ class BaseDataTypes(unittest.TestCase):
"""
standard setUp for unitetest (refer unittest documentation for details)
"""
- self.codec = Codec(StringIO())
+ self.codec = Codec(StringIO(), SPEC)
# ------------------
def tearDown(self):
@@ -504,7 +507,7 @@ def test(type, value):
else:
values = [value]
stream = StringIO()
- codec = Codec(stream)
+ codec = Codec(stream, SPEC)
for v in values:
codec.encode(type, v)
codec.flush()