diff options
Diffstat (limited to 'trunk/qpid/cpp/examples/tradedemo/topic_publisher.cpp')
-rw-r--r-- | trunk/qpid/cpp/examples/tradedemo/topic_publisher.cpp | 271 |
1 files changed, 0 insertions, 271 deletions
diff --git a/trunk/qpid/cpp/examples/tradedemo/topic_publisher.cpp b/trunk/qpid/cpp/examples/tradedemo/topic_publisher.cpp deleted file mode 100644 index e22c185bc7..0000000000 --- a/trunk/qpid/cpp/examples/tradedemo/topic_publisher.cpp +++ /dev/null @@ -1,271 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/** - * topic_publisher.cpp: - * - * This program is one of three programs designed to be used - * together. These programs implement a publish-subscribe example - * using the "amq.topic" exchange. In the example multiple listeners - * can subscribe to the same queues for TTL messages. - * The TTL messages are all ticker price data. Messages are - * browsed and therefore shared among the multiple listeners. - * Messages timeout using TTL so that they don't stay in the queue - * for too long and fill it up. - * Local exclusive LVQ are also declared for market data. - * - * declare_queues.cpp - * - * Declares several non-exclusive queues bound to the amq:topic exchange - * - * topic_publisher.cpp - * - * Sends messages to the "amq.topic" exchange, using the - * multipart routing keys for ticker price and market data - * Ticker messages are sent using a TTL value. - * - * topic_listener.cpp (this program) - * - * Subscribes to non-exclusive queues in NOT_ACQUIRE mode for - * ticker price data and declares two LVQs for market data. - * - * Multiple listeners can be run at the same time. - * - */ - - -#include <qpid/client/Connection.h> -#include <qpid/client/Session.h> -#include <qpid/client/AsyncSession.h> -#include <qpid/client/Message.h> -#include "qpid/client/QueueOptions.h" - - -#include <stdlib.h> -#include <cstdlib> -#include <iostream> -#include <set> -#include <sstream> - -using namespace qpid::client; -using namespace qpid::framing; - -using std::stringstream; -using std::string; - -class Publisher { - private: - Session& session; - int ttl_time; - unsigned long seq; - - unsigned short high_[6]; - unsigned short low_[6]; - unsigned long shares_[6]; - unsigned long volume_[6]; - QueueOptions args; - - public: - Publisher( Session& session, - const int ttl_time, - const unsigned long shares[6]); - - virtual void publish_ticker(const std::string queue, unsigned short& curr_price); - virtual void publish_market(const std::string queue, unsigned short& curr_price, int i); - virtual ~Publisher() { }; -}; - -Publisher::Publisher(Session& session, int ttl_time, const unsigned long shares[6]) : - session(session), - ttl_time(ttl_time), - seq(0) -{ - for (unsigned short i=0; i < 6; i++) { - high_[i] = 0; - low_[i] = 9999; - volume_[i] = 0; - shares_[i] = shares[i]; - } -} - - -void Publisher::publish_ticker(const std::string symbol, unsigned short& curr_price) -{ - Message message; - - // Set the routing key once, we'll use the same routing key for all - // messages. - - std::string routing_key = "TICKER." + symbol; - std::cout << "Setting routing key:" << routing_key << std::endl; - message.getDeliveryProperties().setRoutingKey(routing_key); - - // Randomally generate some price flucuations - bool mvmnt; - unsigned short change = rand() % 3; - if (rand() % 2 == 0) - { - mvmnt = true; - curr_price += change; - } - else - { - mvmnt = false; - curr_price = (curr_price - change)>0 ? (curr_price - change) : 0; - } - - // Was there change in price or no change ? - std::string movement; - if (!change) - { - movement = "] [--]"; - } else - { - movement = (mvmnt ? "] [UP]" : "] [DOWN]"); - } - - stringstream ticker_data; - // Build up the ticker info - ticker_data << "[TICKER] " << "Symbol:" << symbol << " \tPrice[" << curr_price << "] \t[" - << change << movement; - - message.setData(ticker_data.str()); - // Set TTL value so that message will timeout after a period and be purged from queues - message.getDeliveryProperties().setTtl(ttl_time); - // Asynchronous transfer sends messages as quickly as - // possible without waiting for confirmation. - async(session).messageTransfer(arg::content=message, arg::destination="amq.topic"); - -} - -void Publisher::publish_market(const std::string symbol, unsigned short& curr_price, int i) -{ - Message message; - - // Set the routing key - std::string routing_key = "MRKT." + symbol; - std::cout << "Setting routing key:" << routing_key << std::endl; - message.getDeliveryProperties().setRoutingKey(routing_key); - - // Calculate the market data low/hi change, vol, market cap etc. - if (curr_price < low_[i] || low_[i] == 0) - { - low_[i] = curr_price; - } - else if (curr_price > high_[i] || high_[i] == 9999) - { - high_[i] = curr_price; - } - - volume_[i] += rand() % 1000; // increase the daily volume tracker - int mkt_cap = shares_[i] * curr_price; // calculate new market cap based on current price - - stringstream market_data; - // Build up the ticker info - market_data << "[MARKET] " << "Symbol:" << symbol << "\tVolume: " << volume_[i] - << "\tHi:" << high_[i] << "\tLo:" << low_[i] << "\tMktCap:" - << mkt_cap <<"M\tSEQ[" << seq << "]"; - - message.setData(market_data.str()); - - std::string key; - args.getLVQKey(key); - message.getHeaders().setString(key, symbol); - - // Asynchronous transfer sends messages as quickly as - // possible without waiting for confirmation. - async(session).messageTransfer(arg::content=message, arg::destination="amq.topic"); - seq++; // This sequence number is really just to demonstrate the LVQ nature of the queue. - // You will notice some messages don't show because they are overwritten by last value. - -} - - -int main(int argc, char** argv) { - unsigned int pub_cycles = argc>1 ? atoi(argv[1]) : 100; - unsigned int ttl_time = argc>2 ? atoi(argv[2]) : 4000; - const char* host = argc>3 ? argv[3] : "127.0.0.1"; - int port = argc>4 ? atoi(argv[4]) : 5672; - std::cout <<"Usage: topic_publisher <pub cycles> <TTL-timeout> <host name/IP> <port>" << std::endl; - std::cout <<"\tparameters are optional but must be in this order when used." << std::endl; - - // Set up the stocks symbols and their prices - std::string symbol[6]; - unsigned short price[6]; - symbol[0] = "NYSE.RHT"; // Red Hat - symbol[1] = "NYSE.IBM"; // IBM Corp. - symbol[2] = "NASDAQ.MSFT"; // Microsoft - symbol[3] = "NASDAQ.CSCO"; // Cisco Systems - symbol[4] = "NASDAQ.YHOO"; // Yahoo - symbol[5] = "NASDAQ.GOOG"; // Google - - // Rough starting values. - price[0] = rand() % 30 +1; - price[1] = rand() % 120 +1; - price[2] = rand() % 20 +1; - price[3] = rand() % 75 +1; - price[4] = rand() % 10 +1; - price[5] = rand() % 323 +1; - - // Shares oustanding in millions. - unsigned long shares[6] = {190,1340,8890, 5860, 1390, 314}; - - - Connection connection; - try { - connection.open(host, port); - Session session = connection.newSession(); - - Publisher theFeed(session,ttl_time, shares); - - //--------- Main body of program -------------------------------------------- - - // Print the opening values for each symbol - std::cout << std::endl << "Opening values:" << std::endl; - for (int i=0; i < 6; i++) - { - std::cout << symbol[i] << ":" << price[i] << std::endl; - } - - // For the duration of the publishing cycles publish - // ticker and market data for each symbol - for (unsigned int j=0; j<pub_cycles; j++) - { - for (unsigned int i=0; i < 6; i++) - { - // for each symbol publish the ticker and the market data - theFeed.publish_ticker(symbol[i], price[i]); - theFeed.publish_market(symbol[i], price[i], i); - } - } - - - //----------------------------------------------------------------------------- - - connection.close(); - return 0; - } catch(const std::exception& error) { - std::cout << error.what() << std::endl; - } - return 1; -} - - |