diff options
author | Alan Conway <aconway@apache.org> | 2007-03-19 19:39:55 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-03-19 19:39:55 +0000 |
commit | a96bf8ba7ce40d12ee4b3f85002133e1738225a4 (patch) | |
tree | 13db6eefd1120c228c11ff7d94a500bbbd4d1289 | |
parent | 27e6ef93eea10d1aeb7ca6a6a37926aa5f85c380 (diff) | |
download | qpid-python-a96bf8ba7ce40d12ee4b3f85002133e1738225a4.tar.gz |
Merged revisions 504590 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9
........
r504590 | gsim | 2007-02-07 10:36:01 -0500 (Wed, 07 Feb 2007) | 6 lines
Added support for receiving and sending of references
Added asynchronous mode to channels (responses can be tracked via a future, rather than blocking on each request)
Added ability to override server suggested connection tune params
Added two tests for reference functionality (more to follow)
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@520061 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | python/qpid/client.py | 31 | ||||
-rw-r--r-- | python/qpid/codec.py | 27 | ||||
-rw-r--r-- | python/qpid/peer.py | 27 | ||||
-rw-r--r-- | python/qpid/reference.py | 117 | ||||
-rw-r--r-- | python/qpid/testlib.py | 4 | ||||
-rw-r--r-- | python/tests/message.py | 64 |
6 files changed, 256 insertions, 14 deletions
diff --git a/python/qpid/client.py b/python/qpid/client.py index 20d1093878..ea6aa7901a 100644 --- a/python/qpid/client.py +++ b/python/qpid/client.py @@ -28,6 +28,7 @@ from delegate import Delegate from connection import Connection, Frame, connect from spec import load from queue import Queue +from reference import ReferenceId, References class Client: @@ -69,13 +70,14 @@ class Client: self.lock.release() return q - def start(self, response, mechanism="AMQPLAIN", locale="en_US"): + def start(self, response, mechanism="AMQPLAIN", locale="en_US", tune_params=None): self.mechanism = mechanism self.response = response self.locale = locale + self.tune_params = tune_params self.conn = Connection(connect(self.host, self.port), self.spec) - self.peer = Peer(self.conn, ClientDelegate(self)) + self.peer = Peer(self.conn, ClientDelegate(self), self.opened) self.conn.init() self.peer.start() @@ -85,6 +87,9 @@ class Client: def channel(self, id): return self.peer.channel(id) + def opened(self, ch): + ch.references = References() + class ClientDelegate(Delegate): def __init__(self, client): @@ -97,9 +102,29 @@ class ClientDelegate(Delegate): locale=self.client.locale) def connection_tune(self, ch, msg): - msg.tune_ok(*msg.frame.args) + if self.client.tune_params: + #todo: just override the params, i.e. don't require them + # all to be included in tune_params + msg.tune_ok(**self.client.tune_params) + else: + msg.tune_ok(*msg.frame.args) self.client.started.set() + def message_transfer(self, ch, msg): + if isinstance(msg.body, ReferenceId): + self.client.queue(msg.destination).put(ch.references.get(msg.body.id)) + else: + self.client.queue(msg.destination).put(msg) + + def message_open(self, ch, msg): + ch.references.open(msg.reference) + + def message_close(self, ch, msg): + ch.references.close(msg.reference) + + def message_append(self, ch, msg): + ch.references.get(msg.reference).append(msg.bytes) + def basic_deliver(self, ch, msg): self.client.queue(msg.consumer_tag).put(msg) diff --git a/python/qpid/codec.py b/python/qpid/codec.py index 205405894a..3c1e73c2e6 100644 --- a/python/qpid/codec.py +++ b/python/qpid/codec.py @@ -26,6 +26,7 @@ fields. from cStringIO import StringIO from struct import * +from reference import ReferenceId class EOF(Exception): pass @@ -195,14 +196,24 @@ class Codec: return self.decode_longlong() def encode_content(self, s): - # XXX - self.encode_octet(0) - self.encode_longstr(s) - - def decode_content(self): - # XXX - self.decode_octet() - return self.decode_longstr() + # content can be passed as a string in which case it is assumed to + # be inline data, or as an instance of ReferenceId indicating it is + # a reference id + if isinstance(s, ReferenceId): + self.encode_octet(1) + self.encode_longstr(s.id) + else: + self.encode_octet(0) + self.encode_longstr(s) + + def decode_content(self): + # return a string for inline data and a ReferenceId instance for + # references + type = self.decode_octet() + if type == 0: + return self.decode_longstr() + else: + return ReferenceId(self.decode_longstr()) def test(type, value): if isinstance(value, (list, tuple)): diff --git a/python/qpid/peer.py b/python/qpid/peer.py index b5c655dc2a..6c8c6647c9 100644 --- a/python/qpid/peer.py +++ b/python/qpid/peer.py @@ -50,13 +50,14 @@ class Sequence: class Peer: - def __init__(self, conn, delegate): + def __init__(self, conn, delegate, channel_callback=None): self.conn = conn self.delegate = delegate self.outgoing = Queue(0) self.work = Queue(0) self.channels = {} self.lock = thread.allocate_lock() + self.channel_callback = channel_callback #notified when channels are created def channel(self, id): self.lock.acquire() @@ -66,6 +67,8 @@ class Peer: except KeyError: ch = Channel(id, self.outgoing, self.conn.spec) self.channels[id] = ch + if self.channel_callback: + self.channel_callback(ch) finally: self.lock.release() return ch @@ -177,6 +180,7 @@ class Channel: # XXX: better switch self.reliable = False + self.synchronous = True def close(self, reason): if self.closed: @@ -238,6 +242,12 @@ class Channel: content = kwargs.pop("content", None) frame = Method(type, type.arguments(*args, **kwargs)) if self.reliable: + if not self.synchronous: + future = Future() + self.request(frame, future.put_response, content) + if not frame.method.responses: return None + else: return future + self.request(frame, self.queue_response, content) if not frame.method.responses: return None @@ -304,3 +314,18 @@ def read_content(queue): buf.write(content) read += len(content) return Content(buf.getvalue(), children, header.properties.copy()) + +class Future: + def __init__(self): + self.completed = threading.Event() + + def put_response(self, channel, response): + self.response = response + self.completed.set() + + def get_response(self, timeout=None): + self.completed.wait(timeout) + return self.response + + def is_complete(self): + return self.completed.isSet() diff --git a/python/qpid/reference.py b/python/qpid/reference.py new file mode 100644 index 0000000000..d357560390 --- /dev/null +++ b/python/qpid/reference.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +Support for amqp 'reference' content (as opposed to inline content) +""" + +import threading +from queue import Queue, Closed + +class NotOpened(Exception): pass + +class AlreadyOpened(Exception): pass + +""" +A representation of a reference id; can be passed wherever amqp +content is required in place of inline data +""" +class ReferenceId: + + def __init__(self, id): + self.id = id + +""" +Holds content received through 'reference api'. Instances of this +class will be placed in the consumers queue on receiving a transfer +(assuming the reference has been opened). Data can be retrieved in +chunks (as append calls are received) or in full (after reference has +been closed signalling data s complete). +""" + +class Reference: + + def __init__(self, id): + self.id = id + self.chunks = Queue(0) + + def close(self): + self.chunks.close() + + def append(self, bytes): + self.chunks.put(bytes) + + def get_chunk(self): + return self.chunks.get() + + def get_complete(self): + data = "" + for chunk in self: + data += chunk + return data + + def next(self): + try: + return self.get_chunk() + except Closed, e: + raise StopIteration + + def __iter__(self): + return self + +""" +Manages a set of opened references. New references can be opened and +existing references can be retrieved or closed. +""" +class References: + + def __init__(self): + self.map = {} + self.lock = threading.Lock() + + def get(self, id): + self.lock.acquire() + try: + try: + ref = self.map[id] + except KeyError: + raise NotOpened() + finally: + self.lock.release() + return ref + + def open(self, id): + self.lock.acquire() + try: + if id in self.map: raise AlreadyOpened() + self.map[id] = Reference(id) + finally: + self.lock.release() + + + def close(self, id): + self.get(id).close() + self.lock.acquire() + try: + del map[id] + finally: + self.lock.release() + diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py index 708ebcdcb9..dcbf0ed91c 100644 --- a/python/qpid/testlib.py +++ b/python/qpid/testlib.py @@ -145,7 +145,7 @@ Options: print "=======================================" return result.wasSuccessful() - def connect(self, host=None, port=None, spec=None, user=None, password=None): + def connect(self, host=None, port=None, spec=None, user=None, password=None, tune_params=None): """Connect to the broker, returns a qpid.client.Client""" host = host or self.host port = port or self.port @@ -153,7 +153,7 @@ Options: user = user or self.user password = password or self.password client = qpid.client.Client(host, port, spec) - client.start({"LOGIN": user, "PASSWORD": password}) + client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params) return client diff --git a/python/tests/message.py b/python/tests/message.py index a7c8f875eb..10d0f51448 100644 --- a/python/tests/message.py +++ b/python/tests/message.py @@ -20,6 +20,7 @@ from qpid.client import Client, Closed from qpid.queue import Empty from qpid.content import Content from qpid.testlib import testrunner, TestBase +from qpid.reference import Reference, ReferenceId class MessageTests(TestBase): """Tests for 'methods' on the amqp message 'class'""" @@ -413,3 +414,66 @@ class MessageTests(TestBase): reply = channel.message_get(no_ack=True) self.assertEqual(reply.method.klass.name, "message") self.assertEqual(reply.method.name, "get-empty") + + def test_reference_simple(self): + """ + Test basic ability to handle references + """ + channel = self.channel + channel.queue_declare(queue="ref_queue", exclusive=True) + channel.message_consume(queue="ref_queue", destination="c1") + queue = self.client.queue("c1") + + refId = "myref" + channel.message_open(reference=refId) + channel.message_append(reference=refId, bytes="abcd") + channel.synchronous = False + ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId)) + channel.synchronous = True + + channel.message_append(reference=refId, bytes="efgh") + channel.message_append(reference=refId, bytes="ijkl") + channel.message_close(reference=refId) + + #first, wait for the ok for the transfer + ack.get_response(timeout=1) + + msg = queue.get(timeout=1) + if isinstance(msg, Reference): + #should we force broker to deliver as reference by frame + #size limit? or test that separately? for compliance, + #allowing either seems best for now... + data = msg.get_complete() + else: + data = msg.body + self.assertEquals("abcdefghijkl", data) + + + def test_reference_large(self): + """ + Test basic ability to handle references whose content exceeds max frame size + """ + channel = self.channel + self.queue_declare(queue="ref_queue") + + #generate a big data string (> max frame size of consumer): + data = "0123456789" + for i in range(0, 10): + data += data + #send it inline + channel.synchronous = False + ack = channel.message_transfer(routing_key="ref_queue", body=data) + channel.synchronous = True + #first, wait for the ok for the transfer + ack.get_response(timeout=1) + + #create a new connection for consumer, with specific max frame size (< data) + other = self.connect(tune_params={"channel_max":10, "frame_max":5120, "heartbeat":0}) + ch2 = other.channel(1) + ch2.channel_open() + ch2.message_consume(queue="ref_queue", destination="c1") + queue = other.queue("c1") + + msg = queue.get(timeout=1) + self.assertTrue(isinstance(msg, Reference)) + self.assertEquals(data, msg.get_complete()) |