diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/include/qpid/messaging/Address.h | 17 | ||||
-rw-r--r-- | cpp/include/qpid/messaging/Connection.h | 27 | ||||
-rw-r--r-- | cpp/src/CMakeLists.txt | 3 | ||||
-rw-r--r-- | cpp/src/Makefile.am | 8 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp | 148 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ConnectionImpl.h | 21 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/AddressParser.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/Connection.cpp | 37 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/ConnectionImpl.h | 2 | ||||
-rw-r--r-- | cpp/src/tests/qpid_recv.cpp | 8 | ||||
-rw-r--r-- | cpp/src/tests/qpid_send.cpp | 14 | ||||
-rw-r--r-- | cpp/src/tests/qpid_stream.cpp | 31 |
13 files changed, 194 insertions, 125 deletions
diff --git a/cpp/include/qpid/messaging/Address.h b/cpp/include/qpid/messaging/Address.h index 745949792d..f3ca30bcd4 100644 --- a/cpp/include/qpid/messaging/Address.h +++ b/cpp/include/qpid/messaging/Address.h @@ -79,6 +79,11 @@ class AddressImpl; * nide when a sender or receiver is cancelled. Can be one of <i>always</i>, * <i>never</i>, <i>sender</i> or <i>receiver</i>.</td></tr> * + * <tr valign=top><td>reliability</td><td>indicates the level of + * reliability expected. Can be one of unreliable, at-most-once, + * at-least-once or exactly-once (the latter is not yet correctly + * supported).</td></tr> + * * <tr valign=top><td>node-properties</td><td>A nested map of properties of the addressed * entity or 'node'. These can be used when automatically creating it, * or to assert certain properties. @@ -109,16 +114,14 @@ class AddressImpl; * receiver does not want to receiver messages published to the topic * that originate from a sender on the same connection</td></tr> * - * <tr valign=top><td>browse</td><td>(only relevant for queues) specifies that the receiver - * does not wish to consume the messages, but merely browse them</td></tr> + * <tr valign=top><td>mode</td><td>(only relevant for queues) + * indicates whether the subscribe should consume (the default) or + * merely browse the messages. Valid values are 'consume' and + * 'browse'</td></tr> * * <tr valign=top><td>durable</td><td>(only relevant for topics at present) specifies that a * durable subscription is required</td></tr> * - * <tr valign=top><td>reliability</td><td>indicates the level of reliability that the receiver - * expects. Can be one of unreliable, at-most-once, at-least-once or - * exactly-once (the latter is not yet correctly supported).</td></tr> - * * <tr valign=top><td>filter</td><td>(only relevant for topics at present) allows bindings to * be created for the queue that match the given criteria (or list of * criteria).</td></tr> @@ -133,7 +136,7 @@ class AddressImpl; * <li>exclusive, which requests an exclusive subscription and * is only relevant for queues</li> * - * <li>x-queue-arguments, which ais only relevant for topics and + * <li>x-queue-arguments, which is only relevant for topics and * allows arguments to the queue-declare for the subscription * queue to be specified</li> * </ul> diff --git a/cpp/include/qpid/messaging/Connection.h b/cpp/include/qpid/messaging/Connection.h index 36392da0b2..e2d1cc2ed4 100644 --- a/cpp/include/qpid/messaging/Connection.h +++ b/cpp/include/qpid/messaging/Connection.h @@ -58,29 +58,32 @@ class Connection : public qpid::client::Handle<ConnectionImpl> * sasl-mechanism * sasl-min-ssf * sasl-max-ssf + * protocol + * urls * - * (note also bounds, locale, max-channels and max-framesize, but not sure whether those should be docuemented here) + * (note also bounds, locale, max-channels and max-framesize, but + * not sure whether those should be documented here) * - * Retry behaviour can be controlled through the following options: - * - * reconnection-timeout - determines how long it will try to - * reconnect for -1 means forever, 0 - * means don't try to reconnect - * min-retry-interval - * max-retry-interval + * Reconnect behaviour can be controlled through the following options: * - * The retry-interval is the time that the client waits for - * after a failed attempt to reconnect before retrying. It + * reconnect: true/false (enables/disables reconnect entirely) + * reconnect-timeout: number of seconds (give up and report failure after specified time) + * reconnect-limit: n (give up and report failure after specified number of attempts) + * reconnect-interval-min: number of seconds (initial delay between failed reconnection attempts) + * reconnect-interval-max: number of seconds (maximum delay between failed reconnection attempts) + * reconnect-interval: shorthand for setting the same reconnect_interval_min/max + * + * The reconnect-interval is the time that the client waits + * for after a failed attempt to reconnect before retrying. It * starts at the value of the min-retry-interval and is * doubled every failure until the value of max-retry-interval * is reached. - * - * */ QPID_CLIENT_EXTERN Connection(const Variant::Map& options = Variant::Map()); QPID_CLIENT_EXTERN Connection(const std::string& options); QPID_CLIENT_EXTERN ~Connection(); QPID_CLIENT_EXTERN Connection& operator=(const Connection&); + QPID_CLIENT_EXTERN void setOption(const std::string& name, const Variant& value); QPID_CLIENT_EXTERN void open(const std::string& url); QPID_CLIENT_EXTERN void close(); QPID_CLIENT_EXTERN Session newSession(bool transactional, const std::string& name = std::string()); diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 7022f4d343..6323a43e08 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -667,6 +667,7 @@ set (qpidclient_SOURCES qpid/client/amqp0_10/CodecsInternal.h qpid/client/amqp0_10/ConnectionImpl.h qpid/client/amqp0_10/ConnectionImpl.cpp + qpid/client/amqp0_10/FailoverUpdates.cpp qpid/client/amqp0_10/IncomingMessages.h qpid/client/amqp0_10/IncomingMessages.cpp qpid/client/amqp0_10/MessageSink.h @@ -679,6 +680,8 @@ set (qpidclient_SOURCES qpid/client/amqp0_10/SessionImpl.cpp qpid/client/amqp0_10/SenderImpl.h qpid/client/amqp0_10/SenderImpl.cpp + qpid/client/amqp0_10/SimpleUrlParser.h + qpid/client/amqp0_10/SimpleUrlParser.cpp ) add_library (qpidclient SHARED ${qpidclient_SOURCES}) diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 281814a828..fc2be3d8d5 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -729,6 +729,7 @@ libqpidclient_la_SOURCES = \ qpid/client/amqp0_10/CodecsInternal.h \ qpid/client/amqp0_10/ConnectionImpl.h \ qpid/client/amqp0_10/ConnectionImpl.cpp \ + qpid/client/amqp0_10/FailoverUpdates.cpp \ qpid/client/amqp0_10/IncomingMessages.h \ qpid/client/amqp0_10/IncomingMessages.cpp \ qpid/client/amqp0_10/MessageSink.h \ @@ -740,7 +741,9 @@ libqpidclient_la_SOURCES = \ qpid/client/amqp0_10/SessionImpl.h \ qpid/client/amqp0_10/SessionImpl.cpp \ qpid/client/amqp0_10/SenderImpl.h \ - qpid/client/amqp0_10/SenderImpl.cpp + qpid/client/amqp0_10/SenderImpl.cpp \ + qpid/client/amqp0_10/SimpleUrlParser.h \ + qpid/client/amqp0_10/SimpleUrlParser.cpp # NOTE: only public header files (which should be in ../include) # should go in this list. Private headers should go in the SOURCES @@ -826,7 +829,8 @@ nobase_include_HEADERS += \ ../include/qpid/messaging/Session.h \ ../include/qpid/messaging/Uuid.h \ ../include/qpid/messaging/Variant.h \ - ../include/qpid/client/amqp0_10/Codecs.h + ../include/qpid/client/amqp0_10/Codecs.h \ + ../include/qpid/client/amqp0_10/FailoverUpdates.h # Force build of qpidd during dist phase so help2man will work. dist-hook: $(BUILT_SOURCES) diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index 4242850192..9c1c4e0735 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -1,4 +1,4 @@ -/* + /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -20,6 +20,7 @@ */ #include "ConnectionImpl.h" #include "SessionImpl.h" +#include "SimpleUrlParser.h" #include "qpid/messaging/Session.h" #include "qpid/client/PrivateImplRef.h" #include "qpid/framing/Uuid.h" @@ -33,13 +34,42 @@ namespace amqp0_10 { using qpid::messaging::Variant; using qpid::framing::Uuid; -using namespace qpid::sys; -template <class T> void setIfFound(const Variant::Map& map, const std::string& key, T& value) +void convert(const Variant::List& from, std::vector<std::string>& to) +{ + for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) { + to.push_back(i->asString()); + } +} + +template <class T> bool setIfFound(const Variant::Map& map, const std::string& key, T& value) { Variant::Map::const_iterator i = map.find(key); if (i != map.end()) { value = (T) i->second; + QPID_LOG(debug, "option " << key << " specified as " << i->second); + return true; + } else { + QPID_LOG(debug, "option " << key << " not specified"); + return false; + } +} + +template <> +bool setIfFound< std::vector<std::string> >(const Variant::Map& map, + const std::string& key, + std::vector<std::string>& value) +{ + Variant::Map::const_iterator i = map.find(key); + if (i != map.end()) { + if (i->second.getType() == qpid::messaging::VAR_LIST) { + convert(i->second.asList(), value); + } else { + value.push_back(i->second.asString()); + } + return true; + } else { + return false; } } @@ -59,24 +89,47 @@ void convert(const Variant::Map& from, ConnectionSettings& to) setIfFound(from, "max-channels", to.maxChannels); setIfFound(from, "max-frame-size", to.maxFrameSize); setIfFound(from, "bounds", to.bounds); + + setIfFound(from, "protocol", to.protocol); } ConnectionImpl::ConnectionImpl(const Variant::Map& options) : - reconnectionEnabled(true), timeout(-1), - minRetryInterval(1), maxRetryInterval(30) + reconnect(true), timeout(-1), limit(-1), + minReconnectInterval(3), maxReconnectInterval(60), + retries(0) +{ + QPID_LOG(debug, "Created connection with " << options); + setOptions(options); +} + +void ConnectionImpl::setOptions(const Variant::Map& options) { - QPID_LOG(debug, "Opening connection to " << url << " with " << options); convert(options, settings); - setIfFound(options, "reconnection-enabled", reconnectionEnabled); - setIfFound(options, "reconnection-timeout", timeout); - setIfFound(options, "min-retry-interval", minRetryInterval); - setIfFound(options, "max-retry-interval", maxRetryInterval); + setIfFound(options, "reconnect", reconnect); + setIfFound(options, "reconnect-timeout", timeout); + setIfFound(options, "reconnect-limit", limit); + int64_t reconnectInterval; + if (setIfFound(options, "reconnect-interval", reconnectInterval)) { + minReconnectInterval = maxReconnectInterval = reconnectInterval; + } else { + setIfFound(options, "min-reconnect-interval", minReconnectInterval); + setIfFound(options, "max-reconnect-interval", maxReconnectInterval); + } + setIfFound(options, "urls", urls); +} + +void ConnectionImpl::setOption(const std::string& name, const Variant& value) +{ + Variant::Map options; + options[name] = value; + setOptions(options); + QPID_LOG(debug, "Set " << name << " to " << value); } void ConnectionImpl::open(const std::string& u) { - url = u; - connection.open(url, settings); + urls.push_back(u); + connect(); } void ConnectionImpl::close() @@ -134,64 +187,65 @@ qpid::messaging::Session ConnectionImpl::newSession(bool transactional, const st try { getImplPtr(impl)->setSession(connection.newSession(name)); } catch (const TransportFailure&) { - reconnect(); + connect(); } return impl; } -void ConnectionImpl::reconnect() +void ConnectionImpl::connect() { - AbsTime start = now(); - ScopedLock<Semaphore> l(semaphore); + qpid::sys::AbsTime start = qpid::sys::now(); + qpid::sys::ScopedLock<qpid::sys::Semaphore> l(semaphore); if (!connection.isOpen()) connect(start); } -bool expired(const AbsTime& start, int timeout) +bool expired(const qpid::sys::AbsTime& start, int64_t timeout) { if (timeout == 0) return true; if (timeout < 0) return false; - Duration used(start, now()); - Duration allowed = timeout * TIME_SEC; - return allowed > used; + qpid::sys::Duration used(start, qpid::sys::now()); + qpid::sys::Duration allowed = timeout * qpid::sys::TIME_SEC; + return allowed < used; } -void ConnectionImpl::connect(const AbsTime& started) +void ConnectionImpl::connect(const qpid::sys::AbsTime& started) { - for (int i = minRetryInterval; !tryConnect(); i = std::min(i * 2, maxRetryInterval)) { - if (expired(started, timeout)) throw TransportFailure(); + for (int64_t i = minReconnectInterval; !tryConnect(); i = std::min(i * 2, maxReconnectInterval)) { + if (!reconnect) throw TransportFailure("Failed to connect (reconnect disabled)"); + if (limit >= 0 && retries++ >= limit) throw TransportFailure("Failed to connect within reconnect limit"); + if (expired(started, timeout)) throw TransportFailure("Failed to connect within reconnect timeout"); else qpid::sys::sleep(i); } + retries = 0; } bool ConnectionImpl::tryConnect() { - if (tryConnect(url) || - (failoverListener.get() && tryConnect(failoverListener->getKnownBrokers()))) - { - return resetSessions(); - } else { - return false; - } + if (tryConnect(urls)) return resetSessions(); + else return false; } -bool ConnectionImpl::tryConnect(const Url& u) +bool ConnectionImpl::tryConnect(const std::vector<std::string>& urls) { - try { - QPID_LOG(info, "Trying to connect to " << url << "..."); - connection.open(u, settings); - failoverListener.reset(new FailoverListener(connection)); - return true; - } catch (const Exception& e) { - //TODO: need to fix timeout on open so that it throws TransportFailure - QPID_LOG(info, "Failed to connect to " << u << ": " << e.what()); - } - return false; -} - -bool ConnectionImpl::tryConnect(const std::vector<Url>& urls) -{ - for (std::vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) { - if (tryConnect(*i)) return true; + for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) { + try { + QPID_LOG(info, "Trying to connect to " << *i << "..."); + //TODO: when url support is more complete can avoid this test here + if (i->find("amqp:") == 0) { + Url url(*i); + connection.open(url, settings); + } else { + SimpleUrlParser::parse(*i, settings); + connection.open(settings); + } + QPID_LOG(info, "Connected to " << *i); + return true; + } catch (const Exception& e) { + //TODO: need to fix timeout on + //qpid::client::Connection::open() so that it throws + //TransportFailure rather than a ConnectionException + QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what()); + } } return false; } diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h index d9d0d1e065..37a78b2373 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h @@ -25,7 +25,6 @@ #include "qpid/messaging/Variant.h" #include "qpid/Url.h" #include "qpid/client/Connection.h" -#include "qpid/client/FailoverListener.h" #include "qpid/client/ConnectionSettings.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Semaphore.h" @@ -46,7 +45,8 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl qpid::messaging::Session newSession(bool transactional, const std::string& name); qpid::messaging::Session getSession(const std::string& name) const; void closed(SessionImpl&); - void reconnect(); + void connect(); + void setOption(const std::string& name, const qpid::messaging::Variant& value); private: typedef std::map<std::string, qpid::messaging::Session> Sessions; @@ -54,18 +54,19 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl qpid::sys::Semaphore semaphore;//used to coordinate reconnection Sessions sessions; qpid::client::Connection connection; - std::auto_ptr<FailoverListener> failoverListener; - qpid::Url url; + std::vector<std::string> urls; qpid::client::ConnectionSettings settings; - bool reconnectionEnabled; - int timeout; - int minRetryInterval; - int maxRetryInterval; + bool reconnect; + int64_t timeout; + int32_t limit; + int64_t minReconnectInterval; + int64_t maxReconnectInterval; + int32_t retries; + void setOptions(const qpid::messaging::Variant::Map& options); void connect(const qpid::sys::AbsTime& started); bool tryConnect(); - bool tryConnect(const std::vector<Url>& urls); - bool tryConnect(const Url&); + bool tryConnect(const std::vector<std::string>& urls); bool resetSessions(); }; }}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 9823dba6e1..d9fd3a5da1 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -431,7 +431,7 @@ void SessionImpl::senderCancelled(const std::string& name) void SessionImpl::reconnect() { - connection.reconnect(); + connection.connect(); } qpid::messaging::Connection SessionImpl::getConnection() const diff --git a/cpp/src/qpid/messaging/AddressParser.cpp b/cpp/src/qpid/messaging/AddressParser.cpp index 265b5fe195..4b29f126f2 100644 --- a/cpp/src/qpid/messaging/AddressParser.cpp +++ b/cpp/src/qpid/messaging/AddressParser.cpp @@ -198,6 +198,7 @@ bool AddressParser::readSimpleValue(Variant& value) std::string s; if (readWord(s)) { value = s; + try { value = value.asInt32(); return true; } catch (const InvalidConversion&) {} try { value = value.asInt64(); return true; } catch (const InvalidConversion&) {} try { value = value.asDouble(); return true; } catch (const InvalidConversion&) {} return true; diff --git a/cpp/src/qpid/messaging/Connection.cpp b/cpp/src/qpid/messaging/Connection.cpp index 230c9d5dbf..4873899787 100644 --- a/cpp/src/qpid/messaging/Connection.cpp +++ b/cpp/src/qpid/messaging/Connection.cpp @@ -67,40 +67,11 @@ Session Connection::newSession(bool transactional, const std::string& name) return impl->newSession(transactional, name); } Session Connection::getSession(const std::string& name) const { return impl->getSession(name); } - -InvalidOptionString::InvalidOptionString(const std::string& msg) : Exception(msg) {} - -void parseKeyValuePair(const std::string& in, Variant::Map& out) -{ - std::string::size_type i = in.find('='); - if (i == std::string::npos || i == in.size() || in.find('=', i+1) != std::string::npos) { - throw InvalidOptionString(QPID_MSG("Cannot parse name-value pair from " << in)); - } else { - out[in.substr(0, i)] = in.substr(i+1); - } -} - -void parseOptionString(const std::string& in, Variant::Map& out) -{ - std::string::size_type start = 0; - std::string::size_type i = in.find('&'); - while (i != std::string::npos) { - parseKeyValuePair(in.substr(start, i-start), out); - if (i < in.size()) { - start = i+1; - i = in.find('&', start); - } else { - i = std::string::npos; - } - } - parseKeyValuePair(in.substr(start), out); +void Connection::setOption(const std::string& name, const Variant& value) +{ + impl->setOption(name, value); } -Variant::Map parseOptionString(const std::string& in) -{ - Variant::Map map; - parseOptionString(in, map); - return map; -} +InvalidOptionString::InvalidOptionString(const std::string& msg) : Exception(msg) {} }} // namespace qpid::messaging diff --git a/cpp/src/qpid/messaging/ConnectionImpl.h b/cpp/src/qpid/messaging/ConnectionImpl.h index 589c9fbe57..33ebcda950 100644 --- a/cpp/src/qpid/messaging/ConnectionImpl.h +++ b/cpp/src/qpid/messaging/ConnectionImpl.h @@ -31,6 +31,7 @@ namespace client { namespace messaging { class Session; +class Variant; class ConnectionImpl : public virtual qpid::RefCounted { @@ -40,6 +41,7 @@ class ConnectionImpl : public virtual qpid::RefCounted virtual void close() = 0; virtual Session newSession(bool transactional, const std::string& name) = 0; virtual Session getSession(const std::string& name) const = 0; + virtual void setOption(const std::string& name, const Variant& value) = 0; private: }; }} // namespace qpid::messaging diff --git a/cpp/src/tests/qpid_recv.cpp b/cpp/src/tests/qpid_recv.cpp index 9e4e202053..e4cc6a7ac8 100644 --- a/cpp/src/tests/qpid_recv.cpp +++ b/cpp/src/tests/qpid_recv.cpp @@ -27,12 +27,14 @@ #include <qpid/Options.h> #include <qpid/log/Logger.h> #include <qpid/log/Options.h> +#include <qpid/client/amqp0_10/FailoverUpdates.h> #include "TestOptions.h" #include <iostream> - +#include <memory> using namespace qpid::messaging; +using qpid::client::amqp0_10::FailoverUpdates; using namespace std; @@ -54,6 +56,7 @@ struct Options : public qpid::Options uint tx; uint rollbackFrequency; bool printHeaders; + bool failoverUpdates; qpid::log::Options log; Options(const std::string& argv0=std::string()) @@ -69,6 +72,7 @@ struct Options : public qpid::Options tx(0), rollbackFrequency(0), printHeaders(false), + failoverUpdates(false), log(argv0) { addOptions() @@ -84,6 +88,7 @@ struct Options : public qpid::Options ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)") ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)") ("print-headers", qpid::optValue(printHeaders), "If specified print out all message headers as well as content") + ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover") ("help", qpid::optValue(help), "print this usage statement"); add(log); } @@ -146,6 +151,7 @@ int main(int argc, char ** argv) try { Connection connection(opts.connectionOptions); connection.open(opts.url); + std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); Session session = connection.newSession(opts.tx > 0); Receiver receiver = session.createReceiver(opts.address); receiver.setCapacity(opts.capacity); diff --git a/cpp/src/tests/qpid_send.cpp b/cpp/src/tests/qpid_send.cpp index 57c348ab9c..50e6c4371a 100644 --- a/cpp/src/tests/qpid_send.cpp +++ b/cpp/src/tests/qpid_send.cpp @@ -25,16 +25,15 @@ #include <qpid/messaging/Message.h> #include <qpid/messaging/Sender.h> #include <qpid/messaging/Session.h> +#include <qpid/client/amqp0_10/FailoverUpdates.h> #include "TestOptions.h" #include <fstream> #include <iostream> +#include <memory> using namespace qpid::messaging; -using qpid::framing::Uuid; -using qpid::sys::AbsTime; -using qpid::sys::now; -using qpid::sys::TIME_INFINITE; +using qpid::client::amqp0_10::FailoverUpdates; typedef std::vector<std::string> string_vector; @@ -49,7 +48,6 @@ struct Options : public qpid::Options std::string url; std::string connectionOptions; std::string address; - int64_t timeout; uint count; std::string id; std::string replyto; @@ -64,13 +62,13 @@ struct Options : public qpid::Options uint tx; uint rollbackFrequency; uint capacity; + bool failoverUpdates; qpid::log::Options log; Options(const std::string& argv0=std::string()) : qpid::Options("Options"), help(false), url("amqp:tcp:127.0.0.1"), - timeout(TIME_INFINITE), count(1), sendEos(0), durable(false), @@ -78,13 +76,13 @@ struct Options : public qpid::Options tx(0), rollbackFrequency(0), capacity(0), + failoverUpdates(false), log(argv0) { addOptions() ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") ("address,a", qpid::optValue(address, "ADDRESS"), "address to drain from") ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection") - ("timeout,t", qpid::optValue(timeout, "TIMEOUT"), "exit after the specified time") ("count,c", qpid::optValue(count, "COUNT"), "stop after count messages have been sent, zero disables") ("id,i", qpid::optValue(id, "ID"), "use the supplied id instead of generating one") ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address") @@ -99,6 +97,7 @@ struct Options : public qpid::Options ("capacity", qpid::optValue(capacity, "N"), "size of the senders outgoing message queue") ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)") ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)") + ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover") ("help", qpid::optValue(help), "print this usage statement"); add(log); } @@ -185,6 +184,7 @@ int main(int argc, char ** argv) try { Connection connection(opts.connectionOptions); connection.open(opts.url); + std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); Session session = connection.newSession(opts.tx > 0); Sender sender = session.createSender(opts.address); if (opts.capacity) sender.setCapacity(opts.capacity); diff --git a/cpp/src/tests/qpid_stream.cpp b/cpp/src/tests/qpid_stream.cpp index ca21fa248b..ef0aea52e4 100644 --- a/cpp/src/tests/qpid_stream.cpp +++ b/cpp/src/tests/qpid_stream.cpp @@ -40,16 +40,33 @@ struct Args : public qpid::Options { std::string url; std::string address; + uint size; uint rate; bool durable; - - Args() : url("amqp:tcp:127.0.0.1:5672"), address("test-queue"), rate(1000), durable(false) + uint receiverCapacity; + uint senderCapacity; + uint ackFrequency; + + Args() : + url("amqp:tcp:127.0.0.1:5672"), + address("test-queue"), + size(512), + rate(1000), + durable(false), + receiverCapacity(0), + senderCapacity(0), + ackFrequency(1) { addOptions() ("url", qpid::optValue(url, "URL"), "Url to connect to.") ("address", qpid::optValue(address, "ADDRESS"), "Address to stream messages through.") + ("size", qpid::optValue(size, "bytes"), "Message size in bytes (content only, not headers).") ("rate", qpid::optValue(rate, "msgs/sec"), "Rate at which to stream messages.") - ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable."); + ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable.") + ("sender-capacity", qpid::optValue(senderCapacity, "N"), "Credit window (0 implies infinite window)") + ("receiver-capacity", qpid::optValue(receiverCapacity, "N"), "Credit window (0 implies infinite window)") + ("ack-frequency", qpid::optValue(ackFrequency, "N"), + "Ack frequency (0 implies none of the messages will get accepted)"); } }; @@ -93,7 +110,8 @@ struct Publish : Client void doWork(Session& session) { Sender sender = session.createSender(opts.address); - Message msg; + if (opts.senderCapacity) sender.setCapacity(opts.senderCapacity); + Message msg(std::string(opts.size, 'X')); uint64_t interval = qpid::sys::TIME_SEC / opts.rate; uint64_t sent = 0, missedRate = 0; qpid::sys::AbsTime start = qpid::sys::now(); @@ -123,9 +141,12 @@ struct Consume : Client double maxLatency = 0; double totalLatency = 0; Receiver receiver = session.createReceiver(opts.address); + if (opts.receiverCapacity) receiver.setCapacity(opts.receiverCapacity); while (receiver.fetch(msg)) { - session.acknowledge();//TODO: add batching option ++received; + if (opts.ackFrequency && (received % opts.ackFrequency == 0)) { + session.acknowledge(); + } //calculate latency uint64_t receivedAt = timestamp(qpid::sys::now()); uint64_t sentAt = msg.getHeaders()[TIMESTAMP].asUint64(); |