summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/bindings/qmf/python/Makefile.am4
-rw-r--r--qpid/cpp/bindings/qmf/ruby/Makefile.am2
-rw-r--r--qpid/cpp/bindings/qmf2/python/Makefile.am4
-rw-r--r--qpid/cpp/bindings/qmf2/ruby/Makefile.am2
-rw-r--r--qpid/cpp/bindings/qpid/dotnet/src/Session.cpp25
-rw-r--r--qpid/cpp/bindings/qpid/dotnet/src/Session.h2
-rw-r--r--qpid/cpp/bindings/qpid/python/Makefile.am4
-rw-r--r--qpid/cpp/bindings/qpid/ruby/Makefile.am2
-rw-r--r--qpid/cpp/configure.ac4
-rw-r--r--qpid/cpp/examples/messaging/drain.cpp2
-rw-r--r--qpid/cpp/examples/messaging/spout.cpp2
-rw-r--r--qpid/cpp/include/qpid/framing/FieldTable.h2
-rw-r--r--qpid/cpp/include/qpid/messaging/Connection.h28
-rw-r--r--qpid/cpp/include/qpid/messaging/Message.h79
-rw-r--r--qpid/cpp/include/qpid/messaging/Session.h4
-rw-r--r--qpid/cpp/managementgen/Makefile.am11
-rw-r--r--qpid/cpp/src/CMakeLists.txt7
-rw-r--r--qpid/cpp/src/Makefile.am2
-rw-r--r--qpid/cpp/src/qmf/ConsoleSession.cpp2
-rw-r--r--qpid/cpp/src/qmf/engine/ResilientConnection.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h2
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h2
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.cpp12
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionState.h8
-rw-r--r--qpid/cpp/src/qpid/broker/Daemon.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/ExpiryPolicy.h12
-rw-r--r--qpid/cpp/src/qpid/broker/HeadersExchange.cpp9
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp55
-rw-r--r--qpid/cpp/src/qpid/broker/Message.h28
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp135
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h37
-rw-r--r--qpid/cpp/src/qpid/broker/QueueCleaner.cpp18
-rw-r--r--qpid/cpp/src/qpid/broker/QueueCleaner.h14
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/QueuePolicy.cpp13
-rw-r--r--qpid/cpp/src/qpid/broker/QueuePolicy.h3
-rw-r--r--qpid/cpp/src/qpid/broker/RateTracker.cpp51
-rw-r--r--qpid/cpp/src/qpid/broker/RateTracker.h57
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp11
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionImpl.cpp10
-rw-r--r--qpid/cpp/src/qpid/client/Connector.h2
-rw-r--r--qpid/cpp/src/qpid/client/RdmaConnector.cpp6
-rw-r--r--qpid/cpp/src/qpid/client/SslConnector.cpp4
-rw-r--r--qpid/cpp/src/qpid/client/TCPConnector.cpp4
-rw-r--r--qpid/cpp/src/qpid/client/TCPConnector.h2
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp26
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h4
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp21
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp38
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h1
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp4
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h2
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp11
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h11
-rw-r--r--qpid/cpp/src/qpid/client/windows/SslConnector.cpp4
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp66
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h14
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp1
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterSettings.h3
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterTimer.cpp4
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp19
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h5
-rw-r--r--qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp95
-rw-r--r--qpid/cpp/src/qpid/cluster/ExpiryPolicy.h42
-rw-r--r--qpid/cpp/src/qpid/cluster/FailoverExchange.cpp26
-rw-r--r--qpid/cpp/src/qpid/cluster/FailoverExchange.h10
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp91
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.h13
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateExchange.cpp27
-rw-r--r--qpid/cpp/src/qpid/console/SessionManager.cpp3
-rw-r--r--qpid/cpp/src/qpid/framing/AMQHeaderBody.h12
-rw-r--r--qpid/cpp/src/qpid/messaging/Session.cpp3
-rw-r--r--qpid/cpp/src/qpid/messaging/SessionImpl.h2
-rw-r--r--qpid/cpp/src/qpid/sys/AsynchIO.h4
-rw-r--r--qpid/cpp/src/qpid/sys/AtomicValue_gcc.h11
-rw-r--r--qpid/cpp/src/qpid/sys/ProtocolFactory.h3
-rw-r--r--qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp24
-rw-r--r--qpid/cpp/src/qpid/sys/Socket.h23
-rw-r--r--qpid/cpp/src/qpid/sys/SocketAddress.h2
-rw-r--r--qpid/cpp/src/qpid/sys/SslPlugin.cpp9
-rw-r--r--qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp29
-rw-r--r--qpid/cpp/src/qpid/sys/Timer.cpp6
-rw-r--r--qpid/cpp/src/qpid/sys/Timer.h8
-rw-r--r--qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp14
-rwxr-xr-xqpid/cpp/src/qpid/sys/posix/LockFile.cpp3
-rw-r--r--qpid/cpp/src/qpid/sys/posix/Socket.cpp111
-rw-r--r--qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp32
-rw-r--r--qpid/cpp/src/qpid/sys/ssl/SslIo.cpp2
-rw-r--r--qpid/cpp/src/qpid/sys/ssl/SslIo.h2
-rw-r--r--qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp4
-rw-r--r--qpid/cpp/src/qpid/sys/ssl/SslSocket.h2
-rw-r--r--qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp14
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/Socket.cpp91
-rw-r--r--qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp2
-rw-r--r--qpid/cpp/src/tests/BrokerMgmtAgent.cpp3
-rw-r--r--qpid/cpp/src/tests/ClientSessionTest.cpp7
-rw-r--r--qpid/cpp/src/tests/ForkedBroker.cpp3
-rw-r--r--qpid/cpp/src/tests/MessagingSessionTests.cpp72
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp4
-rw-r--r--qpid/cpp/src/tests/SocketProxy.h4
-rw-r--r--qpid/cpp/src/tests/brokertest.py15
-rwxr-xr-xqpid/cpp/src/tests/cluster_test_logs.py2
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py185
-rwxr-xr-xqpid/cpp/src/tests/federation.py13
-rw-r--r--qpid/cpp/xml/cluster.xml40
110 files changed, 1132 insertions, 898 deletions
diff --git a/qpid/cpp/bindings/qmf/python/Makefile.am b/qpid/cpp/bindings/qmf/python/Makefile.am
index 421590f189..8abad32959 100644
--- a/qpid/cpp/bindings/qmf/python/Makefile.am
+++ b/qpid/cpp/bindings/qmf/python/Makefile.am
@@ -30,11 +30,13 @@ BUILT_SOURCES = $(generated_file_list)
SWIG_FLAGS = -w362,401
$(generated_file_list): $(srcdir)/python.i $(srcdir)/../qmfengine.i
- swig -c++ -python $(SWIG_FLAGS) $(INCLUDES) $(QPID_CXXFLAGS) -I$(top_srcdir)/src/qmf -I/usr/include -o qmfengine.cpp $(srcdir)/python.i
+ $(SWIG) -c++ -python $(SWIG_FLAGS) $(INCLUDES) $(QPID_CXXFLAGS) -I$(top_srcdir)/src/qmf -I/usr/include -o qmfengine.cpp $(srcdir)/python.i
pylibdir = $(PYTHON_LIB)
lib_LTLIBRARIES = _qmfengine.la
+qenginedir = $(pyexecdir)
+qengine_PYTHON = qmfengine.py qmf.py
#_qmfengine_la_LDFLAGS = -avoid-version -module -shrext "$(PYTHON_SO)"
#_qmfengine_la_LDFLAGS = -avoid-version -module -shrext ".so"
diff --git a/qpid/cpp/bindings/qmf/ruby/Makefile.am b/qpid/cpp/bindings/qmf/ruby/Makefile.am
index 395d64ff90..de8c4d10d5 100644
--- a/qpid/cpp/bindings/qmf/ruby/Makefile.am
+++ b/qpid/cpp/bindings/qmf/ruby/Makefile.am
@@ -35,7 +35,7 @@ qmfengine.cpp: $(srcdir)/ruby.i $(srcdir)/../qmfengine.i
rubylibarchdir = $(RUBY_LIB_ARCH)
rubylibarch_LTLIBRARIES = qmfengine.la
-qmfengine_la_LDFLAGS = -avoid-version -module -shrext ".$(RUBY_DLEXT)"
+qmfengine_la_LDFLAGS = -avoid-version -module -shared -shrext ".$(RUBY_DLEXT)"
qmfengine_la_LIBADD = $(RUBY_LIBS) -L$(top_builddir)/src/.libs -lqpidclient $(top_builddir)/src/libqmfengine.la
qmfengine_la_CXXFLAGS = $(INCLUDES) -I$(RUBY_INC) -I$(RUBY_INC_ARCH) -fno-strict-aliasing
nodist_qmfengine_la_SOURCES = qmfengine.cpp
diff --git a/qpid/cpp/bindings/qmf2/python/Makefile.am b/qpid/cpp/bindings/qmf2/python/Makefile.am
index 7adc62eddb..3dc04e832f 100644
--- a/qpid/cpp/bindings/qmf2/python/Makefile.am
+++ b/qpid/cpp/bindings/qmf2/python/Makefile.am
@@ -30,12 +30,12 @@ BUILT_SOURCES = $(generated_file_list)
SWIG_FLAGS = -w362,401
$(generated_file_list): $(srcdir)/python.i $(srcdir)/../qmf2.i $(srcdir)/../../swig_python_typemaps.i
- swig -c++ -python $(SWIG_FLAGS) $(INCLUDES) $(QPID_CXXFLAGS) -I/usr/include -o cqmf2.cpp $(srcdir)/python.i
+ $(SWIG) -c++ -python $(SWIG_FLAGS) $(INCLUDES) $(QPID_CXXFLAGS) -I/usr/include -o cqmf2.cpp $(srcdir)/python.i
pylibdir = $(PYTHON_LIB)
lib_LTLIBRARIES = _cqmf2.la
-cqpiddir = $(pythondir)
+cqpiddir = $(pyexecdir)
cqpid_PYTHON = qmf2.py cqmf2.py
_cqmf2_la_LDFLAGS = -avoid-version -module -shared
diff --git a/qpid/cpp/bindings/qmf2/ruby/Makefile.am b/qpid/cpp/bindings/qmf2/ruby/Makefile.am
index 8e11a204b2..97bbc6f385 100644
--- a/qpid/cpp/bindings/qmf2/ruby/Makefile.am
+++ b/qpid/cpp/bindings/qmf2/ruby/Makefile.am
@@ -34,7 +34,7 @@ rubylibarchdir = $(RUBY_LIB_ARCH)
rubylibarch_LTLIBRARIES = cqmf2.la
dist_rubylib_DATA = qmf2.rb
-cqmf2_la_LDFLAGS = -avoid-version -module -shrext ".$(RUBY_DLEXT)"
+cqmf2_la_LDFLAGS = -avoid-version -module -shared -shrext ".$(RUBY_DLEXT)"
cqmf2_la_LIBADD = $(RUBY_LIBS) -L$(top_builddir)/src/.libs -lqmf2 $(top_builddir)/src/libqmf2.la
cqmf2_la_CXXFLAGS = $(INCLUDES) -I$(RUBY_INC) -I$(RUBY_INC_ARCH) -fno-strict-aliasing
nodist_cqmf2_la_SOURCES = cqmf2.cpp
diff --git a/qpid/cpp/bindings/qpid/dotnet/src/Session.cpp b/qpid/cpp/bindings/qpid/dotnet/src/Session.cpp
index 4a6199f108..0e918769a3 100644
--- a/qpid/cpp/bindings/qpid/dotnet/src/Session.cpp
+++ b/qpid/cpp/bindings/qpid/dotnet/src/Session.cpp
@@ -248,6 +248,31 @@ namespace Messaging {
}
}
+ void Session::AcknowledgeUpTo(Message ^ message)
+ {
+ AcknowledgeUpTo(message, false);
+ }
+
+ void Session::AcknowledgeUpTo(Message ^ message, bool sync)
+ {
+ System::Exception ^ newException = nullptr;
+
+ try
+ {
+ sessionp->acknowledgeUpTo(*(message->NativeMessage), sync);
+ }
+ catch (const ::qpid::types::Exception & error)
+ {
+ String ^ errmsg = gcnew String(error.what());
+ newException = gcnew QpidException(errmsg);
+ }
+
+ if (newException != nullptr)
+ {
+ throw newException;
+ }
+ }
+
void Session::Reject(Message ^ message)
{
System::Exception ^ newException = nullptr;
diff --git a/qpid/cpp/bindings/qpid/dotnet/src/Session.h b/qpid/cpp/bindings/qpid/dotnet/src/Session.h
index 4d4cad75c4..4b98a37f18 100644
--- a/qpid/cpp/bindings/qpid/dotnet/src/Session.h
+++ b/qpid/cpp/bindings/qpid/dotnet/src/Session.h
@@ -104,6 +104,8 @@ namespace Messaging {
void Acknowledge(bool sync);
void Acknowledge(Message ^ message);
void Acknowledge(Message ^ message, bool sync);
+ void AcknowledgeUpTo(Message ^ message);
+ void AcknowledgeUpTo(Message ^ message, bool sync);
void Reject(Message ^);
void Release(Message ^);
void Sync();
diff --git a/qpid/cpp/bindings/qpid/python/Makefile.am b/qpid/cpp/bindings/qpid/python/Makefile.am
index 9aef179db7..dd25f34829 100644
--- a/qpid/cpp/bindings/qpid/python/Makefile.am
+++ b/qpid/cpp/bindings/qpid/python/Makefile.am
@@ -30,12 +30,12 @@ BUILT_SOURCES = $(generated_file_list)
SWIG_FLAGS = -w362,401
$(generated_file_list): $(srcdir)/python.i $(srcdir)/../qpid.i $(srcdir)/../../swig_python_typemaps.i
- swig -c++ -python $(SWIG_FLAGS) $(INCLUDES) $(QPID_CXXFLAGS) -I$(top_srcdir)/src/qmf -I/usr/include -o cqpid.cpp $(srcdir)/python.i
+ $(SWIG) -c++ -python $(SWIG_FLAGS) $(INCLUDES) $(QPID_CXXFLAGS) -I$(top_srcdir)/src/qmf -I/usr/include -o cqpid.cpp $(srcdir)/python.i
pylibdir = $(PYTHON_LIB)
lib_LTLIBRARIES = _cqpid.la
-cqpiddir = $(pythondir)
+cqpiddir = $(pyexecdir)
cqpid_PYTHON = cqpid.py
_cqpid_la_LDFLAGS = -avoid-version -module -shared
diff --git a/qpid/cpp/bindings/qpid/ruby/Makefile.am b/qpid/cpp/bindings/qpid/ruby/Makefile.am
index d92eb969de..a2a5dd76bd 100644
--- a/qpid/cpp/bindings/qpid/ruby/Makefile.am
+++ b/qpid/cpp/bindings/qpid/ruby/Makefile.am
@@ -33,7 +33,7 @@ cqpid.cpp: $(srcdir)/ruby.i $(srcdir)/../qpid.i $(srcdir)/../../swig_ruby_typema
rubylibarchdir = $(RUBY_LIB_ARCH)
rubylibarch_LTLIBRARIES = cqpid.la
-cqpid_la_LDFLAGS = -avoid-version -module -shrext ".$(RUBY_DLEXT)"
+cqpid_la_LDFLAGS = -avoid-version -module -shared -shrext ".$(RUBY_DLEXT)"
cqpid_la_LIBADD = $(RUBY_LIBS) -L$(top_builddir)/src/.libs -lqpidmessaging -lqpidtypes \
$(top_builddir)/src/libqpidmessaging.la $(top_builddir)/src/libqpidtypes.la
cqpid_la_CXXFLAGS = $(INCLUDES) -I$(RUBY_INC) -I$(RUBY_INC_ARCH) -fno-strict-aliasing
diff --git a/qpid/cpp/configure.ac b/qpid/cpp/configure.ac
index 43a32d3ad7..092694d56b 100644
--- a/qpid/cpp/configure.ac
+++ b/qpid/cpp/configure.ac
@@ -68,8 +68,10 @@ if test x$GXX = xyes; then
# The following warnings are deliberately omitted, they warn on valid code.
# -Wunreachable-code -Wpadded -Winline
# -Wshadow - warns about boost headers.
+ # Can't test for -Werror as whether it fails or not depends on what's in
+ # CFLAGS/CXXFLAGS. In any case it's been in gcc for a long time (since 2.95 at least)
if test "${enableval}" = yes; then
- gl_COMPILER_FLAGS(-Werror)
+ COMPILER_FLAGS="-Werror"
gl_COMPILER_FLAGS(-pedantic)
gl_COMPILER_FLAGS(-Wall)
gl_COMPILER_FLAGS(-Wextra)
diff --git a/qpid/cpp/examples/messaging/drain.cpp b/qpid/cpp/examples/messaging/drain.cpp
index 5c938e9742..7700244fa8 100644
--- a/qpid/cpp/examples/messaging/drain.cpp
+++ b/qpid/cpp/examples/messaging/drain.cpp
@@ -50,7 +50,7 @@ struct Options : OptionParser
add("broker,b", url, "url of broker to connect to");
add("timeout,t", timeout, "timeout in seconds to wait before exiting");
add("forever,f", forever, "ignore timeout and wait forever");
- add("connection-options", connectionOptions, "connection options string in the form {name1=value1, name2=value2}");
+ add("connection-options", connectionOptions, "connection options string in the form {name1:value1, name2:value2}");
add("count,c", count, "number of messages to read before exiting");
}
diff --git a/qpid/cpp/examples/messaging/spout.cpp b/qpid/cpp/examples/messaging/spout.cpp
index 57b955c1de..cd11a7ad81 100644
--- a/qpid/cpp/examples/messaging/spout.cpp
+++ b/qpid/cpp/examples/messaging/spout.cpp
@@ -65,7 +65,7 @@ struct Options : OptionParser
add("property,P", properties, "specify message property");
add("map,M", entries, "specify entry for map content");
add("content", content, "specify textual content");
- add("connection-options", connectionOptions, "connection options string in the form {name1=value1, name2=value2}");
+ add("connection-options", connectionOptions, "connection options string in the form {name1:value1, name2:value2}");
}
static bool nameval(const std::string& in, std::string& name, std::string& value)
diff --git a/qpid/cpp/include/qpid/framing/FieldTable.h b/qpid/cpp/include/qpid/framing/FieldTable.h
index fed431129a..e8ec524863 100644
--- a/qpid/cpp/include/qpid/framing/FieldTable.h
+++ b/qpid/cpp/include/qpid/framing/FieldTable.h
@@ -65,6 +65,8 @@ class FieldTable
QPID_COMMON_EXTERN void decode(Buffer& buffer);
QPID_COMMON_EXTERN int count() const;
+ QPID_COMMON_EXTERN size_t size() const { return values.size(); }
+ QPID_COMMON_EXTERN bool empty() { return size() == 0; }
QPID_COMMON_EXTERN void set(const std::string& name, const ValuePtr& value);
QPID_COMMON_EXTERN ValuePtr get(const std::string& name) const;
QPID_COMMON_INLINE_EXTERN bool isSet(const std::string& name) const { return get(name).get() != 0; }
diff --git a/qpid/cpp/include/qpid/messaging/Connection.h b/qpid/cpp/include/qpid/messaging/Connection.h
index e938f23189..165573e2ef 100644
--- a/qpid/cpp/include/qpid/messaging/Connection.h
+++ b/qpid/cpp/include/qpid/messaging/Connection.h
@@ -54,27 +54,27 @@ class QPID_MESSAGING_CLASS_EXTERN Connection : public qpid::messaging::Handle<Co
* username
* password
* heartbeat
- * tcp-nodelay
- * sasl-mechanism
- * sasl-service
- * sasl-min-ssf
- * sasl-max-ssf
+ * tcp_nodelay
+ * sasl_mechanisms
+ * sasl_service
+ * sasl_min_ssf
+ * sasl_max_ssf
* transport
*
* Reconnect behaviour can be controlled through the following options:
*
* reconnect: true/false (enables/disables reconnect entirely)
- * reconnect-timeout: number of seconds (give up and report failure after specified time)
- * reconnect-limit: n (give up and report failure after specified number of attempts)
- * reconnect-interval-min: number of seconds (initial delay between failed reconnection attempts)
- * reconnect-interval-max: number of seconds (maximum delay between failed reconnection attempts)
- * reconnect-interval: shorthand for setting the same reconnect_interval_min/max
- * reconnect-urls: list of alternate urls to try when connecting
+ * reconnect_timeout: number of seconds (give up and report failure after specified time)
+ * reconnect_limit: n (give up and report failure after specified number of attempts)
+ * reconnect_interval_min: number of seconds (initial delay between failed reconnection attempts)
+ * reconnect_interval_max: number of seconds (maximum delay between failed reconnection attempts)
+ * reconnect_interval: shorthand for setting the same reconnect_interval_min/max
+ * reconnect_urls: list of alternate urls to try when connecting
*
- * The reconnect-interval is the time that the client waits
+ * The reconnect_interval is the time that the client waits
* for after a failed attempt to reconnect before retrying. It
- * starts at the value of the min-retry-interval and is
- * doubled every failure until the value of max-retry-interval
+ * starts at the value of the min_retry_interval and is
+ * doubled every failure until the value of max_retry_interval
* is reached.
*/
QPID_MESSAGING_EXTERN Connection(const std::string& url, const qpid::types::Variant::Map& options = qpid::types::Variant::Map());
diff --git a/qpid/cpp/include/qpid/messaging/Message.h b/qpid/cpp/include/qpid/messaging/Message.h
index 5cd978f2a2..e89a6ce02f 100644
--- a/qpid/cpp/include/qpid/messaging/Message.h
+++ b/qpid/cpp/include/qpid/messaging/Message.h
@@ -55,23 +55,58 @@ class QPID_MESSAGING_CLASS_EXTERN Message
QPID_MESSAGING_EXTERN void setSubject(const std::string&);
QPID_MESSAGING_EXTERN const std::string& getSubject() const;
+ /**
+ * Set the content type (i.e. the MIME type) for the message. This
+ * should be set by the sending application and indicates to
+ * recipients of message how to interpret or decode the content.
+ */
QPID_MESSAGING_EXTERN void setContentType(const std::string&);
+ /**
+ * Returns the content type (i.e. the MIME type) for the
+ * message. This can be used to determine how to decode the
+ * message content.
+ */
QPID_MESSAGING_EXTERN const std::string& getContentType() const;
+ /**
+ * Set an application defined identifier for the message. At
+ * present this must be a stringfied UUID (support for less
+ * restrictive IDs is anticipated however).
+ */
QPID_MESSAGING_EXTERN void setMessageId(const std::string&);
QPID_MESSAGING_EXTERN const std::string& getMessageId() const;
+ /**
+ * Sets the user id of the message. This should in general be the
+ * user-id as which the sending connection authenticated itself as
+ * the messaging infrastructure will verify this. See
+ * Connection::getAuthenticatedUsername()
+ */
QPID_MESSAGING_EXTERN void setUserId(const std::string&);
QPID_MESSAGING_EXTERN const std::string& getUserId() const;
+ /**
+ * Can be used to set application specific correlation identifiers
+ * as part of a protocol for message exchange patterns. E.g. a
+ * request-reponse pattern might require the correlation-id of the
+ * request and response to match, or might use the message-id of
+ * the request as the correlation-id on the response etc.
+ */
QPID_MESSAGING_EXTERN void setCorrelationId(const std::string&);
QPID_MESSAGING_EXTERN const std::string& getCorrelationId() const;
+ /**
+ * Sets a priority level on the message. This may be used by the
+ * messaging infrastructure to prioritise delivery of higher
+ * priority messages.
+ */
QPID_MESSAGING_EXTERN void setPriority(uint8_t);
QPID_MESSAGING_EXTERN uint8_t getPriority() const;
/**
- * Set the time to live for this message in milliseconds.
+ * Set the time to live for this message in milliseconds. This can
+ * be used by the messaging infrastructure to discard messages
+ * that are no longer of relevance.
*/
QPID_MESSAGING_EXTERN void setTtl(Duration ttl);
/**
@@ -79,24 +114,62 @@ class QPID_MESSAGING_CLASS_EXTERN Message
*/
QPID_MESSAGING_EXTERN Duration getTtl() const;
+ /**
+ * Mark the message as durable. This is a hint to the messaging
+ * infrastructure that the message should be persisted or
+ * otherwise stored such that failoures or shutdown do not cause
+ * it to be lost.
+ */
QPID_MESSAGING_EXTERN void setDurable(bool durable);
QPID_MESSAGING_EXTERN bool getDurable() const;
+ /**
+ * The redelivered flag if set implies that the message *may* have
+ * been previously delivered and thus is a hint to the application
+ * or messaging infrastructure that if de-duplication is required
+ * this message should be examined to determine if it is a
+ * duplicate.
+ */
QPID_MESSAGING_EXTERN bool getRedelivered() const;
+ /**
+ * Can be used to provide a hint to the application or messaging
+ * infrastructure that if de-duplication is required this message
+ * should be examined to determine if it is a duplicate.
+ */
QPID_MESSAGING_EXTERN void setRedelivered(bool);
+ /**
+ * In addition to a payload (i.e. the content), messages can
+ * include annotations describing aspectf of the message. In
+ * addition to the standard annotations such as TTL and content
+ * type, application- or context- specific properties can also be
+ * defined. Each message has a map of name values for such custom
+ * properties. The value is specified as a Variant.
+ */
QPID_MESSAGING_EXTERN const qpid::types::Variant::Map& getProperties() const;
QPID_MESSAGING_EXTERN qpid::types::Variant::Map& getProperties();
+ /**
+ * Set the content to the data held in the string parameter. Note:
+ * this is treated as raw bytes and need not be text. Consider
+ * setting the content-type to indicate how the data should be
+ * interpreted by recipients.
+ */
QPID_MESSAGING_EXTERN void setContent(const std::string&);
/**
- * Note that chars are copied.
+ * Copy count bytes from the region pointed to by chars as the
+ * message content.
*/
QPID_MESSAGING_EXTERN void setContent(const char* chars, size_t count);
/** Get the content as a std::string */
QPID_MESSAGING_EXTERN std::string getContent() const;
- /** Get a const pointer to the start of the content data. */
+ /**
+ * Get a const pointer to the start of the content data. The
+ * memory pointed to is owned by the message. The getContentSize()
+ * method indicates how much data there is (i.e. the extent of the
+ * memory region pointed to by the return value of this method).
+ */
QPID_MESSAGING_EXTERN const char* getContentPtr() const;
/** Get the size of content in bytes. */
QPID_MESSAGING_EXTERN size_t getContentSize() const;
diff --git a/qpid/cpp/include/qpid/messaging/Session.h b/qpid/cpp/include/qpid/messaging/Session.h
index 52786eb5f4..428f8aa491 100644
--- a/qpid/cpp/include/qpid/messaging/Session.h
+++ b/qpid/cpp/include/qpid/messaging/Session.h
@@ -78,6 +78,10 @@ class QPID_MESSAGING_CLASS_EXTERN Session : public qpid::messaging::Handle<Sessi
*/
QPID_MESSAGING_EXTERN void acknowledge(Message&, bool sync=false);
/**
+ * Acknowledges all message up to the specified message.
+ */
+ QPID_MESSAGING_EXTERN void acknowledgeUpTo(Message&, bool sync=false);
+ /**
* Rejects the specified message. The broker does not redeliver a
* message that has been rejected. Once a message has been
* acknowledged, it can no longer be rejected.
diff --git a/qpid/cpp/managementgen/Makefile.am b/qpid/cpp/managementgen/Makefile.am
index 6c2024ccaa..e10dd63c87 100644
--- a/qpid/cpp/managementgen/Makefile.am
+++ b/qpid/cpp/managementgen/Makefile.am
@@ -19,10 +19,16 @@
qmfpythondir = $(pythondir)
dist_bin_SCRIPTS = \
qmf-gen
-nobase_qmfpython_DATA = \
+
+pkgpyexec_qmfgendir = $(pyexecdir)/qmfgen
+pkgpyexec_qmfgen_PYTHON = \
qmfgen/__init__.py \
qmfgen/generate.py \
qmfgen/schema.py \
+ qmfgen/management-types.xml
+
+pkgpyexec_qmfgentmpldir = $(pyexecdir)/qmfgen/templates
+pkgpyexec_qmfgentmpl_PYTHON = \
qmfgen/templates/Args.h \
qmfgen/templates/Class.cpp \
qmfgen/templates/Class.h \
@@ -32,7 +38,6 @@ nobase_qmfpython_DATA = \
qmfgen/templates/Package.cpp \
qmfgen/templates/Package.h \
qmfgen/templates/V2Package.cpp \
- qmfgen/templates/V2Package.h \
- qmfgen/management-types.xml
+ qmfgen/templates/V2Package.h
EXTRA_DIST = $(nobase_qmfpython_DATA) CMakeLists.txt
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index 0fe2d7e4d0..11554b1034 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -96,7 +96,7 @@ MACRO (add_msvc_version_full verProject verProjectType verProjectFileExt verFN1
inherit_value ("winver_${verProject}_InternalName" "${verProject}")
inherit_value ("winver_${verProject}_OriginalFilename" "${verProject}.${verProjectFileExt}")
inherit_value ("winver_${verProject}_ProductName" "${winver_DESCRIPTION_SUMMARY}")
-
+
# Create strings to be substituted into the template file
set ("winverFileVersionBinary" "${winver_${verProject}_FileVersionBinary}")
set ("winverProductVersionBinary" "${winver_${verProject}_ProductVersionBinary}")
@@ -126,7 +126,7 @@ ENDMACRO (add_msvc_version_full)
#
MACRO (add_msvc_version verProject verProjectType verProjectFileExt)
if (MSVC)
- add_msvc_version_full (${verProject}
+ add_msvc_version_full (${verProject}
${verProjectType}
${verProjectFileExt}
${winver_FILE_VERSION_N1}
@@ -656,7 +656,7 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows)
set (qpidd_platform_SOURCES
windows/QpiddBroker.cpp
)
-
+
set (qpidmessaging_platform_SOURCES
qpid/messaging/HandleInstantiator.cpp
)
@@ -997,7 +997,6 @@ set (qpidbroker_SOURCES
qpid/broker/QueuePolicy.cpp
qpid/broker/QueueRegistry.cpp
qpid/broker/QueueFlowLimit.cpp
- qpid/broker/RateTracker.cpp
qpid/broker/RecoveryManagerImpl.cpp
qpid/broker/RecoveredEnqueue.cpp
qpid/broker/RecoveredDequeue.cpp
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 15021cc68b..608afb0660 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -616,8 +616,6 @@ libqpidbroker_la_SOURCES = \
qpid/broker/QueueFlowLimit.h \
qpid/broker/QueueFlowLimit.cpp \
qpid/broker/RateFlowcontrol.h \
- qpid/broker/RateTracker.cpp \
- qpid/broker/RateTracker.h \
qpid/broker/RecoverableConfig.h \
qpid/broker/RecoverableExchange.h \
qpid/broker/RecoverableMessage.h \
diff --git a/qpid/cpp/src/qmf/ConsoleSession.cpp b/qpid/cpp/src/qmf/ConsoleSession.cpp
index 7b839930e1..5df0d83f12 100644
--- a/qpid/cpp/src/qmf/ConsoleSession.cpp
+++ b/qpid/cpp/src/qmf/ConsoleSession.cpp
@@ -66,7 +66,7 @@ Subscription ConsoleSession::subscribe(const string& q, const string& f, const s
//========================================================================================
ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
- connection(c), domain("default"), maxAgentAgeMinutes(5),
+ connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false),
opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
connectedBrokerInAgentList(false), schemaCache(new SchemaCache())
{
diff --git a/qpid/cpp/src/qmf/engine/ResilientConnection.cpp b/qpid/cpp/src/qmf/engine/ResilientConnection.cpp
index ab65b8d768..41dd9ff00c 100644
--- a/qpid/cpp/src/qmf/engine/ResilientConnection.cpp
+++ b/qpid/cpp/src/qmf/engine/ResilientConnection.cpp
@@ -334,8 +334,7 @@ void ResilientConnectionImpl::notify()
{
if (notifyFd != -1)
{
- int unused_ret; //Suppress warnings about ignoring return value.
- unused_ret = ::write(notifyFd, ".", 1);
+ (void) ::write(notifyFd, ".", 1);
}
}
@@ -432,8 +431,7 @@ void ResilientConnectionImpl::EnqueueEvent(ResilientConnectionEvent::EventKind k
if (notifyFd != -1)
{
- int unused_ret; //Suppress warnings about ignoring return value.
- unused_ret = ::write(notifyFd, ".", 1);
+ (void) ::write(notifyFd, ".", 1);
}
}
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 240766c443..f80e0f1e61 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -188,7 +188,7 @@ Broker::Broker(const Broker::Options& conf) :
conf.replayFlushLimit*1024, // convert kb to bytes.
conf.replayHardLimit*1024),
*this),
- queueCleaner(queues, timer),
+ queueCleaner(queues, &timer),
queueEvents(poller,!conf.asyncQueueEvents),
recovery(true),
inCluster(false),
@@ -701,7 +701,7 @@ void Broker::accept() {
}
void Broker::connect(
- const std::string& host, uint16_t port, const std::string& transport,
+ const std::string& host, const std::string& port, const std::string& transport,
boost::function2<void, int, std::string> failed,
sys::ConnectionCodec::Factory* f)
{
@@ -717,7 +717,7 @@ void Broker::connect(
{
url.throwIfEmpty();
const Address& addr=url[0];
- connect(addr.host, addr.port, addr.protocol, failed, f);
+ connect(addr.host, boost::lexical_cast<std::string>(addr.port), addr.protocol, failed, f);
}
uint32_t Broker::queueMoveMessages(
@@ -750,6 +750,7 @@ bool Broker::deferDeliveryImpl(const std::string& ,
void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) {
clusterTimer = t;
+ queueCleaner.setTimer(clusterTimer.get());
}
const std::string Broker::TCP_TRANSPORT("tcp");
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 6d585bf614..40f7b6273f 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -244,7 +244,7 @@ public:
QPID_BROKER_EXTERN void accept();
/** Create a connection to another broker. */
- void connect(const std::string& host, uint16_t port,
+ void connect(const std::string& host, const std::string& port,
const std::string& transport,
boost::function2<void, int, std::string> failed,
sys::ConnectionCodec::Factory* =0);
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index c07e63e68c..8362a9782c 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -288,11 +288,11 @@ void Connection::raiseConnectEvent() {
}
}
-void Connection::setFederationLink(bool b)
+void Connection::setUserProxyAuth(bool b)
{
- ConnectionState::setFederationLink(b);
+ ConnectionState::setUserProxyAuth(b);
if (mgmtObject != 0)
- mgmtObject->set_federationLink(b);
+ mgmtObject->set_userProxyAuth(b);
}
void Connection::close(connection::CloseCode code, const string& text)
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index 8f1aa701ef..3522d70b35 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -125,7 +125,7 @@ class Connection : public sys::ConnectionInputHandler,
const std::string& getUserId() const { return ConnectionState::getUserId(); }
const std::string& getMgmtId() const { return mgmtId; }
management::ManagementAgent* getAgent() const { return agent; }
- void setFederationLink(bool b);
+ void setUserProxyAuth(bool b);
/** Connection does not delete the listener. 0 resets. */
void setErrorListener(ErrorListener* l) { errorListener=l; }
ErrorListener* getErrorListener() { return errorListener; }
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
index 3f97e5b9de..270711705e 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -137,7 +137,9 @@ void ConnectionHandler::Handler::startOk(const framing::FieldTable& clientProper
throw;
}
connection.setFederationLink(clientProperties.get(QPID_FED_LINK));
- connection.setFederationPeerTag(clientProperties.getAsString(QPID_FED_TAG));
+ if (clientProperties.isSet(QPID_FED_TAG)) {
+ connection.setFederationPeerTag(clientProperties.getAsString(QPID_FED_TAG));
+ }
if (connection.isFederationLink()) {
if (acl && !acl->authorise(connection.getUserId(),acl::ACT_CREATE,acl::OBJ_LINK,"")){
proxy.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,"ACL denied creating a federation link");
@@ -256,7 +258,6 @@ void ConnectionHandler::Handler::start(const FieldTable& serverProperties,
false ); // disallow interaction
}
std::string supportedMechanismsList;
- bool requestedMechanismIsSupported = false;
Array::const_iterator i;
/*
@@ -269,11 +270,9 @@ void ConnectionHandler::Handler::start(const FieldTable& serverProperties,
if (i != supportedMechanisms.begin())
supportedMechanismsList += SPACE;
supportedMechanismsList += (*i)->get<std::string>();
- requestedMechanismIsSupported = true;
}
}
else {
- requestedMechanismIsSupported = false;
/*
The caller has requested a mechanism. If it's available,
make sure it ends up at the head of the list.
@@ -282,7 +281,6 @@ void ConnectionHandler::Handler::start(const FieldTable& serverProperties,
string currentMechanism = (*i)->get<std::string>();
if ( requestedMechanism == currentMechanism ) {
- requestedMechanismIsSupported = true;
supportedMechanismsList = currentMechanism + SPACE + supportedMechanismsList;
} else {
if (i != supportedMechanisms.begin())
@@ -292,7 +290,9 @@ void ConnectionHandler::Handler::start(const FieldTable& serverProperties,
}
}
- connection.setFederationPeerTag(serverProperties.getAsString(QPID_FED_TAG));
+ if (serverProperties.isSet(QPID_FED_TAG)) {
+ connection.setFederationPeerTag(serverProperties.getAsString(QPID_FED_TAG));
+ }
FieldTable ft;
ft.setInt(QPID_FED_LINK,1);
diff --git a/qpid/cpp/src/qpid/broker/ConnectionState.h b/qpid/cpp/src/qpid/broker/ConnectionState.h
index 9c31a931d8..fdd3c4ddc0 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionState.h
+++ b/qpid/cpp/src/qpid/broker/ConnectionState.h
@@ -46,6 +46,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable
framemax(65535),
heartbeat(0),
heartbeatmax(120),
+ userProxyAuth(false), // Can proxy msgs with non-matching auth ids when true (used by federation links & clustering)
federationLink(true),
clientSupportsThrottling(false),
clusterOrderOut(0)
@@ -67,8 +68,10 @@ class ConnectionState : public ConnectionToken, public management::Manageable
void setUrl(const std::string& _url) { url = _url; }
const std::string& getUrl() const { return url; }
- void setFederationLink(bool b) { federationLink = b; }
- bool isFederationLink() const { return federationLink; }
+ void setUserProxyAuth(const bool b) { userProxyAuth = b; }
+ bool isUserProxyAuth() const { return userProxyAuth || federationPeerTag.size() > 0; } // links can proxy msgs with non-matching auth ids
+ void setFederationLink(bool b) { federationLink = b; } // deprecated - use setFederationPeerTag() instead
+ bool isFederationLink() const { return federationPeerTag.size() > 0; }
void setFederationPeerTag(const std::string& tag) { federationPeerTag = std::string(tag); }
const std::string& getFederationPeerTag() const { return federationPeerTag; }
std::vector<Url>& getKnownHosts() { return knownHosts; }
@@ -105,6 +108,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable
uint16_t heartbeatmax;
std::string userId;
std::string url;
+ bool userProxyAuth;
bool federationLink;
std::string federationPeerTag;
std::vector<Url> knownHosts;
diff --git a/qpid/cpp/src/qpid/broker/Daemon.cpp b/qpid/cpp/src/qpid/broker/Daemon.cpp
index b30e5f18cb..c36538beb7 100644
--- a/qpid/cpp/src/qpid/broker/Daemon.cpp
+++ b/qpid/cpp/src/qpid/broker/Daemon.cpp
@@ -93,11 +93,10 @@ void Daemon::fork()
catch (const exception& e) {
QPID_LOG(critical, "Unexpected error: " << e.what());
uint16_t port = 0;
- int unused_ret; //Supress warning about ignoring return value.
- unused_ret = write(pipeFds[1], &port, sizeof(uint16_t));
+ (void) write(pipeFds[1], &port, sizeof(uint16_t));
std::string pipeFailureMessage = e.what();
- unused_ret = write ( pipeFds[1],
+ (void) write ( pipeFds[1],
pipeFailureMessage.c_str(),
strlen(pipeFailureMessage.c_str())
);
diff --git a/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp
index 64a12d918a..62cb3fc116 100644
--- a/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp
+++ b/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,12 +27,12 @@ namespace broker {
ExpiryPolicy::~ExpiryPolicy() {}
-void ExpiryPolicy::willExpire(Message&) {}
-
bool ExpiryPolicy::hasExpired(Message& m) {
return m.getExpiration() < sys::AbsTime::now();
}
-void ExpiryPolicy::forget(Message&) {}
+sys::AbsTime ExpiryPolicy::getCurrentTime() {
+ return sys::AbsTime::now();
+}
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/ExpiryPolicy.h b/qpid/cpp/src/qpid/broker/ExpiryPolicy.h
index a723eb0aa8..2caf00ce00 100644
--- a/qpid/cpp/src/qpid/broker/ExpiryPolicy.h
+++ b/qpid/cpp/src/qpid/broker/ExpiryPolicy.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,6 +26,11 @@
#include "qpid/broker/BrokerImportExport.h"
namespace qpid {
+
+namespace sys {
+class AbsTime;
+}
+
namespace broker {
class Message;
@@ -37,9 +42,8 @@ class QPID_BROKER_CLASS_EXTERN ExpiryPolicy : public RefCounted
{
public:
QPID_BROKER_EXTERN virtual ~ExpiryPolicy();
- QPID_BROKER_EXTERN virtual void willExpire(Message&);
QPID_BROKER_EXTERN virtual bool hasExpired(Message&);
- QPID_BROKER_EXTERN virtual void forget(Message&);
+ QPID_BROKER_EXTERN virtual qpid::sys::AbsTime getCurrentTime();
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
index abcaa5f69d..4bda70d313 100644
--- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -112,9 +112,14 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co
{
Mutex::ScopedLock l(lock);
- Binding::shared_ptr binding (new Binding (bindingKey, queue, this, *args));
+ //NOTE: do not include the fed op/tags/origin in the
+ //arguments as when x-match is 'all' these would prevent
+ //matching (they are internally added properties
+ //controlling binding propagation but not relevant to
+ //actual routing)
+ Binding::shared_ptr binding (new Binding (bindingKey, queue, this, extra_args));
BoundKey bk(binding);
- if (bindings.add_unless(bk, MatchArgs(queue, args))) {
+ if (bindings.add_unless(bk, MatchArgs(queue, &extra_args))) {
binding->startManagement();
propagate = bk.fedBinding.addOrigin(queue->getName(), fedOrigin);
if (mgmtExchange != 0) {
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index 91861ade3f..9ab4379a69 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -117,7 +117,7 @@ void Link::startConnectionLH ()
// Set the state before calling connect. It is possible that connect
// will fail synchronously and call Link::closed before returning.
setStateLH(STATE_CONNECTING);
- broker->connect (host, port, transport,
+ broker->connect (host, boost::lexical_cast<std::string>(port), transport,
boost::bind (&Link::closed, this, _1, _2));
QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port);
} catch(std::exception& e) {
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index 4f64ae2db9..1d52984831 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -49,25 +49,21 @@ TransferAdapter Message::TRANSFER;
Message::Message(const framing::SequenceNumber& id) :
frames(id), persistenceId(0), redelivered(false), loaded(false),
- staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
+ staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
expiration(FAR_FUTURE), dequeueCallback(0),
inCallback(false), requiredCredit(0), isManagementMessage(false)
{}
Message::Message(const Message& original) :
PersistableMessage(), frames(original.frames), persistenceId(0), redelivered(false), loaded(false),
- staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
+ staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
expiration(original.expiration), dequeueCallback(0),
inCallback(false), requiredCredit(0)
{
setExpiryPolicy(original.expiryPolicy);
}
-Message::~Message()
-{
- if (expiryPolicy)
- expiryPolicy->forget(*this);
-}
+Message::~Message() {}
void Message::forcePersistent()
{
@@ -87,7 +83,7 @@ std::string Message::getRoutingKey() const
return getAdapter().getRoutingKey(frames);
}
-std::string Message::getExchangeName() const
+std::string Message::getExchangeName() const
{
return getAdapter().getExchange(frames);
}
@@ -96,7 +92,7 @@ const boost::shared_ptr<Exchange> Message::getExchange(ExchangeRegistry& registr
{
if (!exchange) {
exchange = registry.get(getExchangeName());
- }
+ }
return exchange;
}
@@ -196,7 +192,7 @@ void Message::decodeContent(framing::Buffer& buffer)
} else {
//adjust header flags
MarkLastSegment f;
- frames.map_if(f, TypeFilter<HEADER_BODY>());
+ frames.map_if(f, TypeFilter<HEADER_BODY>());
}
//mark content loaded
loaded = true;
@@ -248,7 +244,7 @@ void Message::destroy()
bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const
{
intrusive_ptr<const PersistableMessage> pmsg(this);
-
+
bool done = false;
string& data = frame.castBody<AMQContentBody>()->getData();
store->loadContent(queue, pmsg, data, offset, maxContentSize);
@@ -273,7 +269,7 @@ void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
bool morecontent = true;
for (uint64_t offset = 0; morecontent; offset += maxContentSize)
- {
+ {
AMQFrame frame((AMQContentBody()));
morecontent = getContentFrame(queue, frame, maxContentSize, offset);
out.handle(frame);
@@ -291,7 +287,7 @@ void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/)
{
sys::Mutex::ScopedLock l(lock);
Relay f(out);
- frames.map_if(f, TypeFilter<HEADER_BODY>());
+ frames.map_if(f, TypeFilter<HEADER_BODY>());
}
// TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over
@@ -321,7 +317,7 @@ bool Message::isContentLoaded() const
}
-namespace
+namespace
{
const std::string X_QPID_TRACE("x-qpid.trace");
}
@@ -358,13 +354,13 @@ void Message::addTraceId(const std::string& id)
trace += ",";
trace += id;
headers.setString(X_QPID_TRACE, trace);
- }
+ }
}
}
-void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
+void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
{
- DeliveryProperties* props = getProperties<DeliveryProperties>();
+ DeliveryProperties* props = getProperties<DeliveryProperties>();
if (props->getTtl()) {
// AMQP requires setting the expiration property to be posix
// time_t in seconds. TTL is in milliseconds
@@ -373,10 +369,14 @@ void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
time_t now = ::time(0);
props->setExpiration(now + (props->getTtl()/1000));
}
- // Use higher resolution time for the internal expiry calculation.
- Duration ttl(std::min(props->getTtl() * TIME_MSEC, (uint64_t) std::numeric_limits<int64_t>::max()));//Prevent overflow
- expiration = AbsTime(AbsTime::now(), ttl);
- setExpiryPolicy(e);
+ if (e) {
+ // Use higher resolution time for the internal expiry calculation.
+ // Prevent overflow as a signed int64_t
+ Duration ttl(std::min(props->getTtl() * TIME_MSEC,
+ (uint64_t) std::numeric_limits<int64_t>::max()));
+ expiration = AbsTime(e->getCurrentTime(), ttl);
+ setExpiryPolicy(e);
+ }
}
}
@@ -386,16 +386,17 @@ void Message::adjustTtl()
if (props->getTtl()) {
sys::Mutex::ScopedLock l(lock);
if (expiration < FAR_FUTURE) {
- sys::Duration d(sys::AbsTime::now(), getExpiration());
- props->setTtl(int64_t(d) > 0 ? int64_t(d)/1000000 : 1); // convert from ns to ms; set to 1 if expired
+ sys::AbsTime current(
+ expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now());
+ sys::Duration ttl(current, getExpiration());
+ // convert from ns to ms; set to 1 if expired
+ props->setTtl(int64_t(ttl) >= 1000000 ? int64_t(ttl)/1000000 : 1);
}
}
}
void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
expiryPolicy = e;
- if (expiryPolicy)
- expiryPolicy->willExpire(*this);
}
bool Message::hasExpired()
diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h
index 8c5d42bcde..38e4f831fd 100644
--- a/qpid/cpp/src/qpid/broker/Message.h
+++ b/qpid/cpp/src/qpid/broker/Message.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,12 +34,12 @@
#include <vector>
namespace qpid {
-
+
namespace framing {
class FieldTable;
class SequenceNumber;
}
-
+
namespace broker {
class ConnectionToken;
class Exchange;
@@ -51,11 +51,11 @@ class ExpiryPolicy;
class Message : public PersistableMessage {
public:
typedef boost::function<void (const boost::intrusive_ptr<Message>&)> MessageCallback;
-
+
QPID_BROKER_EXTERN Message(const framing::SequenceNumber& id = framing::SequenceNumber());
QPID_BROKER_EXTERN Message(const Message&);
QPID_BROKER_EXTERN ~Message();
-
+
uint64_t getPersistenceId() const { return persistenceId; }
void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
@@ -83,10 +83,11 @@ public:
void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e);
bool hasExpired();
sys::AbsTime getExpiration() const { return expiration; }
+ void setExpiration(sys::AbsTime exp) { expiration = exp; }
void adjustTtl();
- framing::FrameSet& getFrames() { return frames; }
- const framing::FrameSet& getFrames() const { return frames; }
+ framing::FrameSet& getFrames() { return frames; }
+ const framing::FrameSet& getFrames() const { return frames; }
template <class T> T* getProperties() {
qpid::framing::AMQHeaderBody* p = frames.getHeaders();
@@ -103,6 +104,11 @@ public:
return p->get<T>();
}
+ template <class T> void eraseProperties() {
+ qpid::framing::AMQHeaderBody* p = frames.getHeaders();
+ p->erase<T>();
+ }
+
template <class T> const T* getMethod() const {
return frames.as<T>();
}
@@ -135,7 +141,7 @@ public:
QPID_BROKER_EXTERN void decodeHeader(framing::Buffer& buffer);
QPID_BROKER_EXTERN void decodeContent(framing::Buffer& buffer);
-
+
void QPID_BROKER_EXTERN tryReleaseContent();
void releaseContent();
void releaseContent(MessageStore* s);//deprecated, use 'setStore(store); releaseContent();' instead
@@ -149,10 +155,10 @@ public:
bool isExcluded(const std::vector<std::string>& excludes) const;
void addTraceId(const std::string& id);
-
+
void forcePersistent();
bool isForcedPersistent();
-
+
/** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */
void setDequeueCompleteCallback(MessageCallback& cb);
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index cf538aaaa7..260a45d7e0 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -65,7 +65,7 @@ using std::mem_fun;
namespace _qmf = qmf::org::apache::qpid::broker;
-namespace
+namespace
{
const std::string qpidMaxSize("qpid.max_size");
const std::string qpidMaxCount("qpid.max_count");
@@ -87,16 +87,16 @@ const int ENQUEUE_ONLY=1;
const int ENQUEUE_AND_DEQUEUE=2;
}
-Queue::Queue(const string& _name, bool _autodelete,
+Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
const OwnershipToken* const _owner,
Manageable* parent,
Broker* b) :
- name(_name),
+ name(_name),
autodelete(_autodelete),
store(_store),
- owner(_owner),
+ owner(_owner),
consumerCount(0),
exclusive(0),
noLocal(false),
@@ -179,9 +179,9 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
if (policy.get()) policy->recoverEnqueued(msg);
push(msg, true);
- if (store){
+ if (store){
// setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure
- msg->addToSyncList(shared_from_this(), store);
+ msg->addToSyncList(shared_from_this(), store);
}
if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) {
@@ -206,13 +206,13 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){
void Queue::requeue(const QueuedMessage& msg){
assertClusterSafe();
QueueListeners::NotificationSet copy;
- {
+ {
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return;
messages->reinsert(msg);
listeners.populate(copy);
- // for persistLastNode - don't force a message twice to disk, but force it if no force before
+ // for persistLastNode - don't force a message twice to disk, but force it if no force before
if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) {
msg.payload->forcePersistent();
if (msg.payload->isForcedPersistent() ){
@@ -224,7 +224,7 @@ void Queue::requeue(const QueuedMessage& msg){
copy.notify();
}
-bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
+bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
{
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
@@ -268,7 +268,7 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
case NO_MESSAGES:
default:
return false;
- }
+ }
} else {
return browseNextMessage(m, c);
}
@@ -278,7 +278,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
{
while (true) {
Mutex::ScopedLock locker(messageLock);
- if (messages->empty()) {
+ if (messages->empty()) {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
listeners.addListener(c);
return NO_MESSAGES;
@@ -291,7 +291,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
}
if (c->filter(msg.payload)) {
- if (c->accept(msg.payload)) {
+ if (c->accept(msg.payload)) {
m = msg;
pop();
return CONSUMED;
@@ -304,7 +304,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
//consumer will never want this message
QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
return CANT_CONSUME;
- }
+ }
}
}
}
@@ -358,7 +358,7 @@ bool Queue::dispatch(Consumer::shared_ptr c)
}
}
-// Find the next message
+// Find the next message
bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
Mutex::ScopedLock locker(messageLock);
if (messages->next(c->position, msg)) {
@@ -426,19 +426,25 @@ bool collect_if_expired(std::deque<QueuedMessage>& expired, QueuedMessage& messa
}
}
-void Queue::purgeExpired()
+/**
+ *@param lapse: time since the last purgeExpired
+ */
+void Queue::purgeExpired(qpid::sys::Duration lapse)
{
//As expired messages are discarded during dequeue also, only
//bother explicitly expiring if the rate of dequeues since last
- //attempt is less than one per second.
-
- if (dequeueTracker.sampleRatePerSecond() < 1) {
+ //attempt is less than one per second.
+ int count = dequeueSincePurge.get();
+ dequeueSincePurge -= count;
+ int seconds = int64_t(lapse)/qpid::sys::TIME_SEC;
+ if (seconds == 0 || count / seconds < 1) {
std::deque<QueuedMessage> expired;
{
Mutex::ScopedLock locker(messageLock);
- messages->removeIf(boost::bind(&collect_if_expired, expired, _1));
+ messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1));
}
- for_each(expired.begin(), expired.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+ for_each(expired.begin(), expired.end(),
+ boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
}
@@ -457,7 +463,7 @@ void Queue::purgeExpired()
uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest)
{
Mutex::ScopedLock locker(messageLock);
- uint32_t purge_count = purge_request; // only comes into play if >0
+ uint32_t purge_count = purge_request; // only comes into play if >0
std::deque<DeliverableMessage> rerouteQueue;
uint32_t count = 0;
@@ -490,7 +496,7 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange>
uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
Mutex::ScopedLock locker(messageLock);
- uint32_t move_count = qty; // only comes into play if qty >0
+ uint32_t move_count = qty; // only comes into play if qty >0
uint32_t count = 0; // count how many were moved for returning
while((!qty || move_count--) && !messages->empty()) {
@@ -508,7 +514,7 @@ void Queue::pop()
{
assertClusterSafe();
messages->pop();
- ++dequeueTracker;
+ ++dequeueSincePurge;
}
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
@@ -517,10 +523,10 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
QueuedMessage removed;
bool dequeueRequired = false;
{
- Mutex::ScopedLock locker(messageLock);
+ Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
-
+
dequeueRequired = messages->push(qm, removed);
listeners.populate(copy);
enqueued(qm);
@@ -599,7 +605,7 @@ void Queue::setLastNodeFailure()
}
-// return true if store exists,
+// return true if store exists,
bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck)
{
ScopedUse u(barrier);
@@ -619,7 +625,7 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg
if (inLastNodeFailure && persistLastNode){
msg->forcePersistent();
}
-
+
if (traceId.size()) {
//copy on write: take deep copy of message before modifying it
//as the frames may already be available for delivery on other
@@ -649,9 +655,10 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg
void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
{
Mutex::ScopedLock locker(messageLock);
- if (policy.get()) policy->enqueueAborted(msg);
+ if (policy.get()) policy->enqueueAborted(msg);
}
+
/**
* Returns a null pointer if the dequeue completed, otherwise the dequeue will complete
* asynchronously, and a pointer to a DequeueCompletion object is returned.
@@ -666,7 +673,7 @@ Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return empty;
- if (!ctxt) {
+ if (!ctxt) {
dequeued(msg);
}
}
@@ -693,7 +700,7 @@ Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
void Queue::dequeueCommitted(const QueuedMessage& msg)
{
Mutex::ScopedLock locker(messageLock);
- dequeued(msg);
+ dequeued(msg);
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
@@ -748,8 +755,8 @@ int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::stri
return v->get<int>();
} else if (v->convertsTo<std::string>()){
std::string s = v->get<std::string>();
- try {
- return boost::lexical_cast<int>(s);
+ try {
+ return boost::lexical_cast<int>(s);
} catch(const boost::bad_lexical_cast&) {
QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s);
return 0;
@@ -773,7 +780,7 @@ void Queue::configureImpl(const FieldTable& _settings)
broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY);
}
- if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK &&
+ if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK &&
(!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync())) )) {
if ( NullMessageStore::isNullStore(store)) {
QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName());
@@ -811,7 +818,7 @@ void Queue::configureImpl(const FieldTable& _settings)
QPID_LOG(debug, "Configured queue " << getName() << " as priority queue.");
}
}
-
+
persistLastNode= _settings.get(qpidPersistLastNode);
if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName());
@@ -820,15 +827,15 @@ void Queue::configureImpl(const FieldTable& _settings)
if (excludeList.size()) {
split(traceExclude, excludeList, ", ");
}
- QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
+ QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
<< "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements");
FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers);
if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout);
- if (autoDeleteTimeout)
- QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout);
+ if (autoDeleteTimeout)
+ QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout);
if (mgmtObject != 0) {
mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
@@ -892,9 +899,9 @@ const QueuePolicy* Queue::getPolicy()
return policy.get();
}
-uint64_t Queue::getPersistenceId() const
-{
- return persistenceId;
+uint64_t Queue::getPersistenceId() const
+{
+ return persistenceId;
}
void Queue::setPersistenceId(uint64_t _persistenceId) const
@@ -908,11 +915,11 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const
persistenceId = _persistenceId;
}
-void Queue::encode(Buffer& buffer) const
+void Queue::encode(Buffer& buffer) const
{
buffer.putShortString(name);
buffer.put(settings);
- if (policy.get()) {
+ if (policy.get()) {
buffer.put(*policy);
}
buffer.putShortString(alternateExchange.get() ? alternateExchange->getName() : std::string(""));
@@ -965,7 +972,7 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange()
void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue)
{
- if (broker.getQueues().destroyIf(queue->getName(),
+ if (broker.getQueues().destroyIf(queue->getName(),
boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
QPID_LOG(debug, "Auto-deleting " << queue->getName());
queue->destroyed();
@@ -977,7 +984,7 @@ struct AutoDeleteTask : qpid::sys::TimerTask
Broker& broker;
Queue::shared_ptr queue;
- AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
+ AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
: qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {}
void fire()
@@ -995,27 +1002,27 @@ void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
if (queue->autoDeleteTimeout && queue->canAutoDelete()) {
AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC));
queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time));
- broker.getClusterTimer().add(queue->autoDeleteTask);
+ broker.getClusterTimer().add(queue->autoDeleteTask);
QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated");
} else {
tryAutoDeleteImpl(broker, queue);
}
}
-bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
-{
+bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
+{
Mutex::ScopedLock locker(ownershipLock);
- return o == owner;
+ return o == owner;
}
-void Queue::releaseExclusiveOwnership()
-{
+void Queue::releaseExclusiveOwnership()
+{
Mutex::ScopedLock locker(ownershipLock);
- owner = 0;
+ owner = 0;
}
-bool Queue::setExclusiveOwner(const OwnershipToken* const o)
-{
+bool Queue::setExclusiveOwner(const OwnershipToken* const o)
+{
//reset auto deletion timer if necessary
if (autoDeleteTimeout && autoDeleteTask) {
autoDeleteTask->cancel();
@@ -1024,20 +1031,20 @@ bool Queue::setExclusiveOwner(const OwnershipToken* const o)
if (owner) {
return false;
} else {
- owner = o;
+ owner = o;
return true;
}
}
-bool Queue::hasExclusiveOwner() const
-{
+bool Queue::hasExclusiveOwner() const
+{
Mutex::ScopedLock locker(ownershipLock);
- return owner != 0;
+ return owner != 0;
}
-bool Queue::hasExclusiveConsumer() const
-{
- return exclusive;
+bool Queue::hasExclusiveConsumer() const
+{
+ return exclusive;
}
void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
@@ -1206,6 +1213,10 @@ const Broker* Queue::getBroker()
return broker;
}
+void Queue::setDequeueSincePurge(uint32_t value) {
+ dequeueSincePurge = value;
+}
+
/** invoked from the store thread when the asynchronous dequeueing of the
* message has completed. */
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 86c184676f..63f2397b77 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -32,9 +32,9 @@
#include "qpid/broker/QueueBindings.h"
#include "qpid/broker/QueueListeners.h"
#include "qpid/broker/QueueObserver.h"
-#include "qpid/broker/RateTracker.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Timer.h"
#include "qpid/management/Manageable.h"
@@ -75,13 +75,13 @@ class Queue : public boost::enable_shared_from_this<Queue>,
{
Queue& parent;
uint count;
-
+
UsageBarrier(Queue&);
bool acquire();
void release();
void destroy();
};
-
+
struct ScopedUse
{
UsageBarrier& barrier;
@@ -89,7 +89,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
ScopedUse(UsageBarrier& b) : barrier(b), acquired(barrier.acquire()) {}
~ScopedUse() { if (acquired) barrier.release(); }
};
-
+
typedef std::set< boost::shared_ptr<QueueObserver> > Observers;
enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
@@ -120,7 +120,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
boost::shared_ptr<Exchange> alternateExchange;
framing::SequenceNumber sequence;
qmf::org::apache::qpid::broker::Queue* mgmtObject;
- RateTracker dequeueTracker;
+ sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge.
int eventMode;
Observers observers;
bool insertSeqNo;
@@ -147,7 +147,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
void dequeued(const QueuedMessage& msg);
void pop();
void popAndDequeue();
- QueuedMessage getFront();
+
void forcePersistent(QueuedMessage& msg);
int getEventMode();
void configureImpl(const qpid::framing::FieldTable& settings);
@@ -185,8 +185,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
typedef std::vector<shared_ptr> vector;
QPID_BROKER_EXTERN Queue(const std::string& name,
- bool autodelete = false,
- MessageStore* const store = 0,
+ bool autodelete = false,
+ MessageStore* const store = 0,
const OwnershipToken* const owner = 0,
management::Manageable* parent = 0,
Broker* broker = 0);
@@ -246,11 +246,11 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bool exclusive = false);
QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c);
- uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages
- QPID_BROKER_EXTERN void purgeExpired();
+ uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages
+ QPID_BROKER_EXTERN void purgeExpired(sys::Duration);
//move qty # of messages to destination Queue destq
- uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
+ uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
QPID_BROKER_EXTERN uint32_t getMessageCount() const;
QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const;
@@ -313,8 +313,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
* Inform queue of messages that were enqueued, have since
* been acquired but not yet accepted or released (and
* thus are still logically on the queue) - used in
- * clustered broker.
- */
+ * clustered broker.
+ */
void updateEnqueued(const QueuedMessage& msg);
/**
@@ -325,9 +325,9 @@ class Queue : public boost::enable_shared_from_this<Queue>,
* accepted it).
*/
bool isEnqueued(const QueuedMessage& msg);
-
+
/**
- * Gets the next available message
+ * Gets the next available message
*/
QPID_BROKER_EXTERN QueuedMessage get();
@@ -408,6 +408,9 @@ class Queue : public boost::enable_shared_from_this<Queue>,
const Broker* getBroker();
+ uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
+ void setDequeueSincePurge(uint32_t value);
+
private:
std::map<PersistableMessage *, boost::intrusive_ptr<DequeueCompletion> > pendingDequeueCompletions;
};
diff --git a/qpid/cpp/src/qpid/broker/QueueCleaner.cpp b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
index 3499ea8a4d..838bc28be8 100644
--- a/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,7 +27,7 @@
namespace qpid {
namespace broker {
-QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer& t) : queues(q), timer(t) {}
+QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer* t) : queues(q), timer(t) {}
QueueCleaner::~QueueCleaner()
{
@@ -36,10 +36,16 @@ QueueCleaner::~QueueCleaner()
void QueueCleaner::start(qpid::sys::Duration p)
{
+ period = p;
task = new Task(*this, p);
- timer.add(task);
+ timer->add(task);
}
+void QueueCleaner::setTimer(qpid::sys::Timer* timer) {
+ this->timer = timer;
+}
+
+
QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : sys::TimerTask(d,"QueueCleaner"), parent(p) {}
void QueueCleaner::Task::fire()
@@ -65,9 +71,9 @@ void QueueCleaner::fired()
std::vector<Queue::shared_ptr> copy;
CollectQueues collect(&copy);
queues.eachQueue(collect);
- std::for_each(copy.begin(), copy.end(), boost::bind(&Queue::purgeExpired, _1));
+ std::for_each(copy.begin(), copy.end(), boost::bind(&Queue::purgeExpired, _1, period));
task->setupNextFire();
- timer.add(task);
+ timer->add(task);
}
diff --git a/qpid/cpp/src/qpid/broker/QueueCleaner.h b/qpid/cpp/src/qpid/broker/QueueCleaner.h
index 11c2d180ac..ffebfe3e1b 100644
--- a/qpid/cpp/src/qpid/broker/QueueCleaner.h
+++ b/qpid/cpp/src/qpid/broker/QueueCleaner.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,14 +35,15 @@ class QueueRegistry;
class QueueCleaner
{
public:
- QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer& timer);
+ QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer* timer);
QPID_BROKER_EXTERN ~QueueCleaner();
- QPID_BROKER_EXTERN void start(qpid::sys::Duration period);
+ QPID_BROKER_EXTERN void start(sys::Duration period);
+ QPID_BROKER_EXTERN void setTimer(sys::Timer* timer);
private:
class Task : public sys::TimerTask
{
public:
- Task(QueueCleaner& parent, qpid::sys::Duration duration);
+ Task(QueueCleaner& parent, sys::Duration duration);
void fire();
private:
QueueCleaner& parent;
@@ -50,7 +51,8 @@ class QueueCleaner
boost::intrusive_ptr<sys::TimerTask> task;
QueueRegistry& queues;
- sys::Timer& timer;
+ sys::Timer* timer;
+ sys::Duration period;
void fired();
};
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
index b2e2e54bdf..fcf8d089f9 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -167,7 +167,8 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg)
msg.payload->getIngressCompletion().startCompleter(); // don't complete until flow resumes
bool unique;
unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(msg.position, msg.payload)).second;
- assert(unique);
+ // Like this to avoid tripping up unused variable warning when NDEBUG set
+ if (!unique) assert(unique);
}
}
@@ -379,7 +380,8 @@ void QueueFlowLimit::setState(const qpid::framing::FieldTable& state)
QueuedMessage msg(queue->find(seq)); // fyi: msg.payload may be null if msg is delivered & unacked
bool unique;
unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(seq, msg.payload)).second;
- assert(unique);
+ // Like this to avoid tripping up unused variable warning when NDEBUG set
+ if (!unique) assert(unique);
}
}
}
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
index a93a6332fd..6ae0d53b1a 100644
--- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -117,19 +117,20 @@ void QueuePolicy::update(FieldTable& settings)
settings.setString(typeKey, type);
}
-uint32_t QueuePolicy::getCapacity(const FieldTable& settings, const std::string& key, uint32_t defaultValue)
+template <typename T>
+T getCapacity(const FieldTable& settings, const std::string& key, T defaultValue)
{
FieldTable::ValuePtr v = settings.get(key);
- int32_t result = 0;
+ T result = 0;
if (!v) return defaultValue;
if (v->getType() == 0x23) {
QPID_LOG(debug, "Value for " << key << " specified as float: " << v->get<float>());
} else if (v->getType() == 0x33) {
QPID_LOG(debug, "Value for " << key << " specified as double: " << v->get<double>());
- } else if (v->convertsTo<int>()) {
- result = v->get<int>();
+ } else if (v->convertsTo<T>()) {
+ result = v->get<T>();
QPID_LOG(debug, "Got integer value for " << key << ": " << result);
if (result >= 0) return result;
} else if (v->convertsTo<string>()) {
@@ -319,8 +320,8 @@ std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const qpid::framing::F
std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings)
{
- uint32_t maxCount = getCapacity(settings, maxCountKey, 0);
- uint32_t maxSize = getCapacity(settings, maxSizeKey, defaultMaxSize);
+ uint32_t maxCount = getCapacity<int32_t>(settings, maxCountKey, 0);
+ uint64_t maxSize = getCapacity<int64_t>(settings, maxSizeKey, defaultMaxSize);
if (maxCount || maxSize) {
return createQueuePolicy(name, maxCount, maxSize, getType(settings));
} else {
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.h b/qpid/cpp/src/qpid/broker/QueuePolicy.h
index 3cdd63784d..ec7f846704 100644
--- a/qpid/cpp/src/qpid/broker/QueuePolicy.h
+++ b/qpid/cpp/src/qpid/broker/QueuePolicy.h
@@ -43,8 +43,7 @@ class QueuePolicy
uint32_t count;
uint64_t size;
bool policyExceeded;
-
- static uint32_t getCapacity(const qpid::framing::FieldTable& settings, const std::string& key, uint32_t defaultValue);
+
protected:
uint64_t getCurrentQueueSize() const { return size; }
diff --git a/qpid/cpp/src/qpid/broker/RateTracker.cpp b/qpid/cpp/src/qpid/broker/RateTracker.cpp
deleted file mode 100644
index 048349b658..0000000000
--- a/qpid/cpp/src/qpid/broker/RateTracker.cpp
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/broker/RateTracker.h"
-
-using qpid::sys::AbsTime;
-using qpid::sys::Duration;
-using qpid::sys::TIME_SEC;
-
-namespace qpid {
-namespace broker {
-
-RateTracker::RateTracker() : currentCount(0), lastCount(0), lastTime(AbsTime::now()) {}
-
-RateTracker& RateTracker::operator++()
-{
- ++currentCount;
- return *this;
-}
-
-double RateTracker::sampleRatePerSecond()
-{
- int32_t increment = currentCount - lastCount;
- AbsTime now = AbsTime::now();
- Duration interval(lastTime, now);
- lastCount = currentCount;
- lastTime = now;
- //if sampling at higher frequency than supported, will just return the number of increments
- if (interval < TIME_SEC) return increment;
- else if (increment == 0) return 0;
- else return increment / (interval / TIME_SEC);
-}
-
-}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/RateTracker.h b/qpid/cpp/src/qpid/broker/RateTracker.h
deleted file mode 100644
index 0c20b37312..0000000000
--- a/qpid/cpp/src/qpid/broker/RateTracker.h
+++ /dev/null
@@ -1,57 +0,0 @@
-#ifndef QPID_BROKER_RATETRACKER_H
-#define QPID_BROKER_RATETRACKER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/Time.h"
-
-namespace qpid {
-namespace broker {
-
-/**
- * Simple rate tracker: represents some value that can be incremented,
- * then can periodcially sample the rate of increments.
- */
-class RateTracker
-{
- public:
- RateTracker();
- /**
- * Increments the count being tracked. Can be called concurrently
- * with other calls to this operator as well as with calls to
- * sampleRatePerSecond().
- */
- RateTracker& operator++();
- /**
- * Returns the rate of increments per second since last
- * called. Calls to this method should be serialised, but can be
- * called concurrently with the increment operator
- */
- double sampleRatePerSecond();
- private:
- volatile int32_t currentCount;
- int32_t lastCount;
- qpid::sys::AbsTime lastTime;
-};
-}} // namespace qpid::broker
-
-#endif /*!QPID_BROKER_RATETRACKER_H*/
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 2383978276..9b8c9b8d58 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -70,7 +70,7 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
deliveryAdapter(da),
tagGenerator("sgen"),
dtxSelected(false),
- authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()),
+ authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isUserProxyAuth()),
userID(getSession().getConnection().getUserId()),
userName(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))),
isDefaultRealm(userID.find('@') != std::string::npos && getSession().getBroker().getOptions().realm == userID.substr(userID.find('@')+1,userID.size())),
@@ -469,7 +469,6 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
/* verify the userid if specified: */
std::string id =
msg->hasProperties<MessageProperties>() ? msg->getProperties<MessageProperties>()->getUserId() : nullstring;
-
if (authMsg && !id.empty() && !(id == userID || (isDefaultRealm && id == userName)))
{
QPID_LOG(debug, "authorised user id : " << userID << " but user id in message declared as " << id);
diff --git a/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp b/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
index fd0e537192..676074a590 100644
--- a/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
+++ b/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
@@ -81,12 +81,11 @@ class SslProtocolFactory : public qpid::sys::ProtocolFactory {
SslProtocolFactory(const SslServerOptions&, int backlog, bool nodelay);
~SslProtocolFactory();
void accept(sys::Poller::shared_ptr, sys::ConnectionCodec::Factory*);
- void connect(sys::Poller::shared_ptr, const std::string& host, int16_t port,
+ void connect(sys::Poller::shared_ptr, const std::string& host, const std::string& port,
sys::ConnectionCodec::Factory*,
ConnectFailedCallback failed);
uint16_t getPort() const;
- std::string getHost() const;
bool supports(const std::string& capability);
private:
@@ -130,7 +129,7 @@ SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options,
int backlog,
bool nodelay)
: tcpNoDelay(nodelay),
- listeningPort(listener.listen(options.port, backlog)),
+ listeningPort(listener.listen("", boost::lexical_cast<std::string>(options.port), backlog)),
clientAuthSelected(options.clientAuth) {
SecInvalidateHandle(&credHandle);
@@ -237,10 +236,6 @@ uint16_t SslProtocolFactory::getPort() const {
return listeningPort; // Immutable no need for lock.
}
-std::string SslProtocolFactory::getHost() const {
- return listener.getSockname();
-}
-
void SslProtocolFactory::accept(sys::Poller::shared_ptr poller,
sys::ConnectionCodec::Factory* fact) {
acceptor.reset(
@@ -251,7 +246,7 @@ void SslProtocolFactory::accept(sys::Poller::shared_ptr poller,
void SslProtocolFactory::connect(sys::Poller::shared_ptr poller,
const std::string& host,
- int16_t port,
+ const std::string& port,
sys::ConnectionCodec::Factory* fact,
ConnectFailedCallback failed)
{
diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
index 40c004f166..4b7aa07065 100644
--- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -36,6 +36,7 @@
#include <boost/bind.hpp>
#include <boost/format.hpp>
+#include <boost/lexical_cast.hpp>
#include <boost/shared_ptr.hpp>
#include <limits>
@@ -258,16 +259,16 @@ void ConnectionImpl::open()
connector->setInputHandler(&handler);
connector->setShutdownHandler(this);
try {
- connector->connect(host, port);
-
+ std::string p = boost::lexical_cast<std::string>(port);
+ connector->connect(host, p);
+
} catch (const std::exception& e) {
QPID_LOG(debug, "Failed to connect to " << protocol << ":" << host << ":" << port << " " << e.what());
connector.reset();
throw;
}
connector->init();
- QPID_LOG(info, *this << " connected to " << protocol << ":" << host << ":" << port);
-
+
// Enable heartbeat if requested
uint16_t heartbeat = static_cast<ConnectionSettings&>(handler).heartbeat;
if (heartbeat) {
@@ -281,6 +282,7 @@ void ConnectionImpl::open()
// - in that case in connector.reset() above;
// - or when we are deleted
handler.waitForOpen();
+ QPID_LOG(info, *this << " connected to " << protocol << ":" << host << ":" << port);
// If the SASL layer has provided an "operational" userId for the connection,
// put it in the negotiated settings.
diff --git a/qpid/cpp/src/qpid/client/Connector.h b/qpid/cpp/src/qpid/client/Connector.h
index 586012f9d6..bc611ffe0d 100644
--- a/qpid/cpp/src/qpid/client/Connector.h
+++ b/qpid/cpp/src/qpid/client/Connector.h
@@ -61,7 +61,7 @@ class Connector : public framing::OutputHandler
static void registerFactory(const std::string& proto, Factory* connectorFactory);
virtual ~Connector() {};
- virtual void connect(const std::string& host, int port) = 0;
+ virtual void connect(const std::string& host, const std::string& port) = 0;
virtual void init() {};
virtual void close() = 0;
virtual void send(framing::AMQFrame& frame) = 0;
diff --git a/qpid/cpp/src/qpid/client/RdmaConnector.cpp b/qpid/cpp/src/qpid/client/RdmaConnector.cpp
index 6af607198c..664640f5e7 100644
--- a/qpid/cpp/src/qpid/client/RdmaConnector.cpp
+++ b/qpid/cpp/src/qpid/client/RdmaConnector.cpp
@@ -95,7 +95,7 @@ class RdmaConnector : public Connector, public sys::Codec
std::string identifier;
- void connect(const std::string& host, int port);
+ void connect(const std::string& host, const std::string& port);
void close();
void send(framing::AMQFrame& frame);
void abort() {} // TODO: need to fix this for heartbeat timeouts to work
@@ -173,7 +173,7 @@ RdmaConnector::~RdmaConnector() {
}
}
-void RdmaConnector::connect(const std::string& host, int port){
+void RdmaConnector::connect(const std::string& host, const std::string& port){
Mutex::ScopedLock l(dataConnectedLock);
assert(!dataConnected);
@@ -184,7 +184,7 @@ void RdmaConnector::connect(const std::string& host, int port){
boost::bind(&RdmaConnector::disconnected, this),
boost::bind(&RdmaConnector::rejected, this, poller, _1, _2));
- SocketAddress sa(host, boost::lexical_cast<std::string>(port));
+ SocketAddress sa(host, port);
acon->start(poller, sa);
}
diff --git a/qpid/cpp/src/qpid/client/SslConnector.cpp b/qpid/cpp/src/qpid/client/SslConnector.cpp
index 35c7e6bdf6..f121cfb1ab 100644
--- a/qpid/cpp/src/qpid/client/SslConnector.cpp
+++ b/qpid/cpp/src/qpid/client/SslConnector.cpp
@@ -114,7 +114,7 @@ class SslConnector : public Connector
std::string identifier;
- void connect(const std::string& host, int port);
+ void connect(const std::string& host, const std::string& port);
void init();
void close();
void send(framing::AMQFrame& frame);
@@ -190,7 +190,7 @@ SslConnector::~SslConnector() {
close();
}
-void SslConnector::connect(const std::string& host, int port){
+void SslConnector::connect(const std::string& host, const std::string& port){
Mutex::ScopedLock l(closedLock);
assert(closed);
try {
diff --git a/qpid/cpp/src/qpid/client/TCPConnector.cpp b/qpid/cpp/src/qpid/client/TCPConnector.cpp
index d90781b365..0070b24ec0 100644
--- a/qpid/cpp/src/qpid/client/TCPConnector.cpp
+++ b/qpid/cpp/src/qpid/client/TCPConnector.cpp
@@ -88,7 +88,7 @@ TCPConnector::~TCPConnector() {
close();
}
-void TCPConnector::connect(const std::string& host, int port) {
+void TCPConnector::connect(const std::string& host, const std::string& port) {
Mutex::ScopedLock l(lock);
assert(closed);
connector = AsynchConnector::create(
@@ -121,7 +121,7 @@ void TCPConnector::start(sys::AsynchIO* aio_) {
aio->queueReadBuffer(new Buff(maxFrameSize));
}
- identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
+ identifier = str(format("[%1%]") % socket.getFullAddress());
}
void TCPConnector::initAmqp() {
diff --git a/qpid/cpp/src/qpid/client/TCPConnector.h b/qpid/cpp/src/qpid/client/TCPConnector.h
index c756469182..eb3f696013 100644
--- a/qpid/cpp/src/qpid/client/TCPConnector.h
+++ b/qpid/cpp/src/qpid/client/TCPConnector.h
@@ -98,7 +98,7 @@ class TCPConnector : public Connector, public sys::Codec
protected:
virtual ~TCPConnector();
- void connect(const std::string& host, int port);
+ void connect(const std::string& host, const std::string& port);
void start(sys::AsynchIO* aio_);
void initAmqp();
virtual void connectFailed(const std::string& msg);
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
index bfb20118b5..e8d250de0f 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
@@ -30,12 +30,23 @@ void AcceptTracker::State::accept()
unaccepted.clear();
}
-void AcceptTracker::State::accept(qpid::framing::SequenceNumber id)
+SequenceSet AcceptTracker::State::accept(qpid::framing::SequenceNumber id, bool cumulative)
{
- if (unaccepted.contains(id)) {
- unaccepted.remove(id);
- unconfirmed.add(id);
+ SequenceSet accepting;
+ if (cumulative) {
+ for (SequenceSet::iterator i = unaccepted.begin(); i != unaccepted.end() && *i <= id; ++i) {
+ accepting.add(*i);
+ }
+ unconfirmed.add(accepting);
+ unaccepted.remove(accepting);
+ } else {
+ if (unaccepted.contains(id)) {
+ unaccepted.remove(id);
+ unconfirmed.add(id);
+ accepting.add(id);
+ }
}
+ return accepting;
}
void AcceptTracker::State::release()
@@ -71,16 +82,15 @@ void AcceptTracker::accept(qpid::client::AsyncSession& session)
aggregateState.accept();
}
-void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session)
+void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session, bool cumulative)
{
for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
- i->second.accept(id);
+ i->second.accept(id, cumulative);
}
Record record;
- record.accepted.add(id);
+ record.accepted = aggregateState.accept(id, cumulative);
record.status = session.messageAccept(record.accepted);
pending.push_back(record);
- aggregateState.accept(id);
}
void AcceptTracker::release(qpid::client::AsyncSession& session)
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h
index 87890e41cc..9e801e8147 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h
@@ -42,7 +42,7 @@ class AcceptTracker
public:
void delivered(const std::string& destination, const qpid::framing::SequenceNumber& id);
void accept(qpid::client::AsyncSession&);
- void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&);
+ void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&, bool cumulative);
void release(qpid::client::AsyncSession&);
uint32_t acceptsPending();
uint32_t acceptsPending(const std::string& destination);
@@ -62,7 +62,7 @@ class AcceptTracker
qpid::framing::SequenceSet unconfirmed;
void accept();
- void accept(qpid::framing::SequenceNumber);
+ qpid::framing::SequenceSet accept(qpid::framing::SequenceNumber, bool cumulative);
void release();
uint32_t acceptsPending();
void completed(qpid::framing::SequenceSet&);
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
index f1295a3b66..9cf5f31290 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -233,6 +233,8 @@ class Subscription : public Exchange, public MessageSource
const bool reliable;
const bool durable;
const std::string actualType;
+ const bool exclusiveQueue;
+ const bool exclusiveSubscription;
FieldTable queueOptions;
FieldTable subscriptionOptions;
Bindings bindings;
@@ -307,6 +309,7 @@ struct Opt
Opt& operator/(const std::string& name);
operator bool() const;
std::string str() const;
+ bool asBool(bool defaultValue) const;
const Variant::List& asList() const;
void collect(qpid::framing::FieldTable& args) const;
@@ -338,6 +341,12 @@ Opt::operator bool() const
return value && !value->isVoid() && value->asBool();
}
+bool Opt::asBool(bool defaultValue) const
+{
+ if (value) return value->asBool();
+ else return defaultValue;
+}
+
std::string Opt::str() const
{
if (value) return value->asString();
@@ -490,7 +499,9 @@ Subscription::Subscription(const Address& address, const std::string& type)
queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())),
reliable(AddressResolution::is_reliable(address)),
durable(Opt(address)/LINK/DURABLE),
- actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type)
+ actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type),
+ exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)),
+ exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue))
{
(Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions);
(Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions);
@@ -550,7 +561,7 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str
checkAssert(session, FOR_RECEIVER);
//create subscription queue:
- session.queueDeclare(arg::queue=queue, arg::exclusive=true,
+ session.queueDeclare(arg::queue=queue, arg::exclusive=exclusiveQueue,
arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions);
//'default' binding:
bindings.bind(session);
@@ -559,15 +570,15 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str
linkBindings.bind(session);
//subscribe to subscription queue:
AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE;
- session.messageSubscribe(arg::queue=queue, arg::destination=destination,
- arg::exclusive=true, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
+ session.messageSubscribe(arg::queue=queue, arg::destination=destination,
+ arg::exclusive=exclusiveSubscription, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
}
void Subscription::cancel(qpid::client::AsyncSession& session, const std::string& destination)
{
linkBindings.unbind(session);
session.messageCancel(destination);
- session.queueDelete(arg::queue=queue);
+ if (reliable) session.queueDelete(arg::queue=queue, arg::ifUnused=true);
checkDelete(session, FOR_RECEIVER);
}
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index a87a8dea67..473f5ecd1c 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -40,11 +40,15 @@ using qpid::types::VAR_LIST;
using qpid::framing::Uuid;
namespace {
-void convert(const Variant::List& from, std::vector<std::string>& to)
+void merge(const std::string& value, std::vector<std::string>& list) {
+ if (std::find(list.begin(), list.end(), value) == list.end())
+ list.push_back(value);
+}
+
+void merge(const Variant::List& from, std::vector<std::string>& to)
{
- for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) {
- to.push_back(i->asString());
- }
+ for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i)
+ merge(i->asString(), to);
}
std::string asString(const std::vector<std::string>& v) {
@@ -93,9 +97,9 @@ void ConnectionImpl::setOption(const std::string& name, const Variant& value)
maxReconnectInterval = value;
} else if (name == "reconnect-urls" || name == "reconnect_urls") {
if (value.getType() == VAR_LIST) {
- convert(value.asList(), urls);
+ merge(value.asList(), urls);
} else {
- urls.push_back(value.asString());
+ merge(value.asString(), urls);
}
} else if (name == "username") {
settings.username = value.asString();
@@ -198,7 +202,7 @@ qpid::messaging::Session ConnectionImpl::newSession(bool transactional, const st
sessions[name] = impl;
break;
} catch (const qpid::TransportFailure&) {
- open();
+ reopen();
} catch (const qpid::SessionException& e) {
throw qpid::messaging::SessionError(e.what());
} catch (const std::exception& e) {
@@ -219,6 +223,15 @@ void ConnectionImpl::open()
catch (const qpid::Exception& e) { throw messaging::ConnectionError(e.what()); }
}
+void ConnectionImpl::reopen()
+{
+ if (!reconnect) {
+ throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
+ }
+ open();
+}
+
+
bool expired(const qpid::sys::AbsTime& start, int64_t timeout)
{
if (timeout == 0) return true;
@@ -246,14 +259,9 @@ void ConnectionImpl::connect(const qpid::sys::AbsTime& started)
}
void ConnectionImpl::mergeUrls(const std::vector<Url>& more, const sys::Mutex::ScopedLock&) {
- if (more.size()) {
- for (size_t i = 0; i < more.size(); ++i) {
- if (std::find(urls.begin(), urls.end(), more[i].str()) == urls.end()) {
- urls.push_back(more[i].str());
- }
- }
- QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls));
- }
+ for (std::vector<Url>::const_iterator i = more.begin(); i != more.end(); ++i)
+ merge(i->str(), urls);
+ QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls));
}
bool ConnectionImpl::tryConnect()
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
index 09f2038312..9e31238bc1 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
@@ -43,6 +43,7 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl
public:
ConnectionImpl(const std::string& url, const qpid::types::Variant::Map& options);
void open();
+ void reopen();
bool isOpen() const;
void close();
qpid::messaging::Session newSession(bool transactional, const std::string& name);
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index 71e89bdba1..5cf20c92eb 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -144,10 +144,10 @@ void IncomingMessages::accept()
acceptTracker.accept(session);
}
-void IncomingMessages::accept(qpid::framing::SequenceNumber id)
+void IncomingMessages::accept(qpid::framing::SequenceNumber id, bool cumulative)
{
sys::Mutex::ScopedLock l(lock);
- acceptTracker.accept(id, session);
+ acceptTracker.accept(id, session, cumulative);
}
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
index f6a291bc68..9053b70312 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
@@ -72,7 +72,7 @@ class IncomingMessages
bool get(Handler& handler, qpid::sys::Duration timeout);
bool getNextDestination(std::string& destination, qpid::sys::Duration timeout);
void accept();
- void accept(qpid::framing::SequenceNumber id);
+ void accept(qpid::framing::SequenceNumber id, bool cumulative);
void releaseAll();
void releasePending(const std::string& destination);
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index 75a71997fd..787be7de2a 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -112,13 +112,14 @@ void SessionImpl::release(qpid::messaging::Message& m)
execute1<Release>(m);
}
-void SessionImpl::acknowledge(qpid::messaging::Message& m)
+void SessionImpl::acknowledge(qpid::messaging::Message& m, bool cumulative)
{
//Should probably throw an exception on failure here, or indicate
//it through a return type at least. Failure means that the
//message may be redelivered; i.e. the application cannot delete
//any state necessary for preventing reprocessing of the message
- execute1<Acknowledge1>(m);
+ Acknowledge2 ack(*this, m, cumulative);
+ execute(ack);
}
void SessionImpl::close()
@@ -467,10 +468,10 @@ void SessionImpl::acknowledgeImpl()
if (!transactional) incoming.accept();
}
-void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m)
+void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m, bool cumulative)
{
ScopedLock l(lock);
- if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId());
+ if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId(), cumulative);
}
void SessionImpl::rejectImpl(qpid::messaging::Message& m)
@@ -509,7 +510,7 @@ void SessionImpl::senderCancelled(const std::string& name)
void SessionImpl::reconnect()
{
- connection->open();
+ connection->reopen();
}
bool SessionImpl::backoff()
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
index 2a2aa47df6..c7dea77d18 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
@@ -63,7 +63,7 @@ class SessionImpl : public qpid::messaging::SessionImpl
void acknowledge(bool sync);
void reject(qpid::messaging::Message&);
void release(qpid::messaging::Message&);
- void acknowledge(qpid::messaging::Message& msg);
+ void acknowledge(qpid::messaging::Message& msg, bool cumulative);
void close();
void sync(bool block);
qpid::messaging::Sender createSender(const qpid::messaging::Address& address);
@@ -139,7 +139,7 @@ class SessionImpl : public qpid::messaging::SessionImpl
void commitImpl();
void rollbackImpl();
void acknowledgeImpl();
- void acknowledgeImpl(qpid::messaging::Message&);
+ void acknowledgeImpl(qpid::messaging::Message&, bool cumulative);
void rejectImpl(qpid::messaging::Message&);
void releaseImpl(qpid::messaging::Message&);
void closeImpl();
@@ -204,12 +204,13 @@ class SessionImpl : public qpid::messaging::SessionImpl
void operator()() { impl.releaseImpl(message); }
};
- struct Acknowledge1 : Command
+ struct Acknowledge2 : Command
{
qpid::messaging::Message& message;
+ bool cumulative;
- Acknowledge1(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {}
- void operator()() { impl.acknowledgeImpl(message); }
+ Acknowledge2(SessionImpl& i, qpid::messaging::Message& m, bool c) : Command(i), message(m), cumulative(c) {}
+ void operator()() { impl.acknowledgeImpl(message, cumulative); }
};
struct CreateSender;
diff --git a/qpid/cpp/src/qpid/client/windows/SslConnector.cpp b/qpid/cpp/src/qpid/client/windows/SslConnector.cpp
index a33713e1a8..785c817928 100644
--- a/qpid/cpp/src/qpid/client/windows/SslConnector.cpp
+++ b/qpid/cpp/src/qpid/client/windows/SslConnector.cpp
@@ -77,7 +77,7 @@ public:
framing::ProtocolVersion pVersion,
const ConnectionSettings&,
ConnectionImpl*);
- virtual void connect(const std::string& host, int port);
+ virtual void connect(const std::string& host, const std::string& port);
virtual void connected(const Socket&);
unsigned int getSSF();
};
@@ -153,7 +153,7 @@ SslConnector::~SslConnector()
// Will this get reach via virtual method via boost::bind????
-void SslConnector::connect(const std::string& host, int port) {
+void SslConnector::connect(const std::string& host, const std::string& port) {
brokerHost = host;
TCPConnector::connect(host, port);
}
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 0daf0c7f5a..82ed8bf8c9 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -146,6 +146,7 @@
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/ClusterConfigChangeBody.h"
+#include "qpid/framing/ClusterClockBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionAbortBody.h"
#include "qpid/framing/ClusterRetractOfferBody.h"
@@ -198,7 +199,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 1097431;
+const uint32_t Cluster::CLUSTER_VERSION = 1128070;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -230,7 +231,6 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
cluster.updateOffer(member, updatee, l);
}
void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); }
- void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) {
cluster.errorCheck(member, type, frameSeq, l);
}
@@ -240,6 +240,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void deliverToQueue(const std::string& queue, const std::string& message) {
cluster.deliverToQueue(queue, message, l);
}
+ void clock(uint64_t time) { cluster.clock(time, l); }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
};
@@ -253,7 +254,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
self(cpg.self()),
clusterId(true),
mAgent(0),
- expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())),
+ expiryPolicy(new ExpiryPolicy(*this)),
mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1),
@@ -365,7 +366,8 @@ void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
assert(discarding);
pair<ConnectionMap::iterator, bool> ib
= connections.insert(ConnectionMap::value_type(c->getId(), c));
- assert(ib.second);
+ // Like this to avoid tripping up unused variable warning when NDEBUG set
+ if (!ib.second) assert(ib.second);
}
void Cluster::erase(const ConnectionId& id) {
@@ -667,6 +669,8 @@ void Cluster::initMapCompleted(Lock& l) {
else { // I can go ready.
discarding = false;
setReady(l);
+ // Must be called *before* memberUpdate so first update will be generated.
+ failoverExchange->setReady();
memberUpdate(l);
updateMgmtMembership(l);
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
@@ -719,6 +723,20 @@ void Cluster::configChange(const MemberId&,
updateMgmtMembership(l); // Update on every config change for consistency
}
+struct ClusterClockTask : public sys::TimerTask {
+ Cluster& cluster;
+ sys::Timer& timer;
+
+ ClusterClockTask(Cluster& cluster, sys::Timer& timer, uint16_t clockInterval)
+ : TimerTask(Duration(clockInterval * TIME_MSEC),"ClusterClock"), cluster(cluster), timer(timer) {}
+
+ void fire() {
+ cluster.sendClockUpdate();
+ setupNextFire();
+ timer.add(this);
+ }
+};
+
void Cluster::becomeElder(Lock&) {
if (elder) return; // We were already the elder.
// We are the oldest, reactive links if necessary
@@ -726,6 +744,8 @@ void Cluster::becomeElder(Lock&) {
elder = true;
broker.getLinks().setPassive(false);
timer->becomeElder();
+
+ clockTimer.add(new ClusterClockTask(*this, clockTimer, settings.clockInterval));
}
void Cluster::makeOffer(const MemberId& id, Lock& ) {
@@ -846,7 +866,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l)
if (updatee != self && url) {
QPID_LOG(debug, debugSnapshot());
if (mAgent) mAgent->clusterUpdate();
- // Updatee will call clusterUpdate when update completes
+ // Updatee will call clusterUpdate() via checkUpdateIn() when update completes
}
}
@@ -927,10 +947,11 @@ void Cluster::checkUpdateIn(Lock& l) {
if (!updateClosed) return; // Wait till update connection closes.
if (updatedMap) { // We're up to date
map = *updatedMap;
- failoverExchange->setUrls(getUrls(l));
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
memberUpdate(l);
+ // Must be called *after* memberUpdate() to avoid sending an extra update.
+ failoverExchange->setReady();
// NB: don't updateMgmtMembership() here as we are not in the deliver
// thread. It will be updated on delivery of the "ready" we just mcast.
broker.setClusterUpdatee(false);
@@ -1120,10 +1141,6 @@ void Cluster::setClusterId(const Uuid& uuid, Lock&) {
QPID_LOG(notice, *this << " cluster-uuid = " << clusterId);
}
-void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
- expiryPolicy->deliverExpire(id);
-}
-
void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq, Lock&) {
// If we see an errorCheck here (rather than in the ErrorCheck
// class) then we have processed succesfully past the point of the
@@ -1161,6 +1178,35 @@ void Cluster::deliverToQueue(const std::string& queue, const std::string& messag
q->deliver(msg);
}
+sys::AbsTime Cluster::getClusterTime() {
+ Mutex::ScopedLock l(lock);
+ return clusterTime;
+}
+
+// This method is called during update on the updatee to set the initial cluster time.
+void Cluster::clock(const uint64_t time) {
+ Mutex::ScopedLock l(lock);
+ clock(time, l);
+}
+
+// called when broadcast message received
+void Cluster::clock(const uint64_t time, Lock&) {
+ clusterTime = AbsTime(EPOCH, time);
+ AbsTime now = AbsTime::now();
+
+ if (!elder) {
+ clusterTimeOffset = Duration(now, clusterTime);
+ }
+}
+
+// called by elder timer to send clock broadcast
+void Cluster::sendClockUpdate() {
+ Mutex::ScopedLock l(lock);
+ int64_t nanosecondsSinceEpoch = Duration(EPOCH, now());
+ nanosecondsSinceEpoch += clusterTimeOffset;
+ mcast.mcastControl(ClusterClockBody(ProtocolVersion(), nanosecondsSinceEpoch), self);
+}
+
bool Cluster::deferDeliveryImpl(const std::string& queue,
const boost::intrusive_ptr<broker::Message>& msg)
{
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index 78d325cdf9..adb06b2783 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -63,6 +63,12 @@ class AMQBody;
struct Uuid;
}
+namespace sys {
+class Timer;
+class AbsTime;
+class Duration;
+}
+
namespace cluster {
class Connection;
@@ -135,6 +141,9 @@ class Cluster : private Cpg::Handler, public management::Manageable {
bool deferDeliveryImpl(const std::string& queue,
const boost::intrusive_ptr<broker::Message>& msg);
+ sys::AbsTime getClusterTime();
+ void sendClockUpdate();
+ void clock(const uint64_t time);
private:
typedef sys::Monitor::ScopedLock Lock;
@@ -180,12 +189,12 @@ class Cluster : private Cpg::Handler, public management::Manageable {
const std::string& left,
const std::string& joined,
Lock& l);
- void messageExpired(const MemberId&, uint64_t, Lock& l);
void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&);
void timerWakeup(const MemberId&, const std::string& name, Lock&);
void timerDrop(const MemberId&, const std::string& name, Lock&);
void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&);
void deliverToQueue(const std::string& queue, const std::string& message, Lock&);
+ void clock(const uint64_t time, Lock&);
// Helper functions
ConnectionPtr getConnection(const EventFrame&, Lock&);
@@ -296,6 +305,9 @@ class Cluster : private Cpg::Handler, public management::Manageable {
ErrorCheck error;
UpdateReceiver updateReceiver;
ClusterTimer* timer;
+ sys::Timer clockTimer;
+ sys::AbsTime clusterTime;
+ sys::Duration clusterTimeOffset;
friend std::ostream& operator<<(std::ostream&, const Cluster&);
friend struct ClusterDispatcher;
diff --git a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 2962daaa07..69ba095f16 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -72,6 +72,7 @@ struct ClusterOptions : public Options {
("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.")
#endif
("cluster-size", optValue(settings.size, "N"), "Wait for N cluster members before allowing clients to connect.")
+ ("cluster-clock-interval", optValue(settings.clockInterval,"N"), "How often to broadcast the current time to the cluster nodes, in milliseconds. A value between 5 and 1000 is recommended.")
("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: flow-control limit reads per connection. 0=no limit.")
;
}
diff --git a/qpid/cpp/src/qpid/cluster/ClusterSettings.h b/qpid/cpp/src/qpid/cluster/ClusterSettings.h
index 8e708aa139..2f7b5be20a 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterSettings.h
+++ b/qpid/cpp/src/qpid/cluster/ClusterSettings.h
@@ -35,8 +35,9 @@ struct ClusterSettings {
size_t readMax;
std::string username, password, mechanism;
size_t size;
+ uint16_t clockInterval;
- ClusterSettings() : quorum(false), readMax(10), size(1)
+ ClusterSettings() : quorum(false), readMax(10), size(1), clockInterval(10)
{}
Url getUrl(uint16_t port) const {
diff --git a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp b/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp
index f6e1c7a849..b4f7d00f38 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp
@@ -70,6 +70,7 @@ void ClusterTimer::add(intrusive_ptr<TimerTask> task)
if (i != map.end())
throw Exception(QPID_MSG("Task already exists with name " << task->getName()));
map[task->getName()] = task;
+
// Only the elder actually activates the task with the Timer base class.
if (cluster.isElder()) {
QPID_LOG(trace, "Elder activating cluster timer task " << task->getName());
@@ -112,6 +113,9 @@ void ClusterTimer::deliverWakeup(const std::string& name) {
else {
intrusive_ptr<TimerTask> t = i->second;
map.erase(i);
+ // Move the nextFireTime so readyToFire() is true. This is to ensure we
+ // don't get an error if the fired task calls setupNextFire()
+ t->setFired();
Timer::fire(t);
}
}
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index b9895290e9..030d6e34c1 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -322,10 +322,10 @@ size_t Connection::decode(const char* data, size_t size) {
while (localDecoder.decode(buf))
received(localDecoder.getFrame());
if (!wasOpen && connection->isOpen()) {
- // Connections marked as federation links are allowed to proxy
+ // Connections marked with setUserProxyAuth are allowed to proxy
// messages with user-ID that doesn't match the connection's
// authenticated ID. This is important for updates.
- connection->setFederationLink(isCatchUp());
+ connection->setUserProxyAuth(isCatchUp());
}
}
else { // Multicast local connections.
@@ -601,10 +601,6 @@ void Connection::queueObserverState(const std::string& qname, const std::string&
QPID_LOG(error, "Failed to find observer " << observerId << " state on queue " << qname << "; this will result in inconsistencies.");
}
-void Connection::expiryId(uint64_t id) {
- cluster.getExpiryPolicy().setId(id);
-}
-
std::ostream& operator<<(std::ostream& o, const Connection& c) {
const char* type="unknown";
if (c.isLocal()) type = "local";
@@ -724,5 +720,16 @@ void Connection::doCatchupIoCallbacks() {
if (catchUp) getBrokerConnection()->doIoCallbacks();
}
+
+void Connection::clock(uint64_t time) {
+ QPID_LOG(debug, "Cluster connection received time update");
+ cluster.clock(time);
+}
+
+void Connection::queueDequeueSincePurgeState(const std::string& qname, uint32_t dequeueSincePurge) {
+ boost::shared_ptr<broker::Queue> queue(findQueue(qname));
+ queue->setDequeueSincePurge(dequeueSincePurge);
+}
+
}} // Namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index a0da9efbb8..a9740f97f8 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -155,7 +155,6 @@ class Connection :
void queuePosition(const std::string&, const framing::SequenceNumber&);
void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count);
void queueObserverState(const std::string&, const std::string&, const framing::FieldTable&);
- void expiryId(uint64_t);
void txStart();
void txAccept(const framing::SequenceSet&);
@@ -192,6 +191,10 @@ class Connection :
void doCatchupIoCallbacks();
+ void clock(uint64_t time);
+
+ void queueDequeueSincePurgeState(const std::string&, uint32_t);
+
private:
struct NullFrameHandler : public framing::FrameHandler {
void handle(framing::AMQFrame&) {}
diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
index d9a7b0122a..0ef5c2a35d 100644
--- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
+++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
@@ -21,106 +21,21 @@
#include "qpid/broker/Message.h"
#include "qpid/cluster/ExpiryPolicy.h"
-#include "qpid/cluster/Multicaster.h"
-#include "qpid/framing/ClusterMessageExpiredBody.h"
+#include "qpid/cluster/Cluster.h"
#include "qpid/sys/Time.h"
-#include "qpid/sys/Timer.h"
#include "qpid/log/Statement.h"
namespace qpid {
namespace cluster {
-ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, sys::Timer& t)
- : expiryId(1), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {}
+ExpiryPolicy::ExpiryPolicy(Cluster& cluster) : cluster(cluster) {}
-struct ExpiryTask : public sys::TimerTask {
- ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when)
- : TimerTask(when,"ExpiryPolicy"), expiryPolicy(policy), expiryId(id) {}
- void fire() { expiryPolicy->sendExpire(expiryId); }
- boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
- const uint64_t expiryId;
-};
-
-// Called while receiving an update
-void ExpiryPolicy::setId(uint64_t id) {
- sys::Mutex::ScopedLock l(lock);
- expiryId = id;
-}
-
-// Called while giving an update
-uint64_t ExpiryPolicy::getId() const {
- sys::Mutex::ScopedLock l(lock);
- return expiryId;
-}
-
-// Called in enqueuing connection thread
-void ExpiryPolicy::willExpire(broker::Message& m) {
- uint64_t id;
- {
- // When messages are fanned out to multiple queues, update sends
- // them as independenty messages so we can have multiple messages
- // with the same expiry ID.
- //
- sys::Mutex::ScopedLock l(lock);
- id = expiryId++;
- if (!id) { // This is an update of an already-expired message.
- m.setExpiryPolicy(expiredPolicy);
- }
- else {
- assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end());
- // If this is an update, the id may already exist
- unexpiredById.insert(IdMessageMap::value_type(id, &m));
- unexpiredByMessage[&m] = id;
- }
- }
- timer.add(new ExpiryTask(this, id, m.getExpiration()));
-}
-
-// Called in dequeueing connection thread
-void ExpiryPolicy::forget(broker::Message& m) {
- sys::Mutex::ScopedLock l(lock);
- MessageIdMap::iterator i = unexpiredByMessage.find(&m);
- assert(i != unexpiredByMessage.end());
- unexpiredById.erase(i->second);
- unexpiredByMessage.erase(i);
-}
-
-// Called in dequeueing connection or cleanup thread.
bool ExpiryPolicy::hasExpired(broker::Message& m) {
- sys::Mutex::ScopedLock l(lock);
- return unexpiredByMessage.find(&m) == unexpiredByMessage.end();
-}
-
-// Called in timer thread
-void ExpiryPolicy::sendExpire(uint64_t id) {
- {
- sys::Mutex::ScopedLock l(lock);
- // Don't multicast an expiry notice if message is already forgotten.
- if (unexpiredById.find(id) == unexpiredById.end()) return;
- }
- mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId);
+ return m.getExpiration() < cluster.getClusterTime();
}
-// Called in CPG deliver thread.
-void ExpiryPolicy::deliverExpire(uint64_t id) {
- sys::Mutex::ScopedLock l(lock);
- std::pair<IdMessageMap::iterator, IdMessageMap::iterator> expired = unexpiredById.equal_range(id);
- IdMessageMap::iterator i = expired.first;
- while (i != expired.second) {
- i->second->setExpiryPolicy(expiredPolicy); // hasExpired() == true;
- unexpiredByMessage.erase(i->second);
- unexpiredById.erase(i++);
- }
+sys::AbsTime ExpiryPolicy::getCurrentTime() {
+ return cluster.getClusterTime();
}
-// Called in update thread on the updater.
-boost::optional<uint64_t> ExpiryPolicy::getId(broker::Message& m) {
- sys::Mutex::ScopedLock l(lock);
- MessageIdMap::iterator i = unexpiredByMessage.find(&m);
- return i == unexpiredByMessage.end() ? boost::optional<uint64_t>() : i->second;
-}
-
-bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; }
-void ExpiryPolicy::Expired::willExpire(broker::Message&) { }
-
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
index 77a656aa68..d8ddbca8b3 100644
--- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
+++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
@@ -36,12 +36,8 @@ namespace broker {
class Message;
}
-namespace sys {
-class Timer;
-}
-
namespace cluster {
-class Multicaster;
+class Cluster;
/**
* Cluster expiry policy
@@ -49,43 +45,13 @@ class Multicaster;
class ExpiryPolicy : public broker::ExpiryPolicy
{
public:
- ExpiryPolicy(Multicaster&, const MemberId&, sys::Timer&);
+ ExpiryPolicy(Cluster& cluster);
- void willExpire(broker::Message&);
bool hasExpired(broker::Message&);
- void forget(broker::Message&);
-
- // Send expiration notice to cluster.
- void sendExpire(uint64_t);
+ qpid::sys::AbsTime getCurrentTime();
- // Cluster delivers expiry notice.
- void deliverExpire(uint64_t);
-
- void setId(uint64_t id);
- uint64_t getId() const;
-
- boost::optional<uint64_t> getId(broker::Message&);
-
private:
- typedef std::map<broker::Message*, uint64_t> MessageIdMap;
- // When messages are fanned out to multiple queues, update sends
- // them as independenty messages so we can have multiple messages
- // with the same expiry ID.
- typedef std::multimap<uint64_t, broker::Message*> IdMessageMap;
-
- struct Expired : public broker::ExpiryPolicy {
- bool hasExpired(broker::Message&);
- void willExpire(broker::Message&);
- };
-
- mutable sys::Mutex lock;
- MessageIdMap unexpiredByMessage;
- IdMessageMap unexpiredById;
- uint64_t expiryId;
- boost::intrusive_ptr<Expired> expiredPolicy;
- Multicaster& mcast;
- MemberId memberId;
- sys::Timer& timer;
+ Cluster& cluster;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp b/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp
index 84232dac1b..cfbe34a460 100644
--- a/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp
+++ b/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,8 +39,10 @@ using namespace broker;
using namespace framing;
const string FailoverExchange::typeName("amq.failover");
-
-FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b) : Exchange(typeName, parent, b ) {
+
+FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b)
+ : Exchange(typeName, parent, b ), ready(false)
+{
if (mgmtExchange != 0)
mgmtExchange->set_type(typeName);
}
@@ -53,16 +55,17 @@ void FailoverExchange::setUrls(const vector<Url>& u) {
void FailoverExchange::updateUrls(const vector<Url>& u) {
Lock l(lock);
urls=u;
- if (urls.empty()) return;
- std::for_each(queues.begin(), queues.end(),
- boost::bind(&FailoverExchange::sendUpdate, this, _1));
+ if (ready && !urls.empty()) {
+ std::for_each(queues.begin(), queues.end(),
+ boost::bind(&FailoverExchange::sendUpdate, this, _1));
+ }
}
string FailoverExchange::getType() const { return typeName; }
bool FailoverExchange::bind(Queue::shared_ptr queue, const string&, const framing::FieldTable*) {
Lock l(lock);
- sendUpdate(queue);
+ if (ready) sendUpdate(queue);
return queues.insert(queue).second;
}
@@ -84,7 +87,7 @@ void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) {
// Called with lock held.
if (urls.empty()) return;
framing::Array array(0x95);
- for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i)
+ for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i)
array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str())));
const ProtocolVersion v;
boost::intrusive_ptr<Message> msg(new Message);
@@ -96,9 +99,12 @@ void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) {
header.get<MessageProperties>(true)->getApplicationHeaders().setArray(typeName, array);
AMQFrame headerFrame(header);
headerFrame.setFirstSegment(false);
- msg->getFrames().append(headerFrame);
+ msg->getFrames().append(headerFrame);
DeliverableMessage(msg).deliverTo(queue);
}
+void FailoverExchange::setReady() {
+ ready = true;
+}
}} // namespace cluster
diff --git a/qpid/cpp/src/qpid/cluster/FailoverExchange.h b/qpid/cpp/src/qpid/cluster/FailoverExchange.h
index 2e1edfc0ae..c3e50c6929 100644
--- a/qpid/cpp/src/qpid/cluster/FailoverExchange.h
+++ b/qpid/cpp/src/qpid/cluster/FailoverExchange.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -46,6 +46,8 @@ class FailoverExchange : public broker::Exchange
void setUrls(const std::vector<Url>&);
/** Set the URLs and send an update.*/
void updateUrls(const std::vector<Url>&);
+ /** Flag the failover exchange as ready to generate updates (caught up) */
+ void setReady();
// Exchange overrides
std::string getType() const;
@@ -56,7 +58,7 @@ class FailoverExchange : public broker::Exchange
private:
void sendUpdate(const boost::shared_ptr<broker::Queue>&);
-
+
typedef sys::Mutex::ScopedLock Lock;
typedef std::vector<Url> Urls;
typedef std::set<boost::shared_ptr<broker::Queue> > Queues;
@@ -64,7 +66,7 @@ class FailoverExchange : public broker::Exchange
sys::Mutex lock;
Urls urls;
Queues queues;
-
+ bool ready;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index a15c14ff48..77448789db 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,9 +26,9 @@
#include "qpid/cluster/Decoder.h"
#include "qpid/cluster/ExpiryPolicy.h"
#include "qpid/cluster/UpdateDataExchange.h"
-#include "qpid/client/SessionBase_0_10Access.h"
-#include "qpid/client/ConnectionAccess.h"
-#include "qpid/client/SessionImpl.h"
+#include "qpid/client/SessionBase_0_10Access.h"
+#include "qpid/client/ConnectionAccess.h"
+#include "qpid/client/SessionImpl.h"
#include "qpid/client/ConnectionImpl.h"
#include "qpid/client/Future.h"
#include "qpid/broker/Broker.h"
@@ -83,11 +83,20 @@ using namespace framing;
namespace arg=client::arg;
using client::SessionBase_0_10Access;
+// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges.
+const std::string UpdateClient::UPDATE("x-qpid.cluster-update");
+// Name for header used to carry expiration information.
+const std::string UpdateClient::X_QPID_EXPIRATION = "x-qpid.expiration";
+// Headers used to flag headers/properties added by the UpdateClient so they can be
+// removed on the other side.
+const std::string UpdateClient::X_QPID_NO_MESSAGE_PROPS = "x-qpid.no-message-props";
+const std::string UpdateClient::X_QPID_NO_HEADERS = "x-qpid.no-headers";
+
std::ostream& operator<<(std::ostream& o, const UpdateClient& c) {
return o << "cluster(" << c.updaterId << " UPDATER)";
}
-struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler
+struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler
{
boost::shared_ptr<qpid::client::ConnectionImpl> connection;
@@ -121,7 +130,7 @@ void send(client::AsyncSession& s, const AMQBody& body) {
// TODO aconway 2008-09-24: optimization: update connections/sessions in parallel.
UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url,
- broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_,
+ broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_,
const Cluster::ConnectionVector& cons, Decoder& decoder_,
const boost::function<void()>& ok,
const boost::function<void(const std::exception&)>& fail,
@@ -135,9 +144,6 @@ UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, con
UpdateClient::~UpdateClient() {}
-// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges.
-const std::string UpdateClient::UPDATE("qpid.cluster-update");
-
void UpdateClient::run() {
try {
connection.open(updateeUrl, connectionSettings);
@@ -155,6 +161,13 @@ void UpdateClient::update() {
<< " at " << updateeUrl);
Broker& b = updaterBroker;
+ if(b.getExpiryPolicy()) {
+ QPID_LOG(debug, *this << "Updating updatee with cluster time");
+ qpid::sys::AbsTime clusterTime = b.getExpiryPolicy()->getCurrentTime();
+ int64_t time = qpid::sys::Duration(qpid::sys::EPOCH, clusterTime);
+ ClusterConnectionProxy(session).clock(time);
+ }
+
updateManagementSetupState();
b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
@@ -174,7 +187,6 @@ void UpdateClient::update() {
// Update queue listeners: must come after sessions so consumerNumbering is populated
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1));
- ClusterConnectionProxy(session).expiryId(expiry.getId());
updateLinks();
updateManagementAgent();
@@ -188,7 +200,7 @@ void UpdateClient::update() {
// NOTE: connection will be closed from the other end, don't close
// it here as that causes a race.
-
+
// TODO aconway 2010-03-15: This sleep avoids the race condition
// described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831.
// It allows the connection to fully close before destroying the
@@ -280,7 +292,7 @@ class MessageUpdater {
framing::SequenceNumber lastPos;
client::AsyncSession session;
ExpiryPolicy& expiry;
-
+
public:
MessageUpdater(const string& q, const client::AsyncSession s, ExpiryPolicy& expiry_) : queue(q), haveLastPos(false), session(s), expiry(expiry_) {
@@ -297,7 +309,6 @@ class MessageUpdater {
}
}
-
void updateQueuedMessage(const broker::QueuedMessage& message) {
// Send the queue position if necessary.
if (!haveLastPos || message.position - lastPos != 1) {
@@ -306,10 +317,23 @@ class MessageUpdater {
}
lastPos = message.position;
- // Send the expiry ID if necessary.
- if (message.payload->getProperties<DeliveryProperties>()->getTtl()) {
- boost::optional<uint64_t> expiryId = expiry.getId(*message.payload);
- ClusterConnectionProxy(session).expiryId(expiryId?*expiryId:0);
+ // if the ttl > 0, we need to send the calculated expiration time to the updatee
+ if (message.payload->getProperties<DeliveryProperties>()->getTtl() > 0) {
+ bool hadMessageProps =
+ message.payload->hasProperties<framing::MessageProperties>();
+ framing::MessageProperties* mprops =
+ message.payload->getProperties<framing::MessageProperties>();
+ bool hadApplicationHeaders = mprops->hasApplicationHeaders();
+ FieldTable& applicationHeaders = mprops->getApplicationHeaders();
+ applicationHeaders.setInt64(
+ UpdateClient::X_QPID_EXPIRATION,
+ sys::Duration(sys::EPOCH, message.payload->getExpiration()));
+ // If message properties or application headers didn't exist
+ // prior to us adding data, we want to remove them on the other side.
+ if (!hadMessageProps)
+ applicationHeaders.setInt(UpdateClient::X_QPID_NO_MESSAGE_PROPS, 0);
+ else if (!hadApplicationHeaders)
+ applicationHeaders.setInt(UpdateClient::X_QPID_NO_HEADERS, 0);
}
// We can't send a broker::Message via the normal client API,
@@ -322,7 +346,7 @@ class MessageUpdater {
framing::MessageTransferBody transfer(
*message.payload->getFrames().as<framing::MessageTransferBody>());
transfer.setDestination(UpdateClient::UPDATE);
-
+
sb.get()->send(transfer, message.payload->getFrames(),
!message.payload->isContentReleased());
if (message.payload->isContentReleased()){
@@ -330,12 +354,21 @@ class MessageUpdater {
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
bool morecontent = true;
for (uint64_t offset = 0; morecontent; offset += maxContentSize)
- {
+ {
AMQFrame frame((AMQContentBody()));
morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset);
sb.get()->sendRawFrame(frame);
}
}
+ // If the ttl > 0, we need to send the calculated expiration time to the updatee
+ // Careful not to alter the message as a side effect e.g. by adding
+ // an empty DeliveryProperties or setting TTL when it wasn't set before.
+ uint64_t ttl = 0;
+ if (message.payload->hasProperties<DeliveryProperties>()) {
+ DeliveryProperties* dprops =
+ message.payload->getProperties<DeliveryProperties>();
+ if (dprops->hasTtl()) ttl = dprops->getTtl();
+ };
}
void updateMessage(const boost::intrusive_ptr<broker::Message>& message) {
@@ -361,6 +394,8 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr<
if (qpid::broker::Fairshare::getState(q->getMessages(), priority, count)) {
ClusterConnectionProxy(s).queueFairshareState(q->getName(), priority, count);
}
+
+ ClusterConnectionProxy(s).queueDequeueSincePurgeState(q->getName(), q->getDequeueSincePurge());
}
void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {
@@ -393,7 +428,7 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda
QPID_LOG(debug, *this << " updating connection " << *updateConnection);
assert(updateConnection->getBrokerConnection());
broker::Connection& bc = *updateConnection->getBrokerConnection();
-
+
// Send the management ID first on the main connection.
std::string mgmtId = updateConnection->getBrokerConnection()->getMgmtId();
ClusterConnectionProxy(session).shadowPrepare(mgmtId);
@@ -430,7 +465,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
QPID_LOG(debug, *this << " updating session " << ss->getId());
- // Create a client session to update session state.
+ // Create a client session to update session state.
boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);
boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel());
simpl->disableAutoDetach();
@@ -456,12 +491,12 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
// Adjust command counter for message in progress, will be sent after state update.
boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
SequenceNumber received = ss->receiverGetReceived().command;
- if (inProgress)
+ if (inProgress)
--received;
// Sync the session to ensure all responses from broker have been processed.
shadowSession.sync();
-
+
// Reset command-sequence state.
proxy.sessionState(
ss->senderGetReplayPoint().command,
@@ -511,7 +546,7 @@ void UpdateClient::updateConsumer(
QPID_LOG(debug, *this << " updated consumer " << ci->getName()
<< " on " << shadowSession.getId());
}
-
+
void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) {
if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) {
// If the message is acquired then it is no longer on the
@@ -543,7 +578,7 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
void operator()(const broker::DtxAck& ) {
throw InternalErrorException("DTX transactions not currently supported by cluster.");
}
-
+
void operator()(const broker::RecoveredDequeue& rdeq) {
updateMessage(rdeq.getMessage());
proxy.txEnqueue(rdeq.getQueue()->getName());
@@ -563,7 +598,7 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
typedef std::list<Queue::shared_ptr> QueueList;
const QueueList& qlist = txPub.getQueues();
Array qarray(TYPE_CODE_STR8);
- for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i)
+ for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i)
qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName())));
proxy.txPublish(qarray, txPub.delivered);
}
@@ -573,7 +608,7 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
client::AsyncSession session;
ClusterConnectionProxy proxy;
};
-
+
void UpdateClient::updateTxState(broker::SemanticState& s) {
QPID_LOG(debug, *this << " updating TX transaction state.");
ClusterConnectionProxy proxy(shadowSession);
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h
index b72d090d73..21bf6024e0 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.h
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -69,14 +69,19 @@ class ExpiryPolicy;
class UpdateClient : public sys::Runnable {
public:
static const std::string UPDATE; // Name for special update queue and exchange.
+ static const std::string X_QPID_EXPIRATION; // Update message expiration
+ // Flag to remove props/headers that were added by the UpdateClient
+ static const std::string X_QPID_NO_MESSAGE_PROPS;
+ static const std::string X_QPID_NO_HEADERS;
+
static client::Connection catchUpConnection();
-
+
UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&,
broker::Broker& donor, const ClusterMap& map, ExpiryPolicy& expiry,
const std::vector<boost::intrusive_ptr<Connection> >&, Decoder&,
const boost::function<void()>& done,
const boost::function<void(const std::exception&)>& fail,
- const client::ConnectionSettings&
+ const client::ConnectionSettings&
);
~UpdateClient();
diff --git a/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp b/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp
index 11937f296f..e830459aba 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,6 +19,7 @@
*
*/
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/broker/Message.h"
#include "UpdateExchange.h"
@@ -27,6 +28,8 @@ namespace cluster {
using framing::MessageTransferBody;
using framing::DeliveryProperties;
+using framing::MessageProperties;
+using framing::FieldTable;
UpdateExchange::UpdateExchange(management::Manageable* parent)
: broker::Exchange(UpdateClient::UPDATE, parent),
@@ -34,6 +37,7 @@ UpdateExchange::UpdateExchange(management::Manageable* parent)
void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>& msg) {
+ // Copy exchange name to destination property.
MessageTransferBody* transfer = msg->getMethod<MessageTransferBody>();
assert(transfer);
const DeliveryProperties* props = msg->getProperties<DeliveryProperties>();
@@ -42,6 +46,23 @@ void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>&
transfer->setDestination(props->getExchange());
else
transfer->clearDestinationFlag();
-}
+ // Copy expiration from x-property if present.
+ if (msg->hasProperties<MessageProperties>()) {
+ MessageProperties* mprops = msg->getProperties<MessageProperties>();
+ if (mprops->hasApplicationHeaders()) {
+ FieldTable& headers = mprops->getApplicationHeaders();
+ if (headers.isSet(UpdateClient::X_QPID_EXPIRATION)) {
+ msg->setExpiration(
+ sys::AbsTime(sys::EPOCH, headers.getAsInt64(UpdateClient::X_QPID_EXPIRATION)));
+ headers.erase(UpdateClient::X_QPID_EXPIRATION);
+ // Erase props/headers that were added by the UpdateClient
+ if (headers.isSet(UpdateClient::X_QPID_NO_MESSAGE_PROPS))
+ msg->eraseProperties<MessageProperties>();
+ else if (headers.isSet(UpdateClient::X_QPID_NO_HEADERS))
+ mprops->clearApplicationHeadersFlag();
+ }
+ }
+ }
+}
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/console/SessionManager.cpp b/qpid/cpp/src/qpid/console/SessionManager.cpp
index 80c5959417..910ae22be8 100644
--- a/qpid/cpp/src/qpid/console/SessionManager.cpp
+++ b/qpid/cpp/src/qpid/console/SessionManager.cpp
@@ -362,12 +362,11 @@ void SessionManager::handleCommandComplete(Broker* broker, Buffer& inBuffer, uin
void SessionManager::handleClassInd(Broker* broker, Buffer& inBuffer, uint32_t)
{
- uint8_t kind;
string packageName;
string className;
uint8_t hash[16];
- kind = inBuffer.getOctet();
+ /*kind*/ (void) inBuffer.getOctet();
inBuffer.getShortString(packageName);
inBuffer.getShortString(className);
inBuffer.getBin128(hash);
diff --git a/qpid/cpp/src/qpid/framing/AMQHeaderBody.h b/qpid/cpp/src/qpid/framing/AMQHeaderBody.h
index a8c326969a..452154eb5c 100644
--- a/qpid/cpp/src/qpid/framing/AMQHeaderBody.h
+++ b/qpid/cpp/src/qpid/framing/AMQHeaderBody.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -58,7 +58,7 @@ class QPID_COMMON_CLASS_EXTERN AMQHeaderBody : public AMQBody
}
else
return Base::decode(buffer, size, type);
- }
+ }
void print(std::ostream& out) const {
const boost::optional<T>& p=this->OptProps<T>::props;
if (p) out << *p;
@@ -77,7 +77,7 @@ class QPID_COMMON_CLASS_EXTERN AMQHeaderBody : public AMQBody
typedef PropSet<PropSet<Empty, DeliveryProperties>, MessageProperties> Properties;
Properties properties;
-
+
public:
inline uint8_t type() const { return HEADER_BODY; }
@@ -99,6 +99,10 @@ public:
return properties.OptProps<T>::props.get_ptr();
}
+ template <class T> void erase() {
+ properties.OptProps<T>::props.reset();
+ }
+
boost::intrusive_ptr<AMQBody> clone() const { return BodyFactory::copy(*this); }
};
diff --git a/qpid/cpp/src/qpid/messaging/Session.cpp b/qpid/cpp/src/qpid/messaging/Session.cpp
index 496953a8e5..cccfd9a873 100644
--- a/qpid/cpp/src/qpid/messaging/Session.cpp
+++ b/qpid/cpp/src/qpid/messaging/Session.cpp
@@ -39,7 +39,8 @@ Session& Session::operator=(const Session& s) { return PI::assign(*this, s); }
void Session::commit() { impl->commit(); }
void Session::rollback() { impl->rollback(); }
void Session::acknowledge(bool sync) { impl->acknowledge(sync); }
-void Session::acknowledge(Message& m, bool s) { impl->acknowledge(m); sync(s); }
+void Session::acknowledge(Message& m, bool s) { impl->acknowledge(m, false); sync(s); }
+void Session::acknowledgeUpTo(Message& m, bool s) { impl->acknowledge(m, true); sync(s); }
void Session::reject(Message& m) { impl->reject(m); }
void Session::release(Message& m) { impl->release(m); }
void Session::close() { impl->close(); }
diff --git a/qpid/cpp/src/qpid/messaging/SessionImpl.h b/qpid/cpp/src/qpid/messaging/SessionImpl.h
index 02a254e4f2..60ae615253 100644
--- a/qpid/cpp/src/qpid/messaging/SessionImpl.h
+++ b/qpid/cpp/src/qpid/messaging/SessionImpl.h
@@ -41,7 +41,7 @@ class SessionImpl : public virtual qpid::RefCounted
virtual void commit() = 0;
virtual void rollback() = 0;
virtual void acknowledge(bool sync) = 0;
- virtual void acknowledge(Message&) = 0;
+ virtual void acknowledge(Message&, bool cumulative) = 0;
virtual void reject(Message&) = 0;
virtual void release(Message&) = 0;
virtual void close() = 0;
diff --git a/qpid/cpp/src/qpid/sys/AsynchIO.h b/qpid/cpp/src/qpid/sys/AsynchIO.h
index 50da8fa4fc..41f74f7ed0 100644
--- a/qpid/cpp/src/qpid/sys/AsynchIO.h
+++ b/qpid/cpp/src/qpid/sys/AsynchIO.h
@@ -64,8 +64,8 @@ public:
// deletes. To correctly manage heaps when needed, the allocate and
// delete should both be done from the same class/library.
QPID_COMMON_EXTERN static AsynchConnector* create(const Socket& s,
- std::string hostname,
- uint16_t port,
+ const std::string& hostname,
+ const std::string& port,
ConnectedCallback connCb,
FailedCallback failCb);
virtual void start(boost::shared_ptr<Poller> poller) = 0;
diff --git a/qpid/cpp/src/qpid/sys/AtomicValue_gcc.h b/qpid/cpp/src/qpid/sys/AtomicValue_gcc.h
index d022b07c1d..724bae422e 100644
--- a/qpid/cpp/src/qpid/sys/AtomicValue_gcc.h
+++ b/qpid/cpp/src/qpid/sys/AtomicValue_gcc.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,6 +39,9 @@ class AtomicValue
public:
AtomicValue(T init=0) : value(init) {}
+ // Not atomic. Don't call concurrently with atomic ops.
+ AtomicValue<T>& operator=(T newValue) { value = newValue; return *this; }
+
// Update and return new value.
inline T operator+=(T n) { return __sync_add_and_fetch(&value, n); }
inline T operator-=(T n) { return __sync_sub_and_fetch(&value, n); }
@@ -54,11 +57,11 @@ class AtomicValue
/** If current value == testval then set to newval. Returns the old value. */
T valueCompareAndSwap(T testval, T newval) { return __sync_val_compare_and_swap(&value, testval, newval); }
- /** If current value == testval then set to newval. Returns true if the swap was performed. */
+ /** If current value == testval then set to newval. Returns true if the swap was performed. */
bool boolCompareAndSwap(T testval, T newval) { return __sync_bool_compare_and_swap(&value, testval, newval); }
T get() const { return const_cast<AtomicValue<T>*>(this)->fetchAndAdd(static_cast<T>(0)); }
-
+
private:
T value;
};
diff --git a/qpid/cpp/src/qpid/sys/ProtocolFactory.h b/qpid/cpp/src/qpid/sys/ProtocolFactory.h
index b233b2da1a..4d198a92da 100644
--- a/qpid/cpp/src/qpid/sys/ProtocolFactory.h
+++ b/qpid/cpp/src/qpid/sys/ProtocolFactory.h
@@ -39,11 +39,10 @@ class ProtocolFactory : public qpid::SharedObject<ProtocolFactory>
virtual ~ProtocolFactory() = 0;
virtual uint16_t getPort() const = 0;
- virtual std::string getHost() const = 0;
virtual void accept(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0;
virtual void connect(
boost::shared_ptr<Poller>,
- const std::string& host, int16_t port,
+ const std::string& host, const std::string& port,
ConnectionCodec::Factory* codec,
ConnectFailedCallback failed) = 0;
virtual bool supports(const std::string& /*capability*/) { return false; }
diff --git a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
index d53db20598..6769e5383c 100644
--- a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
+++ b/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
@@ -31,7 +31,6 @@
#include "qpid/sys/SecuritySettings.h"
#include <boost/bind.hpp>
-#include <boost/lexical_cast.hpp>
#include <memory>
#include <netdb.h>
@@ -212,10 +211,9 @@ void RdmaIOHandler::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) {
if (readError) {
return;
}
- size_t decoded = 0;
try {
if (codec) {
- decoded = codec->decode(buff->bytes(), buff->dataCount());
+ (void) codec->decode(buff->bytes(), buff->dataCount());
}else{
// Need to start protocol processing
initProtocolIn(buff);
@@ -230,9 +228,7 @@ void RdmaIOHandler::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) {
void RdmaIOHandler::initProtocolIn(Rdma::Buffer* buff) {
framing::Buffer in(buff->bytes(), buff->dataCount());
framing::ProtocolInitiation protocolInit;
- size_t decoded = 0;
if (protocolInit.decode(in)) {
- decoded = in.getPosition();
QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" << protocolInit << ")");
codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings());
@@ -254,10 +250,9 @@ class RdmaIOProtocolFactory : public ProtocolFactory {
public:
RdmaIOProtocolFactory(int16_t port, int backlog);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const string& host, int16_t port, ConnectionCodec::Factory*, ConnectFailedCallback);
+ void connect(Poller::shared_ptr, const string& host, const std::string& port, ConnectionCodec::Factory*, ConnectFailedCallback);
uint16_t getPort() const;
- string getHost() const;
private:
bool request(Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&, ConnectionCodec::Factory*);
@@ -347,18 +342,7 @@ uint16_t RdmaIOProtocolFactory::getPort() const {
return listeningPort; // Immutable no need for lock.
}
-string RdmaIOProtocolFactory::getHost() const {
- //return listener.getSockname();
- return "";
-}
-
void RdmaIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) {
- ::sockaddr_in sin;
-
- sin.sin_family = AF_INET;
- sin.sin_port = htons(listeningPort);
- sin.sin_addr.s_addr = INADDR_ANY;
-
listener.reset(
new Rdma::Listener(
Rdma::ConnectionParams(65536, Rdma::DEFAULT_WR_ENTRIES),
@@ -387,7 +371,7 @@ void RdmaIOProtocolFactory::connected(Poller::shared_ptr poller, Rdma::Connectio
void RdmaIOProtocolFactory::connect(
Poller::shared_ptr poller,
- const std::string& host, int16_t port,
+ const std::string& host, const std::string& port,
ConnectionCodec::Factory* f,
ConnectFailedCallback failed)
{
@@ -399,7 +383,7 @@ void RdmaIOProtocolFactory::connect(
boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1),
boost::bind(&RdmaIOProtocolFactory::rejected, this, _1, _2, failed));
- SocketAddress sa(host, boost::lexical_cast<std::string>(port));
+ SocketAddress sa(host, port);
c->start(poller, sa);
}
diff --git a/qpid/cpp/src/qpid/sys/Socket.h b/qpid/cpp/src/qpid/sys/Socket.h
index b1cded1aa1..9f62f3be1c 100644
--- a/qpid/cpp/src/qpid/sys/Socket.h
+++ b/qpid/cpp/src/qpid/sys/Socket.h
@@ -39,15 +39,12 @@ public:
/** Create a socket wrapper for descriptor. */
QPID_COMMON_EXTERN Socket();
- /** Set timeout for read and write */
- void setTimeout(const Duration& interval) const;
-
/** Set socket non blocking */
void setNonblocking() const;
QPID_COMMON_EXTERN void setTcpNoDelay() const;
- QPID_COMMON_EXTERN void connect(const std::string& host, uint16_t port) const;
+ QPID_COMMON_EXTERN void connect(const std::string& host, const std::string& port) const;
QPID_COMMON_EXTERN void connect(const SocketAddress&) const;
QPID_COMMON_EXTERN void close() const;
@@ -57,19 +54,9 @@ public:
*@param backlog maximum number of pending connections.
*@return The bound port.
*/
- QPID_COMMON_EXTERN int listen(uint16_t port = 0, int backlog = 10) const;
+ QPID_COMMON_EXTERN int listen(const std::string& host = "", const std::string& port = "0", int backlog = 10) const;
QPID_COMMON_EXTERN int listen(const SocketAddress&, int backlog = 10) const;
- /** Returns the "socket name" ie the address bound to
- * the near end of the socket
- */
- QPID_COMMON_EXTERN std::string getSockname() const;
-
- /** Returns the "peer name" ie the address bound to
- * the remote end of the socket
- */
- std::string getPeername() const;
-
/**
* Returns an address (host and port) for the remote end of the
* socket
@@ -86,9 +73,6 @@ public:
*/
QPID_COMMON_INLINE_EXTERN std::string getFullAddress() const { return getLocalAddress()+"-"+getPeerAddress(); }
- QPID_COMMON_EXTERN uint16_t getLocalPort() const;
- uint16_t getRemotePort() const;
-
/**
* Returns the error code stored in the socket. This may be used
* to determine the result of a non-blocking connect.
@@ -109,7 +93,8 @@ private:
void createSocket(const SocketAddress&) const;
Socket(IOHandlePrivate*);
- mutable std::string connectname;
+ mutable std::string localname;
+ mutable std::string peername;
mutable bool nonblocking;
mutable bool nodelay;
};
diff --git a/qpid/cpp/src/qpid/sys/SocketAddress.h b/qpid/cpp/src/qpid/sys/SocketAddress.h
index 27b9642f2c..c2120338cf 100644
--- a/qpid/cpp/src/qpid/sys/SocketAddress.h
+++ b/qpid/cpp/src/qpid/sys/SocketAddress.h
@@ -41,7 +41,7 @@ public:
QPID_COMMON_EXTERN SocketAddress& operator=(const SocketAddress&);
QPID_COMMON_EXTERN ~SocketAddress();
- std::string asString() const;
+ std::string asString(bool numeric=true) const;
private:
std::string host;
diff --git a/qpid/cpp/src/qpid/sys/SslPlugin.cpp b/qpid/cpp/src/qpid/sys/SslPlugin.cpp
index 0ec051caab..471a0cef60 100644
--- a/qpid/cpp/src/qpid/sys/SslPlugin.cpp
+++ b/qpid/cpp/src/qpid/sys/SslPlugin.cpp
@@ -66,12 +66,11 @@ class SslProtocolFactory : public ProtocolFactory {
public:
SslProtocolFactory(const SslServerOptions&, int backlog, bool nodelay);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& host, int16_t port,
+ void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
ConnectionCodec::Factory*,
boost::function2<void, int, std::string> failed);
uint16_t getPort() const;
- std::string getHost() const;
bool supports(const std::string& capability);
private:
@@ -146,10 +145,6 @@ uint16_t SslProtocolFactory::getPort() const {
return listeningPort; // Immutable no need for lock.
}
-std::string SslProtocolFactory::getHost() const {
- return listener.getSockname();
-}
-
void SslProtocolFactory::accept(Poller::shared_ptr poller,
ConnectionCodec::Factory* fact) {
acceptor.reset(
@@ -160,7 +155,7 @@ void SslProtocolFactory::accept(Poller::shared_ptr poller,
void SslProtocolFactory::connect(
Poller::shared_ptr poller,
- const std::string& host, int16_t port,
+ const std::string& host, const std::string& port,
ConnectionCodec::Factory* fact,
ConnectFailedCallback failed)
{
diff --git a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
index a6528f9ad9..34338ce434 100644
--- a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
+++ b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
@@ -42,14 +42,13 @@ class AsynchIOProtocolFactory : public ProtocolFactory {
std::auto_ptr<AsynchAcceptor> acceptor;
public:
- AsynchIOProtocolFactory(int16_t port, int backlog, bool nodelay);
+ AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& host, int16_t port,
+ void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
ConnectionCodec::Factory*,
ConnectFailedCallback);
uint16_t getPort() const;
- std::string getHost() const;
private:
void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
@@ -61,22 +60,25 @@ class AsynchIOProtocolFactory : public ProtocolFactory {
static class TCPIOPlugin : public Plugin {
void earlyInitialize(Target&) {
}
-
+
void initialize(Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
// Only provide to a Broker
if (broker) {
const broker::Broker::Options& opts = broker->getOptions();
- ProtocolFactory::shared_ptr protocol(new AsynchIOProtocolFactory(opts.port, opts.connectionBacklog,
- opts.tcpNoDelay));
- QPID_LOG(notice, "Listening on TCP port " << protocol->getPort());
- broker->registerProtocolFactory("tcp", protocol);
+ ProtocolFactory::shared_ptr protocolt(
+ new AsynchIOProtocolFactory(
+ "", boost::lexical_cast<std::string>(opts.port),
+ opts.connectionBacklog,
+ opts.tcpNoDelay));
+ QPID_LOG(notice, "Listening on TCP port " << protocolt->getPort());
+ broker->registerProtocolFactory("tcp", protocolt);
}
}
} tcpPlugin;
-AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog, bool nodelay) :
- tcpNoDelay(nodelay), listeningPort(listener.listen(port, backlog))
+AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay) :
+ tcpNoDelay(nodelay), listeningPort(listener.listen(host, port, backlog))
{}
void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
@@ -107,10 +109,6 @@ uint16_t AsynchIOProtocolFactory::getPort() const {
return listeningPort; // Immutable no need for lock.
}
-std::string AsynchIOProtocolFactory::getHost() const {
- return listener.getSockname();
-}
-
void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller,
ConnectionCodec::Factory* fact) {
acceptor.reset(
@@ -130,7 +128,7 @@ void AsynchIOProtocolFactory::connectFailed(
void AsynchIOProtocolFactory::connect(
Poller::shared_ptr poller,
- const std::string& host, int16_t port,
+ const std::string& host, const std::string& port,
ConnectionCodec::Factory* fact,
ConnectFailedCallback failed)
{
@@ -139,7 +137,6 @@ void AsynchIOProtocolFactory::connect(
// upon connection failure or by the AsynchIO upon connection
// shutdown. The allocated AsynchConnector frees itself when it
// is no longer needed.
-
Socket* socket = new Socket();
AsynchConnector* c = AsynchConnector::create(
*socket,
diff --git a/qpid/cpp/src/qpid/sys/Timer.cpp b/qpid/cpp/src/qpid/sys/Timer.cpp
index fdb2e8c6bb..47752e4584 100644
--- a/qpid/cpp/src/qpid/sys/Timer.cpp
+++ b/qpid/cpp/src/qpid/sys/Timer.cpp
@@ -75,6 +75,12 @@ void TimerTask::cancel() {
cancelled = true;
}
+void TimerTask::setFired() {
+ // Set nextFireTime to just before now, making readyToFire() true.
+ nextFireTime = AbsTime(sys::now(), Duration(-1));
+}
+
+
Timer::Timer() :
active(false),
late(50 * TIME_MSEC),
diff --git a/qpid/cpp/src/qpid/sys/Timer.h b/qpid/cpp/src/qpid/sys/Timer.h
index 98ba39ce38..fccb17dbc2 100644
--- a/qpid/cpp/src/qpid/sys/Timer.h
+++ b/qpid/cpp/src/qpid/sys/Timer.h
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -64,6 +64,10 @@ class TimerTask : public RefCounted {
std::string getName() const { return name; }
+ // Move the nextFireTime so readyToFire is true.
+ // Used by the cluster, where tasks are fired on cluster events, not on local time.
+ QPID_COMMON_EXTERN void setFired();
+
protected:
// Must be overridden with callback
virtual void fire() = 0;
diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
index 119a6aa8a4..b5a0b0bf32 100644
--- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -152,8 +152,8 @@ private:
public:
AsynchConnector(const Socket& socket,
- std::string hostname,
- uint16_t port,
+ const std::string& hostname,
+ const std::string& port,
ConnectedCallback connCb,
FailedCallback failCb);
void start(Poller::shared_ptr poller);
@@ -161,8 +161,8 @@ public:
};
AsynchConnector::AsynchConnector(const Socket& s,
- std::string hostname,
- uint16_t port,
+ const std::string& hostname,
+ const std::string& port,
ConnectedCallback connCb,
FailedCallback failCb) :
DispatchHandle(s,
@@ -174,7 +174,7 @@ AsynchConnector::AsynchConnector(const Socket& s,
socket(s)
{
socket.setNonblocking();
- SocketAddress sa(hostname, boost::lexical_cast<std::string>(port));
+ SocketAddress sa(hostname, port);
// Note, not catching any exceptions here, also has effect of destructing
socket.connect(sa);
}
@@ -589,8 +589,8 @@ AsynchAcceptor* AsynchAcceptor::create(const Socket& s,
}
AsynchConnector* AsynchConnector::create(const Socket& s,
- std::string hostname,
- uint16_t port,
+ const std::string& hostname,
+ const std::string& port,
ConnectedCallback connCb,
FailedCallback failCb)
{
diff --git a/qpid/cpp/src/qpid/sys/posix/LockFile.cpp b/qpid/cpp/src/qpid/sys/posix/LockFile.cpp
index 1862ff6ac9..f5a6c292cb 100755
--- a/qpid/cpp/src/qpid/sys/posix/LockFile.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/LockFile.cpp
@@ -58,8 +58,7 @@ LockFile::~LockFile() {
if (impl) {
int f = impl->fd;
if (f >= 0) {
- int unused_ret;
- unused_ret = ::lockf(f, F_ULOCK, 0); // Suppress warnings about ignoring return value.
+ (void) ::lockf(f, F_ULOCK, 0); // Suppress warnings about ignoring return value.
::close(f);
impl->fd = -1;
}
diff --git a/qpid/cpp/src/qpid/sys/posix/Socket.cpp b/qpid/cpp/src/qpid/sys/posix/Socket.cpp
index 3449a753e3..aa25f8062d 100644
--- a/qpid/cpp/src/qpid/sys/posix/Socket.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/Socket.cpp
@@ -37,13 +37,12 @@
#include <iostream>
#include <boost/format.hpp>
-#include <boost/lexical_cast.hpp>
namespace qpid {
namespace sys {
namespace {
-std::string getName(int fd, bool local, bool includeService = false)
+std::string getName(int fd, bool local)
{
::sockaddr_storage name; // big enough for any socket address
::socklen_t namelen = sizeof(name);
@@ -54,45 +53,15 @@ std::string getName(int fd, bool local, bool includeService = false)
} else {
result = ::getpeername(fd, (::sockaddr*)&name, &namelen);
}
-
QPID_POSIX_CHECK(result);
char servName[NI_MAXSERV];
char dispName[NI_MAXHOST];
- if (includeService) {
- if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName),
- servName, sizeof(servName),
- NI_NUMERICHOST | NI_NUMERICSERV) != 0)
- throw QPID_POSIX_ERROR(rc);
- return std::string(dispName) + ":" + std::string(servName);
-
- } else {
- if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), 0, 0, NI_NUMERICHOST) != 0)
- throw QPID_POSIX_ERROR(rc);
- return dispName;
- }
-}
-
-std::string getService(int fd, bool local)
-{
- ::sockaddr_storage name; // big enough for any socket address
- ::socklen_t namelen = sizeof(name);
-
- int result = -1;
- if (local) {
- result = ::getsockname(fd, (::sockaddr*)&name, &namelen);
- } else {
- result = ::getpeername(fd, (::sockaddr*)&name, &namelen);
- }
-
- QPID_POSIX_CHECK(result);
-
- char servName[NI_MAXSERV];
- if (int rc=::getnameinfo((::sockaddr*)&name, namelen, 0, 0,
- servName, sizeof(servName),
- NI_NUMERICHOST | NI_NUMERICSERV) != 0)
+ if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName),
+ servName, sizeof(servName),
+ NI_NUMERICHOST | NI_NUMERICSERV) != 0)
throw QPID_POSIX_ERROR(rc);
- return servName;
+ return std::string(dispName) + ":" + std::string(servName);
}
}
@@ -126,15 +95,6 @@ void Socket::createSocket(const SocketAddress& sa) const
}
}
-void Socket::setTimeout(const Duration& interval) const
-{
- const int& socket = impl->fd;
- struct timeval tv;
- toTimeval(tv, interval);
- setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
- setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
-}
-
void Socket::setNonblocking() const {
int& socket = impl->fd;
nonblocking = true;
@@ -154,15 +114,22 @@ void Socket::setTcpNoDelay() const
}
}
-void Socket::connect(const std::string& host, uint16_t port) const
+void Socket::connect(const std::string& host, const std::string& port) const
{
- SocketAddress sa(host, boost::lexical_cast<std::string>(port));
+ SocketAddress sa(host, port);
connect(sa);
}
void Socket::connect(const SocketAddress& addr) const
{
- connectname = addr.asString();
+ // The display name for an outbound connection needs to be the name that was specified
+ // for the address rather than a resolved IP address as we don't know which of
+ // the IP addresses is actually the one that will be connected to.
+ peername = addr.asString(false);
+
+ // However the string we compare with the local port must be numeric or it might not
+ // match when it should as getLocalAddress() will always be numeric
+ std::string connectname = addr.asString();
createSocket(addr);
@@ -170,7 +137,7 @@ void Socket::connect(const SocketAddress& addr) const
// TODO the correct thing to do here is loop on failure until you've used all the returned addresses
if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) < 0) &&
(errno != EINPROGRESS)) {
- throw Exception(QPID_MSG(strError(errno) << ": " << connectname));
+ throw Exception(QPID_MSG(strError(errno) << ": " << peername));
}
// When connecting to a port on the same host which no longer has
// a process associated with it, the OS occasionally chooses the
@@ -185,9 +152,9 @@ void Socket::connect(const SocketAddress& addr) const
// Raise an error if we see such a connection, since we know there is
// no listener on the peer address.
//
- if (getLocalAddress() == getPeerAddress()) {
+ if (getLocalAddress() == connectname) {
close();
- throw Exception(QPID_MSG("Connection refused: " << connectname));
+ throw Exception(QPID_MSG("Connection refused: " << peername));
}
}
@@ -200,9 +167,9 @@ Socket::close() const
socket = -1;
}
-int Socket::listen(uint16_t port, int backlog) const
+int Socket::listen(const std::string& host, const std::string& port, int backlog) const
{
- SocketAddress sa("", boost::lexical_cast<std::string>(port));
+ SocketAddress sa(host, port);
return listen(sa, backlog);
}
@@ -230,8 +197,11 @@ int Socket::listen(const SocketAddress& sa, int backlog) const
Socket* Socket::accept() const
{
int afd = ::accept(impl->fd, 0, 0);
- if ( afd >= 0)
- return new Socket(new IOHandlePrivate(afd));
+ if ( afd >= 0) {
+ Socket* s = new Socket(new IOHandlePrivate(afd));
+ s->localname = localname;
+ return s;
+ }
else if (errno == EAGAIN)
return 0;
else throw QPID_POSIX_ERROR(errno);
@@ -247,37 +217,20 @@ int Socket::write(const void *buf, size_t count) const
return ::write(impl->fd, buf, count);
}
-std::string Socket::getSockname() const
-{
- return getName(impl->fd, true);
-}
-
-std::string Socket::getPeername() const
-{
- return getName(impl->fd, false);
-}
-
std::string Socket::getPeerAddress() const
{
- if (connectname.empty()) {
- connectname = getName(impl->fd, false, true);
+ if (peername.empty()) {
+ peername = getName(impl->fd, false);
}
- return connectname;
+ return peername;
}
std::string Socket::getLocalAddress() const
{
- return getName(impl->fd, true, true);
-}
-
-uint16_t Socket::getLocalPort() const
-{
- return std::atoi(getService(impl->fd, true).c_str());
-}
-
-uint16_t Socket::getRemotePort() const
-{
- return std::atoi(getService(impl->fd, true).c_str());
+ if (localname.empty()) {
+ localname = getName(impl->fd, true);
+ }
+ return localname;
}
int Socket::getError() const
diff --git a/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp b/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp
index 8f5f29d793..10f1c8a563 100644
--- a/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp
@@ -27,6 +27,8 @@
#include <string.h>
#include <netdb.h>
+#include <algorithm>
+
namespace qpid {
namespace sys {
@@ -46,15 +48,9 @@ SocketAddress::SocketAddress(const SocketAddress& sa) :
SocketAddress& SocketAddress::operator=(const SocketAddress& sa)
{
- if (&sa != this) {
- host = sa.host;
- port = sa.port;
+ SocketAddress temp(sa);
- if (addrInfo) {
- ::freeaddrinfo(addrInfo);
- addrInfo = 0;
- }
- }
+ std::swap(temp, *this);
return *this;
}
@@ -65,9 +61,23 @@ SocketAddress::~SocketAddress()
}
}
-std::string SocketAddress::asString() const
+std::string SocketAddress::asString(bool numeric) const
{
- return host + ":" + port;
+ if (!numeric)
+ return host + ":" + port;
+ // Canonicalise into numeric id
+ const ::addrinfo& ai = getAddrInfo(*this);
+ char servName[NI_MAXSERV];
+ char dispName[NI_MAXHOST];
+ if (int rc=::getnameinfo(ai.ai_addr, ai.ai_addrlen,
+ dispName, sizeof(dispName),
+ servName, sizeof(servName),
+ NI_NUMERICHOST | NI_NUMERICSERV) != 0)
+ throw QPID_POSIX_ERROR(rc);
+ std::string s(dispName);
+ s += ":";
+ s += servName;
+ return s;
}
const ::addrinfo& getAddrInfo(const SocketAddress& sa)
@@ -88,7 +98,7 @@ const ::addrinfo& getAddrInfo(const SocketAddress& sa)
int n = ::getaddrinfo(node, service, &hints, &sa.addrInfo);
if (n != 0)
- throw Exception(QPID_MSG("Cannot resolve " << sa.host << ": " << ::gai_strerror(n)));
+ throw Exception(QPID_MSG("Cannot resolve " << sa.asString(false) << ": " << ::gai_strerror(n)));
}
return *sa.addrInfo;
diff --git a/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp b/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp
index a58a137473..734ebb483a 100644
--- a/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp
+++ b/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp
@@ -117,7 +117,7 @@ void SslAcceptor::readable(DispatchHandle& h) {
SslConnector::SslConnector(const SslSocket& s,
Poller::shared_ptr poller,
std::string hostname,
- uint16_t port,
+ std::string port,
ConnectedCallback connCb,
FailedCallback failCb) :
DispatchHandle(s,
diff --git a/qpid/cpp/src/qpid/sys/ssl/SslIo.h b/qpid/cpp/src/qpid/sys/ssl/SslIo.h
index 53ac69d8d6..8785852c24 100644
--- a/qpid/cpp/src/qpid/sys/ssl/SslIo.h
+++ b/qpid/cpp/src/qpid/sys/ssl/SslIo.h
@@ -73,7 +73,7 @@ public:
SslConnector(const SslSocket& socket,
Poller::shared_ptr poller,
std::string hostname,
- uint16_t port,
+ std::string port,
ConnectedCallback connCb,
FailedCallback failCb = 0);
diff --git a/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp b/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp
index 01e2658877..f7483a220c 100644
--- a/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp
+++ b/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp
@@ -158,7 +158,7 @@ void SslSocket::setNonblocking() const
PR_SetSocketOption(socket, &option);
}
-void SslSocket::connect(const std::string& host, uint16_t port) const
+void SslSocket::connect(const std::string& host, const std::string& port) const
{
std::stringstream namestream;
namestream << host << ":" << port;
@@ -180,7 +180,7 @@ void SslSocket::connect(const std::string& host, uint16_t port) const
PRHostEnt hostEntry;
PR_CHECK(PR_GetHostByName(host.data(), hostBuffer, PR_NETDB_BUF_SIZE, &hostEntry));
PRNetAddr address;
- int value = PR_EnumerateHostEnt(0, &hostEntry, port, &address);
+ int value = PR_EnumerateHostEnt(0, &hostEntry, boost::lexical_cast<PRUint16>(port), &address);
if (value < 0) {
throw Exception(QPID_MSG("Error getting address for host: " << ErrorString()));
} else if (value == 0) {
diff --git a/qpid/cpp/src/qpid/sys/ssl/SslSocket.h b/qpid/cpp/src/qpid/sys/ssl/SslSocket.h
index 25712c98d5..993859495b 100644
--- a/qpid/cpp/src/qpid/sys/ssl/SslSocket.h
+++ b/qpid/cpp/src/qpid/sys/ssl/SslSocket.h
@@ -53,7 +53,7 @@ public:
* NSSInit().*/
void setCertName(const std::string& certName);
- void connect(const std::string& host, uint16_t port) const;
+ void connect(const std::string& host, const std::string& port) const;
void close() const;
diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
index 71138757a5..8d84fdb7b2 100644
--- a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
@@ -175,20 +175,20 @@ private:
FailedCallback failCallback;
const Socket& socket;
const std::string hostname;
- const uint16_t port;
+ const std::string port;
public:
AsynchConnector(const Socket& socket,
- std::string hostname,
- uint16_t port,
+ const std::string& hostname,
+ const std::string& port,
ConnectedCallback connCb,
FailedCallback failCb = 0);
void start(Poller::shared_ptr poller);
};
AsynchConnector::AsynchConnector(const Socket& sock,
- std::string hname,
- uint16_t p,
+ const std::string& hname,
+ const std::string& p,
ConnectedCallback connCb,
FailedCallback failCb) :
connCallback(connCb), failCallback(failCb), socket(sock),
@@ -218,8 +218,8 @@ AsynchAcceptor* AsynchAcceptor::create(const Socket& s,
}
AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s,
- std::string hostname,
- uint16_t port,
+ const std::string& hostname,
+ const std::string& port,
ConnectedCallback connCb,
FailedCallback failCb)
{
diff --git a/qpid/cpp/src/qpid/sys/windows/Socket.cpp b/qpid/cpp/src/qpid/sys/windows/Socket.cpp
index 2ce274acc9..baa80f04e0 100755
--- a/qpid/cpp/src/qpid/sys/windows/Socket.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/Socket.cpp
@@ -89,7 +89,7 @@ namespace sys {
namespace {
-std::string getName(SOCKET fd, bool local, bool includeService = false)
+std::string getName(SOCKET fd, bool local)
{
sockaddr_in name; // big enough for any socket address
socklen_t namelen = sizeof(name);
@@ -101,41 +101,12 @@ std::string getName(SOCKET fd, bool local, bool includeService = false)
char servName[NI_MAXSERV];
char dispName[NI_MAXHOST];
- if (includeService) {
- if (int rc = ::getnameinfo((sockaddr*)&name, namelen,
- dispName, sizeof(dispName),
- servName, sizeof(servName),
- NI_NUMERICHOST | NI_NUMERICSERV) != 0)
- throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
- return std::string(dispName) + ":" + std::string(servName);
- } else {
- if (int rc = ::getnameinfo((sockaddr*)&name, namelen,
- dispName, sizeof(dispName),
- 0, 0,
- NI_NUMERICHOST) != 0)
- throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
- return dispName;
- }
-}
-
-std::string getService(SOCKET fd, bool local)
-{
- sockaddr_in name; // big enough for any socket address
- socklen_t namelen = sizeof(name);
-
- if (local) {
- QPID_WINSOCK_CHECK(::getsockname(fd, (sockaddr*)&name, &namelen));
- } else {
- QPID_WINSOCK_CHECK(::getpeername(fd, (sockaddr*)&name, &namelen));
- }
-
- char servName[NI_MAXSERV];
if (int rc = ::getnameinfo((sockaddr*)&name, namelen,
- 0, 0,
+ dispName, sizeof(dispName),
servName, sizeof(servName),
NI_NUMERICHOST | NI_NUMERICSERV) != 0)
throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
- return servName;
+ return std::string(dispName) + ":" + std::string(servName);
}
} // namespace
@@ -179,34 +150,22 @@ Socket::createSocket(const SocketAddress& sa) const
}
}
-void Socket::setTimeout(const Duration& interval) const
-{
- const SOCKET& socket = impl->fd;
- int64_t nanosecs = interval;
- nanosecs /= (1000 * 1000); // nsecs -> usec -> msec
- int msec = 0;
- if (nanosecs > std::numeric_limits<int>::max())
- msec = std::numeric_limits<int>::max();
- else
- msec = static_cast<int>(nanosecs);
- setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&msec, sizeof(msec));
- setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&msec, sizeof(msec));
-}
-
void Socket::setNonblocking() const {
u_long nonblock = 1;
QPID_WINSOCK_CHECK(ioctlsocket(impl->fd, FIONBIO, &nonblock));
}
-void Socket::connect(const std::string& host, uint16_t port) const
+void Socket::connect(const std::string& host, const std::string& port) const
{
- SocketAddress sa(host, boost::lexical_cast<std::string>(port));
+ SocketAddress sa(host, port);
connect(sa);
}
void
Socket::connect(const SocketAddress& addr) const
{
+ peername = addr.asString(false);
+
const SOCKET& socket = impl->fd;
const addrinfo *addrs = &(getAddrInfo(addr));
int error = 0;
@@ -221,7 +180,7 @@ Socket::connect(const SocketAddress& addr) const
addrs = addrs->ai_next;
}
if (error)
- throw qpid::Exception(QPID_MSG(strError(error) << ": " << connectname));
+ throw qpid::Exception(QPID_MSG(strError(error) << ": " << peername));
}
void
@@ -252,7 +211,7 @@ int Socket::read(void *buf, size_t count) const
return received;
}
-int Socket::listen(uint16_t port, int backlog) const
+int Socket::listen(const std::string&, const std::string& port, int backlog) const
{
const SOCKET& socket = impl->fd;
BOOL yes=1;
@@ -260,7 +219,7 @@ int Socket::listen(uint16_t port, int backlog) const
struct sockaddr_in name;
memset(&name, 0, sizeof(name));
name.sin_family = AF_INET;
- name.sin_port = htons(port);
+ name.sin_port = htons(boost::lexical_cast<uint16_t>(port));
name.sin_addr.s_addr = 0;
if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) == SOCKET_ERROR)
throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(WSAGetLastError())));
@@ -282,36 +241,18 @@ Socket* Socket::accept() const
else throw QPID_WINDOWS_ERROR(WSAGetLastError());
}
-std::string Socket::getSockname() const
-{
- return getName(impl->fd, true);
-}
-
-std::string Socket::getPeername() const
-{
- return getName(impl->fd, false);
-}
-
std::string Socket::getPeerAddress() const
{
- if (!connectname.empty())
- return std::string (connectname);
- return getName(impl->fd, false, true);
+ if (peername.empty())
+ peername = getName(impl->fd, false);
+ return peername;
}
std::string Socket::getLocalAddress() const
{
- return getName(impl->fd, true, true);
-}
-
-uint16_t Socket::getLocalPort() const
-{
- return atoi(getService(impl->fd, true).c_str());
-}
-
-uint16_t Socket::getRemotePort() const
-{
- return atoi(getService(impl->fd, true).c_str());
+ if (localname.empty())
+ localname = getName(impl->fd, true);
+ return localname;
}
int Socket::getError() const
diff --git a/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp b/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp
index 5efdad0183..ac43cd2d23 100644
--- a/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp
@@ -63,7 +63,7 @@ SocketAddress::~SocketAddress()
::freeaddrinfo(addrInfo);
}
-std::string SocketAddress::asString() const
+std::string SocketAddress::asString(bool) const
{
return host + ":" + port;
}
diff --git a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp
index d0c6668b72..1d5289dc90 100644
--- a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp
+++ b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp
@@ -599,13 +599,12 @@ namespace qpid {
// populate the agent with multiple test objects
const size_t objCount = 50;
std::vector<TestManageable *> tmv;
- uint32_t objLen;
for (size_t i = 0; i < objCount; i++) {
std::stringstream key;
key << "testobj-" << i;
TestManageable *tm = new TestManageable(agent, key.str());
- objLen = tm->GetManagementObject()->writePropertiesSize();
+ (void) tm->GetManagementObject()->writePropertiesSize();
agent->addObject(tm->GetManagementObject(), key.str());
tmv.push_back(tm);
}
diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp
index 939f8f2b88..3c0cff7350 100644
--- a/qpid/cpp/src/tests/ClientSessionTest.cpp
+++ b/qpid/cpp/src/tests/ClientSessionTest.cpp
@@ -271,8 +271,12 @@ QPID_AUTO_TEST_CASE(testOpenFailure) {
QPID_AUTO_TEST_CASE(testPeriodicExpiration) {
Broker::Options opts;
opts.queueCleanInterval = 1;
+ opts.queueFlowStopRatio = 0;
+ opts.queueFlowResumeRatio = 0;
ClientSessionFixture fix(opts);
- fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true);
+ FieldTable args;
+ args.setInt("qpid.max_count",10);
+ fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
for (uint i = 0; i < 10; i++) {
Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue");
@@ -283,6 +287,7 @@ QPID_AUTO_TEST_CASE(testPeriodicExpiration) {
BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 10u);
qpid::sys::sleep(2);
BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 5u);
+ fix.session.messageTransfer(arg::content=Message("Message_11", "my-queue"));//ensure policy is also updated
}
QPID_AUTO_TEST_CASE(testExpirationOnPop) {
diff --git a/qpid/cpp/src/tests/ForkedBroker.cpp b/qpid/cpp/src/tests/ForkedBroker.cpp
index 53eaa7e1ce..10674b5175 100644
--- a/qpid/cpp/src/tests/ForkedBroker.cpp
+++ b/qpid/cpp/src/tests/ForkedBroker.cpp
@@ -68,8 +68,7 @@ ForkedBroker::~ForkedBroker() {
}
if (!dataDir.empty())
{
- int unused_ret; // Suppress warnings about ignoring return value.
- unused_ret = ::system(("rm -rf "+dataDir).c_str());
+ (void) ::system(("rm -rf "+dataDir).c_str());
}
}
diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp
index 6aa4c63ed7..fae45a94d0 100644
--- a/qpid/cpp/src/tests/MessagingSessionTests.cpp
+++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp
@@ -992,6 +992,78 @@ QPID_AUTO_TEST_CASE(testTtlForever)
BOOST_CHECK(in.getTtl() == Duration::FOREVER);
}
+QPID_AUTO_TEST_CASE(testExclusiveTopicSubscriber)
+{
+ TopicFixture fix;
+ std::string address = (boost::format("%1%; { link: { name: 'my-subscription', x-declare: { auto-delete: true, exclusive: true }}}") % fix.topic).str();
+ Sender sender = fix.session.createSender(fix.topic);
+ Receiver receiver1 = fix.session.createReceiver(address);
+ {
+ ScopedSuppressLogging sl;
+ try {
+ fix.session.createReceiver(address);
+ fix.session.sync();
+ BOOST_FAIL("Expected exception.");
+ } catch (const MessagingException& /*e*/) {}
+ }
+}
+
+QPID_AUTO_TEST_CASE(testNonExclusiveSubscriber)
+{
+ TopicFixture fix;
+ std::string address = (boost::format("%1%; {node:{type:topic}, link:{name:'my-subscription', x-declare:{auto-delete:true, exclusive:false}}}") % fix.topic).str();
+ Receiver receiver1 = fix.session.createReceiver(address);
+ Receiver receiver2 = fix.session.createReceiver(address);
+ Sender sender = fix.session.createSender(fix.topic);
+ sender.send(Message("one"), true);
+ Message in = receiver1.fetch(Duration::IMMEDIATE);
+ BOOST_CHECK_EQUAL(in.getContent(), std::string("one"));
+ sender.send(Message("two"), true);
+ in = receiver2.fetch(Duration::IMMEDIATE);
+ BOOST_CHECK_EQUAL(in.getContent(), std::string("two"));
+ fix.session.acknowledge();
+}
+
+QPID_AUTO_TEST_CASE(testAcknowledgeUpTo)
+{
+ QueueFixture fix;
+ Sender sender = fix.session.createSender(fix.queue);
+ const uint count(20);
+ for (uint i = 0; i < count; ++i) {
+ sender.send(Message((boost::format("Message_%1%") % (i+1)).str()));
+ }
+
+ Session other = fix.connection.createSession();
+ Receiver receiver = other.createReceiver(fix.queue);
+ std::vector<Message> messages;
+ for (uint i = 0; i < count; ++i) {
+ Message msg = receiver.fetch();
+ BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1)).str());
+ messages.push_back(msg);
+ }
+ const uint batch = 10;
+ other.acknowledgeUpTo(messages[batch-1]);//acknowledge first 10 messages only
+
+ messages.clear();
+ other.sync();
+ other.close();
+
+ other = fix.connection.createSession();
+ receiver = other.createReceiver(fix.queue);
+ Message msg;
+ for (uint i = 0; i < (count-batch); ++i) {
+ msg = receiver.fetch();
+ BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1+batch)).str());
+ }
+ other.acknowledgeUpTo(msg);
+ other.sync();
+ other.close();
+
+ Message m;
+ //check queue is empty
+ BOOST_CHECK(!fix.session.createReceiver(fix.queue).fetch(m, Duration::IMMEDIATE));
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index a752e3afec..6372aa93c3 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -681,7 +681,7 @@ QPID_AUTO_TEST_CASE(testPurgeExpired) {
addMessagesToQueue(10, queue);
BOOST_CHECK_EQUAL(queue.getMessageCount(), 10u);
::usleep(300*1000);
- queue.purgeExpired();
+ queue.purgeExpired(0);
BOOST_CHECK_EQUAL(queue.getMessageCount(), 5u);
}
@@ -692,7 +692,7 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) {
addMessagesToQueue(10, *queue, 200, 400);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u);
- QueueCleaner cleaner(queues, timer);
+ QueueCleaner cleaner(queues, &timer);
cleaner.start(100 * qpid::sys::TIME_MSEC);
::usleep(300*1000);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u);
diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h
index 0c6f39d62e..d195f11aa9 100644
--- a/qpid/cpp/src/tests/SocketProxy.h
+++ b/qpid/cpp/src/tests/SocketProxy.h
@@ -35,6 +35,8 @@
#include "qpid/sys/Mutex.h"
#include "qpid/log/Statement.h"
+#include <boost/lexical_cast.hpp>
+
namespace qpid {
namespace tests {
@@ -62,7 +64,7 @@ class SocketProxy : private qpid::sys::Runnable
: closed(false), joined(true),
port(listener.listen()), dropClient(), dropServer()
{
- client.connect(host, connectPort);
+ client.connect(host, boost::lexical_cast<std::string>(connectPort));
joined = false;
thread = qpid::sys::Thread(static_cast<qpid::sys::Runnable*>(this));
}
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index a19dd305e5..0415a667a2 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -157,8 +157,13 @@ class Popen(subprocess.Popen):
try: self.kill() # Just make sure its dead
except: pass
elif self.expect == EXPECT_RUNNING:
- try: self.kill()
- except: self.unexpected("expected running, exit code %d" % self.wait())
+ if self.poll() != None:
+ self.unexpected("expected running, exit code %d" % self.returncode)
+ else:
+ try:
+ self.kill()
+ except Exception,e:
+ self.unexpected("exception from kill: %s" % str(e))
else:
retry(lambda: self.poll() is not None)
if self.returncode is None: # Still haven't stopped
@@ -544,6 +549,7 @@ class NumberedSender(Thread):
"--broker", "localhost:%s"%broker.port(),
"--address", "%s;{create:always}"%queue,
"--failover-updates",
+ "--connection-options", "{reconnect:true}",
"--content-stdin"
],
expect=EXPECT_RUNNING,
@@ -562,6 +568,7 @@ class NumberedSender(Thread):
try:
self.sent = 0
while not self.stopped:
+ self.sender.assert_running()
if self.max:
self.condition.acquire()
while not self.stopped and self.sent - self.received > self.max:
@@ -604,6 +611,7 @@ class NumberedReceiver(Thread):
"--broker", "localhost:%s"%broker.port(),
"--address", "%s;{create:always}"%queue,
"--failover-updates",
+ "--connection-options", "{reconnect:true}",
"--forever"
],
expect=EXPECT_RUNNING,
@@ -611,15 +619,16 @@ class NumberedReceiver(Thread):
self.lock = Lock()
self.error = None
self.sender = sender
+ self.received = 0
def read_message(self):
return int(self.receiver.stdout.readline())
def run(self):
try:
- self.received = 0
m = self.read_message()
while m != -1:
+ self.receiver.assert_running()
assert(m <= self.received) # Check for missing messages
if (m == self.received): # Ignore duplicates
self.received += 1
diff --git a/qpid/cpp/src/tests/cluster_test_logs.py b/qpid/cpp/src/tests/cluster_test_logs.py
index 9f7d1e2f6c..a0ce8fb9c3 100755
--- a/qpid/cpp/src/tests/cluster_test_logs.py
+++ b/qpid/cpp/src/tests/cluster_test_logs.py
@@ -54,7 +54,7 @@ def filter_log(log):
'caught up',
'active for links|Passivating links|Activating links',
'info Connection.* connected to', # UpdateClient connection
- 'warning Connection [\d+ [0-9.:]+] closed', # UpdateClient connection
+ 'warning Connection \\[[-0-9.: ]+\\] closed', # UpdateClient connection
'warning Broker closed connection: 200, OK',
'task late',
'task overran',
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index caa41fa001..c1d9103f08 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -18,11 +18,12 @@
# under the License.
#
-import os, signal, sys, time, imp, re, subprocess, glob, cluster_test_logs
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging
+import cluster_test_logs
from qpid import datatypes, messaging
from brokertest import *
from qpid.harness import Skipped
-from qpid.messaging import Message, Empty, Disposition, REJECTED
+from qpid.messaging import Message, Empty, Disposition, REJECTED, util
from threading import Thread, Lock, Condition
from logging import getLogger
from itertools import chain
@@ -96,9 +97,15 @@ class ShortTests(BrokerTest):
destination="amq.direct",
message=qpid.datatypes.Message(props, "content"))
+ # Try message with TTL and differnet headers/properties
+ cluster[0].send_message("q", Message(durable=True, ttl=100000))
+ cluster[0].send_message("q", Message(durable=True, properties={}, ttl=100000))
+ cluster[0].send_message("q", Message(durable=True, properties={"x":10}, ttl=100000))
+
# Now update a new member and compare their dumps.
cluster.start(args=["--test-store-dump", "updatee.dump"])
assert readfile("direct.dump") == readfile("updatee.dump")
+
os.remove("direct.dump")
os.remove("updatee.dump")
@@ -253,6 +260,7 @@ acl allow all all
client was attached.
"""
args=["--mgmt-pub-interval=1","--log-enable=trace+:management"]
+ # First broker will be killed.
cluster0 = self.cluster(1, args=args)
cluster1 = self.cluster(1, args=args)
assert 0 == subprocess.call(
@@ -287,6 +295,7 @@ acl allow all all
# Force a change of elder
cluster0.start()
+ cluster0[0].expect=EXPECT_EXIT_FAIL # About to die.
cluster0[0].kill()
time.sleep(2) # Allow a management interval to pass.
# Verify logs are consistent
@@ -525,7 +534,7 @@ acl allow all all
receiver.wait()
q_obj.update()
assert not q_obj.flowStopped
- assert q_obj.msgDepth == 0
+ self.assertEqual(q_obj.msgDepth, 0)
# verify that the sender has become unblocked
sender.join(timeout=5)
@@ -685,6 +694,25 @@ acl allow all all
self.assert_browse(s1, "q", ["foo"])
+ def test_ttl_consistent(self):
+ """Ensure we don't get inconsistent errors with message that have TTL very close together"""
+ messages = [ Message(str(i), ttl=i/1000.0) for i in xrange(0,1000)]
+ messages.append(Message("x"))
+ cluster = self.cluster(2)
+ sender = cluster[0].connect().session().sender("q;{create:always}")
+
+ def fetch(b):
+ receiver = b.connect().session().receiver("q;{create:always}")
+ while receiver.fetch().content != "x": pass
+
+ for m in messages: sender.send(m, sync=False)
+ for m in messages: sender.send(m, sync=False)
+ fetch(cluster[0])
+ fetch(cluster[1])
+ for m in messages: sender.send(m, sync=False)
+ cluster.start()
+ fetch(cluster[2])
+
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):
@@ -697,22 +725,28 @@ class LongTests(BrokerTest):
# Original cluster will all be killed so expect exit with failure
cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL)
+ for b in cluster: b.ready() # Wait for brokers to be ready
for b in cluster: ErrorGenerator(b)
# Start sender and receiver threads
cluster[0].declare_queue("test-queue")
- sender = NumberedSender(cluster[1], 1000) # Max queue depth
- receiver = NumberedReceiver(cluster[2], sender)
+ sender = NumberedSender(cluster[0], 1000) # Max queue depth
+ receiver = NumberedReceiver(cluster[0], sender)
receiver.start()
sender.start()
+ # Wait for sender & receiver to get up and running
+ retry(lambda: receiver.received > 0)
# Kill original brokers, start new ones for the duration.
endtime = time.time() + self.duration()
i = 0
while time.time() < endtime:
+ sender.sender.assert_running()
+ receiver.receiver.assert_running()
cluster[i].kill()
i += 1
b = cluster.start(expect=EXPECT_EXIT_FAIL)
+ for b in cluster[i:]: b.ready()
ErrorGenerator(b)
time.sleep(5)
sender.stop()
@@ -782,7 +816,7 @@ class LongTests(BrokerTest):
args += ["--log-enable=trace+:management"]
# Use store if present.
if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib]
- cluster = self.cluster(3, args)
+ cluster = self.cluster(3, args, expect=EXPECT_EXIT_FAIL) # brokers will be killed
clients = [] # Per-broker list of clients that only connect to one broker.
mclients = [] # Management clients that connect to every broker in the cluster.
@@ -806,7 +840,7 @@ class LongTests(BrokerTest):
endtime = time.time() + self.duration()
# For long duration, first run is a quarter of the duration.
- runtime = max(5, self.duration() / 4.0)
+ runtime = min(5.0, self.duration() / 3.0)
alive = 0 # First live cluster member
for i in range(len(cluster)): start_clients(cluster[i])
start_mclients(cluster[alive])
@@ -817,7 +851,7 @@ class LongTests(BrokerTest):
for b in cluster[alive:]: b.ready() # Check if a broker crashed.
# Kill the first broker, expect the clients to fail.
b = cluster[alive]
- b.expect = EXPECT_EXIT_FAIL
+ b.ready()
b.kill()
# Stop the brokers clients and all the mclients.
for c in clients[alive] + mclients:
@@ -827,11 +861,15 @@ class LongTests(BrokerTest):
mclients = []
# Start another broker and clients
alive += 1
- cluster.start()
+ cluster.start(expect=EXPECT_EXIT_FAIL)
+ cluster[-1].ready() # Wait till its ready
start_clients(cluster[-1])
start_mclients(cluster[alive])
for c in chain(mclients, *clients):
c.stop()
+ for b in cluster[alive:]:
+ b.ready() # Verify still alive
+ b.kill()
# Verify that logs are consistent
cluster_test_logs.verify_logs()
@@ -844,7 +882,7 @@ class LongTests(BrokerTest):
end = time.time() + self.duration()
while (time.time() < end): # Get a management interval
for i in xrange(1000): cluster[0].connect().close()
- cluster_test_logs.verify_logs()
+ cluster_test_logs.verify_logs()
def test_flowlimit_failover(self):
"""Test fail-over during continuous send-receive with flow control
@@ -853,34 +891,149 @@ class LongTests(BrokerTest):
# Original cluster will all be killed so expect exit with failure
cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL)
- #for b in cluster: ErrorGenerator(b)
+ for b in cluster: b.ready() # Wait for brokers to be ready
# create a queue with rather draconian flow control settings
ssn0 = cluster[0].connect().session()
s0 = ssn0.sender("test-queue; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':2000, 'qpid.flow_resume_count':100}}}}")
- receiver = NumberedReceiver(cluster[2])
+ receiver = NumberedReceiver(cluster[0])
receiver.start()
- senders = [NumberedSender(cluster[i]) for i in range(1,3)]
+ senders = [NumberedSender(cluster[0]) for i in range(1,3)]
for s in senders:
s.start()
+ # Wait for senders & receiver to get up and running
+ retry(lambda: receiver.received > 2*senders)
# Kill original brokers, start new ones for the duration.
endtime = time.time() + self.duration();
i = 0
while time.time() < endtime:
+ for s in senders: s.sender.assert_running()
+ receiver.receiver.assert_running()
+ for b in cluster[i:]: b.ready() # Check if any broker crashed.
cluster[i].kill()
i += 1
b = cluster.start(expect=EXPECT_EXIT_FAIL)
- #ErrorGenerator(b)
time.sleep(5)
- #b = cluster[0]
- #b.startQmf()
for s in senders:
s.stop()
receiver.stop()
for i in range(i, len(cluster)): cluster[i].kill()
+ def test_ttl_failover(self):
+ """Test that messages with TTL don't cause problems in a cluster with failover"""
+
+ class Client(StoppableThread):
+
+ def __init__(self, broker):
+ StoppableThread.__init__(self)
+ self.connection = broker.connect(reconnect=True)
+ self.auto_fetch_reconnect_urls(self.connection)
+ self.session = self.connection.session()
+
+ def auto_fetch_reconnect_urls(self, conn):
+ """Replacment for qpid.messaging.util version which is noisy"""
+ ssn = conn.session("auto-fetch-reconnect-urls")
+ rcv = ssn.receiver("amq.failover")
+ rcv.capacity = 10
+
+ def main():
+ while True:
+ try:
+ msg = rcv.fetch()
+ qpid.messaging.util.set_reconnect_urls(conn, msg)
+ ssn.acknowledge(msg, sync=False)
+ except messaging.exceptions.LinkClosed: return
+ except messaging.exceptions.ConnectionError: return
+
+ thread = Thread(name="auto-fetch-reconnect-urls", target=main)
+ thread.setDaemon(True)
+ thread.start()
+
+ def stop(self):
+ StoppableThread.stop(self)
+ self.connection.detach()
+
+ class Sender(Client):
+ def __init__(self, broker, address):
+ Client.__init__(self, broker)
+ self.sent = 0 # Number of messages _reliably_ sent.
+ self.sender = self.session.sender(address, capacity=1000)
+
+ def send_counted(self, ttl):
+ self.sender.send(Message(str(self.sent), ttl=ttl))
+ self.sent += 1
+
+ def run(self):
+ while not self.stopped:
+ choice = random.randint(0,4)
+ if choice == 0: self.send_counted(None) # No ttl
+ elif choice == 1: self.send_counted(100000) # Large ttl
+ else: # Small ttl, might expire
+ self.sender.send(Message("", ttl=random.random()/10))
+ self.sender.send(Message("z"), sync=True) # Chaser.
+
+ class Receiver(Client):
+
+ def __init__(self, broker, address):
+ Client.__init__(self, broker)
+ self.received = 0 # Number of non-empty (reliable) messages received.
+ self.receiver = self.session.receiver(address, capacity=1000)
+ def run(self):
+ try:
+ while True:
+ m = self.receiver.fetch(1)
+ if m.content == "z": break
+ if m.content: # Ignore unreliable messages
+ # Ignore duplicates
+ if int(m.content) == self.received: self.received += 1
+ except Exception,e: self.error = e
+
+ # def test_ttl_failover
+
+ # Original cluster will all be killed so expect exit with failure
+ # Set small purge interval.
+ cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["--queue-purge-interval=1"])
+ for b in cluster: b.ready() # Wait for brokers to be ready
+
+ # Python client failover produces noisy WARN logs, disable temporarily
+ logger = logging.getLogger()
+ log_level = logger.getEffectiveLevel()
+ logger.setLevel(logging.ERROR)
+ try:
+ # Start sender and receiver threads
+ receiver = Receiver(cluster[0], "q;{create:always}")
+ receiver.start()
+ sender = Sender(cluster[0], "q;{create:always}")
+ sender.start()
+ # Wait for sender & receiver to get up and running
+ retry(lambda: receiver.received > 0)
+
+ # Kill brokers in a cycle.
+ endtime = time.time() + self.duration()
+ runtime = min(5.0, self.duration() / 4.0)
+ i = 0
+ while time.time() < endtime:
+ for b in cluster[i:]: b.ready() # Check if any broker crashed.
+ cluster[i].kill()
+ i += 1
+ b = cluster.start(expect=EXPECT_EXIT_FAIL)
+ b.ready()
+ time.sleep(runtime)
+ sender.stop()
+ receiver.stop()
+ for b in cluster[i:]:
+ b.ready() # Check it didn't crash
+ b.kill()
+ self.assertEqual(sender.sent, receiver.received)
+ cluster_test_logs.verify_logs()
+
+ finally:
+ # Detach to avoid slow reconnect attempts during shut-down if test fails.
+ sender.connection.detach()
+ receiver.connection.detach()
+ logger.setLevel(log_level)
class StoreTests(BrokerTest):
"""
diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py
index 201b06a4a2..49bdecdd95 100755
--- a/qpid/cpp/src/tests/federation.py
+++ b/qpid/cpp/src/tests/federation.py
@@ -649,10 +649,17 @@ class FederationTests(TestBase010):
self.verify_cleanup()
- def test_dynamic_headers(self):
+ def test_dynamic_headers_any(self):
+ self.do_test_dynamic_headers('any')
+
+ def test_dynamic_headers_all(self):
+ self.do_test_dynamic_headers('all')
+
+
+ def do_test_dynamic_headers(self, match_mode):
session = self.session
r_conn = self.connect(host=self.remote_host(), port=self.remote_port())
- r_session = r_conn.session("test_dynamic_headers")
+ r_session = r_conn.session("test_dynamic_headers_%s" % match_mode)
session.exchange_declare(exchange="fed.headers", type="headers")
r_session.exchange_declare(exchange="fed.headers", type="headers")
@@ -671,7 +678,7 @@ class FederationTests(TestBase010):
sleep(5)
session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
- session.exchange_bind(queue="fed1", exchange="fed.headers", binding_key="key1", arguments={'x-match':'any', 'class':'first'})
+ session.exchange_bind(queue="fed1", exchange="fed.headers", binding_key="key1", arguments={'x-match':match_mode, 'class':'first'})
self.subscribe(queue="fed1", destination="f1")
queue = session.incoming("f1")
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 4d83c5b5de..c33f2e4852 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -8,9 +8,9 @@
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
--
+-
- http://www.apache.org/licenses/LICENSE-2.0
--
+-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -78,10 +78,6 @@
<field name="left" type="vbin16"/> <!-- packed member-id array -->
</control>
- <control name="message-expired" code="0x12">
- <field name="id" type="uint64"/>
- </control>
-
<domain name="error-type" type="uint8" label="Types of error">
<enum>
<choice name="none" value="0"/>
@@ -89,7 +85,7 @@
<choice name="connection" value="2"/>
</enum>
</domain>
-
+
<!-- Check for error consistency across the cluster -->
<control name="error-check" code="0x14">
<field name="type" type="error-type"/>
@@ -116,6 +112,11 @@
<field name="message" type="vbin32"/>
</control>
+ <!-- Update the cluster time -->
+ <control name="clock" code="0x22">
+ <field name="time" type="uint64"/>
+ </control>
+
</class>
<!-- Controls associated with a specific connection. -->
@@ -149,7 +150,7 @@
<!-- Abort a connection that is sending invalid data. -->
<control name="abort" code="0x4"/>
-
+
<!-- Update controls. Sent to a new broker in joining mode.
A connection is updated as followed:
- send the shadow's management ID in shadow-perpare on the update connection
@@ -183,7 +184,7 @@
<field name="position" type="sequence-no"/>
<field name="tag" type="str8"/>
<field name="id" type="sequence-no"/>
- <field name="acquired" type="bit"/> <!--If not set, message follows. -->
+ <field name="acquired" type="bit"/> <!--If not set, message is on update queue. -->
<field name="accepted" type="bit"/>
<field name="cancelled" type="bit"/>
<field name="completed" type="bit"/>
@@ -192,9 +193,9 @@
<field name="enqueued" type="bit"/>
<field name="credit" type="uint32"/>
</control>
-
+
<!-- Tx transaction state. -->
- <control name="tx-start" code="0x12"/>
+ <control name="tx-start" code="0x12"/>
<control name="tx-accept" code="0x13"> <field name="commands" type="sequence-set"/> </control>
<control name="tx-dequeue" code="0x14"> <field name="queue" type="str8"/> </control>
<control name="tx-enqueue" code="0x15"> <field name="queue" type="str8"/> </control>
@@ -204,7 +205,7 @@
</control>
<control name="tx-end" code="0x17"/>
<control name="accumulated-ack" code="0x18"> <field name="commands" type="sequence-set"/> </control>
-
+
<!-- Consumers in the connection's output task -->
<control name="output-task" code="0x19">
<field name="channel" type="uint16"/>
@@ -253,9 +254,6 @@
<!-- Replicate encoded exchanges/queues. -->
<control name="exchange" code="0x31"><field name="encoded" type="str32"/></control>
- <!-- Set expiry-id for subsequent messages. -->
- <control name="expiry-id" code="0x33"><field name="expiry-id" type="uint64"/></control>
-
<!-- Add a listener to a queue -->
<control name="add-queue-listener" code="0x34">
<field name="queue" type="str8"/>
@@ -289,6 +287,18 @@
<field name="state" type="map"/> <!-- "name"=value -->
</control>
+ <!-- Update the cluster time -->
+ <control name="clock" code="0x40">
+ <field name="time" type="uint64"/>
+ </control>
+
+ <!-- Update a queue's dequeue rate -->
+ <control name="queue-dequeue-since-purge-state" code="0x41">
+ <field name="queue" type="str8"/>
+ <field name="dequeueSincePurge" type="uint32"/>
+ </control>
+
+
</class>
</amqp>