summaryrefslogtreecommitdiff
path: root/python/examples/direct/direct_consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/examples/direct/direct_consumer.py')
-rwxr-xr-xpython/examples/direct/direct_consumer.py32
1 files changed, 20 insertions, 12 deletions
diff --git a/python/examples/direct/direct_consumer.py b/python/examples/direct/direct_consumer.py
index 85c1db0a93..91d85cee1a 100755
--- a/python/examples/direct/direct_consumer.py
+++ b/python/examples/direct/direct_consumer.py
@@ -8,8 +8,11 @@
import qpid
import sys
-from qpid.client import Client
-from qpid.content import Content
+import os
+from random import randint
+from qpid.util import connect
+from qpid.connection import Connection
+from qpid.datatypes import Message, RangedSet
from qpid.queue import Empty
@@ -19,17 +22,20 @@ from qpid.queue import Empty
host=len(sys.argv) > 1 and sys.argv[1] or "127.0.0.1"
port=len(sys.argv) > 2 and int(sys.argv[2]) or 5672
-amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml"
user="guest"
password="guest"
+amqp_spec=""
-# Create a client and log in to it.
+try:
+ amqp_spec = os.environ["AMQP_SPEC"]
+except KeyError:
+ amqp_spec="/usr/share/amqp/amqp.0-10.xml"
-client = Client(host, port, qpid.spec.load(amqp_spec))
-client.start({"LOGIN": user, "PASSWORD": password})
+# Create a connection.
+conn = Connection (connect (host,port), qpid.spec.load(amqp_spec))
+conn.start()
-session = client.session()
-session.session_open()
+session = conn.session(str(randint(1,64*1024)))
#----- Read from queue --------------------------------------------
@@ -39,7 +45,7 @@ session.session_open()
# The consumer tag identifies the client-side queue.
consumer_tag = "consumer1"
-queue = client.queue(consumer_tag)
+queue = session.incoming(consumer_tag)
# Call message_consume() to tell the broker to deliver messages
# from the AMQP queue to this local client queue. The broker will
@@ -57,7 +63,8 @@ content = "" # Content of the last message read
message = None
while content != final:
message = queue.get(timeout=10)
- content = message.content.body
+ content = message.body
+ session.message_accept(RangedSet(message.id))
print content
# Messages are not removed from the queue until they are
@@ -66,11 +73,12 @@ while content != final:
# acknowledged. This is more efficient, because there are fewer
# network round-trips.
-message.complete(cumulative=True)
+#message.complete(cumulative=True)
+# ? Is there an equivakent to the above in the new API ?
#----- Cleanup ------------------------------------------------
# Clean up before exiting so there are no open threads.
#
-session.session_close()
+session.close(timeout=10)