summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-05-01 10:15:35 +0000
committerGordon Sim <gsim@apache.org>2008-05-01 10:15:35 +0000
commit7ca9c6bd32cca57d3c1dab3faa09c9dd9fbe3c51 (patch)
tree33194732ba6b2da30e33e72417dfc18c7129d09e /python
parent4649905d79cb5a85f65f4097b2daecebc3080e93 (diff)
downloadqpid-python-7ca9c6bd32cca57d3c1dab3faa09c9dd9fbe3c51.tar.gz
QPID-966: applied patch from rajith; altered to use uuid as session name; updated verify scripts for automated testing;
re-enabled automated testing in c++ build git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@652469 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rwxr-xr-xpython/examples/direct/declare_queues.py31
-rwxr-xr-xpython/examples/direct/direct_consumer.py32
-rwxr-xr-xpython/examples/direct/direct_producer.py34
-rwxr-xr-xpython/examples/direct/listener.py31
-rwxr-xr-xpython/examples/fanout/fanout_consumer.py43
-rwxr-xr-xpython/examples/fanout/fanout_producer.py32
-rw-r--r--python/examples/fanout/verify2
-rw-r--r--python/examples/fanout/verify.in4
-rwxr-xr-xpython/examples/pubsub/topic_publisher.py59
-rwxr-xr-xpython/examples/pubsub/topic_subscriber.py78
-rw-r--r--python/examples/pubsub/verify2
-rw-r--r--python/examples/pubsub/verify.in2
-rwxr-xr-xpython/examples/request-response/client.py61
-rwxr-xr-xpython/examples/request-response/server.py57
-rw-r--r--python/examples/request-response/verify2
-rw-r--r--python/examples/request-response/verify.in4
16 files changed, 252 insertions, 222 deletions
diff --git a/python/examples/direct/declare_queues.py b/python/examples/direct/declare_queues.py
index f39f0c3349..7041ce2f24 100755
--- a/python/examples/direct/declare_queues.py
+++ b/python/examples/direct/declare_queues.py
@@ -10,8 +10,10 @@
import qpid
import sys
-from qpid.client import Client
-from qpid.content import Content
+import os
+from random import randint
+from qpid.util import connect
+from qpid.connection import Connection
from qpid.queue import Empty
#----- Initialization -----------------------------------
@@ -20,17 +22,20 @@ from qpid.queue import Empty
host=len(sys.argv) > 1 and sys.argv[1] or "127.0.0.1"
port=len(sys.argv) > 2 and int(sys.argv[2]) or 5672
-amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml"
user="guest"
password="guest"
+amqp_spec=""
-# Create a client and log in to it.
+try:
+ amqp_spec = os.environ["AMQP_SPEC"]
+except KeyError:
+ amqp_spec="/usr/share/amqp/amqp.0-10.xml"
-client = Client(host, port, qpid.spec.load(amqp_spec))
-client.start({"LOGIN": user, "PASSWORD": password})
+# Create a connection.
+conn = Connection (connect (host,port), qpid.spec.load(amqp_spec))
+conn.start()
-session = client.session()
-session.session_open()
+session = conn.session(str(randint(1,64*1024)))
#----- Create a queue -------------------------------------
@@ -38,15 +43,13 @@ session.session_open()
# on the broker. Published messages are sent to the AMQP queue,
# from which messages are delivered to consumers.
#
-# queue_bind() determines which messages are routed to a queue.
-# Route all messages with the routing key "routing_key" to
+# exchange_bind() determines which messages are routed to a queue.
+# Route all messages with the binding key "routing_key" to
# the AMQP queue named "message_queue".
session.queue_declare(queue="message_queue")
-session.queue_bind(exchange="amq.direct", queue="message_queue", routing_key="routing_key")
+session.exchange_bind(exchange="amq.direct", queue="message_queue", binding_key="routing_key")
#----- Cleanup ---------------------------------------------
-session.session_close()
-
-
+session.close(timeout=10)
diff --git a/python/examples/direct/direct_consumer.py b/python/examples/direct/direct_consumer.py
index 85c1db0a93..91d85cee1a 100755
--- a/python/examples/direct/direct_consumer.py
+++ b/python/examples/direct/direct_consumer.py
@@ -8,8 +8,11 @@
import qpid
import sys
-from qpid.client import Client
-from qpid.content import Content
+import os
+from random import randint
+from qpid.util import connect
+from qpid.connection import Connection
+from qpid.datatypes import Message, RangedSet
from qpid.queue import Empty
@@ -19,17 +22,20 @@ from qpid.queue import Empty
host=len(sys.argv) > 1 and sys.argv[1] or "127.0.0.1"
port=len(sys.argv) > 2 and int(sys.argv[2]) or 5672
-amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml"
user="guest"
password="guest"
+amqp_spec=""
-# Create a client and log in to it.
+try:
+ amqp_spec = os.environ["AMQP_SPEC"]
+except KeyError:
+ amqp_spec="/usr/share/amqp/amqp.0-10.xml"
-client = Client(host, port, qpid.spec.load(amqp_spec))
-client.start({"LOGIN": user, "PASSWORD": password})
+# Create a connection.
+conn = Connection (connect (host,port), qpid.spec.load(amqp_spec))
+conn.start()
-session = client.session()
-session.session_open()
+session = conn.session(str(randint(1,64*1024)))
#----- Read from queue --------------------------------------------
@@ -39,7 +45,7 @@ session.session_open()
# The consumer tag identifies the client-side queue.
consumer_tag = "consumer1"
-queue = client.queue(consumer_tag)
+queue = session.incoming(consumer_tag)
# Call message_consume() to tell the broker to deliver messages
# from the AMQP queue to this local client queue. The broker will
@@ -57,7 +63,8 @@ content = "" # Content of the last message read
message = None
while content != final:
message = queue.get(timeout=10)
- content = message.content.body
+ content = message.body
+ session.message_accept(RangedSet(message.id))
print content
# Messages are not removed from the queue until they are
@@ -66,11 +73,12 @@ while content != final:
# acknowledged. This is more efficient, because there are fewer
# network round-trips.
-message.complete(cumulative=True)
+#message.complete(cumulative=True)
+# ? Is there an equivakent to the above in the new API ?
#----- Cleanup ------------------------------------------------
# Clean up before exiting so there are no open threads.
#
-session.session_close()
+session.close(timeout=10)
diff --git a/python/examples/direct/direct_producer.py b/python/examples/direct/direct_producer.py
index 2c07bfd8e7..7c4e30d96e 100755
--- a/python/examples/direct/direct_producer.py
+++ b/python/examples/direct/direct_producer.py
@@ -8,8 +8,11 @@
import qpid
import sys
-from qpid.client import Client
-from qpid.content import Content
+import os
+from random import randint
+from qpid.util import connect
+from qpid.connection import Connection
+from qpid.datatypes import Message
from qpid.queue import Empty
#----- Initialization -----------------------------------
@@ -18,34 +21,33 @@ from qpid.queue import Empty
host=len(sys.argv) > 1 and sys.argv[1] or "127.0.0.1"
port=len(sys.argv) > 2 and int(sys.argv[2]) or 5672
-amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml"
user="guest"
password="guest"
+amqp_spec=""
-# Create a client and log in to it.
+try:
+ amqp_spec = os.environ["AMQP_SPEC"]
+except KeyError:
+ amqp_spec="/usr/share/amqp/amqp.0-10.xml"
-client = Client(host, port, qpid.spec.load(amqp_spec))
-client.start({"LOGIN": user, "PASSWORD": password})
+# Create a connection.
+conn = Connection (connect (host,port), qpid.spec.load(amqp_spec))
+conn.start()
-session = client.session()
-session.session_open()
+session = conn.session(str(randint(1,64*1024)))
#----- Publish some messages ------------------------------
# Create some messages and put them on the broker.
+props = session.delivery_properties(routing_key="routing_key")
for i in range(10):
- message = Content("message " + str(i))
- message["routing_key"] = "routing_key"
- session.message_transfer(destination="amq.direct", content=message)
+ session.message_transfer("amq.direct",None, None, Message(props,"message " + str(i)))
-final="That's all, folks!"
-message = Content(final)
-message["routing_key"] = "routing_key"
-session.message_transfer(destination="amq.direct", content=message)
+session.message_transfer("amq.direct",None,None, Message(props,"That's all, folks!"))
#----- Cleanup --------------------------------------------
# Clean up before exiting so there are no open threads.
-session.session_close()
+session.close(timeout=10)
diff --git a/python/examples/direct/listener.py b/python/examples/direct/listener.py
index 2dbd502fa0..aa60b1c501 100755
--- a/python/examples/direct/listener.py
+++ b/python/examples/direct/listener.py
@@ -9,8 +9,11 @@
import qpid
import sys
-from qpid.client import Client
-from qpid.content import Content
+import os
+from random import randint
+from qpid.util import connect
+from qpid.connection import Connection
+from qpid.datatypes import Message, RangedSet
from qpid.queue import Empty
from time import sleep
@@ -24,7 +27,8 @@ class Receiver:
return self.finalReceived
def Handler (self, message):
- content = message.content.body
+ content = message.body
+ session.message_accept(RangedSet(message.id))
print content
if content == "That's all, folks!":
self.finalReceived = True
@@ -34,7 +38,7 @@ class Receiver:
# up to and including the one identified by the delivery tag are
# acknowledged. This is more efficient, because there are fewer
# network round-trips.
- message.complete(cumulative=True)
+ #message.complete(cumulative=True)
#----- Initialization --------------------------------------
@@ -43,17 +47,20 @@ class Receiver:
host=len(sys.argv) > 1 and sys.argv[1] or "127.0.0.1"
port=len(sys.argv) > 2 and int(sys.argv[2]) or 5672
-amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml"
user="guest"
password="guest"
+amqp_spec=""
-# Create a client and log in to it.
+try:
+ amqp_spec = os.environ["AMQP_SPEC"]
+except KeyError:
+ amqp_spec="/usr/share/amqp/amqp.0-10.xml"
-client = Client(host, port, qpid.spec.load(amqp_spec))
-client.start({"LOGIN": user, "PASSWORD": password})
+# Create a connection.
+conn = Connection (connect (host,port), qpid.spec.load(amqp_spec))
+conn.start()
-session = client.session()
-session.session_open()
+session = conn.session(str(randint(1,64*1024)))
#----- Read from queue --------------------------------------------
@@ -63,7 +70,7 @@ session.session_open()
# The consumer tag identifies the client-side queue.
consumer_tag = "consumer1"
-queue = client.queue(consumer_tag)
+queue = session.incoming(consumer_tag)
# Call message_subscribe() to tell the broker to deliver messages
# from the AMQP queue to this local client queue. The broker will
@@ -85,4 +92,4 @@ while not receiver.isFinal ():
# Clean up before exiting so there are no open threads.
#
-session.session_close()
+session.close(timeout=10)
diff --git a/python/examples/fanout/fanout_consumer.py b/python/examples/fanout/fanout_consumer.py
index ef24bf35b2..b82d8045ff 100755
--- a/python/examples/fanout/fanout_consumer.py
+++ b/python/examples/fanout/fanout_consumer.py
@@ -5,21 +5,22 @@
This AMQP client reads messages from a message
queue named "message_queue".
"""
-import base64
import qpid
import sys
-from qpid.client import Client
-from qpid.content import Content
+import os
+from qpid.util import connect
+from qpid.connection import Connection
+from qpid.datatypes import Message, RangedSet, uuid4
from qpid.queue import Empty
#----- Functions -------------------------------------------
-def dump_queue(client, queue_name):
+def dump_queue(session, queue_name):
print "Messages queue: " + queue_name
consumer_tag = queue_name # Use the queue name as the consumer tag - need a unique tag
- queue = client.queue(consumer_tag)
+ queue = session.incoming(queue_name)
# Call message_subscribe() to tell the broker to deliver messages
# from the AMQP queue to a local client queue. The broker will
@@ -37,7 +38,8 @@ def dump_queue(client, queue_name):
while True:
try:
message = queue.get(timeout=10)
- content = message.content.body
+ content = message.body
+ session.message_accept(RangedSet(message.id))
print "Response: " + content
except Empty:
print "No more messages!"
@@ -53,8 +55,8 @@ def dump_queue(client, queue_name):
# by the delivery tag are acknowledged. This is more efficient,
# because there are fewer network round-trips.
- if message != 0:
- message.complete(cumulative=True)
+ #if message != 0:
+ # message.complete(cumulative=True)
#----- Initialization --------------------------------------
@@ -63,36 +65,39 @@ def dump_queue(client, queue_name):
host=len(sys.argv) > 1 and sys.argv[1] or "127.0.0.1"
port=len(sys.argv) > 2 and int(sys.argv[2]) or 5672
-amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml"
user="guest"
password="guest"
+amqp_spec=""
-# Create a client and log in to it.
+try:
+ amqp_spec = os.environ["AMQP_SPEC"]
+except KeyError:
+ amqp_spec="/usr/share/amqp/amqp.0-10.xml"
-client = Client(host, port, qpid.spec.load(amqp_spec))
-client.start({"LOGIN": user, "PASSWORD": password})
+# Create a connection.
+conn = Connection (connect (host,port), qpid.spec.load(amqp_spec))
+conn.start()
-session = client.session()
-session_info = session.session_open()
-session_id = session_info.session_id
+session_id = str(uuid4())
+session = conn.session(session_id)
#----- Main Body -- ----------------------------------------
# Make a unique queue name for my queue from the session ID.
-my_queue = base64.urlsafe_b64encode(session_id)
+my_queue = session_id
session.queue_declare(queue=my_queue)
# Bind my queue to the fanout exchange. No routing key is required
# the fanout exchange copies messages unconditionally to every
# bound queue
-session.queue_bind(queue=my_queue, exchange="amq.fanout")
+session.exchange_bind(queue=my_queue, exchange="amq.fanout")
# Dump the messages on the queue.
-dump_queue(client, my_queue)
+dump_queue(session, my_queue)
#----- Cleanup ------------------------------------------------
# Clean up before exiting so there are no open threads.
#
-session.session_close()
+session.close(timeout=10)
diff --git a/python/examples/fanout/fanout_producer.py b/python/examples/fanout/fanout_producer.py
index 9864c776c1..1b5ea6995e 100755
--- a/python/examples/fanout/fanout_producer.py
+++ b/python/examples/fanout/fanout_producer.py
@@ -5,11 +5,13 @@
Publishes messages to an AMQP direct exchange, using
the routing key "routing_key"
"""
-
import qpid
import sys
-from qpid.client import Client
-from qpid.content import Content
+import os
+from random import randint
+from qpid.util import connect
+from qpid.connection import Connection
+from qpid.datatypes import Message
from qpid.queue import Empty
#----- Initialization -----------------------------------
@@ -18,32 +20,32 @@ from qpid.queue import Empty
host=len(sys.argv) > 1 and sys.argv[1] or "127.0.0.1"
port=len(sys.argv) > 2 and int(sys.argv[2]) or 5672
-amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml"
user="guest"
password="guest"
+amqp_spec=""
-# Create a client and log in to it.
+try:
+ amqp_spec = os.environ["AMQP_SPEC"]
+except KeyError:
+ amqp_spec="/usr/share/amqp/amqp.0-10.xml"
-client = Client(host, port, qpid.spec.load(amqp_spec))
-client.start({"LOGIN": user, "PASSWORD": password})
+# Create a connection.
+conn = Connection (connect (host,port), qpid.spec.load(amqp_spec))
+conn.start()
-session = client.session()
-session.session_open()
+session = conn.session(str(randint(1,64*1024)))
#----- Publish some messages ------------------------------
# Create some messages and put them on the broker.
for i in range(10):
- message = Content(body="message " + str(i))
- session.message_transfer(destination="amq.fanout", content=message)
+ session.message_transfer("amq.fanout", None, None ,Message("message " + str(i)))
-final="That's all, folks!"
-message=Content(final)
-session.message_transfer(destination="amq.fanout", content=message)
+session.message_transfer("amq.fanout", None, None, Message("That's all, folks!"))
#----- Cleanup --------------------------------------------
# Clean up before exiting so there are no open threads.
-session.session_close()
+session.close(timeout=10)
diff --git a/python/examples/fanout/verify b/python/examples/fanout/verify
index 7650853e11..6a3132a94f 100644
--- a/python/examples/fanout/verify
+++ b/python/examples/fanout/verify
@@ -2,4 +2,4 @@
background "Subscribed" ./fanout_consumer.py
background "Subscribed" ./fanout_consumer.py
clients ./fanout_producer.py
-outputs ./fanout_producer.py.out "./fanout_consumer.py.out | remove_uuid64" "./fanout_consumer.pyX.out | remove_uuid64"
+outputs ./fanout_producer.py.out "./fanout_consumer.py.out | remove_uuid" "./fanout_consumer.pyX.out | remove_uuid"
diff --git a/python/examples/fanout/verify.in b/python/examples/fanout/verify.in
index d5067b3850..30dfeb9e69 100644
--- a/python/examples/fanout/verify.in
+++ b/python/examples/fanout/verify.in
@@ -1,5 +1,5 @@
==== fanout_producer.py.out
-==== fanout_consumer.py.out | remove_uuid64
+==== fanout_consumer.py.out | remove_uuid
Messages queue:
Subscribed to queue
Response: message 0
@@ -14,7 +14,7 @@ Response: message 8
Response: message 9
Response: That's all, folks!
No more messages!
-==== fanout_consumer.pyX.out | remove_uuid64
+==== fanout_consumer.pyX.out | remove_uuid
Messages queue:
Subscribed to queue
Response: message 0
diff --git a/python/examples/pubsub/topic_publisher.py b/python/examples/pubsub/topic_publisher.py
index e302d58ad4..b79896eaf6 100755
--- a/python/examples/pubsub/topic_publisher.py
+++ b/python/examples/pubsub/topic_publisher.py
@@ -9,8 +9,11 @@
import qpid
import sys
-from qpid.client import Client
-from qpid.content import Content
+import os
+from random import randint
+from qpid.util import connect
+from qpid.connection import Connection
+from qpid.datatypes import Message
from qpid.queue import Empty
#----- Initialization -----------------------------------
@@ -18,18 +21,20 @@ from qpid.queue import Empty
# Set parameters for login.
host=len(sys.argv) > 1 and sys.argv[1] or "127.0.0.1"
port=len(sys.argv) > 2 and int(sys.argv[2]) or 5672
-amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml"
user="guest"
password="guest"
+amqp_spec=""
-# Create a client and log in to it.
+try:
+ amqp_spec = os.environ["AMQP_SPEC"]
+except KeyError:
+ amqp_spec="/usr/share/amqp/amqp.0-10.xml"
-spec = qpid.spec.load(amqp_spec)
-client = Client(host, port, spec)
-client.start({"LOGIN": user, "PASSWORD": password})
+# Create a connection.
+conn = Connection (connect (host,port), qpid.spec.load(amqp_spec))
+conn.start()
-session = client.session()
-session.session_open()
+session = conn.session(str(randint(1,64*1024)))
#----- Publish some messages ------------------------------
@@ -37,44 +42,30 @@ session.session_open()
# topic exchange. The routing keys are "usa.news", "usa.weather",
# "europe.news", and "europe.weather".
+def send_msg(routing_key):
+ props = session.delivery_properties(routing_key=routing_key)
+ for i in range(5):
+ session.message_transfer("amq.topic", None, None, Message(props,"message " + str(i)))
# usa.news
-
-for i in range(5):
- message = Content("message " + str(i))
- message["routing_key"] = "usa.news"
- session.message_transfer(destination="amq.topic", content=message)
+send_msg("usa.news")
# usa.weather
-
-for i in range(5):
- message = Content("message " + str(i))
- message["routing_key"] = "usa.weather"
- session.message_transfer(destination="amq.topic", content=message)
+send_msg("usa.weather")
# europe.news
-
-for i in range(5):
- message = Content("message " + str(i))
- message["routing_key"] = "europe.news"
- session.message_transfer(destination="amq.topic", content=message)
+send_msg("europe.news")
# europe.weather
-
-for i in range(5):
- message = Content("message " + str(i))
- message["routing_key"] = "europe.weather"
- session.message_transfer(destination="amq.topic", content=message)
+send_msg("europe.weather")
# Signal termination
-
-message = Content("That's all, folks!")
-message["routing_key"] = "control"
-session.message_transfer(destination="amq.topic", content=message)
+props = session.delivery_properties(routing_key="control")
+session.message_transfer("amq.topic",None, None, Message(props,"That's all, folks!"))
#----- Cleanup --------------------------------------------
# Clean up before exiting so there are no open threads.
-session.session_close()
+session.close(timeout=10)
diff --git a/python/examples/pubsub/topic_subscriber.py b/python/examples/pubsub/topic_subscriber.py
index a5c05ba177..6908be5471 100755
--- a/python/examples/pubsub/topic_subscriber.py
+++ b/python/examples/pubsub/topic_subscriber.py
@@ -3,24 +3,25 @@
topic_subscriber.py
This subscriber creates private queues and binds them
- to the topics "usa.#", "europe.#", "#.news", and "#.weather".
+ to the topics 'usa.#', 'europe.#', '#.news', and '#.weather'.
"""
-import base64
-import sys
import qpid
-from qpid.client import Client
-from qpid.content import Content
+import sys
+import os
+from qpid.util import connect
+from qpid.connection import Connection
+from qpid.datatypes import Message, RangedSet, uuid4
from qpid.queue import Empty
#----- Functions -------------------------------------------
-def dump_queue(client, queue_name):
+def dump_queue(queue_name):
print "Messages queue: " + queue_name
consumer_tag = queue_name # Use the queue name as the consumer tag - need a unique tag
- queue = client.queue(consumer_tag)
+ queue = session.incoming(consumer_tag)
# Call message_subscribe() to tell the broker to deliver messages
# from the AMQP queue to a local client queue. The broker will
@@ -37,11 +38,12 @@ def dump_queue(client, queue_name):
while content != final:
try:
message = queue.get()
- content = message.content.body
+ content = message.body
+ session.message_accept(RangedSet(message.id))
print content
except Empty:
- if message != 0:
- message.complete(cumulative=True)
+ #if message != 0:
+ # message.complete(cumulative=True)
print "No more messages!"
return
@@ -52,8 +54,8 @@ def dump_queue(client, queue_name):
# by the delivery tag are acknowledged. This is more efficient,
# because there are fewer network round-trips.
- if message != 0:
- message.complete(cumulative=True)
+ #if message != 0:
+ # message.complete(cumulative=True)
#----- Initialization --------------------------------------
@@ -62,27 +64,29 @@ def dump_queue(client, queue_name):
host=len(sys.argv) > 1 and sys.argv[1] or "127.0.0.1"
port=len(sys.argv) > 2 and int(sys.argv[2]) or 5672
-amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml"
user="guest"
password="guest"
+amqp_spec=""
-# Create a client and log in to it.
+try:
+ amqp_spec = os.environ["AMQP_SPEC"]
+except KeyError:
+ amqp_spec="/usr/share/amqp/amqp.0-10.xml"
-spec = qpid.spec.load(amqp_spec)
-client = Client(host, port, spec)
-client.start({"LOGIN": user, "PASSWORD": password})
+# Create a connection.
+conn = Connection (connect (host,port), qpid.spec.load(amqp_spec))
+conn.start()
-session = client.session()
-session_info = session.session_open()
-session_id = session_info.session_id
+session_id = str(uuid4())
+session = conn.session(session_id)
#----- Main Body -- ----------------------------------------
-news = "news" + base64.urlsafe_b64encode(session_id)
-weather = "weather" + base64.urlsafe_b64encode(session_id)
-usa = "usa" + base64.urlsafe_b64encode(session_id)
-europe = "europe" + base64.urlsafe_b64encode(session_id)
+news = "news" + session_id
+weather = "weather" + session_id
+usa = "usa" + session_id
+europe = "europe" + session_id
session.queue_declare(queue=news, exclusive=True)
session.queue_declare(queue=weather, exclusive=True)
@@ -94,17 +98,17 @@ session.queue_declare(queue=europe, exclusive=True)
# The '#' symbol matches one component of a multipart name, e.g. "#.news" matches
# "europe.news" or "usa.news".
-session.queue_bind(exchange="amq.topic", queue=news, routing_key="#.news")
-session.queue_bind(exchange="amq.topic", queue=weather, routing_key="#.weather")
-session.queue_bind(exchange="amq.topic", queue=usa, routing_key="usa.#")
-session.queue_bind(exchange="amq.topic", queue=europe, routing_key="europe.#")
+session.exchange_bind(exchange="amq.topic", queue=news, binding_key="#.news")
+session.exchange_bind(exchange="amq.topic", queue=weather, binding_key="#.weather")
+session.exchange_bind(exchange="amq.topic", queue=usa, binding_key="usa.#")
+session.exchange_bind(exchange="amq.topic", queue=europe, binding_key="europe.#")
# Bind each queue to the control queue so we know when to stop
-session.queue_bind(exchange="amq.topic", queue=news, routing_key="control")
-session.queue_bind(exchange="amq.topic", queue=weather, routing_key="control")
-session.queue_bind(exchange="amq.topic", queue=usa, routing_key="control")
-session.queue_bind(exchange="amq.topic", queue=europe, routing_key="control")
+session.exchange_bind(exchange="amq.topic", queue=news, binding_key="control")
+session.exchange_bind(exchange="amq.topic", queue=weather, binding_key="control")
+session.exchange_bind(exchange="amq.topic", queue=usa, binding_key="control")
+session.exchange_bind(exchange="amq.topic", queue=europe, binding_key="control")
# Remind the user to start the topic producer
@@ -113,13 +117,13 @@ sys.stdout.flush()
# Call dump_queue to print messages from each queue
-dump_queue(client, news)
-dump_queue(client, weather)
-dump_queue(client, usa)
-dump_queue(client, europe)
+dump_queue(news)
+dump_queue(weather)
+dump_queue(usa)
+dump_queue(europe)
#----- Cleanup ------------------------------------------------
# Clean up before exiting so there are no open threads.
-session.session_close()
+session.close(timeout=10)
diff --git a/python/examples/pubsub/verify b/python/examples/pubsub/verify
index bef233b4ff..963d2e32e1 100644
--- a/python/examples/pubsub/verify
+++ b/python/examples/pubsub/verify
@@ -1,4 +1,4 @@
# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify
background "Queues created" ./topic_subscriber.py
clients ./topic_publisher.py
-outputs ./topic_publisher.py.out "topic_subscriber.py.out | remove_uuid64 | sort"
+outputs ./topic_publisher.py.out "topic_subscriber.py.out | remove_uuid | sort"
diff --git a/python/examples/pubsub/verify.in b/python/examples/pubsub/verify.in
index 69de08d17c..2f6da09ec5 100644
--- a/python/examples/pubsub/verify.in
+++ b/python/examples/pubsub/verify.in
@@ -1,5 +1,5 @@
==== topic_publisher.py.out
-==== topic_subscriber.py.out | remove_uuid64 | sort
+==== topic_subscriber.py.out | remove_uuid | sort
message 0
message 0
message 0
diff --git a/python/examples/request-response/client.py b/python/examples/request-response/client.py
index 6561bb6fee..8f7d430d1b 100755
--- a/python/examples/request-response/client.py
+++ b/python/examples/request-response/client.py
@@ -6,22 +6,22 @@
"""
-import base64
-
import qpid
import sys
-from qpid.client import Client
-from qpid.content import Content
+import os
+from qpid.util import connect
+from qpid.connection import Connection
+from qpid.datatypes import Message, RangedSet, uuid4
from qpid.queue import Empty
#----- Functions -------------------------------------------
-def dump_queue(client, queue_name):
+def dump_queue(queue_name):
print "Messages queue: " + queue_name
consumer_tag = queue_name # Use the queue name as the consumer tag - need a unique tag
- queue = client.queue(consumer_tag)
+ queue = session.incoming(consumer_tag)
# Call message_subscribe() to tell the broker to deliver messages
# from the AMQP queue to a local client queue. The broker will
@@ -36,7 +36,8 @@ def dump_queue(client, queue_name):
while True:
try:
message = queue.get(timeout=10)
- content = message.content.body
+ content = message.body
+ session.message_accept(RangedSet(message.id))
print "Response: " + content
except Empty:
print "No more messages!"
@@ -52,8 +53,8 @@ def dump_queue(client, queue_name):
# by the delivery tag are acknowledged. This is more efficient,
# because there are fewer network round-trips.
- if message != 0:
- message.complete(cumulative=True)
+ #if message != 0:
+ # message.complete(cumulative=True)
#----- Initialization --------------------------------------
@@ -62,21 +63,21 @@ def dump_queue(client, queue_name):
host=len(sys.argv) > 1 and sys.argv[1] or "127.0.0.1"
port=len(sys.argv) > 2 and int(sys.argv[2]) or 5672
-amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml"
user="guest"
password="guest"
+amqp_spec=""
-# Create a client and log in to it.
-
-spec = qpid.spec.load(amqp_spec)
-client = Client(host, port, spec)
-client.start({"LOGIN": user, "PASSWORD": password})
+try:
+ amqp_spec = os.environ["AMQP_SPEC"]
+except KeyError:
+ amqp_spec="/usr/share/amqp/amqp.0-10.xml"
-# Open the session. Save the session id.
+# Create a connection.
+conn = Connection (connect (host,port), qpid.spec.load(amqp_spec))
+conn.start()
-session = client.session()
-session_info = session.session_open()
-session_id = session_info.session_id
+session_id = str(uuid4())
+session = conn.session(session_id)
#----- Main Body -- ----------------------------------------
@@ -84,9 +85,9 @@ session_id = session_info.session_id
# same string as the name of the queue and the name of the routing
# key.
-replyTo = "ReplyTo:" + base64.urlsafe_b64encode(session_id)
+replyTo = "ReplyTo:" + session_id
session.queue_declare(queue=replyTo, exclusive=True)
-session.queue_bind(exchange="amq.direct", queue=replyTo, routing_key=replyTo)
+session.exchange_bind(exchange="amq.direct", queue=replyTo, binding_key=replyTo)
# Send some messages to the server's request queue
@@ -95,22 +96,20 @@ lines = ["Twas brilling, and the slithy toves",
"All mimsy were the borogroves,",
"And the mome raths outgrabe."]
-for l in lines:
- print "Request: " + l
- request=Content(l)
- request["routing_key"] = "request"
- request["reply_to"] = client.spec.struct("reply_to")
- request["reply_to"]["exchange_name"] = "amq.direct"
- request["reply_to"]["routing_key"] = replyTo
- session.message_transfer(destination="amq.direct", content=request)
+for ln in lines:
+ print "Request: " + ln
+ mp = session.message_properties()
+ mp.reply_to = session.reply_to("amq.direct", replyTo)
+ dp = session.delivery_properties(routing_key="request")
+ session.message_transfer("amq.direct", None, None, Message(mp,dp,ln))
# Now see what messages the server sent to our replyTo queue
-dump_queue(client, replyTo)
+dump_queue(replyTo)
#----- Cleanup ------------------------------------------------
# Clean up before exiting so there are no open threads.
-session.session_close()
+session.close(timeout=10)
diff --git a/python/examples/request-response/server.py b/python/examples/request-response/server.py
index 04b147d003..71c3161495 100755
--- a/python/examples/request-response/server.py
+++ b/python/examples/request-response/server.py
@@ -4,52 +4,61 @@
Server for a client/server example
"""
-
import qpid
import sys
-from qpid.client import Client
-from qpid.content import Content
+import os
+from random import randint
+from qpid.util import connect
+from qpid.connection import Connection
+from qpid.datatypes import Message, RangedSet, uuid4
from qpid.queue import Empty
#----- Functions -------------------------------------------
+def getProperty(msg, name):
+ for h in msg.headers:
+ if hasattr(h, name): return getattr(h, name)
+ return None
def respond(session, request):
# The routing key for the response is the request's reply-to
# property. The body for the response is the request's body,
# converted to upper case.
-
- response=Content(request.body.upper())
- response["routing_key"] = request["reply_to"]["routing_key"]
- session.message_transfer(destination=request["reply_to"]["exchange_name"], content=response)
+ reply_to = getProperty(request,"reply_to")
+ if reply_to == None:
+ raise Exception("reply to property needs to be there")
+
+ props = session.delivery_properties(routing_key=reply_to["routing_key"])
+ session.message_transfer(reply_to["exchange"],None, None, Message(props,request.body.upper()))
#----- Initialization --------------------------------------
# Set parameters for login
-
host=len(sys.argv) > 1 and sys.argv[1] or "127.0.0.1"
port=len(sys.argv) > 2 and int(sys.argv[2]) or 5672
-amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml"
user="guest"
password="guest"
-
-# Create a client and log in to it.
-
-client = Client(host, port, qpid.spec.load(amqp_spec))
-client.start({"LOGIN": user, "PASSWORD": password})
-
-# Create a session and open it.
-
-session = client.session()
-session.session_open()
+amqp_spec=""
+
+try:
+ amqp_spec = os.environ["AMQP_SPEC"]
+except KeyError:
+ amqp_spec="/usr/share/amqp/amqp.0-10.xml"
+
+# Create a connection.
+conn = Connection (connect (host,port), qpid.spec.load(amqp_spec))
+conn.start()
+
+session_id = str(uuid4())
+session = conn.session(session_id)
#----- Main Body -- ----------------------------------------
# Create a request queue and subscribe to it
session.queue_declare(queue="request", exclusive=True)
-session.queue_bind(exchange="amq.direct", queue="request", routing_key="request")
+session.exchange_bind(exchange="amq.direct", queue="request", binding_key="request")
dest = "request_destination"
@@ -66,7 +75,7 @@ sys.stdout.flush()
# Respond to each request
-queue = client.queue(dest)
+queue = session.incoming(dest)
# If we get a message, send it back to the user (as indicated in the
# ReplyTo property)
@@ -74,8 +83,8 @@ queue = client.queue(dest)
while True:
try:
request = queue.get(timeout=100)
- respond(session, request.content)
- request.complete()
+ respond(session, request)
+ session.message_accept(RangedSet(request.id))
except Empty:
print "No more messages!"
break;
@@ -85,4 +94,4 @@ while True:
# Clean up before exiting so there are no open threads.
-session.session_close()
+session.close(timeout=10)
diff --git a/python/examples/request-response/verify b/python/examples/request-response/verify
index 2a2d479077..cf8151d4e4 100644
--- a/python/examples/request-response/verify
+++ b/python/examples/request-response/verify
@@ -2,4 +2,4 @@
background "Request server running" ./server.py
clients ./client.py
kill %% # Must kill the server.
-outputs "./client.py.out | remove_uuid64" " server.py.out | remove_uuid64"
+outputs "./client.py.out | remove_uuid" " server.py.out | remove_uuid"
diff --git a/python/examples/request-response/verify.in b/python/examples/request-response/verify.in
index f681253b3c..8d7f732ec8 100644
--- a/python/examples/request-response/verify.in
+++ b/python/examples/request-response/verify.in
@@ -1,4 +1,4 @@
-==== client.py.out | remove_uuid64
+==== client.py.out | remove_uuid
Request: Twas brilling, and the slithy toves
Request: Did gyre and gimble in the wabe.
Request: All mimsy were the borogroves,
@@ -9,6 +9,6 @@ Response: DID GYRE AND GIMBLE IN THE WABE.
Response: ALL MIMSY WERE THE BOROGROVES,
Response: AND THE MOME RATHS OUTGRABE.
No more messages!
-==== server.py.out | remove_uuid64
+==== server.py.out | remove_uuid
Request server running - run your client now.
(Times out after 100 seconds ...)