diff options
-rw-r--r-- | cpp/src/qpid/broker/XmlExchange.cpp | 152 | ||||
-rw-r--r-- | cpp/src/qpid/client/Dispatcher.cpp | 8 | ||||
-rw-r--r-- | python/examples/xml-exchange/declare_queues.py | 82 | ||||
-rw-r--r-- | python/examples/xml-exchange/listener.py | 98 | ||||
-rw-r--r-- | python/examples/xml-exchange/verify | 3 | ||||
-rw-r--r-- | python/examples/xml-exchange/verify.in | 14 | ||||
-rw-r--r-- | python/examples/xml-exchange/xml_consumer.py | 89 | ||||
-rw-r--r-- | python/examples/xml-exchange/xml_producer.py | 84 |
8 files changed, 464 insertions, 66 deletions
diff --git a/cpp/src/qpid/broker/XmlExchange.cpp b/cpp/src/qpid/broker/XmlExchange.cpp index 8577e9211c..1d8f2ae8d8 100644 --- a/cpp/src/qpid/broker/XmlExchange.cpp +++ b/cpp/src/qpid/broker/XmlExchange.cpp @@ -77,34 +77,38 @@ XmlExchange::XmlExchange(const std::string& _name, bool _durable, bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* bindingArguments) { - RWlock::ScopedWlock l(lock); - XmlBinding::vector& bindings(bindingsMap[routingKey]); - XmlBinding::vector::iterator i; - string queryText = bindingArguments->getString("xquery"); - for (i = bindings.begin(); i != bindings.end(); i++) - if ((*i)->queue == queue) - break; + try { + RWlock::ScopedWlock l(lock); + XmlBinding::vector& bindings(bindingsMap[routingKey]); + XmlBinding::vector::iterator i; - if (i == bindings.end()) { + for (i = bindings.begin(); i != bindings.end(); i++) + if ((*i)->queue == queue) + break; + + if (i == bindings.end()) { - try { Query query(xqilla.parse(X(queryText.c_str()))); XmlBinding::shared_ptr binding(new XmlBinding (routingKey, queue, this, query)); XmlBinding::vector bindings(1, binding); - bindingsMap[routingKey] = bindings; - } - catch (XQException& e) { - throw InternalErrorException(QPID_MSG("Could not parse xquery:"+ queryText)); + bindingsMap[routingKey] = bindings; + QPID_LOG(trace, "Bound successfully with query: " << queryText ); + + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_bindings (); + } + return true; + } else{ + return false; } - - if (mgmtExchange.get() != 0) { - mgmtExchange->inc_bindings (); - } - return true; - } else{ - return false; + } + catch (XQException& e) { + throw InternalErrorException(QPID_MSG("Could not parse xquery:"+ queryText)); + } + catch (...) { + throw InternalErrorException(QPID_MSG("Unexpected error - Could not parse xquery:"+ queryText)); } } @@ -138,22 +142,31 @@ bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::F // Hack alert - the following code does not work for really large messages string msgContent; - msg.getMessage().getFrames().getContent(msgContent); - - boost::scoped_ptr<DynamicContext> context(query->createDynamicContext()); try { - XERCES_CPP_NAMESPACE::MemBufInputSource xml((XMLByte*)msgContent.c_str(), msgContent.length(), "input" ); + msg.getMessage().getFrames().getContent(msgContent); + + QPID_LOG(trace, "matches: query is [" << UTF8(query->getQueryText()) << "]"); + QPID_LOG(trace, "matches: message content is [" << msgContent << "]"); + + boost::scoped_ptr<DynamicContext> context(query->createDynamicContext()); + if (!context.get()) { + throw InternalErrorException(QPID_MSG("Query context looks munged ...")); + } + + XERCES_CPP_NAMESPACE::MemBufInputSource xml((XMLByte*) msgContent.c_str(), msgContent.length(), "input" ); Sequence seq(context->parseDocument(xml)); - FieldTable::ValueMap::const_iterator v = args->begin(); - for(; v != args->end(); ++v) { - // ### TODO: Do types properly - if (v->second->convertsTo<std::string>()) { - QPID_LOG(trace, "XmlExchange, external variable: " << v->first << " = " << v->second->getData().getString().c_str()); - Item::Ptr value = context->getItemFactory()->createString(X(v->second->getData().getString().c_str()), context.get()); - context->setExternalVariable(X(v->first.c_str()), value); - } + if (args) { + FieldTable::ValueMap::const_iterator v = args->begin(); + for(; v != args->end(); ++v) { + // ### TODO: Do types properly + if (v->second->convertsTo<std::string>()) { + QPID_LOG(trace, "XmlExchange, external variable: " << v->first << " = " << v->second->getData().getString().c_str()); + Item::Ptr value = context->getItemFactory()->createString(X(v->second->getData().getString().c_str()), context.get()); + context->setExternalVariable(X(v->first.c_str()), value); + } + } } if(!seq.isEmpty() && seq.first()->isNode()) { @@ -166,45 +179,58 @@ bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::F } catch (XQException& e) { QPID_LOG(warning, "Could not parse XML content (or message headers):" << msgContent); + return 0; + } + catch (...) { + QPID_LOG(warning, "Unexpected error routing message: " << msgContent); + return 0; } return 0; } void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* args) { - RWlock::ScopedRlock l(lock); - XmlBinding::vector& bindings(bindingsMap[routingKey]); - XmlBinding::vector::iterator i; - int count(0); - - for (i = bindings.begin(); i != bindings.end(); i++, count++) { - - if (matches((*i)->xquery, msg, args)) { - msg.deliverTo((*i)->queue); - - if ((*i)->mgmtBinding.get() != 0) - (*i)->mgmtBinding->inc_msgMatched (); - } - } - - if(!count){ - QPID_LOG(warning, "XMLExchange " << getName() << " could not route message with query " << routingKey); - if (mgmtExchange.get() != 0) { - mgmtExchange->inc_msgDrops (); - mgmtExchange->inc_byteDrops (msg.contentSize ()); - } - } - else { - if (mgmtExchange.get() != 0) { - mgmtExchange->inc_msgRoutes (count); - mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); - } + try { + RWlock::ScopedRlock l(lock); + XmlBinding::vector& bindings(bindingsMap[routingKey]); + XmlBinding::vector::iterator i; + int count(0); + + for (i = bindings.begin(); i != bindings.end(); i++) { + + if ((*i)->xquery && matches((*i)->xquery, msg, args)) { // Overly defensive? There should always be a query ... + msg.deliverTo((*i)->queue); + count++; + QPID_LOG(trace, "Delivered to queue" ); + + if ((*i)->mgmtBinding.get() != 0) + (*i)->mgmtBinding->inc_msgMatched (); + } + + if(!count){ + QPID_LOG(warning, "XMLExchange " << getName() << ": could not route message with query " << routingKey); + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_msgDrops (); + mgmtExchange->inc_byteDrops (msg.contentSize ()); + } + } + else { + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_msgRoutes (count); + mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + } + } + + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_msgReceives (); + mgmtExchange->inc_byteReceives (msg.contentSize ()); + } + } } - - if (mgmtExchange.get() != 0) { - mgmtExchange->inc_msgReceives (); - mgmtExchange->inc_byteReceives (msg.contentSize ()); + catch (...) { + QPID_LOG(warning, "XMLExchange " << getName() << ": exception routing message with query " << routingKey); } + } diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp index 37b37d9077..2bbe5a122f 100644 --- a/cpp/src/qpid/client/Dispatcher.cpp +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -88,10 +88,12 @@ void Dispatcher::run() } } } - } catch (const ClosedException&) { - //ignore it and return + session.sync(); // Make sure all our acks are received before returning. + } + catch (const ClosedException&) {} //ignore it and return + catch (const std::exception& e) { + QPID_LOG(error, "Exception in client dispatch thread: " << e.what()); } - session.sync(); // Make sure all our acks are received before returning. } void Dispatcher::stop() diff --git a/python/examples/xml-exchange/declare_queues.py b/python/examples/xml-exchange/declare_queues.py new file mode 100644 index 0000000000..d3bf4d359e --- /dev/null +++ b/python/examples/xml-exchange/declare_queues.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python +""" + declare_queues.py + + Creates and binds a queue on an AMQP direct exchange. + + All messages using the routing key "routing_key" are + sent to the queue named "message_queue". +""" + +import qpid +import sys +import os +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, RangedSet, uuid4 +from qpid.queue import Empty + +#----- Initialization ----------------------------------- + + +# Set parameters for login + +host="127.0.0.1" +port=5672 +user="guest" +password="guest" +amqp_spec="/usr/share/amqp/amqp.0-10.xml" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +# +# If AMQP_SPEC is defined, use it to locate the spec file instead of +# looking for it in the default location. + +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) + +try: + amqp_spec = os.environ["AMQP_SPEC"] +except KeyError: + amqp_spec="/usr/share/amqp/amqp.0-10.xml" + +# Create a connection. +socket = connect(host, port) +connection = Connection (sock=socket, spec=qpid.spec.load(amqp_spec)) +connection.start() +session = connection.session(str(uuid4())) + +#----- Create a queue ------------------------------------- + +# queue_declare() creates an AMQP queue, which is held +# on the broker. Published messages are sent to the AMQP queue, +# from which messages are delivered to consumers. +# +# queue_bind() determines which messages are routed to a queue. +# Route all messages with the routing key "routing_key" to +# the AMQP queue named "message_queue". + +session.exchange_declare(exchange="xml", type="xml") +session.queue_declare(queue="message_queue") + +binding = {} +binding["xquery"] = """ + let $w := ./weather + return $w/station = 'Raleigh-Durham International Airport (KRDU)' + and $w/temperature_f > 50 + and $w/temperature_f - $w/dewpoint > 5 + and $w/wind_speed_mph > 7 + and $w/wind_speed_mph < 20 """ + + +session.exchange_bind(exchange="xml", queue="message_queue", binding_key="weather", arguments=binding) + + +#----- Cleanup --------------------------------------------- + +session.close() + + diff --git a/python/examples/xml-exchange/listener.py b/python/examples/xml-exchange/listener.py new file mode 100644 index 0000000000..cdc00a7015 --- /dev/null +++ b/python/examples/xml-exchange/listener.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python +""" + listener.py + + This AMQP client reads messages from a message + queue named "message_queue". It is implemented + as a message listener. +""" + + +import qpid +import sys +import os +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, RangedSet, uuid4 +from qpid.queue import Empty + +# + +from time import sleep + + +#----- Message Receive Handler ----------------------------- +class Receiver: + def __init__ (self): + self.finalReceived = False + + def isFinal (self): + return self.finalReceived + + def Handler (self, message): + content = message.body + session.message_accept(RangedSet(message.id)) + print content + +#----- Initialization -------------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +user="guest" +password="guest" +amqp_spec="/usr/share/amqp/amqp.0-10.xml" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +# +# If AMQP_SPEC is defined, use it to locate the spec file instead of +# looking for it in the default location. + +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) + +try: + amqp_spec = os.environ["AMQP_SPEC"] +except KeyError: + amqp_spec="/usr/share/amqp/amqp.0-10.xml" + +# Create a connection. +socket = connect(host, port) +connection = Connection (sock=socket, spec=qpid.spec.load(amqp_spec)) +connection.start() +session = connection.session(str(uuid4())) + +#----- Read from queue -------------------------------------------- + +# Now let's create a local client queue and tell it to read +# incoming messages. + +# The consumer tag identifies the client-side queue. + +local_queue_name = "local_queue" +local_queue = session.incoming(local_queue_name) + +# Call message_subscribe() to tell the broker to deliver messages +# from the AMQP queue to this local client queue. The broker will +# start delivering messages as soon as message_subscribe() is called. + +session.message_subscribe(queue="message_queue", destination=local_queue_name) +session.message_flow(local_queue_name, session.credit_unit.message, 0xFFFFFFFF) +session.message_flow(local_queue_name, session.credit_unit.byte, 0xFFFFFFFF) + +receiver = Receiver () +local_queue.listen (receiver.Handler) + +sleep (10) + + +#----- Cleanup ------------------------------------------------ + +# Clean up before exiting so there are no open threads. +# + +session.close() diff --git a/python/examples/xml-exchange/verify b/python/examples/xml-exchange/verify new file mode 100644 index 0000000000..01d81a18a1 --- /dev/null +++ b/python/examples/xml-exchange/verify @@ -0,0 +1,3 @@ +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +clients ./declare_queues.py ./direct_producer.py ./direct_consumer.py +outputs ./declare_queues.py.out ./direct_producer.py.out ./direct_consumer.py.out diff --git a/python/examples/xml-exchange/verify.in b/python/examples/xml-exchange/verify.in new file mode 100644 index 0000000000..5e691619d9 --- /dev/null +++ b/python/examples/xml-exchange/verify.in @@ -0,0 +1,14 @@ +==== declare_queues.py.out +==== direct_producer.py.out +==== direct_consumer.py.out +message 0 +message 1 +message 2 +message 3 +message 4 +message 5 +message 6 +message 7 +message 8 +message 9 +That's all, folks! diff --git a/python/examples/xml-exchange/xml_consumer.py b/python/examples/xml-exchange/xml_consumer.py new file mode 100644 index 0000000000..c210a4342c --- /dev/null +++ b/python/examples/xml-exchange/xml_consumer.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python +""" + direct_consumer.py + + This AMQP client reads messages from a message + queue named "message_queue". +""" + +import qpid +import sys +import os +from random import randint +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, RangedSet, uuid4 +from qpid.queue import Empty + + +#----- Initialization -------------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +user="guest" +password="guest" +amqp_spec="/usr/share/amqp/amqp.0-10.xml" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +# +# If AMQP_SPEC is defined, use it to locate the spec file instead of +# looking for it in the default location. + +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) + +try: + amqp_spec = os.environ["AMQP_SPEC"] +except KeyError: + amqp_spec="/usr/share/amqp/amqp.0-10.xml" + +# Create a connection. +socket = connect(host, port) +connection = Connection (sock=socket, spec=qpid.spec.load(amqp_spec)) +connection.start() +session = connection.session(str(uuid4())) + + +#----- Read from queue -------------------------------------------- + +# Now let's create a local client queue and tell it to read +# incoming messages. + +# The consumer tag identifies the client-side queue. + +local_queue_name = "local_queue" +local_queue = session.incoming(local_queue_name) + +# Call message_consume() to tell the broker to deliver messages +# from the AMQP queue to this local client queue. The broker will +# start delivering messages as soon as message_consume() is called. + +session.message_subscribe(queue="message_queue", destination=local_queue_name) +session.message_flow(local_queue_name, session.credit_unit.message, 0xFFFFFFFF) +session.message_flow(local_queue_name, session.credit_unit.byte, 0xFFFFFFFF) + +# Initialize 'final' and 'content', variables used to identify the last message. + +message = None +while True: + try: + message = local_queue.get(timeout=10) + session.message_accept(RangedSet(message.id)) + content = message.body + print content + except Empty: + print "No more messages!" + break + + +#----- Cleanup ------------------------------------------------ + +# Clean up before exiting so there are no open threads. +# + +session.close() diff --git a/python/examples/xml-exchange/xml_producer.py b/python/examples/xml-exchange/xml_producer.py new file mode 100644 index 0000000000..9e609ed132 --- /dev/null +++ b/python/examples/xml-exchange/xml_producer.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python +""" + xml_producer.py + + Publishes messages to an XML exchange, using + the routing key "weather" +""" + + +import qpid +import sys +import os +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, RangedSet, uuid4 +from qpid.queue import Empty + +#----- Functions ---------------------------------------- + +# Data for weather reports + +station = ("Raleigh-Durham International Airport (KRDU)", + "New Bern, Craven County Regional Airport (KEWN)", + "Boone, Watauga County Hospital Heliport (KTNB)", + "Hatteras, Mitchell Field (KHSE)") +wind_speed_mph = ( 0, 2, 5, 10, 16, 22, 28, 35, 42, 51, 61, 70, 80 ) +temperature_f = ( 30, 40, 50, 60, 70, 80, 90, 100 ) +dewpoint = ( 35, 40, 45, 50 ) + +def pick_one(list, i): + return str( list [ i % len(list)] ) + +def report(i): + return "<weather>" + "<station>" + pick_one(station,i)+ "</station>" + "<wind_speed_mph>" + pick_one(wind_speed_mph,i) + "</wind_speed_mph>" + "<temperature_f>" + pick_one(temperature_f,i) + "</temperature_f>" + "<dewpoint>" + pick_one(dewpoint,i) + "</dewpoint>" + "</weather>" + + +#----- Initialization ----------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +user="guest" +password="guest" +amqp_spec="/usr/share/amqp/amqp.0-10.xml" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +# +# If AMQP_SPEC is defined, use it to locate the spec file instead of +# looking for it in the default location. + +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) + +try: + amqp_spec = os.environ["AMQP_SPEC"] +except KeyError: + amqp_spec="/usr/share/amqp/amqp.0-10.xml" + +# Create a connection. +socket = connect(host, port) +connection = Connection (sock=socket, spec=qpid.spec.load(amqp_spec)) +connection.start() +session = connection.session(str(uuid4())) + +#----- Publish some messages ------------------------------ + +# Create some messages and put them on the broker. + +props = session.delivery_properties(routing_key="weather") + +for i in range(10): + print report(i) + session.message_transfer(destination="xml", message=Message(props, report(i))) + + +#----- Cleanup -------------------------------------------- + +# Clean up before exiting so there are no open threads. + +session.close() |