summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/include/qpid/messaging/Address.h17
-rw-r--r--cpp/include/qpid/messaging/Connection.h27
-rw-r--r--cpp/src/CMakeLists.txt3
-rw-r--r--cpp/src/Makefile.am8
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp148
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.h21
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.cpp2
-rw-r--r--cpp/src/qpid/messaging/AddressParser.cpp1
-rw-r--r--cpp/src/qpid/messaging/Connection.cpp37
-rw-r--r--cpp/src/qpid/messaging/ConnectionImpl.h2
-rw-r--r--cpp/src/tests/qpid_recv.cpp8
-rw-r--r--cpp/src/tests/qpid_send.cpp14
-rw-r--r--cpp/src/tests/qpid_stream.cpp31
13 files changed, 194 insertions, 125 deletions
diff --git a/cpp/include/qpid/messaging/Address.h b/cpp/include/qpid/messaging/Address.h
index 745949792d..f3ca30bcd4 100644
--- a/cpp/include/qpid/messaging/Address.h
+++ b/cpp/include/qpid/messaging/Address.h
@@ -79,6 +79,11 @@ class AddressImpl;
* nide when a sender or receiver is cancelled. Can be one of <i>always</i>,
* <i>never</i>, <i>sender</i> or <i>receiver</i>.</td></tr>
*
+ * <tr valign=top><td>reliability</td><td>indicates the level of
+ * reliability expected. Can be one of unreliable, at-most-once,
+ * at-least-once or exactly-once (the latter is not yet correctly
+ * supported).</td></tr>
+ *
* <tr valign=top><td>node-properties</td><td>A nested map of properties of the addressed
* entity or 'node'. These can be used when automatically creating it,
* or to assert certain properties.
@@ -109,16 +114,14 @@ class AddressImpl;
* receiver does not want to receiver messages published to the topic
* that originate from a sender on the same connection</td></tr>
*
- * <tr valign=top><td>browse</td><td>(only relevant for queues) specifies that the receiver
- * does not wish to consume the messages, but merely browse them</td></tr>
+ * <tr valign=top><td>mode</td><td>(only relevant for queues)
+ * indicates whether the subscribe should consume (the default) or
+ * merely browse the messages. Valid values are 'consume' and
+ * 'browse'</td></tr>
*
* <tr valign=top><td>durable</td><td>(only relevant for topics at present) specifies that a
* durable subscription is required</td></tr>
*
- * <tr valign=top><td>reliability</td><td>indicates the level of reliability that the receiver
- * expects. Can be one of unreliable, at-most-once, at-least-once or
- * exactly-once (the latter is not yet correctly supported).</td></tr>
- *
* <tr valign=top><td>filter</td><td>(only relevant for topics at present) allows bindings to
* be created for the queue that match the given criteria (or list of
* criteria).</td></tr>
@@ -133,7 +136,7 @@ class AddressImpl;
* <li>exclusive, which requests an exclusive subscription and
* is only relevant for queues</li>
*
- * <li>x-queue-arguments, which ais only relevant for topics and
+ * <li>x-queue-arguments, which is only relevant for topics and
* allows arguments to the queue-declare for the subscription
* queue to be specified</li>
* </ul>
diff --git a/cpp/include/qpid/messaging/Connection.h b/cpp/include/qpid/messaging/Connection.h
index 36392da0b2..e2d1cc2ed4 100644
--- a/cpp/include/qpid/messaging/Connection.h
+++ b/cpp/include/qpid/messaging/Connection.h
@@ -58,29 +58,32 @@ class Connection : public qpid::client::Handle<ConnectionImpl>
* sasl-mechanism
* sasl-min-ssf
* sasl-max-ssf
+ * protocol
+ * urls
*
- * (note also bounds, locale, max-channels and max-framesize, but not sure whether those should be docuemented here)
+ * (note also bounds, locale, max-channels and max-framesize, but
+ * not sure whether those should be documented here)
*
- * Retry behaviour can be controlled through the following options:
- *
- * reconnection-timeout - determines how long it will try to
- * reconnect for -1 means forever, 0
- * means don't try to reconnect
- * min-retry-interval
- * max-retry-interval
+ * Reconnect behaviour can be controlled through the following options:
*
- * The retry-interval is the time that the client waits for
- * after a failed attempt to reconnect before retrying. It
+ * reconnect: true/false (enables/disables reconnect entirely)
+ * reconnect-timeout: number of seconds (give up and report failure after specified time)
+ * reconnect-limit: n (give up and report failure after specified number of attempts)
+ * reconnect-interval-min: number of seconds (initial delay between failed reconnection attempts)
+ * reconnect-interval-max: number of seconds (maximum delay between failed reconnection attempts)
+ * reconnect-interval: shorthand for setting the same reconnect_interval_min/max
+ *
+ * The reconnect-interval is the time that the client waits
+ * for after a failed attempt to reconnect before retrying. It
* starts at the value of the min-retry-interval and is
* doubled every failure until the value of max-retry-interval
* is reached.
- *
- *
*/
QPID_CLIENT_EXTERN Connection(const Variant::Map& options = Variant::Map());
QPID_CLIENT_EXTERN Connection(const std::string& options);
QPID_CLIENT_EXTERN ~Connection();
QPID_CLIENT_EXTERN Connection& operator=(const Connection&);
+ QPID_CLIENT_EXTERN void setOption(const std::string& name, const Variant& value);
QPID_CLIENT_EXTERN void open(const std::string& url);
QPID_CLIENT_EXTERN void close();
QPID_CLIENT_EXTERN Session newSession(bool transactional, const std::string& name = std::string());
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index 7022f4d343..6323a43e08 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -667,6 +667,7 @@ set (qpidclient_SOURCES
qpid/client/amqp0_10/CodecsInternal.h
qpid/client/amqp0_10/ConnectionImpl.h
qpid/client/amqp0_10/ConnectionImpl.cpp
+ qpid/client/amqp0_10/FailoverUpdates.cpp
qpid/client/amqp0_10/IncomingMessages.h
qpid/client/amqp0_10/IncomingMessages.cpp
qpid/client/amqp0_10/MessageSink.h
@@ -679,6 +680,8 @@ set (qpidclient_SOURCES
qpid/client/amqp0_10/SessionImpl.cpp
qpid/client/amqp0_10/SenderImpl.h
qpid/client/amqp0_10/SenderImpl.cpp
+ qpid/client/amqp0_10/SimpleUrlParser.h
+ qpid/client/amqp0_10/SimpleUrlParser.cpp
)
add_library (qpidclient SHARED ${qpidclient_SOURCES})
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 281814a828..fc2be3d8d5 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -729,6 +729,7 @@ libqpidclient_la_SOURCES = \
qpid/client/amqp0_10/CodecsInternal.h \
qpid/client/amqp0_10/ConnectionImpl.h \
qpid/client/amqp0_10/ConnectionImpl.cpp \
+ qpid/client/amqp0_10/FailoverUpdates.cpp \
qpid/client/amqp0_10/IncomingMessages.h \
qpid/client/amqp0_10/IncomingMessages.cpp \
qpid/client/amqp0_10/MessageSink.h \
@@ -740,7 +741,9 @@ libqpidclient_la_SOURCES = \
qpid/client/amqp0_10/SessionImpl.h \
qpid/client/amqp0_10/SessionImpl.cpp \
qpid/client/amqp0_10/SenderImpl.h \
- qpid/client/amqp0_10/SenderImpl.cpp
+ qpid/client/amqp0_10/SenderImpl.cpp \
+ qpid/client/amqp0_10/SimpleUrlParser.h \
+ qpid/client/amqp0_10/SimpleUrlParser.cpp
# NOTE: only public header files (which should be in ../include)
# should go in this list. Private headers should go in the SOURCES
@@ -826,7 +829,8 @@ nobase_include_HEADERS += \
../include/qpid/messaging/Session.h \
../include/qpid/messaging/Uuid.h \
../include/qpid/messaging/Variant.h \
- ../include/qpid/client/amqp0_10/Codecs.h
+ ../include/qpid/client/amqp0_10/Codecs.h \
+ ../include/qpid/client/amqp0_10/FailoverUpdates.h
# Force build of qpidd during dist phase so help2man will work.
dist-hook: $(BUILT_SOURCES)
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index 4242850192..9c1c4e0735 100644
--- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -1,4 +1,4 @@
-/*
+ /*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -20,6 +20,7 @@
*/
#include "ConnectionImpl.h"
#include "SessionImpl.h"
+#include "SimpleUrlParser.h"
#include "qpid/messaging/Session.h"
#include "qpid/client/PrivateImplRef.h"
#include "qpid/framing/Uuid.h"
@@ -33,13 +34,42 @@ namespace amqp0_10 {
using qpid::messaging::Variant;
using qpid::framing::Uuid;
-using namespace qpid::sys;
-template <class T> void setIfFound(const Variant::Map& map, const std::string& key, T& value)
+void convert(const Variant::List& from, std::vector<std::string>& to)
+{
+ for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) {
+ to.push_back(i->asString());
+ }
+}
+
+template <class T> bool setIfFound(const Variant::Map& map, const std::string& key, T& value)
{
Variant::Map::const_iterator i = map.find(key);
if (i != map.end()) {
value = (T) i->second;
+ QPID_LOG(debug, "option " << key << " specified as " << i->second);
+ return true;
+ } else {
+ QPID_LOG(debug, "option " << key << " not specified");
+ return false;
+ }
+}
+
+template <>
+bool setIfFound< std::vector<std::string> >(const Variant::Map& map,
+ const std::string& key,
+ std::vector<std::string>& value)
+{
+ Variant::Map::const_iterator i = map.find(key);
+ if (i != map.end()) {
+ if (i->second.getType() == qpid::messaging::VAR_LIST) {
+ convert(i->second.asList(), value);
+ } else {
+ value.push_back(i->second.asString());
+ }
+ return true;
+ } else {
+ return false;
}
}
@@ -59,24 +89,47 @@ void convert(const Variant::Map& from, ConnectionSettings& to)
setIfFound(from, "max-channels", to.maxChannels);
setIfFound(from, "max-frame-size", to.maxFrameSize);
setIfFound(from, "bounds", to.bounds);
+
+ setIfFound(from, "protocol", to.protocol);
}
ConnectionImpl::ConnectionImpl(const Variant::Map& options) :
- reconnectionEnabled(true), timeout(-1),
- minRetryInterval(1), maxRetryInterval(30)
+ reconnect(true), timeout(-1), limit(-1),
+ minReconnectInterval(3), maxReconnectInterval(60),
+ retries(0)
+{
+ QPID_LOG(debug, "Created connection with " << options);
+ setOptions(options);
+}
+
+void ConnectionImpl::setOptions(const Variant::Map& options)
{
- QPID_LOG(debug, "Opening connection to " << url << " with " << options);
convert(options, settings);
- setIfFound(options, "reconnection-enabled", reconnectionEnabled);
- setIfFound(options, "reconnection-timeout", timeout);
- setIfFound(options, "min-retry-interval", minRetryInterval);
- setIfFound(options, "max-retry-interval", maxRetryInterval);
+ setIfFound(options, "reconnect", reconnect);
+ setIfFound(options, "reconnect-timeout", timeout);
+ setIfFound(options, "reconnect-limit", limit);
+ int64_t reconnectInterval;
+ if (setIfFound(options, "reconnect-interval", reconnectInterval)) {
+ minReconnectInterval = maxReconnectInterval = reconnectInterval;
+ } else {
+ setIfFound(options, "min-reconnect-interval", minReconnectInterval);
+ setIfFound(options, "max-reconnect-interval", maxReconnectInterval);
+ }
+ setIfFound(options, "urls", urls);
+}
+
+void ConnectionImpl::setOption(const std::string& name, const Variant& value)
+{
+ Variant::Map options;
+ options[name] = value;
+ setOptions(options);
+ QPID_LOG(debug, "Set " << name << " to " << value);
}
void ConnectionImpl::open(const std::string& u)
{
- url = u;
- connection.open(url, settings);
+ urls.push_back(u);
+ connect();
}
void ConnectionImpl::close()
@@ -134,64 +187,65 @@ qpid::messaging::Session ConnectionImpl::newSession(bool transactional, const st
try {
getImplPtr(impl)->setSession(connection.newSession(name));
} catch (const TransportFailure&) {
- reconnect();
+ connect();
}
return impl;
}
-void ConnectionImpl::reconnect()
+void ConnectionImpl::connect()
{
- AbsTime start = now();
- ScopedLock<Semaphore> l(semaphore);
+ qpid::sys::AbsTime start = qpid::sys::now();
+ qpid::sys::ScopedLock<qpid::sys::Semaphore> l(semaphore);
if (!connection.isOpen()) connect(start);
}
-bool expired(const AbsTime& start, int timeout)
+bool expired(const qpid::sys::AbsTime& start, int64_t timeout)
{
if (timeout == 0) return true;
if (timeout < 0) return false;
- Duration used(start, now());
- Duration allowed = timeout * TIME_SEC;
- return allowed > used;
+ qpid::sys::Duration used(start, qpid::sys::now());
+ qpid::sys::Duration allowed = timeout * qpid::sys::TIME_SEC;
+ return allowed < used;
}
-void ConnectionImpl::connect(const AbsTime& started)
+void ConnectionImpl::connect(const qpid::sys::AbsTime& started)
{
- for (int i = minRetryInterval; !tryConnect(); i = std::min(i * 2, maxRetryInterval)) {
- if (expired(started, timeout)) throw TransportFailure();
+ for (int64_t i = minReconnectInterval; !tryConnect(); i = std::min(i * 2, maxReconnectInterval)) {
+ if (!reconnect) throw TransportFailure("Failed to connect (reconnect disabled)");
+ if (limit >= 0 && retries++ >= limit) throw TransportFailure("Failed to connect within reconnect limit");
+ if (expired(started, timeout)) throw TransportFailure("Failed to connect within reconnect timeout");
else qpid::sys::sleep(i);
}
+ retries = 0;
}
bool ConnectionImpl::tryConnect()
{
- if (tryConnect(url) ||
- (failoverListener.get() && tryConnect(failoverListener->getKnownBrokers())))
- {
- return resetSessions();
- } else {
- return false;
- }
+ if (tryConnect(urls)) return resetSessions();
+ else return false;
}
-bool ConnectionImpl::tryConnect(const Url& u)
+bool ConnectionImpl::tryConnect(const std::vector<std::string>& urls)
{
- try {
- QPID_LOG(info, "Trying to connect to " << url << "...");
- connection.open(u, settings);
- failoverListener.reset(new FailoverListener(connection));
- return true;
- } catch (const Exception& e) {
- //TODO: need to fix timeout on open so that it throws TransportFailure
- QPID_LOG(info, "Failed to connect to " << u << ": " << e.what());
- }
- return false;
-}
-
-bool ConnectionImpl::tryConnect(const std::vector<Url>& urls)
-{
- for (std::vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
- if (tryConnect(*i)) return true;
+ for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
+ try {
+ QPID_LOG(info, "Trying to connect to " << *i << "...");
+ //TODO: when url support is more complete can avoid this test here
+ if (i->find("amqp:") == 0) {
+ Url url(*i);
+ connection.open(url, settings);
+ } else {
+ SimpleUrlParser::parse(*i, settings);
+ connection.open(settings);
+ }
+ QPID_LOG(info, "Connected to " << *i);
+ return true;
+ } catch (const Exception& e) {
+ //TODO: need to fix timeout on
+ //qpid::client::Connection::open() so that it throws
+ //TransportFailure rather than a ConnectionException
+ QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
+ }
}
return false;
}
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
index d9d0d1e065..37a78b2373 100644
--- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
@@ -25,7 +25,6 @@
#include "qpid/messaging/Variant.h"
#include "qpid/Url.h"
#include "qpid/client/Connection.h"
-#include "qpid/client/FailoverListener.h"
#include "qpid/client/ConnectionSettings.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Semaphore.h"
@@ -46,7 +45,8 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl
qpid::messaging::Session newSession(bool transactional, const std::string& name);
qpid::messaging::Session getSession(const std::string& name) const;
void closed(SessionImpl&);
- void reconnect();
+ void connect();
+ void setOption(const std::string& name, const qpid::messaging::Variant& value);
private:
typedef std::map<std::string, qpid::messaging::Session> Sessions;
@@ -54,18 +54,19 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl
qpid::sys::Semaphore semaphore;//used to coordinate reconnection
Sessions sessions;
qpid::client::Connection connection;
- std::auto_ptr<FailoverListener> failoverListener;
- qpid::Url url;
+ std::vector<std::string> urls;
qpid::client::ConnectionSettings settings;
- bool reconnectionEnabled;
- int timeout;
- int minRetryInterval;
- int maxRetryInterval;
+ bool reconnect;
+ int64_t timeout;
+ int32_t limit;
+ int64_t minReconnectInterval;
+ int64_t maxReconnectInterval;
+ int32_t retries;
+ void setOptions(const qpid::messaging::Variant::Map& options);
void connect(const qpid::sys::AbsTime& started);
bool tryConnect();
- bool tryConnect(const std::vector<Url>& urls);
- bool tryConnect(const Url&);
+ bool tryConnect(const std::vector<std::string>& urls);
bool resetSessions();
};
}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index 9823dba6e1..d9fd3a5da1 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -431,7 +431,7 @@ void SessionImpl::senderCancelled(const std::string& name)
void SessionImpl::reconnect()
{
- connection.reconnect();
+ connection.connect();
}
qpid::messaging::Connection SessionImpl::getConnection() const
diff --git a/cpp/src/qpid/messaging/AddressParser.cpp b/cpp/src/qpid/messaging/AddressParser.cpp
index 265b5fe195..4b29f126f2 100644
--- a/cpp/src/qpid/messaging/AddressParser.cpp
+++ b/cpp/src/qpid/messaging/AddressParser.cpp
@@ -198,6 +198,7 @@ bool AddressParser::readSimpleValue(Variant& value)
std::string s;
if (readWord(s)) {
value = s;
+ try { value = value.asInt32(); return true; } catch (const InvalidConversion&) {}
try { value = value.asInt64(); return true; } catch (const InvalidConversion&) {}
try { value = value.asDouble(); return true; } catch (const InvalidConversion&) {}
return true;
diff --git a/cpp/src/qpid/messaging/Connection.cpp b/cpp/src/qpid/messaging/Connection.cpp
index 230c9d5dbf..4873899787 100644
--- a/cpp/src/qpid/messaging/Connection.cpp
+++ b/cpp/src/qpid/messaging/Connection.cpp
@@ -67,40 +67,11 @@ Session Connection::newSession(bool transactional, const std::string& name)
return impl->newSession(transactional, name);
}
Session Connection::getSession(const std::string& name) const { return impl->getSession(name); }
-
-InvalidOptionString::InvalidOptionString(const std::string& msg) : Exception(msg) {}
-
-void parseKeyValuePair(const std::string& in, Variant::Map& out)
-{
- std::string::size_type i = in.find('=');
- if (i == std::string::npos || i == in.size() || in.find('=', i+1) != std::string::npos) {
- throw InvalidOptionString(QPID_MSG("Cannot parse name-value pair from " << in));
- } else {
- out[in.substr(0, i)] = in.substr(i+1);
- }
-}
-
-void parseOptionString(const std::string& in, Variant::Map& out)
-{
- std::string::size_type start = 0;
- std::string::size_type i = in.find('&');
- while (i != std::string::npos) {
- parseKeyValuePair(in.substr(start, i-start), out);
- if (i < in.size()) {
- start = i+1;
- i = in.find('&', start);
- } else {
- i = std::string::npos;
- }
- }
- parseKeyValuePair(in.substr(start), out);
+void Connection::setOption(const std::string& name, const Variant& value)
+{
+ impl->setOption(name, value);
}
-Variant::Map parseOptionString(const std::string& in)
-{
- Variant::Map map;
- parseOptionString(in, map);
- return map;
-}
+InvalidOptionString::InvalidOptionString(const std::string& msg) : Exception(msg) {}
}} // namespace qpid::messaging
diff --git a/cpp/src/qpid/messaging/ConnectionImpl.h b/cpp/src/qpid/messaging/ConnectionImpl.h
index 589c9fbe57..33ebcda950 100644
--- a/cpp/src/qpid/messaging/ConnectionImpl.h
+++ b/cpp/src/qpid/messaging/ConnectionImpl.h
@@ -31,6 +31,7 @@ namespace client {
namespace messaging {
class Session;
+class Variant;
class ConnectionImpl : public virtual qpid::RefCounted
{
@@ -40,6 +41,7 @@ class ConnectionImpl : public virtual qpid::RefCounted
virtual void close() = 0;
virtual Session newSession(bool transactional, const std::string& name) = 0;
virtual Session getSession(const std::string& name) const = 0;
+ virtual void setOption(const std::string& name, const Variant& value) = 0;
private:
};
}} // namespace qpid::messaging
diff --git a/cpp/src/tests/qpid_recv.cpp b/cpp/src/tests/qpid_recv.cpp
index 9e4e202053..e4cc6a7ac8 100644
--- a/cpp/src/tests/qpid_recv.cpp
+++ b/cpp/src/tests/qpid_recv.cpp
@@ -27,12 +27,14 @@
#include <qpid/Options.h>
#include <qpid/log/Logger.h>
#include <qpid/log/Options.h>
+#include <qpid/client/amqp0_10/FailoverUpdates.h>
#include "TestOptions.h"
#include <iostream>
-
+#include <memory>
using namespace qpid::messaging;
+using qpid::client::amqp0_10::FailoverUpdates;
using namespace std;
@@ -54,6 +56,7 @@ struct Options : public qpid::Options
uint tx;
uint rollbackFrequency;
bool printHeaders;
+ bool failoverUpdates;
qpid::log::Options log;
Options(const std::string& argv0=std::string())
@@ -69,6 +72,7 @@ struct Options : public qpid::Options
tx(0),
rollbackFrequency(0),
printHeaders(false),
+ failoverUpdates(false),
log(argv0)
{
addOptions()
@@ -84,6 +88,7 @@ struct Options : public qpid::Options
("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")
("print-headers", qpid::optValue(printHeaders), "If specified print out all message headers as well as content")
+ ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover")
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
@@ -146,6 +151,7 @@ int main(int argc, char ** argv)
try {
Connection connection(opts.connectionOptions);
connection.open(opts.url);
+ std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0);
Session session = connection.newSession(opts.tx > 0);
Receiver receiver = session.createReceiver(opts.address);
receiver.setCapacity(opts.capacity);
diff --git a/cpp/src/tests/qpid_send.cpp b/cpp/src/tests/qpid_send.cpp
index 57c348ab9c..50e6c4371a 100644
--- a/cpp/src/tests/qpid_send.cpp
+++ b/cpp/src/tests/qpid_send.cpp
@@ -25,16 +25,15 @@
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
+#include <qpid/client/amqp0_10/FailoverUpdates.h>
#include "TestOptions.h"
#include <fstream>
#include <iostream>
+#include <memory>
using namespace qpid::messaging;
-using qpid::framing::Uuid;
-using qpid::sys::AbsTime;
-using qpid::sys::now;
-using qpid::sys::TIME_INFINITE;
+using qpid::client::amqp0_10::FailoverUpdates;
typedef std::vector<std::string> string_vector;
@@ -49,7 +48,6 @@ struct Options : public qpid::Options
std::string url;
std::string connectionOptions;
std::string address;
- int64_t timeout;
uint count;
std::string id;
std::string replyto;
@@ -64,13 +62,13 @@ struct Options : public qpid::Options
uint tx;
uint rollbackFrequency;
uint capacity;
+ bool failoverUpdates;
qpid::log::Options log;
Options(const std::string& argv0=std::string())
: qpid::Options("Options"),
help(false),
url("amqp:tcp:127.0.0.1"),
- timeout(TIME_INFINITE),
count(1),
sendEos(0),
durable(false),
@@ -78,13 +76,13 @@ struct Options : public qpid::Options
tx(0),
rollbackFrequency(0),
capacity(0),
+ failoverUpdates(false),
log(argv0)
{
addOptions()
("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
("address,a", qpid::optValue(address, "ADDRESS"), "address to drain from")
("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection")
- ("timeout,t", qpid::optValue(timeout, "TIMEOUT"), "exit after the specified time")
("count,c", qpid::optValue(count, "COUNT"), "stop after count messages have been sent, zero disables")
("id,i", qpid::optValue(id, "ID"), "use the supplied id instead of generating one")
("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address")
@@ -99,6 +97,7 @@ struct Options : public qpid::Options
("capacity", qpid::optValue(capacity, "N"), "size of the senders outgoing message queue")
("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")
+ ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover")
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
@@ -185,6 +184,7 @@ int main(int argc, char ** argv)
try {
Connection connection(opts.connectionOptions);
connection.open(opts.url);
+ std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0);
Session session = connection.newSession(opts.tx > 0);
Sender sender = session.createSender(opts.address);
if (opts.capacity) sender.setCapacity(opts.capacity);
diff --git a/cpp/src/tests/qpid_stream.cpp b/cpp/src/tests/qpid_stream.cpp
index ca21fa248b..ef0aea52e4 100644
--- a/cpp/src/tests/qpid_stream.cpp
+++ b/cpp/src/tests/qpid_stream.cpp
@@ -40,16 +40,33 @@ struct Args : public qpid::Options
{
std::string url;
std::string address;
+ uint size;
uint rate;
bool durable;
-
- Args() : url("amqp:tcp:127.0.0.1:5672"), address("test-queue"), rate(1000), durable(false)
+ uint receiverCapacity;
+ uint senderCapacity;
+ uint ackFrequency;
+
+ Args() :
+ url("amqp:tcp:127.0.0.1:5672"),
+ address("test-queue"),
+ size(512),
+ rate(1000),
+ durable(false),
+ receiverCapacity(0),
+ senderCapacity(0),
+ ackFrequency(1)
{
addOptions()
("url", qpid::optValue(url, "URL"), "Url to connect to.")
("address", qpid::optValue(address, "ADDRESS"), "Address to stream messages through.")
+ ("size", qpid::optValue(size, "bytes"), "Message size in bytes (content only, not headers).")
("rate", qpid::optValue(rate, "msgs/sec"), "Rate at which to stream messages.")
- ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable.");
+ ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable.")
+ ("sender-capacity", qpid::optValue(senderCapacity, "N"), "Credit window (0 implies infinite window)")
+ ("receiver-capacity", qpid::optValue(receiverCapacity, "N"), "Credit window (0 implies infinite window)")
+ ("ack-frequency", qpid::optValue(ackFrequency, "N"),
+ "Ack frequency (0 implies none of the messages will get accepted)");
}
};
@@ -93,7 +110,8 @@ struct Publish : Client
void doWork(Session& session)
{
Sender sender = session.createSender(opts.address);
- Message msg;
+ if (opts.senderCapacity) sender.setCapacity(opts.senderCapacity);
+ Message msg(std::string(opts.size, 'X'));
uint64_t interval = qpid::sys::TIME_SEC / opts.rate;
uint64_t sent = 0, missedRate = 0;
qpid::sys::AbsTime start = qpid::sys::now();
@@ -123,9 +141,12 @@ struct Consume : Client
double maxLatency = 0;
double totalLatency = 0;
Receiver receiver = session.createReceiver(opts.address);
+ if (opts.receiverCapacity) receiver.setCapacity(opts.receiverCapacity);
while (receiver.fetch(msg)) {
- session.acknowledge();//TODO: add batching option
++received;
+ if (opts.ackFrequency && (received % opts.ackFrequency == 0)) {
+ session.acknowledge();
+ }
//calculate latency
uint64_t receivedAt = timestamp(qpid::sys::now());
uint64_t sentAt = msg.getHeaders()[TIMESTAMP].asUint64();