summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-11-07 12:27:56 +0000
committerKeith Wall <kwall@apache.org>2012-11-07 12:27:56 +0000
commit016d15ba1964e51f145320f15b1f796006b66dce (patch)
tree292514b827ec73c49578edbc795e3c188eeb453f
parentf82467cf38d4fbc45cbaa06906420e84ccbb088b (diff)
downloadqpid-python-016d15ba1964e51f145320f15b1f796006b66dce.tar.gz
QPID-4422: Python Client (0-8..0-9) now allows "instance" client property to be passed in order to allow re-subscribing to durable subscriptions. Centralised the creation of client properties such that this is only done in one place across all protocols. Also increased Python Client (0-8..0-9)'s diagnostic logging.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1406584 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--python/qpid/client.py10
-rw-r--r--python/qpid/connection08.py41
-rw-r--r--python/qpid/delegates.py20
-rw-r--r--python/qpid/messaging/driver.py19
-rw-r--r--python/qpid/testlib.py6
-rw-r--r--python/qpid/tests/__init__.py1
-rw-r--r--python/qpid/tests/util.py46
-rw-r--r--python/qpid/util.py20
-rw-r--r--tests/src/py/qpid_tests/broker_0_8/basic.py45
-rw-r--r--tests/src/py/qpid_tests/broker_0_9/__init__.py2
-rw-r--r--tests/src/py/qpid_tests/broker_0_9/messageheader.py35
11 files changed, 188 insertions, 57 deletions
diff --git a/python/qpid/client.py b/python/qpid/client.py
index 5a877bb8d6..4d42a8b20f 100644
--- a/python/qpid/client.py
+++ b/python/qpid/client.py
@@ -18,13 +18,14 @@
#
"""
-An AQMP client implementation that uses a custom delegate for
+An AMQP client implementation that uses a custom delegate for
interacting with the server.
"""
import os, threading
from peer import Peer, Channel, Closed
from delegate import Delegate
+from util import get_client_properties_with_defaults
from connection08 import Connection, Frame, connect
from spec08 import load
from queue import Queue
@@ -76,12 +77,12 @@ class Client:
self.lock.release()
return q
- def start(self, response, mechanism="AMQPLAIN", locale="en_US", tune_params=None):
+ def start(self, response, mechanism="AMQPLAIN", locale="en_US", tune_params=None, client_properties=None):
self.mechanism = mechanism
self.response = response
self.locale = locale
self.tune_params = tune_params
-
+ self.client_properties=get_client_properties_with_defaults(provided_client_properties=client_properties)
self.socket = connect(self.host, self.port)
self.conn = Connection(self.socket, self.spec)
self.peer = Peer(self.conn, ClientDelegate(self), Session)
@@ -128,7 +129,8 @@ class ClientDelegate(Delegate):
def connection_start(self, ch, msg):
msg.start_ok(mechanism=self.client.mechanism,
response=self.client.response,
- locale=self.client.locale)
+ locale=self.client.locale,
+ client_properties=self.client.client_properties)
def connection_tune(self, ch, msg):
if self.client.tune_params:
diff --git a/python/qpid/connection08.py b/python/qpid/connection08.py
index 654148dad2..0045e122ea 100644
--- a/python/qpid/connection08.py
+++ b/python/qpid/connection08.py
@@ -28,6 +28,9 @@ from cStringIO import StringIO
from codec import EOF
from compat import SHUT_RDWR
from exceptions import VersionError
+from logging import getLogger, DEBUG
+
+log = getLogger("qpid.connection08")
class SockIO:
@@ -35,7 +38,8 @@ class SockIO:
self.sock = sock
def write(self, buf):
-# print "OUT: %r" % buf
+ if log.isEnabledFor(DEBUG):
+ log.debug("OUT: %r", buf)
self.sock.sendall(buf)
def read(self, n):
@@ -47,8 +51,9 @@ class SockIO:
break
if len(s) == 0:
break
-# print "IN: %r" % s
data += s
+ if log.isEnabledFor(DEBUG):
+ log.debug("IN: %r", data)
return data
def flush(self):
@@ -120,19 +125,25 @@ class Connection:
(self.spec.major, self.spec.minor, major, minor))
else:
raise FramingError("unknown frame type: %s" % tid)
- channel = c.decode_short()
- body = c.decode_longstr()
- dec = codec.Codec(StringIO(body), self.spec)
- frame = Frame.DECODERS[type].decode(self.spec, dec, len(body))
- frame.channel = channel
- end = c.decode_octet()
- if end != self.FRAME_END:
- garbage = ""
- while end != self.FRAME_END:
- garbage += chr(end)
- end = c.decode_octet()
- raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage)
- return frame
+ try:
+ channel = c.decode_short()
+ body = c.decode_longstr()
+ dec = codec.Codec(StringIO(body), self.spec)
+ frame = Frame.DECODERS[type].decode(self.spec, dec, len(body))
+ frame.channel = channel
+ end = c.decode_octet()
+ if end != self.FRAME_END:
+ garbage = ""
+ while end != self.FRAME_END:
+ garbage += chr(end)
+ end = c.decode_octet()
+ raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage)
+ return frame
+ except EOF:
+ # An EOF caught here can indicate an error decoding the frame,
+ # rather than that a disconnection occurred,so it's worth logging it.
+ log.exception("Error occurred when reading frame with tid %s" % tid)
+ raise
def write_0_9(self, frame):
self.write_8_0(frame)
diff --git a/python/qpid/delegates.py b/python/qpid/delegates.py
index 5e44a3a6dc..ae7ed7f988 100644
--- a/python/qpid/delegates.py
+++ b/python/qpid/delegates.py
@@ -18,7 +18,7 @@
#
import os, connection, session
-from util import notify
+from util import notify, get_client_properties_with_defaults
from datatypes import RangedSet
from exceptions import VersionError, Closed
from logging import getLogger
@@ -137,24 +137,12 @@ class Server(Delegate):
class Client(Delegate):
- ppid = 0
- try:
- ppid = os.getppid()
- except:
- pass
-
- PROPERTIES = {"product": "qpid python client",
- "version": "development",
- "platform": os.name,
- "qpid.client_process": os.path.basename(sys.argv[0]),
- "qpid.client_pid": os.getpid(),
- "qpid.client_ppid": ppid}
-
def __init__(self, connection, username=None, password=None,
mechanism=None, heartbeat=None, **kwargs):
Delegate.__init__(self, connection)
- self.client_properties=Client.PROPERTIES.copy()
- self.client_properties.update(kwargs.get("client_properties",{}))
+ provided_client_properties = kwargs.get("client_properties")
+ self.client_properties=get_client_properties_with_defaults(provided_client_properties)
+
##
## self.acceptableMechanisms is the list of SASL mechanisms that the client is willing to
## use. If it's None, then any mechanism is acceptable.
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py
index 3cb62d75c9..2bd638f327 100644
--- a/python/qpid/messaging/driver.py
+++ b/python/qpid/messaging/driver.py
@@ -31,7 +31,7 @@ from qpid.messaging.exceptions import *
from qpid.messaging.message import get_codec, Disposition, Message
from qpid.ops import *
from qpid.selector import Selector
-from qpid.util import URL, default
+from qpid.util import URL, default,get_client_properties_with_defaults
from qpid.validator import And, Context, List, Map, Types, Values
from threading import Condition, Thread
@@ -90,20 +90,6 @@ SUBJECT_DEFAULTS = {
"topic": "#"
}
-# XXX
-ppid = 0
-try:
- ppid = os.getppid()
-except:
- pass
-
-CLIENT_PROPERTIES = {"product": "qpid python client",
- "version": "development",
- "platform": os.name,
- "qpid.client_process": os.path.basename(sys.argv[0]),
- "qpid.client_pid": os.getpid(),
- "qpid.client_ppid": ppid}
-
def noop(): pass
def sync_noop(): pass
@@ -710,8 +696,7 @@ class Engine:
except sasl.SASLError, e:
raise AuthenticationFailure(text=str(e))
- client_properties = CLIENT_PROPERTIES.copy()
- client_properties.update(self.connection.client_properties)
+ client_properties = get_client_properties_with_defaults(provided_client_properties=self.connection.client_properties);
self.write_op(ConnectionStartOk(client_properties=client_properties,
mechanism=mech, response=initial))
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py
index f9796982f5..2b283f3998 100644
--- a/python/qpid/testlib.py
+++ b/python/qpid/testlib.py
@@ -73,7 +73,7 @@ class TestBase(unittest.TestCase):
else:
self.client.close()
- def connect(self, host=None, port=None, user=None, password=None, tune_params=None):
+ def connect(self, host=None, port=None, user=None, password=None, tune_params=None, client_properties=None):
"""Create a new connction, return the Client object"""
host = host or self.config.broker.host
port = port or self.config.broker.port or 5672
@@ -82,9 +82,9 @@ class TestBase(unittest.TestCase):
client = qpid.client.Client(host, port)
try:
if client.spec.major == 8 and client.spec.minor == 0:
- client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params)
+ client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params, client_properties=client_properties)
else:
- client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params)
+ client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params, client_properties=client_properties)
except qpid.client.Closed, e:
if isinstance(e.args[0], VersionError):
raise Skipped(e.args[0])
diff --git a/python/qpid/tests/__init__.py b/python/qpid/tests/__init__.py
index 101a0c3759..dc9988515e 100644
--- a/python/qpid/tests/__init__.py
+++ b/python/qpid/tests/__init__.py
@@ -37,6 +37,7 @@ import qpid.tests.datatypes
import qpid.tests.connection
import qpid.tests.spec010
import qpid.tests.codec010
+import qpid.tests.util
class TestTestsXXX(Test):
diff --git a/python/qpid/tests/util.py b/python/qpid/tests/util.py
new file mode 100644
index 0000000000..9777443720
--- /dev/null
+++ b/python/qpid/tests/util.py
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from unittest import TestCase
+from qpid.util import get_client_properties_with_defaults
+
+class UtilTest (TestCase):
+
+ def test_get_spec_recommended_client_properties(self):
+ client_properties = get_client_properties_with_defaults(provided_client_properties={"mykey":"myvalue"})
+ self.assertTrue("product" in client_properties)
+ self.assertTrue("version" in client_properties)
+ self.assertTrue("platform" in client_properties)
+
+ def test_get_client_properties_with_provided_value(self):
+ client_properties = get_client_properties_with_defaults(provided_client_properties={"mykey":"myvalue"})
+ self.assertTrue("product" in client_properties)
+ self.assertTrue("mykey" in client_properties)
+ self.assertEqual("myvalue", client_properties["mykey"])
+
+ def test_get_client_properties_with_no_provided_values(self):
+ client_properties = get_client_properties_with_defaults(provided_client_properties=None)
+ self.assertTrue("product" in client_properties)
+
+ client_properties = get_client_properties_with_defaults()
+ self.assertTrue("product" in client_properties)
+
+ def test_get_client_properties_with_provided_value_that_overrides_default(self):
+ client_properties = get_client_properties_with_defaults(provided_client_properties={"version":"myversion"})
+ self.assertEqual("myversion", client_properties["version"])
+
diff --git a/python/qpid/util.py b/python/qpid/util.py
index 5b1a876c5e..8da17ce0c6 100644
--- a/python/qpid/util.py
+++ b/python/qpid/util.py
@@ -17,7 +17,7 @@
# under the License.
#
-import os, socket, time, textwrap, re
+import os, socket, time, textwrap, re, sys
try:
from ssl import wrap_socket as ssl
@@ -42,6 +42,24 @@ except ImportError:
def close(self):
self.sock.close()
+def get_client_properties_with_defaults(provided_client_properties={}):
+ ppid = 0
+ try:
+ ppid = os.getppid()
+ except:
+ pass
+
+ client_properties = {"product": "qpid python client",
+ "version": "development",
+ "platform": os.name,
+ "qpid.client_process": os.path.basename(sys.argv[0]),
+ "qpid.client_pid": os.getpid(),
+ "qpid.client_ppid": ppid}
+
+ if provided_client_properties:
+ client_properties.update(provided_client_properties)
+ return client_properties
+
def connect(host, port):
for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
diff --git a/tests/src/py/qpid_tests/broker_0_8/basic.py b/tests/src/py/qpid_tests/broker_0_8/basic.py
index d5837fc19c..f04d581750 100644
--- a/tests/src/py/qpid_tests/broker_0_8/basic.py
+++ b/tests/src/py/qpid_tests/broker_0_8/basic.py
@@ -79,6 +79,51 @@ class BasicTests(TestBase):
except Closed, e:
self.assertChannelException(403, e.args[0])
+ def test_reconnect_to_durable_subscription(self):
+ try:
+ publisherchannel = self.channel
+ my_id = "my_id"
+ consumer_connection_properties_with_instance = {"instance": my_id}
+ queue_for_subscription = "queue_for_subscription_%s" % my_id
+ topic_name = "my_topic_name"
+ test_message = self.uniqueString()
+
+ durable_subscription_client = self.connect(client_properties=consumer_connection_properties_with_instance)
+ consumerchannel = durable_subscription_client.channel(1)
+ consumerchannel.channel_open()
+
+ self._declare_and_bind_exclusive_queue_on_topic_exchange(consumerchannel, queue_for_subscription, topic_name)
+
+ # disconnect
+ durable_subscription_client.close()
+
+ # send message to topic
+ publisherchannel.basic_publish(routing_key=topic_name, exchange="amq.topic", content=Content(test_message))
+
+ # reconnect and consume message
+ durable_subscription_client = self.connect(client_properties=consumer_connection_properties_with_instance)
+ consumerchannel = durable_subscription_client.channel(1)
+ consumerchannel.channel_open()
+
+ self._declare_and_bind_exclusive_queue_on_topic_exchange(consumerchannel, queue_for_subscription, topic_name)
+
+ # Create consumer and consume the message that was sent whilst subscriber was disconnected. By convention we
+ # declare the consumer as exclusive to forbid concurrent access.
+ subscription = consumerchannel.basic_consume(queue=queue_for_subscription, exclusive=True)
+ queue = durable_subscription_client.queue(subscription.consumer_tag)
+
+ # consume and verify message content
+ msg = queue.get(timeout=1)
+ self.assertEqual(test_message, msg.content.body)
+ consumerchannel.basic_ack(delivery_tag=msg.delivery_tag)
+ finally:
+ publisherchannel.queue_delete(queue=queue_for_subscription)
+ durable_subscription_client.close()
+
+ def _declare_and_bind_exclusive_queue_on_topic_exchange(self, channel, queue, topic_name):
+ channel.queue_declare(queue=queue, exclusive=True, auto_delete=False, durable=True)
+ channel.queue_bind(exchange="amq.topic", queue=queue, routing_key=topic_name)
+
def test_consume_queue_errors(self):
"""
Test error conditions associated with the queue field of the consume method:
diff --git a/tests/src/py/qpid_tests/broker_0_9/__init__.py b/tests/src/py/qpid_tests/broker_0_9/__init__.py
index d9f2ed7dbb..6b46b96b1d 100644
--- a/tests/src/py/qpid_tests/broker_0_9/__init__.py
+++ b/tests/src/py/qpid_tests/broker_0_9/__init__.py
@@ -19,4 +19,4 @@
# under the License.
#
-import query, queue
+import query, queue, messageheader
diff --git a/tests/src/py/qpid_tests/broker_0_9/messageheader.py b/tests/src/py/qpid_tests/broker_0_9/messageheader.py
new file mode 100644
index 0000000000..3526cf37af
--- /dev/null
+++ b/tests/src/py/qpid_tests/broker_0_9/messageheader.py
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.testlib import TestBase
+
+class MessageHeaderTests(TestBase):
+ """Verify that messages with headers work as expected"""
+
+ def test_message_with_integer_header(self):
+ props={"headers":{"one":1, "zero":0}}
+ self.queue_declare(queue="q")
+ q = self.consume("q")
+ self.assertPublishGet(q, routing_key="q", properties=props)
+
+ def test_message_with_string_header(self):
+ props={"headers":{"mystr":"hello world", "myempty":""}}
+ self.queue_declare(queue="q")
+ q = self.consume("q")
+ self.assertPublishGet(q, routing_key="q", properties=props)