summaryrefslogtreecommitdiff
path: root/python/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid')
-rw-r--r--python/qpid/__init__.py23
-rw-r--r--python/qpid/client.py6
-rw-r--r--python/qpid/codec.py27
-rw-r--r--python/qpid/connection.py6
-rw-r--r--python/qpid/content.py8
-rw-r--r--python/qpid/message.py2
-rw-r--r--python/qpid/peer.py16
-rw-r--r--python/qpid/spec.py16
8 files changed, 90 insertions, 14 deletions
diff --git a/python/qpid/__init__.py b/python/qpid/__init__.py
index 4363f175fb..4aeccae38e 100644
--- a/python/qpid/__init__.py
+++ b/python/qpid/__init__.py
@@ -18,3 +18,26 @@
#
import spec, codec, connection, content, peer, delegate, client
+
+class Struct:
+
+ def __init__(self, type):
+ self.__dict__["type"] = type
+ self.__dict__["_values"] = {}
+
+ def _check(self, attr):
+ field = self.type.fields.byname.get(attr)
+ if field == None:
+ raise AttributeError(attr)
+ return field
+
+ def __setattr__(self, attr, value):
+ self._check(attr)
+ self._values[attr] = value
+
+ def __getattr__(self, attr):
+ field = self._check(attr)
+ return self._values.get(attr, field.default())
+
+ def __str__(self):
+ return "%s %s" % (self.type.type, self._values)
diff --git a/python/qpid/client.py b/python/qpid/client.py
index f1800204db..3efd79c389 100644
--- a/python/qpid/client.py
+++ b/python/qpid/client.py
@@ -110,7 +110,7 @@ class ClientDelegate(Delegate):
#todo: just override the params, i.e. don't require them
# all to be included in tune_params
msg.tune_ok(**self.client.tune_params)
- else:
+ else:
msg.tune_ok(*msg.frame.args)
self.client.started.set()
@@ -143,6 +143,10 @@ class ClientDelegate(Delegate):
def execution_complete(self, ch, msg):
ch.completion.complete(msg.cumulative_execution_mark)
+ def execution_result(self, ch, msg):
+ future = ch.futures[msg.command_id]
+ future.put_response(ch, msg.data)
+
def close(self, reason):
self.client.closed = True
self.client.reason = reason
diff --git a/python/qpid/codec.py b/python/qpid/codec.py
index a11486376d..3920f2c8d9 100644
--- a/python/qpid/codec.py
+++ b/python/qpid/codec.py
@@ -26,7 +26,7 @@ fields.
The unit test for this module is located in tests/codec.py
"""
-import re
+import re, qpid
from cStringIO import StringIO
from struct import *
from reference import ReferenceId
@@ -40,11 +40,12 @@ class Codec:
class that handles encoding/decoding of AMQP primitives
"""
- def __init__(self, stream):
+ def __init__(self, stream, spec):
"""
initializing the stream/fields used
"""
self.stream = stream
+ self.spec = spec
self.nwrote = 0
self.nread = 0
self.incoming_bits = []
@@ -163,7 +164,7 @@ class Codec:
# short int's valid range is [0,65535]
if (o < 0 or o > 65535):
- raise ValueError('Valid range of short int is [0,65535]')
+ raise ValueError('Valid range of short int is [0,65535]: %s' % o)
self.pack("!H", o)
@@ -255,7 +256,7 @@ class Codec:
encodes a table data structure in network byte order
"""
enc = StringIO()
- codec = Codec(enc)
+ codec = Codec(enc, self.spec)
if tbl:
for key, value in tbl.items():
if len(key) > 128:
@@ -356,3 +357,21 @@ class Codec:
def decode_uuid(self):
return self.decode_longstr()
+
+ def encode_long_struct(self, s):
+ enc = StringIO()
+ codec = Codec(enc, self.spec)
+ type = s.type
+ codec.encode_short(type.type)
+ for f in type.fields:
+ codec.encode(f.type, getattr(s, f.name))
+ codec.flush()
+ self.encode_longstr(enc.getvalue())
+
+ def decode_long_struct(self):
+ codec = Codec(StringIO(self.decode_longstr()), self.spec)
+ type = self.spec.structs[codec.decode_short()]
+ s = qpid.Struct(type)
+ for f in type.fields:
+ setattr(s, f.name, codec.decode(f.type))
+ return s
diff --git a/python/qpid/connection.py b/python/qpid/connection.py
index cdfa2c2dc0..58235117ef 100644
--- a/python/qpid/connection.py
+++ b/python/qpid/connection.py
@@ -74,7 +74,7 @@ def listen(host, port, predicate = lambda: True):
class Connection:
def __init__(self, io, spec):
- self.codec = codec.Codec(io)
+ self.codec = codec.Codec(io, spec)
self.spec = spec
self.FRAME_END = self.spec.constants.byname["frame_end"].id
@@ -95,7 +95,7 @@ class Connection:
c.encode_octet(self.spec.constants.byname[frame.type].id)
c.encode_short(frame.channel)
body = StringIO()
- enc = codec.Codec(body)
+ enc = codec.Codec(body, self.spec)
frame.encode(enc)
enc.flush()
c.encode_longstr(body.getvalue())
@@ -106,7 +106,7 @@ class Connection:
type = self.spec.constants.byid[c.decode_octet()].name
channel = c.decode_short()
body = c.decode_longstr()
- dec = codec.Codec(StringIO(body))
+ dec = codec.Codec(StringIO(body), self.spec)
frame = Frame.DECODERS[type].decode(self.spec, dec, len(body))
frame.channel = channel
end = c.decode_octet()
diff --git a/python/qpid/content.py b/python/qpid/content.py
index bcbea1697c..9391f4f1a8 100644
--- a/python/qpid/content.py
+++ b/python/qpid/content.py
@@ -48,3 +48,11 @@ class Content:
def __delitem__(self, name):
del self.properties[name]
+
+ def __str__(self):
+ if self.children:
+ return "%s [%s] %s" % (self.properties,
+ ", ".join(map(str, self.children)),
+ self.body)
+ else:
+ return "%s %s" % (self.properties, self.body)
diff --git a/python/qpid/message.py b/python/qpid/message.py
index 7e28b63037..c9ea5a8a0c 100644
--- a/python/qpid/message.py
+++ b/python/qpid/message.py
@@ -29,7 +29,7 @@ class Message:
if self.method.is_l4_command():
self.command_id = self.channel.incoming_completion.sequence.next()
#print "allocated: ", self.command_id, "to ", self.method.klass.name, "_", self.method.name
-
+
def __len__(self):
return len(self.frame.args)
diff --git a/python/qpid/peer.py b/python/qpid/peer.py
index 03c48bef90..6762f774f4 100644
--- a/python/qpid/peer.py
+++ b/python/qpid/peer.py
@@ -189,6 +189,7 @@ class Channel:
self.completion = OutgoingCompletion()
self.incoming_completion = IncomingCompletion(self)
+ self.futures = {}
# Use reliable framing if version == 0-9.
if spec.major == 0 and spec.minor == 9:
@@ -261,6 +262,7 @@ class Channel:
self.completion.reset()
self.incoming_completion.reset()
self.completion.next_command(type)
+
content = kwargs.pop("content", None)
frame = Method(type, type.arguments(*args, **kwargs))
return self.invoker(frame, content)
@@ -275,7 +277,7 @@ class Channel:
self.request(frame, self.queue_response, content)
if not frame.method.responses:
- if self.use_execution_layer and type.is_l4_command():
+ if self.use_execution_layer and frame.method_type.is_l4_command():
self.execution_flush()
self.completion.wait()
if self.closed:
@@ -287,7 +289,6 @@ class Channel:
return Message(self, resp, read_content(self.responses))
else:
return Message(self, resp)
-
except QueueClosed, e:
if self.closed:
raise Closed(self.reason)
@@ -296,6 +297,11 @@ class Channel:
# used for 0-8 and 0-10
def invoke_method(self, frame, content = None):
+ if frame.method.result:
+ cmd_id = self.completion.command_id
+ future = Future()
+ self.futures[cmd_id] = future
+
self.write(frame, content)
try:
@@ -316,6 +322,11 @@ class Channel:
return Message(self, resp, content)
else:
raise ValueError(resp)
+ elif frame.method.result:
+ if self.synchronous:
+ return future.get_response(timeout=10)
+ else:
+ return future
elif self.synchronous and not frame.method.response \
and self.use_execution_layer and frame.method.is_l4_command():
self.execution_flush()
@@ -324,7 +335,6 @@ class Channel:
raise Closed(self.reason)
if not completed:
self.close("Timed-out waiting for completion")
-
except QueueClosed, e:
if self.closed:
raise Closed(self.reason)
diff --git a/python/qpid/spec.py b/python/qpid/spec.py
index f9d305c133..8a511bcb3d 100644
--- a/python/qpid/spec.py
+++ b/python/qpid/spec.py
@@ -91,6 +91,8 @@ class Spec(Metadata):
self.classes = SpecContainer()
# methods indexed by classname_methname
self.methods = {}
+ # structs by type code
+ self.structs = {}
def post_load(self):
self.module = self.define_module("amqp%s%s" % (self.major, self.minor))
@@ -295,13 +297,18 @@ class Field(Metadata):
self.description = description
self.docs = docs
+ def default(self):
+ return Method.DEFAULTS[self.type]
+
def get_result(nd, spec):
result = nd["result"]
if not result: return None
name = result["@domain"]
if name != None: return spec.domains.byname[name]
st_nd = result["struct"]
- st = Struct(st_nd["@size"], st_nd["@type"], st_nd["@pack"])
+ st = Struct(st_nd["@size"], int(result.parent.parent["@index"])*256 +
+ int(st_nd["@type"]), st_nd["@pack"])
+ spec.structs[st.type] = st
load_fields(st_nd, st.fields, spec.domains.byname)
return st
@@ -352,7 +359,12 @@ def load(specfile, *errata):
type = nd["@type"]
if type == None:
st_nd = nd["struct"]
- type = Struct(st_nd["@size"], st_nd["@type"], st_nd["@pack"])
+ code = st_nd["@type"]
+ if code not in (None, "", "none"):
+ code = int(code)
+ type = Struct(st_nd["@size"], code, st_nd["@pack"])
+ if type.type != None:
+ spec.structs[type.type] = type
structs.append((type, st_nd))
else:
type = pythonize(type)