diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2009-02-09 14:19:23 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2009-02-09 14:19:23 +0000 |
commit | c70ea4bfdc185dff06527e0cca3ee9fbe0a4dcc5 (patch) | |
tree | 27a04a9996c7b4f7994f649490fda5d1d064c4cb | |
parent | f5978ac5b26366ce16f0934158a4ec790899ec9d (diff) | |
download | qpid-python-c70ea4bfdc185dff06527e0cca3ee9fbe0a4dcc5.tar.gz |
QPID-1595 from william -- trade demo
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@742515 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/configure.ac | 1 | ||||
-rw-r--r-- | qpid/cpp/examples/Makefile.am | 2 | ||||
-rw-r--r-- | qpid/cpp/examples/tradedemo/Makefile.am | 53 | ||||
-rw-r--r-- | qpid/cpp/examples/tradedemo/declare_queues.cpp | 98 | ||||
-rw-r--r-- | qpid/cpp/examples/tradedemo/topic_listener.cpp | 184 | ||||
-rw-r--r-- | qpid/cpp/examples/tradedemo/topic_publisher.cpp | 271 |
6 files changed, 608 insertions, 1 deletions
diff --git a/qpid/cpp/configure.ac b/qpid/cpp/configure.ac index 4fe143bedf..90b943f047 100644 --- a/qpid/cpp/configure.ac +++ b/qpid/cpp/configure.ac @@ -380,6 +380,7 @@ AC_CONFIG_FILES([ examples/failover/Makefile examples/xml-exchange/Makefile examples/qmf-console/Makefile + examples/tradedemo/Makefile managementgen/Makefile etc/Makefile src/Makefile diff --git a/qpid/cpp/examples/Makefile.am b/qpid/cpp/examples/Makefile.am index 29a101425c..255c86a7be 100644 --- a/qpid/cpp/examples/Makefile.am +++ b/qpid/cpp/examples/Makefile.am @@ -16,7 +16,7 @@ # specific language governing permissions and limitations # under the License. # -SUBDIRS = direct fanout pub-sub request-response failover qmf-console +SUBDIRS = direct fanout pub-sub request-response failover qmf-console tradedemo if HAVE_XML SUBDIRS += xml-exchange broker_args = "--no-module-dir --data-dir \"\" --auth no --load-module $(top_builddir)/src/.libs/xml.so" diff --git a/qpid/cpp/examples/tradedemo/Makefile.am b/qpid/cpp/examples/tradedemo/Makefile.am new file mode 100644 index 0000000000..b38160947a --- /dev/null +++ b/qpid/cpp/examples/tradedemo/Makefile.am @@ -0,0 +1,53 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +examplesdir=$(pkgdatadir)/examples/pub-sub + +include $(top_srcdir)/examples/makedist.mk + +noinst_PROGRAMS=topic_listener topic_publisher declare_queues + +topic_listener_SOURCES=topic_listener.cpp +topic_listener_LDADD=$(CLIENT_LIB) + +topic_publisher_SOURCES=topic_publisher.cpp +topic_publisher_LDADD=$(CLIENT_LIB) + +declare_queues_SOURCES=declare_queues.cpp +declare_queues_LDADD=$(CLIENT_LIB) + + +examples_DATA= \ + topic_listener.cpp \ + topic_publisher.cpp \ + declare_queues.cpp \ + $(MAKEDIST) + +EXTRA_DIST= \ + $(examples_DATA) \ + verify \ + verify.in \ + verify_cpp_python \ + verify_cpp_python.in \ + verify_python_cpp \ + verify_python_cpp.in + + + + + diff --git a/qpid/cpp/examples/tradedemo/declare_queues.cpp b/qpid/cpp/examples/tradedemo/declare_queues.cpp new file mode 100644 index 0000000000..b1f2cc3510 --- /dev/null +++ b/qpid/cpp/examples/tradedemo/declare_queues.cpp @@ -0,0 +1,98 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +/** + * topic_publisher.cpp: + * + * This program is one of three programs designed to be used + * together. These programs implement a publish-subscribe example + * using the "amq.topic" exchange. In the example multiple listeners + * can subscribe to the same queues for TTL messages. + * The TTL messages are all ticker price data. Messages are + * browsed and therefore shared among the multiple listeners. + * Messages timeout using TTL so that they don't stay in the queue + * for too long and fill it up. + * Local exclusive LVQ are also declared for market data. + * + * declare_queues.cpp + * + * Declares several non-exclusive queues bound to the amq:topic exchange + * + * topic_publisher.cpp + * + * Sends messages to the "amq.topic" exchange, using the + * multipart routing keys for ticker price and market data + * Ticker messages are sent using a TTL value. + * + * topic_listener.cpp (this program) + * + * Subscribes to non-exclusive queues in NOT_ACQUIRE mode for + * ticker price data and declares two LVQs for market data. + * + * Multiple listeners can be run at the same time. + * + */ + +#include <qpid/client/Connection.h> +#include <qpid/client/Session.h> + + +using namespace qpid::client; +using namespace qpid::framing; + + +int main(int argc, char** argv) { + const char* host = argc>1 ? argv[1] : "127.0.0.1"; + int port = argc>2 ? atoi(argv[2]) : 5672; + Connection connection; + + try { + connection.open(host, port); + Session session = connection.newSession(); + + + //--------- Main body of program -------------------------------------------- + + // Create a queue named "message_queue", and route all messages whose + // routing key is "routing_key" to this newly created queue. + + session.queueDeclare(arg::queue="TICKER.NYSE", arg::exclusive=false); + session.exchangeBind(arg::exchange="amq.topic", arg::queue="TICKER.NYSE", arg::bindingKey="TICKER.NYSE.#"); + std::cout << "Declared queue Ticker NYSE non-exclusive with amq:topic binding TICKER.NYSE.#" << std::endl; + session.queueDeclare(arg::queue="TICKER.NASDAQ", arg::exclusive=false); + session.exchangeBind(arg::exchange="amq.topic", arg::queue="TICKER.NASDAQ", arg::bindingKey="TICKER.NASDAQ.#"); + std::cout << "Declared queue Ticker NASDAQ non-exclusive with amq:topic binding TICKER.NASDAQ.#" << std::endl; + + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; + +} + + + diff --git a/qpid/cpp/examples/tradedemo/topic_listener.cpp b/qpid/cpp/examples/tradedemo/topic_listener.cpp new file mode 100644 index 0000000000..569e558147 --- /dev/null +++ b/qpid/cpp/examples/tradedemo/topic_listener.cpp @@ -0,0 +1,184 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * topic_publisher.cpp: + * + * This program is one of three programs designed to be used + * together. These programs implement a publish-subscribe example + * using the "amq.topic" exchange. In the example multiple listeners + * can subscribe to the same queues for TTL messages. + * The TTL messages are all ticker price data. Messages are + * browsed and therefore shared among the multiple listeners. + * Messages timeout using TTL so that they don't stay in the queue + * for too long and fill it up. + * Local exclusive LVQ are also declared for market data. + * + * declare_queues.cpp + * + * Declares several non-exclusive queues bound to the amq:topic exchange + * + * topic_publisher.cpp + * + * Sends messages to the "amq.topic" exchange, using the + * multipart routing keys for ticker price and market data + * Ticker messages are sent using a TTL value. + * + * topic_listener.cpp (this program) + * + * Subscribes to non-exclusive queues in NOT_ACQUIRE mode for + * ticker price data and declares two LVQs for market data. + * + * Multiple listeners can be run at the same time. + * + */ + +#include <qpid/client/Connection.h> +#include <qpid/client/Session.h> +#include <qpid/client/Message.h> +#include <qpid/client/MessageListener.h> +#include <qpid/client/SubscriptionManager.h> +#include "qpid/client/QueueOptions.h" + +#include <unistd.h> +#include <cstdlib> +#include <iostream> + +using namespace qpid::client; +using namespace qpid::framing; + + +class Listener : public MessageListener { + private: + Session& session; + SubscriptionManager subscriptions; + public: + Listener(Session& session); + virtual void subscribeTTLQueue(std::string queue); + virtual void subscribeLVQQueue(std::string queue); + virtual void received(Message& message); + virtual void listen(); + ~Listener() { }; +}; + + +/* + * Listener::Listener + * + * Subscribe to the queue, route it to a client destination for the + * listener. (The destination name merely identifies the destination + * in the listener, you can use any name as long as you use the same + * name for the listener). + */ + +Listener::Listener(Session& session) : + session(session), + subscriptions(session) +{ +} + + +void Listener::subscribeTTLQueue(std::string queue) { + + /* + * Subscribe to the queue using the subscription manager. + * The queues were declared elsewhere alog with their bindings. + */ + + std::cout << "Subscribing to queue " << queue << std::endl; + subscriptions.subscribe(*this, queue); + // Will not acquire messages but instead browse them. + subscriptions.setAcquireMode(message::ACQUIRE_MODE_NOT_ACQUIRED); +} + +void Listener::subscribeLVQQueue(std::string queue) { + + /* + * Declare and subscribe to the queue using the subscription manager. + */ + + QueueOptions qo; + qo.setOrdering(LVQ); + std::string binding = queue + ".#"; + queue += session.getId().getName(); + session.queueDeclare(arg::queue=queue, arg::exclusive=true, arg::arguments=qo); + session.exchangeBind(arg::exchange="amq.topic", arg::queue=queue, arg::bindingKey=binding); + std::cout << "Declared queue " << queue << " non-exclusive with amq:topic binding " << binding << std::endl; + std::cout << "Subscribing to queue " << queue << std::endl; + subscriptions.subscribe(*this, queue, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE)); + +} + +void Listener::received(Message& message) { + // If you want to see the destination you can swap the following lines. + // std::cout << message.getDestination() << "\t" << message.getData() << std::endl; + std::cout << message.getData() << std::endl; + +} + +void Listener::listen() { + // Receive messages + subscriptions.run(); +} + +int main(int argc, char** argv) { + const char* host = argc>1 ? argv[1] : "127.0.0.1"; + int port = argc>2 ? atoi(argv[2]) : 5672; + + Connection connection; + try { + connection.open(host, port); + Session session = connection.newSession(); + + //--------- Main body of program -------------------------------------------- + + + + // Create a listener for the session + + Listener listener(session); + + // Subscribe to messages on the queues we are interested in + + listener.subscribeTTLQueue("TICKER.NASDAQ"); + listener.subscribeTTLQueue("TICKER.NYSE"); + + listener.subscribeLVQQueue("MRKT.NASDAQ"); + listener.subscribeLVQQueue("MRKT.NYSE"); + + std::cout << "Starting Listener <Ctrl>-C to exit." << std::endl; + std::cout << "Listening for messages ..." << std::endl; + + // Give up control and receive messages + listener.listen(); + + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/qpid/cpp/examples/tradedemo/topic_publisher.cpp b/qpid/cpp/examples/tradedemo/topic_publisher.cpp new file mode 100644 index 0000000000..1670a3433b --- /dev/null +++ b/qpid/cpp/examples/tradedemo/topic_publisher.cpp @@ -0,0 +1,271 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * topic_publisher.cpp: + * + * This program is one of three programs designed to be used + * together. These programs implement a publish-subscribe example + * using the "amq.topic" exchange. In the example multiple listeners + * can subscribe to the same queues for TTL messages. + * The TTL messages are all ticker price data. Messages are + * browsed and therefore shared among the multiple listeners. + * Messages timeout using TTL so that they don't stay in the queue + * for too long and fill it up. + * Local exclusive LVQ are also declared for market data. + * + * declare_queues.cpp + * + * Declares several non-exclusive queues bound to the amq:topic exchange + * + * topic_publisher.cpp + * + * Sends messages to the "amq.topic" exchange, using the + * multipart routing keys for ticker price and market data + * Ticker messages are sent using a TTL value. + * + * topic_listener.cpp (this program) + * + * Subscribes to non-exclusive queues in NOT_ACQUIRE mode for + * ticker price data and declares two LVQs for market data. + * + * Multiple listeners can be run at the same time. + * + */ + + +#include <qpid/client/Connection.h> +#include <qpid/client/Session.h> +#include <qpid/client/AsyncSession.h> +#include <qpid/client/Message.h> +#include "qpid/client/QueueOptions.h" + + +#include <unistd.h> +#include <cstdlib> +#include <iostream> +#include <set> +#include <sstream> + +using namespace qpid::client; +using namespace qpid::framing; + +using std::stringstream; +using std::string; + +class Publisher { + private: + Session& session; + int ttl_time; + unsigned long seq; + + unsigned short high_[6]; + unsigned short low_[6]; + unsigned long shares_[6]; + unsigned long volume_[6]; + QueueOptions args; + + public: + Publisher( Session& session, + const int ttl_time, + const unsigned long shares[6]); + + virtual void publish_ticker(const std::string queue, unsigned short& curr_price); + virtual void publish_market(const std::string queue, unsigned short& curr_price, int i); + ~Publisher() { }; +}; + +Publisher::Publisher(Session& session, int ttl_time, const unsigned long shares[6]) : + session(session), + ttl_time(ttl_time), + seq(0) +{ + for (unsigned short i=0; i < 6; i++) { + high_[i] = 0; + low_[i] = 9999; + volume_[i] = 0; + shares_[i] = shares[i]; + } +} + + +void Publisher::publish_ticker(const std::string symbol, unsigned short& curr_price) +{ + Message message; + + // Set the routing key once, we'll use the same routing key for all + // messages. + + std::string routing_key = "TICKER." + symbol; + std::cout << "Setting routing key:" << routing_key << std::endl; + message.getDeliveryProperties().setRoutingKey(routing_key); + + // Randomally generate some price flucuations + bool mvmnt; + unsigned short change = rand() % 3; + if (rand() % 2 == 0) + { + mvmnt = true; + curr_price += change; + } + else + { + mvmnt = false; + curr_price = (curr_price - change)>0 ? (curr_price - change) : 0; + } + + // Was there change in price or no change ? + std::string movement; + if (!change) + { + movement = "] [--]"; + } else + { + movement = (mvmnt ? "] [UP]" : "] [DOWN]"); + } + + stringstream ticker_data; + // Build up the ticker info + ticker_data << "[TICKER] " << "Symbol:" << symbol << " \tPrice[" << curr_price << "] \t[" + << change << movement; + + message.setData(ticker_data.str()); + // Set TTL value so that message will timeout after a period and be purged from queues + message.getDeliveryProperties().setTtl(ttl_time); + // Asynchronous transfer sends messages as quickly as + // possible without waiting for confirmation. + async(session).messageTransfer(arg::content=message, arg::destination="amq.topic"); + +} + +void Publisher::publish_market(const std::string symbol, unsigned short& curr_price, int i) +{ + Message message; + + // Set the routing key + std::string routing_key = "MRKT." + symbol; + std::cout << "Setting routing key:" << routing_key << std::endl; + message.getDeliveryProperties().setRoutingKey(routing_key); + + // Calculate the market data low/hi change, vol, market cap etc. + if (curr_price < low_[i] || low_[i] == 0) + { + low_[i] = curr_price; + } + else if (curr_price > high_[i] || high_[i] == 9999) + { + high_[i] = curr_price; + } + + volume_[i] += rand() % 1000; // increase the daily volume tracker + int mkt_cap = shares_[i] * curr_price; // calculate new market cap based on current price + + stringstream market_data; + // Build up the ticker info + market_data << "[MARKET] " << "Symbol:" << symbol << "\tVolume: " << volume_[i] + << "\tHi:" << high_[i] << "\tLo:" << low_[i] << "\tMktCap:" + << mkt_cap <<"M\tSEQ[" << seq << "]"; + + message.setData(market_data.str()); + + std::string key; + args.getLVQKey(key); + message.getHeaders().setString(key, symbol); + + // Asynchronous transfer sends messages as quickly as + // possible without waiting for confirmation. + async(session).messageTransfer(arg::content=message, arg::destination="amq.topic"); + seq++; // This sequence number is really just to demonstrate the LVQ nature of the queue. + // You will notice some messages don't show because they are overwritten by last value. + +} + + +int main(int argc, char** argv) { + unsigned int pub_cycles = argc>1 ? atoi(argv[1]) : 100; + unsigned int ttl_time = argc>2 ? atoi(argv[2]) : 4000; + const char* host = argc>3 ? argv[3] : "127.0.0.1"; + int port = argc>4 ? atoi(argv[4]) : 5672; + std::cout <<"Usage: topic_publisher <pub cycles> <TTL-timeout> <host name/IP> <port>" << std::endl; + std::cout <<"\tparameters are optional but must be in this order when used." << std::endl; + + // Set up the stocks symbols and their prices + std::string symbol[6]; + unsigned short price[6]; + symbol[0] = "NYSE.RHT"; // Red Hat + symbol[1] = "NYSE.IBM"; // IBM Corp. + symbol[2] = "NASDAQ.MSFT"; // Microsoft + symbol[3] = "NASDAQ.CSCO"; // Cisco Systems + symbol[4] = "NASDAQ.YHOO"; // Yahoo + symbol[5] = "NASDAQ.GOOG"; // Google + + // Rough starting values. + price[0] = rand() % 30 +1; + price[1] = rand() % 120 +1; + price[2] = rand() % 20 +1; + price[3] = rand() % 75 +1; + price[4] = rand() % 10 +1; + price[5] = rand() % 323 +1; + + // Shares oustanding in millions. + unsigned long shares[6] = {190,1340,8890, 5860, 1390, 314}; + + + Connection connection; + try { + connection.open(host, port); + Session session = connection.newSession(); + + Publisher theFeed(session,ttl_time, shares); + + //--------- Main body of program -------------------------------------------- + + // Print the opening values for each symbol + std::cout << std::endl << "Opening values:" << std::endl; + for (int i=0; i < 6; i++) + { + std::cout << symbol[i] << ":" << price[i] << std::endl; + } + + // For the duration of the publishing cycles publish + // ticker and market data for each symbol + for (unsigned int j=0; j<pub_cycles; j++) + { + for (unsigned int i=0; i < 6; i++) + { + // for each symbol publish the ticker and the market data + theFeed.publish_ticker(symbol[i], price[i]); + theFeed.publish_market(symbol[i], price[i], i); + } + } + + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + |