summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/qpid/client.py31
-rw-r--r--python/qpid/codec.py27
-rw-r--r--python/qpid/peer.py27
-rw-r--r--python/qpid/reference.py117
-rw-r--r--python/qpid/testlib.py4
-rw-r--r--python/tests/message.py64
6 files changed, 256 insertions, 14 deletions
diff --git a/python/qpid/client.py b/python/qpid/client.py
index 20d1093878..ea6aa7901a 100644
--- a/python/qpid/client.py
+++ b/python/qpid/client.py
@@ -28,6 +28,7 @@ from delegate import Delegate
from connection import Connection, Frame, connect
from spec import load
from queue import Queue
+from reference import ReferenceId, References
class Client:
@@ -69,13 +70,14 @@ class Client:
self.lock.release()
return q
- def start(self, response, mechanism="AMQPLAIN", locale="en_US"):
+ def start(self, response, mechanism="AMQPLAIN", locale="en_US", tune_params=None):
self.mechanism = mechanism
self.response = response
self.locale = locale
+ self.tune_params = tune_params
self.conn = Connection(connect(self.host, self.port), self.spec)
- self.peer = Peer(self.conn, ClientDelegate(self))
+ self.peer = Peer(self.conn, ClientDelegate(self), self.opened)
self.conn.init()
self.peer.start()
@@ -85,6 +87,9 @@ class Client:
def channel(self, id):
return self.peer.channel(id)
+ def opened(self, ch):
+ ch.references = References()
+
class ClientDelegate(Delegate):
def __init__(self, client):
@@ -97,9 +102,29 @@ class ClientDelegate(Delegate):
locale=self.client.locale)
def connection_tune(self, ch, msg):
- msg.tune_ok(*msg.frame.args)
+ if self.client.tune_params:
+ #todo: just override the params, i.e. don't require them
+ # all to be included in tune_params
+ msg.tune_ok(**self.client.tune_params)
+ else:
+ msg.tune_ok(*msg.frame.args)
self.client.started.set()
+ def message_transfer(self, ch, msg):
+ if isinstance(msg.body, ReferenceId):
+ self.client.queue(msg.destination).put(ch.references.get(msg.body.id))
+ else:
+ self.client.queue(msg.destination).put(msg)
+
+ def message_open(self, ch, msg):
+ ch.references.open(msg.reference)
+
+ def message_close(self, ch, msg):
+ ch.references.close(msg.reference)
+
+ def message_append(self, ch, msg):
+ ch.references.get(msg.reference).append(msg.bytes)
+
def basic_deliver(self, ch, msg):
self.client.queue(msg.consumer_tag).put(msg)
diff --git a/python/qpid/codec.py b/python/qpid/codec.py
index 205405894a..3c1e73c2e6 100644
--- a/python/qpid/codec.py
+++ b/python/qpid/codec.py
@@ -26,6 +26,7 @@ fields.
from cStringIO import StringIO
from struct import *
+from reference import ReferenceId
class EOF(Exception):
pass
@@ -195,14 +196,24 @@ class Codec:
return self.decode_longlong()
def encode_content(self, s):
- # XXX
- self.encode_octet(0)
- self.encode_longstr(s)
-
- def decode_content(self):
- # XXX
- self.decode_octet()
- return self.decode_longstr()
+ # content can be passed as a string in which case it is assumed to
+ # be inline data, or as an instance of ReferenceId indicating it is
+ # a reference id
+ if isinstance(s, ReferenceId):
+ self.encode_octet(1)
+ self.encode_longstr(s.id)
+ else:
+ self.encode_octet(0)
+ self.encode_longstr(s)
+
+ def decode_content(self):
+ # return a string for inline data and a ReferenceId instance for
+ # references
+ type = self.decode_octet()
+ if type == 0:
+ return self.decode_longstr()
+ else:
+ return ReferenceId(self.decode_longstr())
def test(type, value):
if isinstance(value, (list, tuple)):
diff --git a/python/qpid/peer.py b/python/qpid/peer.py
index b5c655dc2a..6c8c6647c9 100644
--- a/python/qpid/peer.py
+++ b/python/qpid/peer.py
@@ -50,13 +50,14 @@ class Sequence:
class Peer:
- def __init__(self, conn, delegate):
+ def __init__(self, conn, delegate, channel_callback=None):
self.conn = conn
self.delegate = delegate
self.outgoing = Queue(0)
self.work = Queue(0)
self.channels = {}
self.lock = thread.allocate_lock()
+ self.channel_callback = channel_callback #notified when channels are created
def channel(self, id):
self.lock.acquire()
@@ -66,6 +67,8 @@ class Peer:
except KeyError:
ch = Channel(id, self.outgoing, self.conn.spec)
self.channels[id] = ch
+ if self.channel_callback:
+ self.channel_callback(ch)
finally:
self.lock.release()
return ch
@@ -177,6 +180,7 @@ class Channel:
# XXX: better switch
self.reliable = False
+ self.synchronous = True
def close(self, reason):
if self.closed:
@@ -238,6 +242,12 @@ class Channel:
content = kwargs.pop("content", None)
frame = Method(type, type.arguments(*args, **kwargs))
if self.reliable:
+ if not self.synchronous:
+ future = Future()
+ self.request(frame, future.put_response, content)
+ if not frame.method.responses: return None
+ else: return future
+
self.request(frame, self.queue_response, content)
if not frame.method.responses:
return None
@@ -304,3 +314,18 @@ def read_content(queue):
buf.write(content)
read += len(content)
return Content(buf.getvalue(), children, header.properties.copy())
+
+class Future:
+ def __init__(self):
+ self.completed = threading.Event()
+
+ def put_response(self, channel, response):
+ self.response = response
+ self.completed.set()
+
+ def get_response(self, timeout=None):
+ self.completed.wait(timeout)
+ return self.response
+
+ def is_complete(self):
+ return self.completed.isSet()
diff --git a/python/qpid/reference.py b/python/qpid/reference.py
new file mode 100644
index 0000000000..d357560390
--- /dev/null
+++ b/python/qpid/reference.py
@@ -0,0 +1,117 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Support for amqp 'reference' content (as opposed to inline content)
+"""
+
+import threading
+from queue import Queue, Closed
+
+class NotOpened(Exception): pass
+
+class AlreadyOpened(Exception): pass
+
+"""
+A representation of a reference id; can be passed wherever amqp
+content is required in place of inline data
+"""
+class ReferenceId:
+
+ def __init__(self, id):
+ self.id = id
+
+"""
+Holds content received through 'reference api'. Instances of this
+class will be placed in the consumers queue on receiving a transfer
+(assuming the reference has been opened). Data can be retrieved in
+chunks (as append calls are received) or in full (after reference has
+been closed signalling data s complete).
+"""
+
+class Reference:
+
+ def __init__(self, id):
+ self.id = id
+ self.chunks = Queue(0)
+
+ def close(self):
+ self.chunks.close()
+
+ def append(self, bytes):
+ self.chunks.put(bytes)
+
+ def get_chunk(self):
+ return self.chunks.get()
+
+ def get_complete(self):
+ data = ""
+ for chunk in self:
+ data += chunk
+ return data
+
+ def next(self):
+ try:
+ return self.get_chunk()
+ except Closed, e:
+ raise StopIteration
+
+ def __iter__(self):
+ return self
+
+"""
+Manages a set of opened references. New references can be opened and
+existing references can be retrieved or closed.
+"""
+class References:
+
+ def __init__(self):
+ self.map = {}
+ self.lock = threading.Lock()
+
+ def get(self, id):
+ self.lock.acquire()
+ try:
+ try:
+ ref = self.map[id]
+ except KeyError:
+ raise NotOpened()
+ finally:
+ self.lock.release()
+ return ref
+
+ def open(self, id):
+ self.lock.acquire()
+ try:
+ if id in self.map: raise AlreadyOpened()
+ self.map[id] = Reference(id)
+ finally:
+ self.lock.release()
+
+
+ def close(self, id):
+ self.get(id).close()
+ self.lock.acquire()
+ try:
+ del map[id]
+ finally:
+ self.lock.release()
+
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py
index 708ebcdcb9..dcbf0ed91c 100644
--- a/python/qpid/testlib.py
+++ b/python/qpid/testlib.py
@@ -145,7 +145,7 @@ Options:
print "======================================="
return result.wasSuccessful()
- def connect(self, host=None, port=None, spec=None, user=None, password=None):
+ def connect(self, host=None, port=None, spec=None, user=None, password=None, tune_params=None):
"""Connect to the broker, returns a qpid.client.Client"""
host = host or self.host
port = port or self.port
@@ -153,7 +153,7 @@ Options:
user = user or self.user
password = password or self.password
client = qpid.client.Client(host, port, spec)
- client.start({"LOGIN": user, "PASSWORD": password})
+ client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params)
return client
diff --git a/python/tests/message.py b/python/tests/message.py
index a7c8f875eb..10d0f51448 100644
--- a/python/tests/message.py
+++ b/python/tests/message.py
@@ -20,6 +20,7 @@ from qpid.client import Client, Closed
from qpid.queue import Empty
from qpid.content import Content
from qpid.testlib import testrunner, TestBase
+from qpid.reference import Reference, ReferenceId
class MessageTests(TestBase):
"""Tests for 'methods' on the amqp message 'class'"""
@@ -413,3 +414,66 @@ class MessageTests(TestBase):
reply = channel.message_get(no_ack=True)
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "get-empty")
+
+ def test_reference_simple(self):
+ """
+ Test basic ability to handle references
+ """
+ channel = self.channel
+ channel.queue_declare(queue="ref_queue", exclusive=True)
+ channel.message_consume(queue="ref_queue", destination="c1")
+ queue = self.client.queue("c1")
+
+ refId = "myref"
+ channel.message_open(reference=refId)
+ channel.message_append(reference=refId, bytes="abcd")
+ channel.synchronous = False
+ ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId))
+ channel.synchronous = True
+
+ channel.message_append(reference=refId, bytes="efgh")
+ channel.message_append(reference=refId, bytes="ijkl")
+ channel.message_close(reference=refId)
+
+ #first, wait for the ok for the transfer
+ ack.get_response(timeout=1)
+
+ msg = queue.get(timeout=1)
+ if isinstance(msg, Reference):
+ #should we force broker to deliver as reference by frame
+ #size limit? or test that separately? for compliance,
+ #allowing either seems best for now...
+ data = msg.get_complete()
+ else:
+ data = msg.body
+ self.assertEquals("abcdefghijkl", data)
+
+
+ def test_reference_large(self):
+ """
+ Test basic ability to handle references whose content exceeds max frame size
+ """
+ channel = self.channel
+ self.queue_declare(queue="ref_queue")
+
+ #generate a big data string (> max frame size of consumer):
+ data = "0123456789"
+ for i in range(0, 10):
+ data += data
+ #send it inline
+ channel.synchronous = False
+ ack = channel.message_transfer(routing_key="ref_queue", body=data)
+ channel.synchronous = True
+ #first, wait for the ok for the transfer
+ ack.get_response(timeout=1)
+
+ #create a new connection for consumer, with specific max frame size (< data)
+ other = self.connect(tune_params={"channel_max":10, "frame_max":5120, "heartbeat":0})
+ ch2 = other.channel(1)
+ ch2.channel_open()
+ ch2.message_consume(queue="ref_queue", destination="c1")
+ queue = other.queue("c1")
+
+ msg = queue.get(timeout=1)
+ self.assertTrue(isinstance(msg, Reference))
+ self.assertEquals(data, msg.get_complete())