summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-03-05 14:39:40 +0000
committerRafael H. Schloming <rhs@apache.org>2008-03-05 14:39:40 +0000
commit86779be122dea590bc1e5201c58777ea3e362a95 (patch)
tree5867b18efe04c62c99e1ca14d177b0eda894bd82
parent00f2ca6cf33f77e44b94db2701830f8c9bcd794e (diff)
downloadqpid-python-86779be122dea590bc1e5201c58777ea3e362a95.tar.gz
added incoming queues for messages; altered session dispatch to send entire assembly to a single handler; added logging switch for hello-010-world
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@633861 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xpython/hello-010-world16
-rw-r--r--python/qpid/assembler.py6
-rw-r--r--python/qpid/datatypes.py8
-rw-r--r--python/qpid/session.py79
-rw-r--r--python/qpid/spec010.py32
-rwxr-xr-xpython/server0106
-rw-r--r--python/tests/connection010.py24
7 files changed, 124 insertions, 47 deletions
diff --git a/python/hello-010-world b/python/hello-010-world
index 1b4c9aed33..2b762b1296 100755
--- a/python/hello-010-world
+++ b/python/hello-010-world
@@ -1,13 +1,18 @@
#!/usr/bin/env python
-import logging
+import sys, logging
from qpid.connection010 import Connection
from qpid.spec010 import load
from qpid.util import connect
from qpid.datatypes import Message
+if "-v" in sys.argv:
+ level = logging.DEBUG
+else:
+ level = logging.WARN
+
format = "%(asctime)s %(name)-12s %(levelname)-8s %(message)s"
-logging.basicConfig(level=logging.DEBUG, format=format, datefmt='%H:%M:%S')
+logging.basicConfig(level=level, format=format, datefmt='%H:%M:%S')
spec = load("../specs/amqp.0-10.xml")
conn = Connection(connect("0.0.0.0", spec.port), spec)
@@ -18,10 +23,15 @@ ssn = conn.session("my-session", timeout=10)
ssn.queue_declare("asdf")
ssn.message_transfer("this", None, None, Message("testing..."))
-ssn.message_transfer("is")
+ssn.message_transfer("is", None, None, Message("more testing..."))
ssn.message_transfer("a")
ssn.message_transfer("test")
+print ssn.incoming("this").get()
+print ssn.incoming("is").get()
+print ssn.incoming("a").get()
+print ssn.incoming("test").get()
+
print ssn.queue_query("testing")
ssn.close(timeout=10)
diff --git a/python/qpid/assembler.py b/python/qpid/assembler.py
index aac8b80cb4..fe78baaceb 100644
--- a/python/qpid/assembler.py
+++ b/python/qpid/assembler.py
@@ -46,7 +46,9 @@ class Segment:
def decode_command(self, spec):
sc = StringCodec(spec, self.payload)
- return sc.read_command()
+ cmd = sc.read_command()
+ cmd.id = self.id
+ return cmd
def decode_header(self, spec):
sc = StringCodec(spec, self.payload)
@@ -56,7 +58,7 @@ class Segment:
return values
def decode_body(self, spec):
- return self
+ return self.payload
def __str__(self):
return "%s%s %s %s %s %r" % (int(self.first), int(self.last), self.type,
diff --git a/python/qpid/datatypes.py b/python/qpid/datatypes.py
index 649c8f4d76..efd1fdd4ff 100644
--- a/python/qpid/datatypes.py
+++ b/python/qpid/datatypes.py
@@ -43,6 +43,14 @@ class Message:
else:
self.headers = None
+ def __repr__(self):
+ args = []
+ if self.headers:
+ args.extend(self.headers)
+ if self.body:
+ args.append(self.body)
+ return "Message(%s)" % ", ".join(map(repr, args))
+
class Range:
def __init__(self, lower, upper):
diff --git a/python/qpid/session.py b/python/qpid/session.py
index 2e5f47b63e..334902bbf3 100644
--- a/python/qpid/session.py
+++ b/python/qpid/session.py
@@ -17,11 +17,14 @@
# under the License.
#
-from threading import Event
+from threading import Event, RLock
from invoker import Invoker
from datatypes import RangeSet, Struct, Future
from codec010 import StringCodec
from assembler import Segment
+from queue import Queue
+from datatypes import Message
+from logging import getLogger
class SessionDetached(Exception): pass
@@ -46,6 +49,20 @@ class Session(Invoker):
self.delegate = delegate(self)
self.send_id = True
self.results = {}
+ self.lock = RLock()
+ self._incoming = {}
+ self.assembly = None
+
+ def incoming(self, destination):
+ self.lock.acquire()
+ try:
+ queue = self._incoming.get(destination)
+ if queue == None:
+ queue = Queue()
+ self._incoming[destination] = queue
+ return queue
+ finally:
+ self.lock.release()
def close(self, timeout=None):
self.channel.session_detach(self.name)
@@ -106,19 +123,37 @@ class Session(Invoker):
def received(self, seg):
self.receiver.received(seg)
- if seg.type == self.spec["segment_type.command"].value:
- cmd = seg.decode(self.spec)
- attr = cmd.type.qname.replace(".", "_")
- result = getattr(self.delegate, attr)(cmd)
- if cmd.type.result:
- self.execution_result(seg.id, result)
- elif seg.type == self.spec["segment_type.header"].value:
- self.delegate.header(seg.decode(self.spec))
- elif seg.type == self.spec["segment_type.body"].value:
- self.delegate.body(seg.decode(self.spec))
- else:
- raise ValueError("unknown segment type: %s" % seg.type)
- self.receiver.completed(seg)
+ if seg.first:
+ assert self.assembly == None
+ self.assembly = []
+ self.assembly.append(seg)
+ if seg.last:
+ self.dispatch(self.assembly)
+ self.assembly = None
+
+ def dispatch(self, assembly):
+ cmd = assembly.pop(0).decode(self.spec)
+ args = []
+
+ for st in cmd.type.segments:
+ if assembly:
+ seg = assembly[0]
+ if seg.type == st.segment_type:
+ args.append(seg.decode(self.spec))
+ assembly.pop(0)
+ continue
+ args.append(None)
+
+ assert len(assembly) == 0
+
+ attr = cmd.type.qname.replace(".", "_")
+ result = getattr(self.delegate, attr)(cmd, *args)
+
+ if cmd.type.result:
+ self.execution_result(cmd.id, result)
+
+ for seg in assembly:
+ self.receiver.completed(seg)
def send(self, seg):
self.sender.send(seg)
@@ -196,13 +231,13 @@ class Delegate:
future = self.session.results[er.command_id]
future.set(er.value)
-class Client(Delegate):
+msg = getLogger("qpid.ssn.msg")
- def message_transfer(self, cmd):
- print "TRANSFER:", cmd
-
- def header(self, hdr):
- print "HEADER:", hdr
+class Client(Delegate):
- def body(self, seg):
- print "BODY:", seg
+ def message_transfer(self, cmd, headers, body):
+ m = Message(body)
+ m.headers = headers
+ messages = self.session.incoming(cmd.destination)
+ messages.put(m)
+ msg.debug("RECV: %s", m)
diff --git a/python/qpid/spec010.py b/python/qpid/spec010.py
index e6b7946e17..c3f3e6ad57 100644
--- a/python/qpid/spec010.py
+++ b/python/qpid/spec010.py
@@ -194,7 +194,7 @@ class Composite(Type, Coded):
result[f.name] = a
for k, v in kwargs.items():
- f = self.named.get(k, None)
+ f = self.named.get(k)
if f == None:
raise TypeError("%s got an unexpected keyword argument '%s'" %
(self.name, k))
@@ -232,7 +232,7 @@ class Composite(Type, Coded):
flags = 0
for i in range(len(self.fields)):
f = self.fields[i]
- if f.type.is_present(values.get(f.name, None)):
+ if f.type.is_present(values.get(f.name)):
flags |= (0x1 << i)
for i in range(self.pack):
codec.write_uint8((flags >> 8*i) & 0xFF)
@@ -272,7 +272,10 @@ class Struct(Composite):
for f in self.fields])
return "%s {\n %s\n}" % (self.qname, fields)
-class Segment(Node):
+class Segment:
+
+ def __init__(self):
+ self.segment_type = None
def register(self, node):
self.spec = node.spec
@@ -284,7 +287,7 @@ class Instruction(Composite, Segment):
def __init__(self, name, code, children):
Composite.__init__(self, name, code, 0, 2, children)
- self.segment_type = None
+ Segment.__init__(self)
self.track = None
self.handlers = []
@@ -337,11 +340,17 @@ class Command(Instruction):
self.header.encode(codec, cmd)
Instruction.encode(self, codec, cmd)
-class Header(Segment):
+class Header(Segment, Node):
def __init__(self, children):
+ Segment.__init__(self)
+ Node.__init__(self, children)
self.entries = []
- Segment.__init__(self, children)
+
+ def register(self, node):
+ Segment.register(self, node)
+ self.segment_type = self.spec["segment_type.header"].value
+ Node.register(self)
class Entry(Lookup):
@@ -356,7 +365,16 @@ class Entry(Lookup):
def resolve(self):
self.type = self.lookup(self.type)
-class Body(Segment):
+class Body(Segment, Node):
+
+ def __init__(self, children):
+ Segment.__init__(self)
+ Node.__init__(self, children)
+
+ def register(self, node):
+ Segment.register(self, node)
+ self.segment_type = self.spec["segment_type.body"].value
+ Node.register(self)
def resolve(self): pass
diff --git a/python/server010 b/python/server010
index b0e13d1e9f..6d89ee5ea0 100755
--- a/python/server010
+++ b/python/server010
@@ -5,6 +5,7 @@ from qpid.connection010 import Connection
from qpid.util import connect, listen
from qpid.spec010 import load
from qpid.session import Client
+from qpid.datatypes import Message
spec = load("../specs/amqp.0-10.xml")
@@ -27,6 +28,11 @@ class SessionDelegate(Client):
def queue_query(self, qq):
return qq.type.result.type.new((qq.queue,), {})
+ def message_transfer(self, cmd, header, body):
+ m = Message(body)
+ m.header = header
+ self.session.message_transfer(cmd.destination, cmd.accept_mode, cmd.acquire_mode, m)
+
server = Server()
for s in listen("0.0.0.0", spec.port):
diff --git a/python/tests/connection010.py b/python/tests/connection010.py
index 5e4bf983da..8adf20fd78 100644
--- a/python/tests/connection010.py
+++ b/python/tests/connection010.py
@@ -50,11 +50,8 @@ class TestSession(Delegate):
def queue_query(self, qq):
return qq.type.result.type.new((qq.queue,), {})
- def message_transfer(self, cmd):
- self.queue.put(cmd)
-
- def body(self, body):
- self.queue.put(body)
+ def message_transfer(self, cmd, header, body):
+ self.queue.put((cmd, header, body))
class ConnectionTest(TestCase):
@@ -88,8 +85,8 @@ class ConnectionTest(TestCase):
c = Connection(connect("0.0.0.0", PORT), self.spec)
c.start(10)
- ssn1 = c.session("test1")
- ssn2 = c.session("test2")
+ ssn1 = c.session("test1", timeout=10)
+ ssn2 = c.session("test2", timeout=10)
assert ssn1 == c.sessions["test1"]
assert ssn2 == c.sessions["test2"]
@@ -110,7 +107,7 @@ class ConnectionTest(TestCase):
assert ssn2 not in c.attached.values()
assert ssn2 not in c.sessions.values()
- ssn = c.session("session")
+ ssn = c.session("session", timeout=10)
assert ssn.channel != None
assert ssn in c.sessions.values()
@@ -121,16 +118,17 @@ class ConnectionTest(TestCase):
ssn.message_transfer(d)
for d in destinations:
- cmd = self.queue.get(10)
+ cmd, header, body = self.queue.get(10)
assert cmd.destination == d
+ assert header == None
+ assert body == None
msg = Message("this is a test")
ssn.message_transfer("four", message=msg)
- cmd = self.queue.get(10)
+ cmd, header, body = self.queue.get(10)
assert cmd.destination == "four"
- body = self.queue.get(10)
- assert body.payload == msg.body
- assert body.last
+ assert header == None
+ assert body == msg.body
qq = ssn.queue_query("asdf")
assert qq.queue == "asdf"