summaryrefslogtreecommitdiff
path: root/trunk/qpid/cpp/examples/tradedemo/topic_publisher.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/cpp/examples/tradedemo/topic_publisher.cpp')
-rw-r--r--trunk/qpid/cpp/examples/tradedemo/topic_publisher.cpp271
1 files changed, 0 insertions, 271 deletions
diff --git a/trunk/qpid/cpp/examples/tradedemo/topic_publisher.cpp b/trunk/qpid/cpp/examples/tradedemo/topic_publisher.cpp
deleted file mode 100644
index e22c185bc7..0000000000
--- a/trunk/qpid/cpp/examples/tradedemo/topic_publisher.cpp
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-/**
- * topic_publisher.cpp:
- *
- * This program is one of three programs designed to be used
- * together. These programs implement a publish-subscribe example
- * using the "amq.topic" exchange. In the example multiple listeners
- * can subscribe to the same queues for TTL messages.
- * The TTL messages are all ticker price data. Messages are
- * browsed and therefore shared among the multiple listeners.
- * Messages timeout using TTL so that they don't stay in the queue
- * for too long and fill it up.
- * Local exclusive LVQ are also declared for market data.
- *
- * declare_queues.cpp
- *
- * Declares several non-exclusive queues bound to the amq:topic exchange
- *
- * topic_publisher.cpp
- *
- * Sends messages to the "amq.topic" exchange, using the
- * multipart routing keys for ticker price and market data
- * Ticker messages are sent using a TTL value.
- *
- * topic_listener.cpp (this program)
- *
- * Subscribes to non-exclusive queues in NOT_ACQUIRE mode for
- * ticker price data and declares two LVQs for market data.
- *
- * Multiple listeners can be run at the same time.
- *
- */
-
-
-#include <qpid/client/Connection.h>
-#include <qpid/client/Session.h>
-#include <qpid/client/AsyncSession.h>
-#include <qpid/client/Message.h>
-#include "qpid/client/QueueOptions.h"
-
-
-#include <stdlib.h>
-#include <cstdlib>
-#include <iostream>
-#include <set>
-#include <sstream>
-
-using namespace qpid::client;
-using namespace qpid::framing;
-
-using std::stringstream;
-using std::string;
-
-class Publisher {
- private:
- Session& session;
- int ttl_time;
- unsigned long seq;
-
- unsigned short high_[6];
- unsigned short low_[6];
- unsigned long shares_[6];
- unsigned long volume_[6];
- QueueOptions args;
-
- public:
- Publisher( Session& session,
- const int ttl_time,
- const unsigned long shares[6]);
-
- virtual void publish_ticker(const std::string queue, unsigned short& curr_price);
- virtual void publish_market(const std::string queue, unsigned short& curr_price, int i);
- virtual ~Publisher() { };
-};
-
-Publisher::Publisher(Session& session, int ttl_time, const unsigned long shares[6]) :
- session(session),
- ttl_time(ttl_time),
- seq(0)
-{
- for (unsigned short i=0; i < 6; i++) {
- high_[i] = 0;
- low_[i] = 9999;
- volume_[i] = 0;
- shares_[i] = shares[i];
- }
-}
-
-
-void Publisher::publish_ticker(const std::string symbol, unsigned short& curr_price)
-{
- Message message;
-
- // Set the routing key once, we'll use the same routing key for all
- // messages.
-
- std::string routing_key = "TICKER." + symbol;
- std::cout << "Setting routing key:" << routing_key << std::endl;
- message.getDeliveryProperties().setRoutingKey(routing_key);
-
- // Randomally generate some price flucuations
- bool mvmnt;
- unsigned short change = rand() % 3;
- if (rand() % 2 == 0)
- {
- mvmnt = true;
- curr_price += change;
- }
- else
- {
- mvmnt = false;
- curr_price = (curr_price - change)>0 ? (curr_price - change) : 0;
- }
-
- // Was there change in price or no change ?
- std::string movement;
- if (!change)
- {
- movement = "] [--]";
- } else
- {
- movement = (mvmnt ? "] [UP]" : "] [DOWN]");
- }
-
- stringstream ticker_data;
- // Build up the ticker info
- ticker_data << "[TICKER] " << "Symbol:" << symbol << " \tPrice[" << curr_price << "] \t["
- << change << movement;
-
- message.setData(ticker_data.str());
- // Set TTL value so that message will timeout after a period and be purged from queues
- message.getDeliveryProperties().setTtl(ttl_time);
- // Asynchronous transfer sends messages as quickly as
- // possible without waiting for confirmation.
- async(session).messageTransfer(arg::content=message, arg::destination="amq.topic");
-
-}
-
-void Publisher::publish_market(const std::string symbol, unsigned short& curr_price, int i)
-{
- Message message;
-
- // Set the routing key
- std::string routing_key = "MRKT." + symbol;
- std::cout << "Setting routing key:" << routing_key << std::endl;
- message.getDeliveryProperties().setRoutingKey(routing_key);
-
- // Calculate the market data low/hi change, vol, market cap etc.
- if (curr_price < low_[i] || low_[i] == 0)
- {
- low_[i] = curr_price;
- }
- else if (curr_price > high_[i] || high_[i] == 9999)
- {
- high_[i] = curr_price;
- }
-
- volume_[i] += rand() % 1000; // increase the daily volume tracker
- int mkt_cap = shares_[i] * curr_price; // calculate new market cap based on current price
-
- stringstream market_data;
- // Build up the ticker info
- market_data << "[MARKET] " << "Symbol:" << symbol << "\tVolume: " << volume_[i]
- << "\tHi:" << high_[i] << "\tLo:" << low_[i] << "\tMktCap:"
- << mkt_cap <<"M\tSEQ[" << seq << "]";
-
- message.setData(market_data.str());
-
- std::string key;
- args.getLVQKey(key);
- message.getHeaders().setString(key, symbol);
-
- // Asynchronous transfer sends messages as quickly as
- // possible without waiting for confirmation.
- async(session).messageTransfer(arg::content=message, arg::destination="amq.topic");
- seq++; // This sequence number is really just to demonstrate the LVQ nature of the queue.
- // You will notice some messages don't show because they are overwritten by last value.
-
-}
-
-
-int main(int argc, char** argv) {
- unsigned int pub_cycles = argc>1 ? atoi(argv[1]) : 100;
- unsigned int ttl_time = argc>2 ? atoi(argv[2]) : 4000;
- const char* host = argc>3 ? argv[3] : "127.0.0.1";
- int port = argc>4 ? atoi(argv[4]) : 5672;
- std::cout <<"Usage: topic_publisher <pub cycles> <TTL-timeout> <host name/IP> <port>" << std::endl;
- std::cout <<"\tparameters are optional but must be in this order when used." << std::endl;
-
- // Set up the stocks symbols and their prices
- std::string symbol[6];
- unsigned short price[6];
- symbol[0] = "NYSE.RHT"; // Red Hat
- symbol[1] = "NYSE.IBM"; // IBM Corp.
- symbol[2] = "NASDAQ.MSFT"; // Microsoft
- symbol[3] = "NASDAQ.CSCO"; // Cisco Systems
- symbol[4] = "NASDAQ.YHOO"; // Yahoo
- symbol[5] = "NASDAQ.GOOG"; // Google
-
- // Rough starting values.
- price[0] = rand() % 30 +1;
- price[1] = rand() % 120 +1;
- price[2] = rand() % 20 +1;
- price[3] = rand() % 75 +1;
- price[4] = rand() % 10 +1;
- price[5] = rand() % 323 +1;
-
- // Shares oustanding in millions.
- unsigned long shares[6] = {190,1340,8890, 5860, 1390, 314};
-
-
- Connection connection;
- try {
- connection.open(host, port);
- Session session = connection.newSession();
-
- Publisher theFeed(session,ttl_time, shares);
-
- //--------- Main body of program --------------------------------------------
-
- // Print the opening values for each symbol
- std::cout << std::endl << "Opening values:" << std::endl;
- for (int i=0; i < 6; i++)
- {
- std::cout << symbol[i] << ":" << price[i] << std::endl;
- }
-
- // For the duration of the publishing cycles publish
- // ticker and market data for each symbol
- for (unsigned int j=0; j<pub_cycles; j++)
- {
- for (unsigned int i=0; i < 6; i++)
- {
- // for each symbol publish the ticker and the market data
- theFeed.publish_ticker(symbol[i], price[i]);
- theFeed.publish_market(symbol[i], price[i], i);
- }
- }
-
-
- //-----------------------------------------------------------------------------
-
- connection.close();
- return 0;
- } catch(const std::exception& error) {
- std::cout << error.what() << std::endl;
- }
- return 1;
-}
-
-