diff options
Diffstat (limited to 'python/examples/request-response/client.py')
-rwxr-xr-x | python/examples/request-response/client.py | 61 |
1 files changed, 30 insertions, 31 deletions
diff --git a/python/examples/request-response/client.py b/python/examples/request-response/client.py index 6561bb6fee..8f7d430d1b 100755 --- a/python/examples/request-response/client.py +++ b/python/examples/request-response/client.py @@ -6,22 +6,22 @@ """ -import base64 - import qpid import sys -from qpid.client import Client -from qpid.content import Content +import os +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, RangedSet, uuid4 from qpid.queue import Empty #----- Functions ------------------------------------------- -def dump_queue(client, queue_name): +def dump_queue(queue_name): print "Messages queue: " + queue_name consumer_tag = queue_name # Use the queue name as the consumer tag - need a unique tag - queue = client.queue(consumer_tag) + queue = session.incoming(consumer_tag) # Call message_subscribe() to tell the broker to deliver messages # from the AMQP queue to a local client queue. The broker will @@ -36,7 +36,8 @@ def dump_queue(client, queue_name): while True: try: message = queue.get(timeout=10) - content = message.content.body + content = message.body + session.message_accept(RangedSet(message.id)) print "Response: " + content except Empty: print "No more messages!" @@ -52,8 +53,8 @@ def dump_queue(client, queue_name): # by the delivery tag are acknowledged. This is more efficient, # because there are fewer network round-trips. - if message != 0: - message.complete(cumulative=True) + #if message != 0: + # message.complete(cumulative=True) #----- Initialization -------------------------------------- @@ -62,21 +63,21 @@ def dump_queue(client, queue_name): host=len(sys.argv) > 1 and sys.argv[1] or "127.0.0.1" port=len(sys.argv) > 2 and int(sys.argv[2]) or 5672 -amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" user="guest" password="guest" +amqp_spec="" -# Create a client and log in to it. - -spec = qpid.spec.load(amqp_spec) -client = Client(host, port, spec) -client.start({"LOGIN": user, "PASSWORD": password}) +try: + amqp_spec = os.environ["AMQP_SPEC"] +except KeyError: + amqp_spec="/usr/share/amqp/amqp.0-10.xml" -# Open the session. Save the session id. +# Create a connection. +conn = Connection (connect (host,port), qpid.spec.load(amqp_spec)) +conn.start() -session = client.session() -session_info = session.session_open() -session_id = session_info.session_id +session_id = str(uuid4()) +session = conn.session(session_id) #----- Main Body -- ---------------------------------------- @@ -84,9 +85,9 @@ session_id = session_info.session_id # same string as the name of the queue and the name of the routing # key. -replyTo = "ReplyTo:" + base64.urlsafe_b64encode(session_id) +replyTo = "ReplyTo:" + session_id session.queue_declare(queue=replyTo, exclusive=True) -session.queue_bind(exchange="amq.direct", queue=replyTo, routing_key=replyTo) +session.exchange_bind(exchange="amq.direct", queue=replyTo, binding_key=replyTo) # Send some messages to the server's request queue @@ -95,22 +96,20 @@ lines = ["Twas brilling, and the slithy toves", "All mimsy were the borogroves,", "And the mome raths outgrabe."] -for l in lines: - print "Request: " + l - request=Content(l) - request["routing_key"] = "request" - request["reply_to"] = client.spec.struct("reply_to") - request["reply_to"]["exchange_name"] = "amq.direct" - request["reply_to"]["routing_key"] = replyTo - session.message_transfer(destination="amq.direct", content=request) +for ln in lines: + print "Request: " + ln + mp = session.message_properties() + mp.reply_to = session.reply_to("amq.direct", replyTo) + dp = session.delivery_properties(routing_key="request") + session.message_transfer("amq.direct", None, None, Message(mp,dp,ln)) # Now see what messages the server sent to our replyTo queue -dump_queue(client, replyTo) +dump_queue(replyTo) #----- Cleanup ------------------------------------------------ # Clean up before exiting so there are no open threads. -session.session_close() +session.close(timeout=10) |