diff options
Diffstat (limited to 'qpid')
-rwxr-xr-x | qpid/bin/verify | 6 | ||||
-rw-r--r-- | qpid/cpp/examples/Makefile.am | 1 | ||||
-rw-r--r-- | qpid/cpp/examples/examples/fanout/Makefile | 2 | ||||
-rw-r--r-- | qpid/cpp/examples/examples/fanout/declare_queues.cpp | 80 | ||||
-rw-r--r-- | qpid/cpp/examples/examples/fanout/listener.cpp | 16 | ||||
-rw-r--r-- | qpid/cpp/examples/examples/fanout/verify | 7 | ||||
-rw-r--r-- | qpid/cpp/examples/examples/fanout/verify.in | 34 | ||||
-rw-r--r-- | qpid/cpp/examples/examples/fanout/verify_cpp_python | 6 | ||||
-rw-r--r-- | qpid/cpp/examples/examples/fanout/verify_cpp_python.in | 43 | ||||
-rw-r--r-- | qpid/cpp/examples/examples/fanout/verify_python_cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/examples/examples/fanout/verify_python_cpp.in | 20 | ||||
-rwxr-xr-x | qpid/python/examples/fanout/declare_queues.py | 52 | ||||
-rwxr-xr-x | qpid/python/examples/fanout/fanout_consumer.py | 92 | ||||
-rw-r--r-- | qpid/python/examples/fanout/verify | 6 | ||||
-rw-r--r-- | qpid/python/examples/fanout/verify.in | 43 |
15 files changed, 202 insertions, 212 deletions
diff --git a/qpid/bin/verify b/qpid/bin/verify index 844e99b765..96fe1df9b3 100755 --- a/qpid/bin/verify +++ b/qpid/bin/verify @@ -19,7 +19,11 @@ trap cleanup EXIT ARGS="${QPID_HOST:-localhost} $QPID_PORT" -outfile() { echo $1.out; } +outfile() { + file=$1 + while [ -f $file.out ]; do file="${file}X"; done + echo $file.out + } fail() { test -n "$*" && echo $* 1>&2 ; FAIL=1; return 1; } diff --git a/qpid/cpp/examples/Makefile.am b/qpid/cpp/examples/Makefile.am index 92625550e2..9b41b130e9 100644 --- a/qpid/cpp/examples/Makefile.am +++ b/qpid/cpp/examples/Makefile.am @@ -6,7 +6,6 @@ nobase_pkgdata_DATA= \ examples/request-response/server.cpp \ examples/request-response/Makefile \ examples/fanout/Makefile \ - examples/fanout/declare_queues.cpp \ examples/fanout/listener.cpp \ examples/fanout/fanout_producer.cpp \ examples/pub-sub/Makefile \ diff --git a/qpid/cpp/examples/examples/fanout/Makefile b/qpid/cpp/examples/examples/fanout/Makefile index b58026f337..7963af7ddf 100644 --- a/qpid/cpp/examples/examples/fanout/Makefile +++ b/qpid/cpp/examples/examples/fanout/Makefile @@ -2,7 +2,7 @@ CXX=g++ CXXFLAGS= LDFLAGS=-lqpidclient -PROGRAMS=declare_queues fanout_producer listener +PROGRAMS=fanout_producer listener all: $(PROGRAMS) diff --git a/qpid/cpp/examples/examples/fanout/declare_queues.cpp b/qpid/cpp/examples/examples/fanout/declare_queues.cpp deleted file mode 100644 index c76ed54730..0000000000 --- a/qpid/cpp/examples/examples/fanout/declare_queues.cpp +++ /dev/null @@ -1,80 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/** - * declare_queues.cpp (this program): - * - * This program is one of three programs designed to be used - * together. These programs use the "amq.fanout" exchange. - * - * fanout_producer.cpp: - * - * Publishes to a broker, specifying a routing key. - * - * listener.cpp - * - * Reads from a queue on the broker using a message listener. - * - */ - -#include <qpid/client/Connection.h> -#include <qpid/client/Session.h> - -#include <unistd.h> -#include <cstdlib> -#include <iostream> - -using namespace qpid::client; -using namespace qpid::framing; - -using std::string; - - -int main(int argc, char** argv) { - const char* host = argc>1 ? argv[1] : "127.0.0.1"; - int port = argc>2 ? atoi(argv[2]) : 5672; - Connection connection; - Message msg; - try { - connection.open(host, port); - Session session = connection.newSession(); - - - //--------- Main body of program -------------------------------------------- - - // Create and bind a queue named "message_queue". - session.queueDeclare(arg::queue="message_queue"); - session.queueBind(arg::queue="message_queue", arg::exchange="amq.fanout"); - - - //----------------------------------------------------------------------------- - - connection.close(); - return 0; - } catch(const std::exception& error) { - std::cout << error.what() << std::endl; - } - return 1; - -} - - - diff --git a/qpid/cpp/examples/examples/fanout/listener.cpp b/qpid/cpp/examples/examples/fanout/listener.cpp index 91b5123c68..5295e10f34 100644 --- a/qpid/cpp/examples/examples/fanout/listener.cpp +++ b/qpid/cpp/examples/examples/fanout/listener.cpp @@ -69,12 +69,24 @@ int main(int argc, char** argv) { //--------- Main body of program -------------------------------------------- + // Unique name for private queue: + std::string myQueue=session.getId().str(); + // Declear my queue. + session.queueDeclare(arg::queue=myQueue, arg::exclusive=true, + arg::autoDelete=true); + // Bind my queue to the fanout exchange. + // Note no routingKey required, the fanout exchange delivers + // all messages to all bound queues unconditionally. + session.queueBind(arg::exchange="amq.fanout", arg::queue=myQueue); + + // Create a listener and subscribe it to my queue. SubscriptionManager subscriptions(session); - // Create a listener and subscribe it to the queue named "message_queue" Listener listener(subscriptions); - subscriptions.subscribe(listener, "message_queue"); + subscriptions.subscribe(listener, myQueue); + // Deliver messages until the subscription is cancelled // by Listener::received() + std::cout << "Listening" << std::endl; subscriptions.run(); //--------------------------------------------------------------------------- diff --git a/qpid/cpp/examples/examples/fanout/verify b/qpid/cpp/examples/examples/fanout/verify index 1e1e8121d2..ace4a6dfee 100644 --- a/qpid/cpp/examples/examples/fanout/verify +++ b/qpid/cpp/examples/examples/fanout/verify @@ -1,3 +1,6 @@ # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify -clients ./declare_queues ./fanout_producer ./listener -outputs ./declare_queues.out ./fanout_producer.out ./listener.out +background "Listening" ./listener +background "Listening" ./listener +background "Listening" ./listener +clients ./fanout_producer +outputs ./fanout_producer.out "./listener.out | remove_uuid" "./listenerX.out | remove_uuid" "./listenerXX.out | remove_uuid" diff --git a/qpid/cpp/examples/examples/fanout/verify.in b/qpid/cpp/examples/examples/fanout/verify.in index 23d08f38dd..8f8612ce67 100644 --- a/qpid/cpp/examples/examples/fanout/verify.in +++ b/qpid/cpp/examples/examples/fanout/verify.in @@ -1,6 +1,6 @@ -==== declare_queues.out ==== fanout_producer.out -==== listener.out +==== listener.out | remove_uuid +Listening Message: Message 0 Message: Message 1 Message: Message 2 @@ -12,4 +12,32 @@ Message: Message 7 Message: Message 8 Message: Message 9 Message: That's all, folks! -Shutting down listener for message_queue +Shutting down listener for +==== listenerX.out | remove_uuid +Listening +Message: Message 0 +Message: Message 1 +Message: Message 2 +Message: Message 3 +Message: Message 4 +Message: Message 5 +Message: Message 6 +Message: Message 7 +Message: Message 8 +Message: Message 9 +Message: That's all, folks! +Shutting down listener for +==== listenerXX.out | remove_uuid +Listening +Message: Message 0 +Message: Message 1 +Message: Message 2 +Message: Message 3 +Message: Message 4 +Message: Message 5 +Message: Message 6 +Message: Message 7 +Message: Message 8 +Message: Message 9 +Message: That's all, folks! +Shutting down listener for diff --git a/qpid/cpp/examples/examples/fanout/verify_cpp_python b/qpid/cpp/examples/examples/fanout/verify_cpp_python index f53784ef1c..e840e68f91 100644 --- a/qpid/cpp/examples/examples/fanout/verify_cpp_python +++ b/qpid/cpp/examples/examples/fanout/verify_cpp_python @@ -1,5 +1,7 @@ # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify py=$PYTHON_EXAMPLES/fanout -clients ./declare_queues ./fanout_producer $py/fanout_consumer.py -outputs ./declare_queues.out ./fanout_producer.out $py/fanout_consumer.py.out +background "Subscribed" $py/fanout_consumer.py +background "Subscribed" $py/fanout_consumer.py +clients ./fanout_producer +outputs ./fanout_producer.out "$py/fanout_consumer.py.out | remove_uuid64" "$py/fanout_consumer.pyX.out | remove_uuid64" diff --git a/qpid/cpp/examples/examples/fanout/verify_cpp_python.in b/qpid/cpp/examples/examples/fanout/verify_cpp_python.in index cb9e52cfcc..fac2b365d3 100644 --- a/qpid/cpp/examples/examples/fanout/verify_cpp_python.in +++ b/qpid/cpp/examples/examples/fanout/verify_cpp_python.in @@ -1,14 +1,31 @@ -==== declare_queues.out ==== fanout_producer.out -==== fanout_consumer.py.out -Message 0 -Message 1 -Message 2 -Message 3 -Message 4 -Message 5 -Message 6 -Message 7 -Message 8 -Message 9 -That's all, folks! +==== fanout_consumer.py.out | remove_uuid64 +Messages queue: +Subscribed to queue +Response: Message 0 +Response: Message 1 +Response: Message 2 +Response: Message 3 +Response: Message 4 +Response: Message 5 +Response: Message 6 +Response: Message 7 +Response: Message 8 +Response: Message 9 +Response: That's all, folks! +No more messages! +==== fanout_consumer.pyX.out | remove_uuid64 +Messages queue: +Subscribed to queue +Response: Message 0 +Response: Message 1 +Response: Message 2 +Response: Message 3 +Response: Message 4 +Response: Message 5 +Response: Message 6 +Response: Message 7 +Response: Message 8 +Response: Message 9 +Response: That's all, folks! +No more messages! diff --git a/qpid/cpp/examples/examples/fanout/verify_python_cpp b/qpid/cpp/examples/examples/fanout/verify_python_cpp index 00a0727352..d9b3361523 100644 --- a/qpid/cpp/examples/examples/fanout/verify_python_cpp +++ b/qpid/cpp/examples/examples/fanout/verify_python_cpp @@ -1,5 +1,7 @@ # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify py=$PYTHON_EXAMPLES/fanout -clients $py/declare_queues.py $py/fanout_producer.py ./listener -outputs $py/declare_queues.py.out $py/fanout_producer.py.out ./listener.out +background "Listening" ./listener +background "Listening" ./listener +clients $py/fanout_producer.py +outputs $py/fanout_producer.py.out "./listener.out | remove_uuid" "./listenerX.out | remove_uuid" diff --git a/qpid/cpp/examples/examples/fanout/verify_python_cpp.in b/qpid/cpp/examples/examples/fanout/verify_python_cpp.in index 87a1694e6c..8f9e959053 100644 --- a/qpid/cpp/examples/examples/fanout/verify_python_cpp.in +++ b/qpid/cpp/examples/examples/fanout/verify_python_cpp.in @@ -1,6 +1,6 @@ -==== declare_queues.py.out ==== fanout_producer.py.out -==== listener.out +==== listener.out | remove_uuid +Listening Message: message 0 Message: message 1 Message: message 2 @@ -12,4 +12,18 @@ Message: message 7 Message: message 8 Message: message 9 Message: That's all, folks! -Shutting down listener for message_queue +Shutting down listener for +==== listenerX.out | remove_uuid +Listening +Message: message 0 +Message: message 1 +Message: message 2 +Message: message 3 +Message: message 4 +Message: message 5 +Message: message 6 +Message: message 7 +Message: message 8 +Message: message 9 +Message: That's all, folks! +Shutting down listener for diff --git a/qpid/python/examples/fanout/declare_queues.py b/qpid/python/examples/fanout/declare_queues.py deleted file mode 100755 index 52f23f4f9a..0000000000 --- a/qpid/python/examples/fanout/declare_queues.py +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env python -""" - declare_queues.py - - Creates and binds a queue on an AMQP direct exchange. - - All messages using the routing key "routing_key" are - sent to the queue named "message_queue". -""" - -import qpid -import sys -from qpid.client import Client -from qpid.content import Content -from qpid.queue import Empty - -#----- Initialization ----------------------------------- - -# Set parameters for login - -host=len(sys.argv) > 1 and sys.argv[1] or "127.0.0.1" -port=len(sys.argv) > 2 and int(sys.argv[2]) or 5672 -amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" -user="guest" -password="guest" - -# Create a client and log in to it. - -client = Client(host, port, qpid.spec.load(amqp_spec)) -client.start({"LOGIN": user, "PASSWORD": password}) - -session = client.session() -session.session_open() - -#----- Create a queue ------------------------------------- - -# queue_declare() creates an AMQP queue, which is held -# on the broker. Published messages are sent to the AMQP queue, -# from which messages are delivered to consumers. -# -# queue_bind() determines which messages are routed to a queue. -# Route all messages with the routing key "routing_key" to -# the AMQP queue named "message_queue". - -session.queue_declare(queue="message_queue") -session.queue_bind(exchange="amq.fanout", queue="message_queue") - -#----- Cleanup --------------------------------------------- - -# Clean up before exiting so there are no open threads. - -session.session_close() diff --git a/qpid/python/examples/fanout/fanout_consumer.py b/qpid/python/examples/fanout/fanout_consumer.py index b91ea35c0d..ef24bf35b2 100755 --- a/qpid/python/examples/fanout/fanout_consumer.py +++ b/qpid/python/examples/fanout/fanout_consumer.py @@ -5,13 +5,57 @@ This AMQP client reads messages from a message queue named "message_queue". """ - +import base64 import qpid import sys from qpid.client import Client from qpid.content import Content from qpid.queue import Empty +#----- Functions ------------------------------------------- + +def dump_queue(client, queue_name): + + print "Messages queue: " + queue_name + + consumer_tag = queue_name # Use the queue name as the consumer tag - need a unique tag + queue = client.queue(consumer_tag) + + # Call message_subscribe() to tell the broker to deliver messages + # from the AMQP queue to a local client queue. The broker will + # start delivering messages as soon as message_subscribe() is called. + + session.message_subscribe(queue=queue_name, destination=consumer_tag) + session.message_flow(consumer_tag, 0, 0xFFFFFFFF) + session.message_flow(consumer_tag, 1, 0xFFFFFFFF) + + print "Subscribed to queue " + queue_name + sys.stdout.flush() + + message = 0 + + while True: + try: + message = queue.get(timeout=10) + content = message.content.body + print "Response: " + content + except Empty: + print "No more messages!" + break + except: + print "Unexpected exception!" + break + + + # Messages are not removed from the queue until they + # are acknowledged. Using cumulative=True, all messages + # in the session up to and including the one identified + # by the delivery tag are acknowledged. This is more efficient, + # because there are fewer network round-trips. + + if message != 0: + message.complete(cumulative=True) + #----- Initialization -------------------------------------- @@ -29,44 +73,22 @@ client = Client(host, port, qpid.spec.load(amqp_spec)) client.start({"LOGIN": user, "PASSWORD": password}) session = client.session() -session.session_open() - -#----- Read from queue -------------------------------------------- - -# Now let's create a local client queue and tell it to read -# incoming messages. - -# The consumer tag identifies the client-side queue. - -consumer_tag = "consumer1" -queue = client.queue(consumer_tag) - -# Call message_subscribe() to tell the broker to deliver messages -# from the AMQP queue to this local client queue. The broker will -# start delivering messages as soon as message_subscribe() is called. - -session.message_subscribe(queue="message_queue", destination=consumer_tag) -session.message_flow(consumer_tag, 0, 0xFFFFFFFF) -session.message_flow(consumer_tag, 1, 0xFFFFFFFF) - -# Initialize 'final' and 'content', variables used to identify the last message. +session_info = session.session_open() +session_id = session_info.session_id -final = "That's all, folks!" # In a message body, signals the last message -content = "" # Content of the last message read +#----- Main Body -- ---------------------------------------- -message = None -while content != final: - message = queue.get(timeout=10) - content = message.content.body - print content +# Make a unique queue name for my queue from the session ID. +my_queue = base64.urlsafe_b64encode(session_id) +session.queue_declare(queue=my_queue) -# Messages are not removed from the queue until they are -# acknowledged. Using cumulative=True, all messages from the session -# up to and including the one identified by the delivery tag are -# acknowledged. This is more efficient, because there are fewer -# network round-trips. +# Bind my queue to the fanout exchange. No routing key is required +# the fanout exchange copies messages unconditionally to every +# bound queue +session.queue_bind(queue=my_queue, exchange="amq.fanout") -message.complete(cumulative=True) +# Dump the messages on the queue. +dump_queue(client, my_queue) #----- Cleanup ------------------------------------------------ diff --git a/qpid/python/examples/fanout/verify b/qpid/python/examples/fanout/verify index f136ccd39b..7650853e11 100644 --- a/qpid/python/examples/fanout/verify +++ b/qpid/python/examples/fanout/verify @@ -1,3 +1,5 @@ # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify -clients ./declare_queues.py ./fanout_producer.py ./fanout_consumer.py -outputs ./declare_queues.py.out ./fanout_producer.py.out ./fanout_consumer.py.out +background "Subscribed" ./fanout_consumer.py +background "Subscribed" ./fanout_consumer.py +clients ./fanout_producer.py +outputs ./fanout_producer.py.out "./fanout_consumer.py.out | remove_uuid64" "./fanout_consumer.pyX.out | remove_uuid64" diff --git a/qpid/python/examples/fanout/verify.in b/qpid/python/examples/fanout/verify.in index a5f57f0b4b..d5067b3850 100644 --- a/qpid/python/examples/fanout/verify.in +++ b/qpid/python/examples/fanout/verify.in @@ -1,14 +1,31 @@ -==== declare_queues.py.out ==== fanout_producer.py.out -==== fanout_consumer.py.out -message 0 -message 1 -message 2 -message 3 -message 4 -message 5 -message 6 -message 7 -message 8 -message 9 -That's all, folks! +==== fanout_consumer.py.out | remove_uuid64 +Messages queue: +Subscribed to queue +Response: message 0 +Response: message 1 +Response: message 2 +Response: message 3 +Response: message 4 +Response: message 5 +Response: message 6 +Response: message 7 +Response: message 8 +Response: message 9 +Response: That's all, folks! +No more messages! +==== fanout_consumer.pyX.out | remove_uuid64 +Messages queue: +Subscribed to queue +Response: message 0 +Response: message 1 +Response: message 2 +Response: message 3 +Response: message 4 +Response: message 5 +Response: message 6 +Response: message 7 +Response: message 8 +Response: message 9 +Response: That's all, folks! +No more messages! |