summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-02-15 21:00:44 +0000
committerAlan Conway <aconway@apache.org>2008-02-15 21:00:44 +0000
commita8c96aae65caa95f790fb8f7011731cf5d29454a (patch)
tree2d9427eb7cf5556a57c08d7ceca47feb0a5b5d21 /python
parent2895d73ed993c4f9d9e32116bf7c2ea0d3d089e8 (diff)
downloadqpid-python-a8c96aae65caa95f790fb8f7011731cf5d29454a.tar.gz
Updated c++ and python fanout examples and verify scripts.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@628169 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rwxr-xr-xpython/examples/fanout/declare_queues.py52
-rwxr-xr-xpython/examples/fanout/fanout_consumer.py92
-rw-r--r--python/examples/fanout/verify6
-rw-r--r--python/examples/fanout/verify.in43
4 files changed, 91 insertions, 102 deletions
diff --git a/python/examples/fanout/declare_queues.py b/python/examples/fanout/declare_queues.py
deleted file mode 100755
index 52f23f4f9a..0000000000
--- a/python/examples/fanout/declare_queues.py
+++ /dev/null
@@ -1,52 +0,0 @@
-#!/usr/bin/env python
-"""
- declare_queues.py
-
- Creates and binds a queue on an AMQP direct exchange.
-
- All messages using the routing key "routing_key" are
- sent to the queue named "message_queue".
-"""
-
-import qpid
-import sys
-from qpid.client import Client
-from qpid.content import Content
-from qpid.queue import Empty
-
-#----- Initialization -----------------------------------
-
-# Set parameters for login
-
-host=len(sys.argv) > 1 and sys.argv[1] or "127.0.0.1"
-port=len(sys.argv) > 2 and int(sys.argv[2]) or 5672
-amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml"
-user="guest"
-password="guest"
-
-# Create a client and log in to it.
-
-client = Client(host, port, qpid.spec.load(amqp_spec))
-client.start({"LOGIN": user, "PASSWORD": password})
-
-session = client.session()
-session.session_open()
-
-#----- Create a queue -------------------------------------
-
-# queue_declare() creates an AMQP queue, which is held
-# on the broker. Published messages are sent to the AMQP queue,
-# from which messages are delivered to consumers.
-#
-# queue_bind() determines which messages are routed to a queue.
-# Route all messages with the routing key "routing_key" to
-# the AMQP queue named "message_queue".
-
-session.queue_declare(queue="message_queue")
-session.queue_bind(exchange="amq.fanout", queue="message_queue")
-
-#----- Cleanup ---------------------------------------------
-
-# Clean up before exiting so there are no open threads.
-
-session.session_close()
diff --git a/python/examples/fanout/fanout_consumer.py b/python/examples/fanout/fanout_consumer.py
index b91ea35c0d..ef24bf35b2 100755
--- a/python/examples/fanout/fanout_consumer.py
+++ b/python/examples/fanout/fanout_consumer.py
@@ -5,13 +5,57 @@
This AMQP client reads messages from a message
queue named "message_queue".
"""
-
+import base64
import qpid
import sys
from qpid.client import Client
from qpid.content import Content
from qpid.queue import Empty
+#----- Functions -------------------------------------------
+
+def dump_queue(client, queue_name):
+
+ print "Messages queue: " + queue_name
+
+ consumer_tag = queue_name # Use the queue name as the consumer tag - need a unique tag
+ queue = client.queue(consumer_tag)
+
+ # Call message_subscribe() to tell the broker to deliver messages
+ # from the AMQP queue to a local client queue. The broker will
+ # start delivering messages as soon as message_subscribe() is called.
+
+ session.message_subscribe(queue=queue_name, destination=consumer_tag)
+ session.message_flow(consumer_tag, 0, 0xFFFFFFFF)
+ session.message_flow(consumer_tag, 1, 0xFFFFFFFF)
+
+ print "Subscribed to queue " + queue_name
+ sys.stdout.flush()
+
+ message = 0
+
+ while True:
+ try:
+ message = queue.get(timeout=10)
+ content = message.content.body
+ print "Response: " + content
+ except Empty:
+ print "No more messages!"
+ break
+ except:
+ print "Unexpected exception!"
+ break
+
+
+ # Messages are not removed from the queue until they
+ # are acknowledged. Using cumulative=True, all messages
+ # in the session up to and including the one identified
+ # by the delivery tag are acknowledged. This is more efficient,
+ # because there are fewer network round-trips.
+
+ if message != 0:
+ message.complete(cumulative=True)
+
#----- Initialization --------------------------------------
@@ -29,44 +73,22 @@ client = Client(host, port, qpid.spec.load(amqp_spec))
client.start({"LOGIN": user, "PASSWORD": password})
session = client.session()
-session.session_open()
-
-#----- Read from queue --------------------------------------------
-
-# Now let's create a local client queue and tell it to read
-# incoming messages.
-
-# The consumer tag identifies the client-side queue.
-
-consumer_tag = "consumer1"
-queue = client.queue(consumer_tag)
-
-# Call message_subscribe() to tell the broker to deliver messages
-# from the AMQP queue to this local client queue. The broker will
-# start delivering messages as soon as message_subscribe() is called.
-
-session.message_subscribe(queue="message_queue", destination=consumer_tag)
-session.message_flow(consumer_tag, 0, 0xFFFFFFFF)
-session.message_flow(consumer_tag, 1, 0xFFFFFFFF)
-
-# Initialize 'final' and 'content', variables used to identify the last message.
+session_info = session.session_open()
+session_id = session_info.session_id
-final = "That's all, folks!" # In a message body, signals the last message
-content = "" # Content of the last message read
+#----- Main Body -- ----------------------------------------
-message = None
-while content != final:
- message = queue.get(timeout=10)
- content = message.content.body
- print content
+# Make a unique queue name for my queue from the session ID.
+my_queue = base64.urlsafe_b64encode(session_id)
+session.queue_declare(queue=my_queue)
-# Messages are not removed from the queue until they are
-# acknowledged. Using cumulative=True, all messages from the session
-# up to and including the one identified by the delivery tag are
-# acknowledged. This is more efficient, because there are fewer
-# network round-trips.
+# Bind my queue to the fanout exchange. No routing key is required
+# the fanout exchange copies messages unconditionally to every
+# bound queue
+session.queue_bind(queue=my_queue, exchange="amq.fanout")
-message.complete(cumulative=True)
+# Dump the messages on the queue.
+dump_queue(client, my_queue)
#----- Cleanup ------------------------------------------------
diff --git a/python/examples/fanout/verify b/python/examples/fanout/verify
index f136ccd39b..7650853e11 100644
--- a/python/examples/fanout/verify
+++ b/python/examples/fanout/verify
@@ -1,3 +1,5 @@
# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify
-clients ./declare_queues.py ./fanout_producer.py ./fanout_consumer.py
-outputs ./declare_queues.py.out ./fanout_producer.py.out ./fanout_consumer.py.out
+background "Subscribed" ./fanout_consumer.py
+background "Subscribed" ./fanout_consumer.py
+clients ./fanout_producer.py
+outputs ./fanout_producer.py.out "./fanout_consumer.py.out | remove_uuid64" "./fanout_consumer.pyX.out | remove_uuid64"
diff --git a/python/examples/fanout/verify.in b/python/examples/fanout/verify.in
index a5f57f0b4b..d5067b3850 100644
--- a/python/examples/fanout/verify.in
+++ b/python/examples/fanout/verify.in
@@ -1,14 +1,31 @@
-==== declare_queues.py.out
==== fanout_producer.py.out
-==== fanout_consumer.py.out
-message 0
-message 1
-message 2
-message 3
-message 4
-message 5
-message 6
-message 7
-message 8
-message 9
-That's all, folks!
+==== fanout_consumer.py.out | remove_uuid64
+Messages queue:
+Subscribed to queue
+Response: message 0
+Response: message 1
+Response: message 2
+Response: message 3
+Response: message 4
+Response: message 5
+Response: message 6
+Response: message 7
+Response: message 8
+Response: message 9
+Response: That's all, folks!
+No more messages!
+==== fanout_consumer.pyX.out | remove_uuid64
+Messages queue:
+Subscribed to queue
+Response: message 0
+Response: message 1
+Response: message 2
+Response: message 3
+Response: message 4
+Response: message 5
+Response: message 6
+Response: message 7
+Response: message 8
+Response: message 9
+Response: That's all, folks!
+No more messages!