diff options
author | Gordon Sim <gsim@apache.org> | 2008-05-12 17:23:21 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-05-12 17:23:21 +0000 |
commit | 3a923f1e6a96e856911d3bbf49dc7af42e16c98b (patch) | |
tree | fbf3732cbddb43f09713652f8c1052f48582e7ed /python/examples/direct | |
parent | 0655ff5aceb9d53eb256a05d7beb55b1c803c8de (diff) | |
download | qpid-python-3a923f1e6a96e856911d3bbf49dc7af42e16c98b.tar.gz |
QPID-1044: Part of patch from Jonathan Robie + changes to verify scripts to keep automated testing working.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@655568 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/examples/direct')
-rwxr-xr-x | python/examples/direct/declare_queues.py | 31 | ||||
-rwxr-xr-x | python/examples/direct/direct_consumer.py | 51 | ||||
-rwxr-xr-x | python/examples/direct/direct_producer.py | 32 | ||||
-rwxr-xr-x | python/examples/direct/listener.py | 59 |
4 files changed, 104 insertions, 69 deletions
diff --git a/python/examples/direct/declare_queues.py b/python/examples/direct/declare_queues.py index 7041ce2f24..deea0a3ccc 100755 --- a/python/examples/direct/declare_queues.py +++ b/python/examples/direct/declare_queues.py @@ -8,34 +8,47 @@ sent to the queue named "message_queue". """ +# Common includes + import qpid import sys import os -from random import randint from qpid.util import connect from qpid.connection import Connection +from qpid.datatypes import Message, RangedSet, uuid4 from qpid.queue import Empty #----- Initialization ----------------------------------- # Set parameters for login -host=len(sys.argv) > 1 and sys.argv[1] or "127.0.0.1" -port=len(sys.argv) > 2 and int(sys.argv[2]) or 5672 +host="127.0.0.1" +port=5672 user="guest" password="guest" -amqp_spec="" +amqp_spec="/usr/share/amqp/amqp.0-10.xml" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +# +# If AMQP_SPEC is defined, use it to locate the spec file instead of +# looking for it in the default location. + +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) try: amqp_spec = os.environ["AMQP_SPEC"] except KeyError: - amqp_spec="/usr/share/amqp/amqp.0-10.xml" + amqp_spec="/usr/share/amqp/amqp.0-10.xml" # Create a connection. -conn = Connection (connect (host,port), qpid.spec.load(amqp_spec)) -conn.start() - -session = conn.session(str(randint(1,64*1024))) +socket = connect(host, port) +connection = Connection (sock=socket, spec=qpid.spec.load(amqp_spec)) +connection.start() +session = connection.session(str(uuid4())) #----- Create a queue ------------------------------------- diff --git a/python/examples/direct/direct_consumer.py b/python/examples/direct/direct_consumer.py index 91d85cee1a..f2018bbbb8 100755 --- a/python/examples/direct/direct_consumer.py +++ b/python/examples/direct/direct_consumer.py @@ -12,7 +12,7 @@ import os from random import randint from qpid.util import connect from qpid.connection import Connection -from qpid.datatypes import Message, RangedSet +from qpid.datatypes import Message, RangedSet, uuid4 from qpid.queue import Empty @@ -20,11 +20,22 @@ from qpid.queue import Empty # Set parameters for login -host=len(sys.argv) > 1 and sys.argv[1] or "127.0.0.1" -port=len(sys.argv) > 2 and int(sys.argv[2]) or 5672 +host="127.0.0.1" +port=5672 user="guest" password="guest" -amqp_spec="" +amqp_spec="/usr/share/amqp/amqp.0-10.xml" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +# +# If AMQP_SPEC is defined, use it to locate the spec file instead of +# looking for it in the default location. + +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) try: amqp_spec = os.environ["AMQP_SPEC"] @@ -32,10 +43,10 @@ except KeyError: amqp_spec="/usr/share/amqp/amqp.0-10.xml" # Create a connection. -conn = Connection (connect (host,port), qpid.spec.load(amqp_spec)) -conn.start() - -session = conn.session(str(randint(1,64*1024))) +socket = connect(host, port) +connection = Connection (sock=socket, spec=qpid.spec.load(amqp_spec)) +connection.start() +session = connection.session(str(uuid4())) #----- Read from queue -------------------------------------------- @@ -44,16 +55,17 @@ session = conn.session(str(randint(1,64*1024))) # The consumer tag identifies the client-side queue. -consumer_tag = "consumer1" -queue = session.incoming(consumer_tag) +local_queue_name = "local_queue" +queue = session.incoming(local_queue_name) -# Call message_consume() to tell the broker to deliver messages +# Call message_subscribe() to tell the broker to deliver messages # from the AMQP queue to this local client queue. The broker will -# start delivering messages as soon as message_consume() is called. +# start delivering messages as soon as credit is allocated using +# session.message_flow(). -session.message_subscribe(queue="message_queue", destination=consumer_tag) -session.message_flow(consumer_tag, 0, 0xFFFFFFFF) # Kill these? -session.message_flow(consumer_tag, 1, 0xFFFFFFFF) # Kill these? +session.message_subscribe(queue="message_queue", destination=local_queue_name) +session.message_flow(local_queue_name, session.credit_unit.message, 0xFFFFFFFF) +session.message_flow(local_queue_name, session.credit_unit.byte, 0xFFFFFFFF) # Initialize 'final' and 'content', variables used to identify the last message. @@ -67,15 +79,6 @@ while content != final: session.message_accept(RangedSet(message.id)) print content -# Messages are not removed from the queue until they are -# acknowledged. Using cumulative=True, all messages from the session -# up to and including the one identified by the delivery tag are -# acknowledged. This is more efficient, because there are fewer -# network round-trips. - -#message.complete(cumulative=True) -# ? Is there an equivakent to the above in the new API ? - #----- Cleanup ------------------------------------------------ # Clean up before exiting so there are no open threads. diff --git a/python/examples/direct/direct_producer.py b/python/examples/direct/direct_producer.py index 7c4e30d96e..8f6a91ba18 100755 --- a/python/examples/direct/direct_producer.py +++ b/python/examples/direct/direct_producer.py @@ -9,21 +9,33 @@ import qpid import sys import os -from random import randint from qpid.util import connect from qpid.connection import Connection from qpid.datatypes import Message +from qpid.datatypes import uuid4 from qpid.queue import Empty + #----- Initialization ----------------------------------- # Set parameters for login -host=len(sys.argv) > 1 and sys.argv[1] or "127.0.0.1" -port=len(sys.argv) > 2 and int(sys.argv[2]) or 5672 +host="127.0.0.1" +port=5672 user="guest" password="guest" -amqp_spec="" +amqp_spec="/usr/share/amqp/amqp.0-10.xml" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +# +# If AMQP_SPEC is defined, use it to locate the spec file instead of +# looking for it in the default location. + +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) try: amqp_spec = os.environ["AMQP_SPEC"] @@ -31,10 +43,10 @@ except KeyError: amqp_spec="/usr/share/amqp/amqp.0-10.xml" # Create a connection. -conn = Connection (connect (host,port), qpid.spec.load(amqp_spec)) -conn.start() - -session = conn.session(str(randint(1,64*1024))) +socket = connect(host, port) +connection = Connection (sock=socket, spec=qpid.spec.load(amqp_spec)) +connection.start() +session = connection.session(str(uuid4())) #----- Publish some messages ------------------------------ @@ -42,9 +54,9 @@ session = conn.session(str(randint(1,64*1024))) props = session.delivery_properties(routing_key="routing_key") for i in range(10): - session.message_transfer("amq.direct",None, None, Message(props,"message " + str(i))) + session.message_transfer(destination="amq.direct", message=Message(props,"message " + str(i))) -session.message_transfer("amq.direct",None,None, Message(props,"That's all, folks!")) +session.message_transfer(destination="amq.direct", message=Message(props,"That's all, folks!")) #----- Cleanup -------------------------------------------- diff --git a/python/examples/direct/listener.py b/python/examples/direct/listener.py index aa60b1c501..c18ef47fb7 100755 --- a/python/examples/direct/listener.py +++ b/python/examples/direct/listener.py @@ -7,14 +7,18 @@ as a message listener. """ +# Common includes + import qpid import sys import os -from random import randint from qpid.util import connect from qpid.connection import Connection -from qpid.datatypes import Message, RangedSet -from qpid.queue import Empty +from qpid.datatypes import Message, RangedSet, uuid4 +from qpid.queue import Empty + +# Includes specific to this example + from time import sleep @@ -33,23 +37,26 @@ class Receiver: if content == "That's all, folks!": self.finalReceived = True - # Messages are not removed from the queue until they are - # acknowledged. Using cumulative=True, all messages from the session - # up to and including the one identified by the delivery tag are - # acknowledged. This is more efficient, because there are fewer - # network round-trips. - #message.complete(cumulative=True) - - #----- Initialization -------------------------------------- # Set parameters for login -host=len(sys.argv) > 1 and sys.argv[1] or "127.0.0.1" -port=len(sys.argv) > 2 and int(sys.argv[2]) or 5672 +host="127.0.0.1" +port=5672 user="guest" password="guest" -amqp_spec="" +amqp_spec="/usr/share/amqp/amqp.0-10.xml" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +# +# If AMQP_SPEC is defined, use it to locate the spec file instead of +# looking for it in the default location. + +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) try: amqp_spec = os.environ["AMQP_SPEC"] @@ -57,33 +64,33 @@ except KeyError: amqp_spec="/usr/share/amqp/amqp.0-10.xml" # Create a connection. -conn = Connection (connect (host,port), qpid.spec.load(amqp_spec)) -conn.start() - -session = conn.session(str(randint(1,64*1024))) +socket = connect(host, port) +connection = Connection (sock=socket, spec=qpid.spec.load(amqp_spec)) +connection.start() +session = connection.session(str(uuid4())) #----- Read from queue -------------------------------------------- # Now let's create a local client queue and tell it to read # incoming messages. -# The consumer tag identifies the client-side queue. +# The local_queue_name identifies the client-side queue. -consumer_tag = "consumer1" -queue = session.incoming(consumer_tag) +local_queue_name = "local_queue" +queue = session.incoming(local_queue_name) # Call message_subscribe() to tell the broker to deliver messages # from the AMQP queue to this local client queue. The broker will # start delivering messages as soon as message_subscribe() is called. -session.message_subscribe(queue="message_queue", destination=consumer_tag) -session.message_flow(consumer_tag, 0, 0xFFFFFFFF) # Kill these? -session.message_flow(consumer_tag, 1, 0xFFFFFFFF) # Kill these? +session.message_subscribe(queue="message_queue", destination=local_queue_name) +session.message_flow(local_queue_name, session.credit_unit.message, 0xFFFFFFFF) +session.message_flow(local_queue_name, session.credit_unit.byte, 0xFFFFFFFF) -receiver = Receiver () +receiver = Receiver() queue.listen (receiver.Handler) -while not receiver.isFinal (): +while not receiver.isFinal() : sleep (1) |