diff options
Diffstat (limited to 'python/examples/pubsub/topic_subscriber.py')
-rwxr-xr-x | python/examples/pubsub/topic_subscriber.py | 78 |
1 files changed, 41 insertions, 37 deletions
diff --git a/python/examples/pubsub/topic_subscriber.py b/python/examples/pubsub/topic_subscriber.py index a5c05ba177..6908be5471 100755 --- a/python/examples/pubsub/topic_subscriber.py +++ b/python/examples/pubsub/topic_subscriber.py @@ -3,24 +3,25 @@ topic_subscriber.py This subscriber creates private queues and binds them - to the topics "usa.#", "europe.#", "#.news", and "#.weather". + to the topics 'usa.#', 'europe.#', '#.news', and '#.weather'. """ -import base64 -import sys import qpid -from qpid.client import Client -from qpid.content import Content +import sys +import os +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, RangedSet, uuid4 from qpid.queue import Empty #----- Functions ------------------------------------------- -def dump_queue(client, queue_name): +def dump_queue(queue_name): print "Messages queue: " + queue_name consumer_tag = queue_name # Use the queue name as the consumer tag - need a unique tag - queue = client.queue(consumer_tag) + queue = session.incoming(consumer_tag) # Call message_subscribe() to tell the broker to deliver messages # from the AMQP queue to a local client queue. The broker will @@ -37,11 +38,12 @@ def dump_queue(client, queue_name): while content != final: try: message = queue.get() - content = message.content.body + content = message.body + session.message_accept(RangedSet(message.id)) print content except Empty: - if message != 0: - message.complete(cumulative=True) + #if message != 0: + # message.complete(cumulative=True) print "No more messages!" return @@ -52,8 +54,8 @@ def dump_queue(client, queue_name): # by the delivery tag are acknowledged. This is more efficient, # because there are fewer network round-trips. - if message != 0: - message.complete(cumulative=True) + #if message != 0: + # message.complete(cumulative=True) #----- Initialization -------------------------------------- @@ -62,27 +64,29 @@ def dump_queue(client, queue_name): host=len(sys.argv) > 1 and sys.argv[1] or "127.0.0.1" port=len(sys.argv) > 2 and int(sys.argv[2]) or 5672 -amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" user="guest" password="guest" +amqp_spec="" -# Create a client and log in to it. +try: + amqp_spec = os.environ["AMQP_SPEC"] +except KeyError: + amqp_spec="/usr/share/amqp/amqp.0-10.xml" -spec = qpid.spec.load(amqp_spec) -client = Client(host, port, spec) -client.start({"LOGIN": user, "PASSWORD": password}) +# Create a connection. +conn = Connection (connect (host,port), qpid.spec.load(amqp_spec)) +conn.start() -session = client.session() -session_info = session.session_open() -session_id = session_info.session_id +session_id = str(uuid4()) +session = conn.session(session_id) #----- Main Body -- ---------------------------------------- -news = "news" + base64.urlsafe_b64encode(session_id) -weather = "weather" + base64.urlsafe_b64encode(session_id) -usa = "usa" + base64.urlsafe_b64encode(session_id) -europe = "europe" + base64.urlsafe_b64encode(session_id) +news = "news" + session_id +weather = "weather" + session_id +usa = "usa" + session_id +europe = "europe" + session_id session.queue_declare(queue=news, exclusive=True) session.queue_declare(queue=weather, exclusive=True) @@ -94,17 +98,17 @@ session.queue_declare(queue=europe, exclusive=True) # The '#' symbol matches one component of a multipart name, e.g. "#.news" matches # "europe.news" or "usa.news". -session.queue_bind(exchange="amq.topic", queue=news, routing_key="#.news") -session.queue_bind(exchange="amq.topic", queue=weather, routing_key="#.weather") -session.queue_bind(exchange="amq.topic", queue=usa, routing_key="usa.#") -session.queue_bind(exchange="amq.topic", queue=europe, routing_key="europe.#") +session.exchange_bind(exchange="amq.topic", queue=news, binding_key="#.news") +session.exchange_bind(exchange="amq.topic", queue=weather, binding_key="#.weather") +session.exchange_bind(exchange="amq.topic", queue=usa, binding_key="usa.#") +session.exchange_bind(exchange="amq.topic", queue=europe, binding_key="europe.#") # Bind each queue to the control queue so we know when to stop -session.queue_bind(exchange="amq.topic", queue=news, routing_key="control") -session.queue_bind(exchange="amq.topic", queue=weather, routing_key="control") -session.queue_bind(exchange="amq.topic", queue=usa, routing_key="control") -session.queue_bind(exchange="amq.topic", queue=europe, routing_key="control") +session.exchange_bind(exchange="amq.topic", queue=news, binding_key="control") +session.exchange_bind(exchange="amq.topic", queue=weather, binding_key="control") +session.exchange_bind(exchange="amq.topic", queue=usa, binding_key="control") +session.exchange_bind(exchange="amq.topic", queue=europe, binding_key="control") # Remind the user to start the topic producer @@ -113,13 +117,13 @@ sys.stdout.flush() # Call dump_queue to print messages from each queue -dump_queue(client, news) -dump_queue(client, weather) -dump_queue(client, usa) -dump_queue(client, europe) +dump_queue(news) +dump_queue(weather) +dump_queue(usa) +dump_queue(europe) #----- Cleanup ------------------------------------------------ # Clean up before exiting so there are no open threads. -session.session_close() +session.close(timeout=10) |