diff options
Diffstat (limited to 'qpid/cpp')
110 files changed, 1132 insertions, 898 deletions
diff --git a/qpid/cpp/bindings/qmf/python/Makefile.am b/qpid/cpp/bindings/qmf/python/Makefile.am index 421590f189..8abad32959 100644 --- a/qpid/cpp/bindings/qmf/python/Makefile.am +++ b/qpid/cpp/bindings/qmf/python/Makefile.am @@ -30,11 +30,13 @@ BUILT_SOURCES = $(generated_file_list) SWIG_FLAGS = -w362,401 $(generated_file_list): $(srcdir)/python.i $(srcdir)/../qmfengine.i - swig -c++ -python $(SWIG_FLAGS) $(INCLUDES) $(QPID_CXXFLAGS) -I$(top_srcdir)/src/qmf -I/usr/include -o qmfengine.cpp $(srcdir)/python.i + $(SWIG) -c++ -python $(SWIG_FLAGS) $(INCLUDES) $(QPID_CXXFLAGS) -I$(top_srcdir)/src/qmf -I/usr/include -o qmfengine.cpp $(srcdir)/python.i pylibdir = $(PYTHON_LIB) lib_LTLIBRARIES = _qmfengine.la +qenginedir = $(pyexecdir) +qengine_PYTHON = qmfengine.py qmf.py #_qmfengine_la_LDFLAGS = -avoid-version -module -shrext "$(PYTHON_SO)" #_qmfengine_la_LDFLAGS = -avoid-version -module -shrext ".so" diff --git a/qpid/cpp/bindings/qmf/ruby/Makefile.am b/qpid/cpp/bindings/qmf/ruby/Makefile.am index 395d64ff90..de8c4d10d5 100644 --- a/qpid/cpp/bindings/qmf/ruby/Makefile.am +++ b/qpid/cpp/bindings/qmf/ruby/Makefile.am @@ -35,7 +35,7 @@ qmfengine.cpp: $(srcdir)/ruby.i $(srcdir)/../qmfengine.i rubylibarchdir = $(RUBY_LIB_ARCH) rubylibarch_LTLIBRARIES = qmfengine.la -qmfengine_la_LDFLAGS = -avoid-version -module -shrext ".$(RUBY_DLEXT)" +qmfengine_la_LDFLAGS = -avoid-version -module -shared -shrext ".$(RUBY_DLEXT)" qmfengine_la_LIBADD = $(RUBY_LIBS) -L$(top_builddir)/src/.libs -lqpidclient $(top_builddir)/src/libqmfengine.la qmfengine_la_CXXFLAGS = $(INCLUDES) -I$(RUBY_INC) -I$(RUBY_INC_ARCH) -fno-strict-aliasing nodist_qmfengine_la_SOURCES = qmfengine.cpp diff --git a/qpid/cpp/bindings/qmf2/python/Makefile.am b/qpid/cpp/bindings/qmf2/python/Makefile.am index 7adc62eddb..3dc04e832f 100644 --- a/qpid/cpp/bindings/qmf2/python/Makefile.am +++ b/qpid/cpp/bindings/qmf2/python/Makefile.am @@ -30,12 +30,12 @@ BUILT_SOURCES = $(generated_file_list) SWIG_FLAGS = -w362,401 $(generated_file_list): $(srcdir)/python.i $(srcdir)/../qmf2.i $(srcdir)/../../swig_python_typemaps.i - swig -c++ -python $(SWIG_FLAGS) $(INCLUDES) $(QPID_CXXFLAGS) -I/usr/include -o cqmf2.cpp $(srcdir)/python.i + $(SWIG) -c++ -python $(SWIG_FLAGS) $(INCLUDES) $(QPID_CXXFLAGS) -I/usr/include -o cqmf2.cpp $(srcdir)/python.i pylibdir = $(PYTHON_LIB) lib_LTLIBRARIES = _cqmf2.la -cqpiddir = $(pythondir) +cqpiddir = $(pyexecdir) cqpid_PYTHON = qmf2.py cqmf2.py _cqmf2_la_LDFLAGS = -avoid-version -module -shared diff --git a/qpid/cpp/bindings/qmf2/ruby/Makefile.am b/qpid/cpp/bindings/qmf2/ruby/Makefile.am index 8e11a204b2..97bbc6f385 100644 --- a/qpid/cpp/bindings/qmf2/ruby/Makefile.am +++ b/qpid/cpp/bindings/qmf2/ruby/Makefile.am @@ -34,7 +34,7 @@ rubylibarchdir = $(RUBY_LIB_ARCH) rubylibarch_LTLIBRARIES = cqmf2.la dist_rubylib_DATA = qmf2.rb -cqmf2_la_LDFLAGS = -avoid-version -module -shrext ".$(RUBY_DLEXT)" +cqmf2_la_LDFLAGS = -avoid-version -module -shared -shrext ".$(RUBY_DLEXT)" cqmf2_la_LIBADD = $(RUBY_LIBS) -L$(top_builddir)/src/.libs -lqmf2 $(top_builddir)/src/libqmf2.la cqmf2_la_CXXFLAGS = $(INCLUDES) -I$(RUBY_INC) -I$(RUBY_INC_ARCH) -fno-strict-aliasing nodist_cqmf2_la_SOURCES = cqmf2.cpp diff --git a/qpid/cpp/bindings/qpid/dotnet/src/Session.cpp b/qpid/cpp/bindings/qpid/dotnet/src/Session.cpp index 4a6199f108..0e918769a3 100644 --- a/qpid/cpp/bindings/qpid/dotnet/src/Session.cpp +++ b/qpid/cpp/bindings/qpid/dotnet/src/Session.cpp @@ -248,6 +248,31 @@ namespace Messaging { }
}
+ void Session::AcknowledgeUpTo(Message ^ message)
+ {
+ AcknowledgeUpTo(message, false);
+ }
+
+ void Session::AcknowledgeUpTo(Message ^ message, bool sync)
+ {
+ System::Exception ^ newException = nullptr;
+
+ try
+ {
+ sessionp->acknowledgeUpTo(*(message->NativeMessage), sync);
+ }
+ catch (const ::qpid::types::Exception & error)
+ {
+ String ^ errmsg = gcnew String(error.what());
+ newException = gcnew QpidException(errmsg);
+ }
+
+ if (newException != nullptr)
+ {
+ throw newException;
+ }
+ }
+
void Session::Reject(Message ^ message)
{
System::Exception ^ newException = nullptr;
diff --git a/qpid/cpp/bindings/qpid/dotnet/src/Session.h b/qpid/cpp/bindings/qpid/dotnet/src/Session.h index 4d4cad75c4..4b98a37f18 100644 --- a/qpid/cpp/bindings/qpid/dotnet/src/Session.h +++ b/qpid/cpp/bindings/qpid/dotnet/src/Session.h @@ -104,6 +104,8 @@ namespace Messaging { void Acknowledge(bool sync);
void Acknowledge(Message ^ message);
void Acknowledge(Message ^ message, bool sync);
+ void AcknowledgeUpTo(Message ^ message);
+ void AcknowledgeUpTo(Message ^ message, bool sync);
void Reject(Message ^);
void Release(Message ^);
void Sync();
diff --git a/qpid/cpp/bindings/qpid/python/Makefile.am b/qpid/cpp/bindings/qpid/python/Makefile.am index 9aef179db7..dd25f34829 100644 --- a/qpid/cpp/bindings/qpid/python/Makefile.am +++ b/qpid/cpp/bindings/qpid/python/Makefile.am @@ -30,12 +30,12 @@ BUILT_SOURCES = $(generated_file_list) SWIG_FLAGS = -w362,401 $(generated_file_list): $(srcdir)/python.i $(srcdir)/../qpid.i $(srcdir)/../../swig_python_typemaps.i - swig -c++ -python $(SWIG_FLAGS) $(INCLUDES) $(QPID_CXXFLAGS) -I$(top_srcdir)/src/qmf -I/usr/include -o cqpid.cpp $(srcdir)/python.i + $(SWIG) -c++ -python $(SWIG_FLAGS) $(INCLUDES) $(QPID_CXXFLAGS) -I$(top_srcdir)/src/qmf -I/usr/include -o cqpid.cpp $(srcdir)/python.i pylibdir = $(PYTHON_LIB) lib_LTLIBRARIES = _cqpid.la -cqpiddir = $(pythondir) +cqpiddir = $(pyexecdir) cqpid_PYTHON = cqpid.py _cqpid_la_LDFLAGS = -avoid-version -module -shared diff --git a/qpid/cpp/bindings/qpid/ruby/Makefile.am b/qpid/cpp/bindings/qpid/ruby/Makefile.am index d92eb969de..a2a5dd76bd 100644 --- a/qpid/cpp/bindings/qpid/ruby/Makefile.am +++ b/qpid/cpp/bindings/qpid/ruby/Makefile.am @@ -33,7 +33,7 @@ cqpid.cpp: $(srcdir)/ruby.i $(srcdir)/../qpid.i $(srcdir)/../../swig_ruby_typema rubylibarchdir = $(RUBY_LIB_ARCH) rubylibarch_LTLIBRARIES = cqpid.la -cqpid_la_LDFLAGS = -avoid-version -module -shrext ".$(RUBY_DLEXT)" +cqpid_la_LDFLAGS = -avoid-version -module -shared -shrext ".$(RUBY_DLEXT)" cqpid_la_LIBADD = $(RUBY_LIBS) -L$(top_builddir)/src/.libs -lqpidmessaging -lqpidtypes \ $(top_builddir)/src/libqpidmessaging.la $(top_builddir)/src/libqpidtypes.la cqpid_la_CXXFLAGS = $(INCLUDES) -I$(RUBY_INC) -I$(RUBY_INC_ARCH) -fno-strict-aliasing diff --git a/qpid/cpp/configure.ac b/qpid/cpp/configure.ac index 43a32d3ad7..092694d56b 100644 --- a/qpid/cpp/configure.ac +++ b/qpid/cpp/configure.ac @@ -68,8 +68,10 @@ if test x$GXX = xyes; then # The following warnings are deliberately omitted, they warn on valid code. # -Wunreachable-code -Wpadded -Winline # -Wshadow - warns about boost headers. + # Can't test for -Werror as whether it fails or not depends on what's in + # CFLAGS/CXXFLAGS. In any case it's been in gcc for a long time (since 2.95 at least) if test "${enableval}" = yes; then - gl_COMPILER_FLAGS(-Werror) + COMPILER_FLAGS="-Werror" gl_COMPILER_FLAGS(-pedantic) gl_COMPILER_FLAGS(-Wall) gl_COMPILER_FLAGS(-Wextra) diff --git a/qpid/cpp/examples/messaging/drain.cpp b/qpid/cpp/examples/messaging/drain.cpp index 5c938e9742..7700244fa8 100644 --- a/qpid/cpp/examples/messaging/drain.cpp +++ b/qpid/cpp/examples/messaging/drain.cpp @@ -50,7 +50,7 @@ struct Options : OptionParser add("broker,b", url, "url of broker to connect to"); add("timeout,t", timeout, "timeout in seconds to wait before exiting"); add("forever,f", forever, "ignore timeout and wait forever"); - add("connection-options", connectionOptions, "connection options string in the form {name1=value1, name2=value2}"); + add("connection-options", connectionOptions, "connection options string in the form {name1:value1, name2:value2}"); add("count,c", count, "number of messages to read before exiting"); } diff --git a/qpid/cpp/examples/messaging/spout.cpp b/qpid/cpp/examples/messaging/spout.cpp index 57b955c1de..cd11a7ad81 100644 --- a/qpid/cpp/examples/messaging/spout.cpp +++ b/qpid/cpp/examples/messaging/spout.cpp @@ -65,7 +65,7 @@ struct Options : OptionParser add("property,P", properties, "specify message property"); add("map,M", entries, "specify entry for map content"); add("content", content, "specify textual content"); - add("connection-options", connectionOptions, "connection options string in the form {name1=value1, name2=value2}"); + add("connection-options", connectionOptions, "connection options string in the form {name1:value1, name2:value2}"); } static bool nameval(const std::string& in, std::string& name, std::string& value) diff --git a/qpid/cpp/include/qpid/framing/FieldTable.h b/qpid/cpp/include/qpid/framing/FieldTable.h index fed431129a..e8ec524863 100644 --- a/qpid/cpp/include/qpid/framing/FieldTable.h +++ b/qpid/cpp/include/qpid/framing/FieldTable.h @@ -65,6 +65,8 @@ class FieldTable QPID_COMMON_EXTERN void decode(Buffer& buffer); QPID_COMMON_EXTERN int count() const; + QPID_COMMON_EXTERN size_t size() const { return values.size(); } + QPID_COMMON_EXTERN bool empty() { return size() == 0; } QPID_COMMON_EXTERN void set(const std::string& name, const ValuePtr& value); QPID_COMMON_EXTERN ValuePtr get(const std::string& name) const; QPID_COMMON_INLINE_EXTERN bool isSet(const std::string& name) const { return get(name).get() != 0; } diff --git a/qpid/cpp/include/qpid/messaging/Connection.h b/qpid/cpp/include/qpid/messaging/Connection.h index e938f23189..165573e2ef 100644 --- a/qpid/cpp/include/qpid/messaging/Connection.h +++ b/qpid/cpp/include/qpid/messaging/Connection.h @@ -54,27 +54,27 @@ class QPID_MESSAGING_CLASS_EXTERN Connection : public qpid::messaging::Handle<Co * username * password * heartbeat - * tcp-nodelay - * sasl-mechanism - * sasl-service - * sasl-min-ssf - * sasl-max-ssf + * tcp_nodelay + * sasl_mechanisms + * sasl_service + * sasl_min_ssf + * sasl_max_ssf * transport * * Reconnect behaviour can be controlled through the following options: * * reconnect: true/false (enables/disables reconnect entirely) - * reconnect-timeout: number of seconds (give up and report failure after specified time) - * reconnect-limit: n (give up and report failure after specified number of attempts) - * reconnect-interval-min: number of seconds (initial delay between failed reconnection attempts) - * reconnect-interval-max: number of seconds (maximum delay between failed reconnection attempts) - * reconnect-interval: shorthand for setting the same reconnect_interval_min/max - * reconnect-urls: list of alternate urls to try when connecting + * reconnect_timeout: number of seconds (give up and report failure after specified time) + * reconnect_limit: n (give up and report failure after specified number of attempts) + * reconnect_interval_min: number of seconds (initial delay between failed reconnection attempts) + * reconnect_interval_max: number of seconds (maximum delay between failed reconnection attempts) + * reconnect_interval: shorthand for setting the same reconnect_interval_min/max + * reconnect_urls: list of alternate urls to try when connecting * - * The reconnect-interval is the time that the client waits + * The reconnect_interval is the time that the client waits * for after a failed attempt to reconnect before retrying. It - * starts at the value of the min-retry-interval and is - * doubled every failure until the value of max-retry-interval + * starts at the value of the min_retry_interval and is + * doubled every failure until the value of max_retry_interval * is reached. */ QPID_MESSAGING_EXTERN Connection(const std::string& url, const qpid::types::Variant::Map& options = qpid::types::Variant::Map()); diff --git a/qpid/cpp/include/qpid/messaging/Message.h b/qpid/cpp/include/qpid/messaging/Message.h index 5cd978f2a2..e89a6ce02f 100644 --- a/qpid/cpp/include/qpid/messaging/Message.h +++ b/qpid/cpp/include/qpid/messaging/Message.h @@ -55,23 +55,58 @@ class QPID_MESSAGING_CLASS_EXTERN Message QPID_MESSAGING_EXTERN void setSubject(const std::string&); QPID_MESSAGING_EXTERN const std::string& getSubject() const; + /** + * Set the content type (i.e. the MIME type) for the message. This + * should be set by the sending application and indicates to + * recipients of message how to interpret or decode the content. + */ QPID_MESSAGING_EXTERN void setContentType(const std::string&); + /** + * Returns the content type (i.e. the MIME type) for the + * message. This can be used to determine how to decode the + * message content. + */ QPID_MESSAGING_EXTERN const std::string& getContentType() const; + /** + * Set an application defined identifier for the message. At + * present this must be a stringfied UUID (support for less + * restrictive IDs is anticipated however). + */ QPID_MESSAGING_EXTERN void setMessageId(const std::string&); QPID_MESSAGING_EXTERN const std::string& getMessageId() const; + /** + * Sets the user id of the message. This should in general be the + * user-id as which the sending connection authenticated itself as + * the messaging infrastructure will verify this. See + * Connection::getAuthenticatedUsername() + */ QPID_MESSAGING_EXTERN void setUserId(const std::string&); QPID_MESSAGING_EXTERN const std::string& getUserId() const; + /** + * Can be used to set application specific correlation identifiers + * as part of a protocol for message exchange patterns. E.g. a + * request-reponse pattern might require the correlation-id of the + * request and response to match, or might use the message-id of + * the request as the correlation-id on the response etc. + */ QPID_MESSAGING_EXTERN void setCorrelationId(const std::string&); QPID_MESSAGING_EXTERN const std::string& getCorrelationId() const; + /** + * Sets a priority level on the message. This may be used by the + * messaging infrastructure to prioritise delivery of higher + * priority messages. + */ QPID_MESSAGING_EXTERN void setPriority(uint8_t); QPID_MESSAGING_EXTERN uint8_t getPriority() const; /** - * Set the time to live for this message in milliseconds. + * Set the time to live for this message in milliseconds. This can + * be used by the messaging infrastructure to discard messages + * that are no longer of relevance. */ QPID_MESSAGING_EXTERN void setTtl(Duration ttl); /** @@ -79,24 +114,62 @@ class QPID_MESSAGING_CLASS_EXTERN Message */ QPID_MESSAGING_EXTERN Duration getTtl() const; + /** + * Mark the message as durable. This is a hint to the messaging + * infrastructure that the message should be persisted or + * otherwise stored such that failoures or shutdown do not cause + * it to be lost. + */ QPID_MESSAGING_EXTERN void setDurable(bool durable); QPID_MESSAGING_EXTERN bool getDurable() const; + /** + * The redelivered flag if set implies that the message *may* have + * been previously delivered and thus is a hint to the application + * or messaging infrastructure that if de-duplication is required + * this message should be examined to determine if it is a + * duplicate. + */ QPID_MESSAGING_EXTERN bool getRedelivered() const; + /** + * Can be used to provide a hint to the application or messaging + * infrastructure that if de-duplication is required this message + * should be examined to determine if it is a duplicate. + */ QPID_MESSAGING_EXTERN void setRedelivered(bool); + /** + * In addition to a payload (i.e. the content), messages can + * include annotations describing aspectf of the message. In + * addition to the standard annotations such as TTL and content + * type, application- or context- specific properties can also be + * defined. Each message has a map of name values for such custom + * properties. The value is specified as a Variant. + */ QPID_MESSAGING_EXTERN const qpid::types::Variant::Map& getProperties() const; QPID_MESSAGING_EXTERN qpid::types::Variant::Map& getProperties(); + /** + * Set the content to the data held in the string parameter. Note: + * this is treated as raw bytes and need not be text. Consider + * setting the content-type to indicate how the data should be + * interpreted by recipients. + */ QPID_MESSAGING_EXTERN void setContent(const std::string&); /** - * Note that chars are copied. + * Copy count bytes from the region pointed to by chars as the + * message content. */ QPID_MESSAGING_EXTERN void setContent(const char* chars, size_t count); /** Get the content as a std::string */ QPID_MESSAGING_EXTERN std::string getContent() const; - /** Get a const pointer to the start of the content data. */ + /** + * Get a const pointer to the start of the content data. The + * memory pointed to is owned by the message. The getContentSize() + * method indicates how much data there is (i.e. the extent of the + * memory region pointed to by the return value of this method). + */ QPID_MESSAGING_EXTERN const char* getContentPtr() const; /** Get the size of content in bytes. */ QPID_MESSAGING_EXTERN size_t getContentSize() const; diff --git a/qpid/cpp/include/qpid/messaging/Session.h b/qpid/cpp/include/qpid/messaging/Session.h index 52786eb5f4..428f8aa491 100644 --- a/qpid/cpp/include/qpid/messaging/Session.h +++ b/qpid/cpp/include/qpid/messaging/Session.h @@ -78,6 +78,10 @@ class QPID_MESSAGING_CLASS_EXTERN Session : public qpid::messaging::Handle<Sessi */ QPID_MESSAGING_EXTERN void acknowledge(Message&, bool sync=false); /** + * Acknowledges all message up to the specified message. + */ + QPID_MESSAGING_EXTERN void acknowledgeUpTo(Message&, bool sync=false); + /** * Rejects the specified message. The broker does not redeliver a * message that has been rejected. Once a message has been * acknowledged, it can no longer be rejected. diff --git a/qpid/cpp/managementgen/Makefile.am b/qpid/cpp/managementgen/Makefile.am index 6c2024ccaa..e10dd63c87 100644 --- a/qpid/cpp/managementgen/Makefile.am +++ b/qpid/cpp/managementgen/Makefile.am @@ -19,10 +19,16 @@ qmfpythondir = $(pythondir) dist_bin_SCRIPTS = \ qmf-gen -nobase_qmfpython_DATA = \ + +pkgpyexec_qmfgendir = $(pyexecdir)/qmfgen +pkgpyexec_qmfgen_PYTHON = \ qmfgen/__init__.py \ qmfgen/generate.py \ qmfgen/schema.py \ + qmfgen/management-types.xml + +pkgpyexec_qmfgentmpldir = $(pyexecdir)/qmfgen/templates +pkgpyexec_qmfgentmpl_PYTHON = \ qmfgen/templates/Args.h \ qmfgen/templates/Class.cpp \ qmfgen/templates/Class.h \ @@ -32,7 +38,6 @@ nobase_qmfpython_DATA = \ qmfgen/templates/Package.cpp \ qmfgen/templates/Package.h \ qmfgen/templates/V2Package.cpp \ - qmfgen/templates/V2Package.h \ - qmfgen/management-types.xml + qmfgen/templates/V2Package.h EXTRA_DIST = $(nobase_qmfpython_DATA) CMakeLists.txt diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index 0fe2d7e4d0..11554b1034 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -96,7 +96,7 @@ MACRO (add_msvc_version_full verProject verProjectType verProjectFileExt verFN1 inherit_value ("winver_${verProject}_InternalName" "${verProject}") inherit_value ("winver_${verProject}_OriginalFilename" "${verProject}.${verProjectFileExt}") inherit_value ("winver_${verProject}_ProductName" "${winver_DESCRIPTION_SUMMARY}") - + # Create strings to be substituted into the template file set ("winverFileVersionBinary" "${winver_${verProject}_FileVersionBinary}") set ("winverProductVersionBinary" "${winver_${verProject}_ProductVersionBinary}") @@ -126,7 +126,7 @@ ENDMACRO (add_msvc_version_full) # MACRO (add_msvc_version verProject verProjectType verProjectFileExt) if (MSVC) - add_msvc_version_full (${verProject} + add_msvc_version_full (${verProject} ${verProjectType} ${verProjectFileExt} ${winver_FILE_VERSION_N1} @@ -656,7 +656,7 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows) set (qpidd_platform_SOURCES windows/QpiddBroker.cpp ) - + set (qpidmessaging_platform_SOURCES qpid/messaging/HandleInstantiator.cpp ) @@ -997,7 +997,6 @@ set (qpidbroker_SOURCES qpid/broker/QueuePolicy.cpp qpid/broker/QueueRegistry.cpp qpid/broker/QueueFlowLimit.cpp - qpid/broker/RateTracker.cpp qpid/broker/RecoveryManagerImpl.cpp qpid/broker/RecoveredEnqueue.cpp qpid/broker/RecoveredDequeue.cpp diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 15021cc68b..608afb0660 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -616,8 +616,6 @@ libqpidbroker_la_SOURCES = \ qpid/broker/QueueFlowLimit.h \ qpid/broker/QueueFlowLimit.cpp \ qpid/broker/RateFlowcontrol.h \ - qpid/broker/RateTracker.cpp \ - qpid/broker/RateTracker.h \ qpid/broker/RecoverableConfig.h \ qpid/broker/RecoverableExchange.h \ qpid/broker/RecoverableMessage.h \ diff --git a/qpid/cpp/src/qmf/ConsoleSession.cpp b/qpid/cpp/src/qmf/ConsoleSession.cpp index 7b839930e1..5df0d83f12 100644 --- a/qpid/cpp/src/qmf/ConsoleSession.cpp +++ b/qpid/cpp/src/qmf/ConsoleSession.cpp @@ -66,7 +66,7 @@ Subscription ConsoleSession::subscribe(const string& q, const string& f, const s //======================================================================================== ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) : - connection(c), domain("default"), maxAgentAgeMinutes(5), + connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false), opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0), connectedBrokerInAgentList(false), schemaCache(new SchemaCache()) { diff --git a/qpid/cpp/src/qmf/engine/ResilientConnection.cpp b/qpid/cpp/src/qmf/engine/ResilientConnection.cpp index ab65b8d768..41dd9ff00c 100644 --- a/qpid/cpp/src/qmf/engine/ResilientConnection.cpp +++ b/qpid/cpp/src/qmf/engine/ResilientConnection.cpp @@ -334,8 +334,7 @@ void ResilientConnectionImpl::notify() { if (notifyFd != -1) { - int unused_ret; //Suppress warnings about ignoring return value. - unused_ret = ::write(notifyFd, ".", 1); + (void) ::write(notifyFd, ".", 1); } } @@ -432,8 +431,7 @@ void ResilientConnectionImpl::EnqueueEvent(ResilientConnectionEvent::EventKind k if (notifyFd != -1) { - int unused_ret; //Suppress warnings about ignoring return value. - unused_ret = ::write(notifyFd, ".", 1); + (void) ::write(notifyFd, ".", 1); } } diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 240766c443..f80e0f1e61 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -188,7 +188,7 @@ Broker::Broker(const Broker::Options& conf) : conf.replayFlushLimit*1024, // convert kb to bytes. conf.replayHardLimit*1024), *this), - queueCleaner(queues, timer), + queueCleaner(queues, &timer), queueEvents(poller,!conf.asyncQueueEvents), recovery(true), inCluster(false), @@ -701,7 +701,7 @@ void Broker::accept() { } void Broker::connect( - const std::string& host, uint16_t port, const std::string& transport, + const std::string& host, const std::string& port, const std::string& transport, boost::function2<void, int, std::string> failed, sys::ConnectionCodec::Factory* f) { @@ -717,7 +717,7 @@ void Broker::connect( { url.throwIfEmpty(); const Address& addr=url[0]; - connect(addr.host, addr.port, addr.protocol, failed, f); + connect(addr.host, boost::lexical_cast<std::string>(addr.port), addr.protocol, failed, f); } uint32_t Broker::queueMoveMessages( @@ -750,6 +750,7 @@ bool Broker::deferDeliveryImpl(const std::string& , void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) { clusterTimer = t; + queueCleaner.setTimer(clusterTimer.get()); } const std::string Broker::TCP_TRANSPORT("tcp"); diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 6d585bf614..40f7b6273f 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -244,7 +244,7 @@ public: QPID_BROKER_EXTERN void accept(); /** Create a connection to another broker. */ - void connect(const std::string& host, uint16_t port, + void connect(const std::string& host, const std::string& port, const std::string& transport, boost::function2<void, int, std::string> failed, sys::ConnectionCodec::Factory* =0); diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index c07e63e68c..8362a9782c 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -288,11 +288,11 @@ void Connection::raiseConnectEvent() { } } -void Connection::setFederationLink(bool b) +void Connection::setUserProxyAuth(bool b) { - ConnectionState::setFederationLink(b); + ConnectionState::setUserProxyAuth(b); if (mgmtObject != 0) - mgmtObject->set_federationLink(b); + mgmtObject->set_userProxyAuth(b); } void Connection::close(connection::CloseCode code, const string& text) diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index 8f1aa701ef..3522d70b35 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -125,7 +125,7 @@ class Connection : public sys::ConnectionInputHandler, const std::string& getUserId() const { return ConnectionState::getUserId(); } const std::string& getMgmtId() const { return mgmtId; } management::ManagementAgent* getAgent() const { return agent; } - void setFederationLink(bool b); + void setUserProxyAuth(bool b); /** Connection does not delete the listener. 0 resets. */ void setErrorListener(ErrorListener* l) { errorListener=l; } ErrorListener* getErrorListener() { return errorListener; } diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index 3f97e5b9de..270711705e 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -137,7 +137,9 @@ void ConnectionHandler::Handler::startOk(const framing::FieldTable& clientProper throw; } connection.setFederationLink(clientProperties.get(QPID_FED_LINK)); - connection.setFederationPeerTag(clientProperties.getAsString(QPID_FED_TAG)); + if (clientProperties.isSet(QPID_FED_TAG)) { + connection.setFederationPeerTag(clientProperties.getAsString(QPID_FED_TAG)); + } if (connection.isFederationLink()) { if (acl && !acl->authorise(connection.getUserId(),acl::ACT_CREATE,acl::OBJ_LINK,"")){ proxy.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,"ACL denied creating a federation link"); @@ -256,7 +258,6 @@ void ConnectionHandler::Handler::start(const FieldTable& serverProperties, false ); // disallow interaction } std::string supportedMechanismsList; - bool requestedMechanismIsSupported = false; Array::const_iterator i; /* @@ -269,11 +270,9 @@ void ConnectionHandler::Handler::start(const FieldTable& serverProperties, if (i != supportedMechanisms.begin()) supportedMechanismsList += SPACE; supportedMechanismsList += (*i)->get<std::string>(); - requestedMechanismIsSupported = true; } } else { - requestedMechanismIsSupported = false; /* The caller has requested a mechanism. If it's available, make sure it ends up at the head of the list. @@ -282,7 +281,6 @@ void ConnectionHandler::Handler::start(const FieldTable& serverProperties, string currentMechanism = (*i)->get<std::string>(); if ( requestedMechanism == currentMechanism ) { - requestedMechanismIsSupported = true; supportedMechanismsList = currentMechanism + SPACE + supportedMechanismsList; } else { if (i != supportedMechanisms.begin()) @@ -292,7 +290,9 @@ void ConnectionHandler::Handler::start(const FieldTable& serverProperties, } } - connection.setFederationPeerTag(serverProperties.getAsString(QPID_FED_TAG)); + if (serverProperties.isSet(QPID_FED_TAG)) { + connection.setFederationPeerTag(serverProperties.getAsString(QPID_FED_TAG)); + } FieldTable ft; ft.setInt(QPID_FED_LINK,1); diff --git a/qpid/cpp/src/qpid/broker/ConnectionState.h b/qpid/cpp/src/qpid/broker/ConnectionState.h index 9c31a931d8..fdd3c4ddc0 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionState.h +++ b/qpid/cpp/src/qpid/broker/ConnectionState.h @@ -46,6 +46,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable framemax(65535), heartbeat(0), heartbeatmax(120), + userProxyAuth(false), // Can proxy msgs with non-matching auth ids when true (used by federation links & clustering) federationLink(true), clientSupportsThrottling(false), clusterOrderOut(0) @@ -67,8 +68,10 @@ class ConnectionState : public ConnectionToken, public management::Manageable void setUrl(const std::string& _url) { url = _url; } const std::string& getUrl() const { return url; } - void setFederationLink(bool b) { federationLink = b; } - bool isFederationLink() const { return federationLink; } + void setUserProxyAuth(const bool b) { userProxyAuth = b; } + bool isUserProxyAuth() const { return userProxyAuth || federationPeerTag.size() > 0; } // links can proxy msgs with non-matching auth ids + void setFederationLink(bool b) { federationLink = b; } // deprecated - use setFederationPeerTag() instead + bool isFederationLink() const { return federationPeerTag.size() > 0; } void setFederationPeerTag(const std::string& tag) { federationPeerTag = std::string(tag); } const std::string& getFederationPeerTag() const { return federationPeerTag; } std::vector<Url>& getKnownHosts() { return knownHosts; } @@ -105,6 +108,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable uint16_t heartbeatmax; std::string userId; std::string url; + bool userProxyAuth; bool federationLink; std::string federationPeerTag; std::vector<Url> knownHosts; diff --git a/qpid/cpp/src/qpid/broker/Daemon.cpp b/qpid/cpp/src/qpid/broker/Daemon.cpp index b30e5f18cb..c36538beb7 100644 --- a/qpid/cpp/src/qpid/broker/Daemon.cpp +++ b/qpid/cpp/src/qpid/broker/Daemon.cpp @@ -93,11 +93,10 @@ void Daemon::fork() catch (const exception& e) { QPID_LOG(critical, "Unexpected error: " << e.what()); uint16_t port = 0; - int unused_ret; //Supress warning about ignoring return value. - unused_ret = write(pipeFds[1], &port, sizeof(uint16_t)); + (void) write(pipeFds[1], &port, sizeof(uint16_t)); std::string pipeFailureMessage = e.what(); - unused_ret = write ( pipeFds[1], + (void) write ( pipeFds[1], pipeFailureMessage.c_str(), strlen(pipeFailureMessage.c_str()) ); diff --git a/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp index 64a12d918a..62cb3fc116 100644 --- a/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp +++ b/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -27,12 +27,12 @@ namespace broker { ExpiryPolicy::~ExpiryPolicy() {} -void ExpiryPolicy::willExpire(Message&) {} - bool ExpiryPolicy::hasExpired(Message& m) { return m.getExpiration() < sys::AbsTime::now(); } -void ExpiryPolicy::forget(Message&) {} +sys::AbsTime ExpiryPolicy::getCurrentTime() { + return sys::AbsTime::now(); +} }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/ExpiryPolicy.h b/qpid/cpp/src/qpid/broker/ExpiryPolicy.h index a723eb0aa8..2caf00ce00 100644 --- a/qpid/cpp/src/qpid/broker/ExpiryPolicy.h +++ b/qpid/cpp/src/qpid/broker/ExpiryPolicy.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,6 +26,11 @@ #include "qpid/broker/BrokerImportExport.h" namespace qpid { + +namespace sys { +class AbsTime; +} + namespace broker { class Message; @@ -37,9 +42,8 @@ class QPID_BROKER_CLASS_EXTERN ExpiryPolicy : public RefCounted { public: QPID_BROKER_EXTERN virtual ~ExpiryPolicy(); - QPID_BROKER_EXTERN virtual void willExpire(Message&); QPID_BROKER_EXTERN virtual bool hasExpired(Message&); - QPID_BROKER_EXTERN virtual void forget(Message&); + QPID_BROKER_EXTERN virtual qpid::sys::AbsTime getCurrentTime(); }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp index abcaa5f69d..4bda70d313 100644 --- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp @@ -112,9 +112,14 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co { Mutex::ScopedLock l(lock); - Binding::shared_ptr binding (new Binding (bindingKey, queue, this, *args)); + //NOTE: do not include the fed op/tags/origin in the + //arguments as when x-match is 'all' these would prevent + //matching (they are internally added properties + //controlling binding propagation but not relevant to + //actual routing) + Binding::shared_ptr binding (new Binding (bindingKey, queue, this, extra_args)); BoundKey bk(binding); - if (bindings.add_unless(bk, MatchArgs(queue, args))) { + if (bindings.add_unless(bk, MatchArgs(queue, &extra_args))) { binding->startManagement(); propagate = bk.fedBinding.addOrigin(queue->getName(), fedOrigin); if (mgmtExchange != 0) { diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 91861ade3f..9ab4379a69 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -117,7 +117,7 @@ void Link::startConnectionLH () // Set the state before calling connect. It is possible that connect // will fail synchronously and call Link::closed before returning. setStateLH(STATE_CONNECTING); - broker->connect (host, port, transport, + broker->connect (host, boost::lexical_cast<std::string>(port), transport, boost::bind (&Link::closed, this, _1, _2)); QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port); } catch(std::exception& e) { diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 4f64ae2db9..1d52984831 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -49,25 +49,21 @@ TransferAdapter Message::TRANSFER; Message::Message(const framing::SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), - staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), + staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), expiration(FAR_FUTURE), dequeueCallback(0), inCallback(false), requiredCredit(0), isManagementMessage(false) {} Message::Message(const Message& original) : PersistableMessage(), frames(original.frames), persistenceId(0), redelivered(false), loaded(false), - staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), + staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), expiration(original.expiration), dequeueCallback(0), inCallback(false), requiredCredit(0) { setExpiryPolicy(original.expiryPolicy); } -Message::~Message() -{ - if (expiryPolicy) - expiryPolicy->forget(*this); -} +Message::~Message() {} void Message::forcePersistent() { @@ -87,7 +83,7 @@ std::string Message::getRoutingKey() const return getAdapter().getRoutingKey(frames); } -std::string Message::getExchangeName() const +std::string Message::getExchangeName() const { return getAdapter().getExchange(frames); } @@ -96,7 +92,7 @@ const boost::shared_ptr<Exchange> Message::getExchange(ExchangeRegistry& registr { if (!exchange) { exchange = registry.get(getExchangeName()); - } + } return exchange; } @@ -196,7 +192,7 @@ void Message::decodeContent(framing::Buffer& buffer) } else { //adjust header flags MarkLastSegment f; - frames.map_if(f, TypeFilter<HEADER_BODY>()); + frames.map_if(f, TypeFilter<HEADER_BODY>()); } //mark content loaded loaded = true; @@ -248,7 +244,7 @@ void Message::destroy() bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const { intrusive_ptr<const PersistableMessage> pmsg(this); - + bool done = false; string& data = frame.castBody<AMQContentBody>()->getData(); store->loadContent(queue, pmsg, data, offset, maxContentSize); @@ -273,7 +269,7 @@ void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16 uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); bool morecontent = true; for (uint64_t offset = 0; morecontent; offset += maxContentSize) - { + { AMQFrame frame((AMQContentBody())); morecontent = getContentFrame(queue, frame, maxContentSize, offset); out.handle(frame); @@ -291,7 +287,7 @@ void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/) { sys::Mutex::ScopedLock l(lock); Relay f(out); - frames.map_if(f, TypeFilter<HEADER_BODY>()); + frames.map_if(f, TypeFilter<HEADER_BODY>()); } // TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over @@ -321,7 +317,7 @@ bool Message::isContentLoaded() const } -namespace +namespace { const std::string X_QPID_TRACE("x-qpid.trace"); } @@ -358,13 +354,13 @@ void Message::addTraceId(const std::string& id) trace += ","; trace += id; headers.setString(X_QPID_TRACE, trace); - } + } } } -void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) +void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) { - DeliveryProperties* props = getProperties<DeliveryProperties>(); + DeliveryProperties* props = getProperties<DeliveryProperties>(); if (props->getTtl()) { // AMQP requires setting the expiration property to be posix // time_t in seconds. TTL is in milliseconds @@ -373,10 +369,14 @@ void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) time_t now = ::time(0); props->setExpiration(now + (props->getTtl()/1000)); } - // Use higher resolution time for the internal expiry calculation. - Duration ttl(std::min(props->getTtl() * TIME_MSEC, (uint64_t) std::numeric_limits<int64_t>::max()));//Prevent overflow - expiration = AbsTime(AbsTime::now(), ttl); - setExpiryPolicy(e); + if (e) { + // Use higher resolution time for the internal expiry calculation. + // Prevent overflow as a signed int64_t + Duration ttl(std::min(props->getTtl() * TIME_MSEC, + (uint64_t) std::numeric_limits<int64_t>::max())); + expiration = AbsTime(e->getCurrentTime(), ttl); + setExpiryPolicy(e); + } } } @@ -386,16 +386,17 @@ void Message::adjustTtl() if (props->getTtl()) { sys::Mutex::ScopedLock l(lock); if (expiration < FAR_FUTURE) { - sys::Duration d(sys::AbsTime::now(), getExpiration()); - props->setTtl(int64_t(d) > 0 ? int64_t(d)/1000000 : 1); // convert from ns to ms; set to 1 if expired + sys::AbsTime current( + expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now()); + sys::Duration ttl(current, getExpiration()); + // convert from ns to ms; set to 1 if expired + props->setTtl(int64_t(ttl) >= 1000000 ? int64_t(ttl)/1000000 : 1); } } } void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; - if (expiryPolicy) - expiryPolicy->willExpire(*this); } bool Message::hasExpired() diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h index 8c5d42bcde..38e4f831fd 100644 --- a/qpid/cpp/src/qpid/broker/Message.h +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -34,12 +34,12 @@ #include <vector> namespace qpid { - + namespace framing { class FieldTable; class SequenceNumber; } - + namespace broker { class ConnectionToken; class Exchange; @@ -51,11 +51,11 @@ class ExpiryPolicy; class Message : public PersistableMessage { public: typedef boost::function<void (const boost::intrusive_ptr<Message>&)> MessageCallback; - + QPID_BROKER_EXTERN Message(const framing::SequenceNumber& id = framing::SequenceNumber()); QPID_BROKER_EXTERN Message(const Message&); QPID_BROKER_EXTERN ~Message(); - + uint64_t getPersistenceId() const { return persistenceId; } void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; } @@ -83,10 +83,11 @@ public: void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e); bool hasExpired(); sys::AbsTime getExpiration() const { return expiration; } + void setExpiration(sys::AbsTime exp) { expiration = exp; } void adjustTtl(); - framing::FrameSet& getFrames() { return frames; } - const framing::FrameSet& getFrames() const { return frames; } + framing::FrameSet& getFrames() { return frames; } + const framing::FrameSet& getFrames() const { return frames; } template <class T> T* getProperties() { qpid::framing::AMQHeaderBody* p = frames.getHeaders(); @@ -103,6 +104,11 @@ public: return p->get<T>(); } + template <class T> void eraseProperties() { + qpid::framing::AMQHeaderBody* p = frames.getHeaders(); + p->erase<T>(); + } + template <class T> const T* getMethod() const { return frames.as<T>(); } @@ -135,7 +141,7 @@ public: QPID_BROKER_EXTERN void decodeHeader(framing::Buffer& buffer); QPID_BROKER_EXTERN void decodeContent(framing::Buffer& buffer); - + void QPID_BROKER_EXTERN tryReleaseContent(); void releaseContent(); void releaseContent(MessageStore* s);//deprecated, use 'setStore(store); releaseContent();' instead @@ -149,10 +155,10 @@ public: bool isExcluded(const std::vector<std::string>& excludes) const; void addTraceId(const std::string& id); - + void forcePersistent(); bool isForcedPersistent(); - + /** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */ void setDequeueCompleteCallback(MessageCallback& cb); diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index cf538aaaa7..260a45d7e0 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -65,7 +65,7 @@ using std::mem_fun; namespace _qmf = qmf::org::apache::qpid::broker; -namespace +namespace { const std::string qpidMaxSize("qpid.max_size"); const std::string qpidMaxCount("qpid.max_count"); @@ -87,16 +87,16 @@ const int ENQUEUE_ONLY=1; const int ENQUEUE_AND_DEQUEUE=2; } -Queue::Queue(const string& _name, bool _autodelete, +Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, const OwnershipToken* const _owner, Manageable* parent, Broker* b) : - name(_name), + name(_name), autodelete(_autodelete), store(_store), - owner(_owner), + owner(_owner), consumerCount(0), exclusive(0), noLocal(false), @@ -179,9 +179,9 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ if (policy.get()) policy->recoverEnqueued(msg); push(msg, true); - if (store){ + if (store){ // setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure - msg->addToSyncList(shared_from_this(), store); + msg->addToSyncList(shared_from_this(), store); } if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) { @@ -206,13 +206,13 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){ void Queue::requeue(const QueuedMessage& msg){ assertClusterSafe(); QueueListeners::NotificationSet copy; - { + { Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return; messages->reinsert(msg); listeners.populate(copy); - // for persistLastNode - don't force a message twice to disk, but force it if no force before + // for persistLastNode - don't force a message twice to disk, but force it if no force before if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) { msg.payload->forcePersistent(); if (msg.payload->isForcedPersistent() ){ @@ -224,7 +224,7 @@ void Queue::requeue(const QueuedMessage& msg){ copy.notify(); } -bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) +bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) { Mutex::ScopedLock locker(messageLock); assertClusterSafe(); @@ -268,7 +268,7 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) case NO_MESSAGES: default: return false; - } + } } else { return browseNextMessage(m, c); } @@ -278,7 +278,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ { while (true) { Mutex::ScopedLock locker(messageLock); - if (messages->empty()) { + if (messages->empty()) { QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); listeners.addListener(c); return NO_MESSAGES; @@ -291,7 +291,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ } if (c->filter(msg.payload)) { - if (c->accept(msg.payload)) { + if (c->accept(msg.payload)) { m = msg; pop(); return CONSUMED; @@ -304,7 +304,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ //consumer will never want this message QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); return CANT_CONSUME; - } + } } } } @@ -358,7 +358,7 @@ bool Queue::dispatch(Consumer::shared_ptr c) } } -// Find the next message +// Find the next message bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) { Mutex::ScopedLock locker(messageLock); if (messages->next(c->position, msg)) { @@ -426,19 +426,25 @@ bool collect_if_expired(std::deque<QueuedMessage>& expired, QueuedMessage& messa } } -void Queue::purgeExpired() +/** + *@param lapse: time since the last purgeExpired + */ +void Queue::purgeExpired(qpid::sys::Duration lapse) { //As expired messages are discarded during dequeue also, only //bother explicitly expiring if the rate of dequeues since last - //attempt is less than one per second. - - if (dequeueTracker.sampleRatePerSecond() < 1) { + //attempt is less than one per second. + int count = dequeueSincePurge.get(); + dequeueSincePurge -= count; + int seconds = int64_t(lapse)/qpid::sys::TIME_SEC; + if (seconds == 0 || count / seconds < 1) { std::deque<QueuedMessage> expired; { Mutex::ScopedLock locker(messageLock); - messages->removeIf(boost::bind(&collect_if_expired, expired, _1)); + messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1)); } - for_each(expired.begin(), expired.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); + for_each(expired.begin(), expired.end(), + boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); } } @@ -457,7 +463,7 @@ void Queue::purgeExpired() uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest) { Mutex::ScopedLock locker(messageLock); - uint32_t purge_count = purge_request; // only comes into play if >0 + uint32_t purge_count = purge_request; // only comes into play if >0 std::deque<DeliverableMessage> rerouteQueue; uint32_t count = 0; @@ -490,7 +496,7 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) { Mutex::ScopedLock locker(messageLock); - uint32_t move_count = qty; // only comes into play if qty >0 + uint32_t move_count = qty; // only comes into play if qty >0 uint32_t count = 0; // count how many were moved for returning while((!qty || move_count--) && !messages->empty()) { @@ -508,7 +514,7 @@ void Queue::pop() { assertClusterSafe(); messages->pop(); - ++dequeueTracker; + ++dequeueSincePurge; } void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ @@ -517,10 +523,10 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ QueuedMessage removed; bool dequeueRequired = false; { - Mutex::ScopedLock locker(messageLock); + Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); - + dequeueRequired = messages->push(qm, removed); listeners.populate(copy); enqueued(qm); @@ -599,7 +605,7 @@ void Queue::setLastNodeFailure() } -// return true if store exists, +// return true if store exists, bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck) { ScopedUse u(barrier); @@ -619,7 +625,7 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg if (inLastNodeFailure && persistLastNode){ msg->forcePersistent(); } - + if (traceId.size()) { //copy on write: take deep copy of message before modifying it //as the frames may already be available for delivery on other @@ -649,9 +655,10 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg) { Mutex::ScopedLock locker(messageLock); - if (policy.get()) policy->enqueueAborted(msg); + if (policy.get()) policy->enqueueAborted(msg); } + /** * Returns a null pointer if the dequeue completed, otherwise the dequeue will complete * asynchronously, and a pointer to a DequeueCompletion object is returned. @@ -666,7 +673,7 @@ Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return empty; - if (!ctxt) { + if (!ctxt) { dequeued(msg); } } @@ -693,7 +700,7 @@ Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) void Queue::dequeueCommitted(const QueuedMessage& msg) { Mutex::ScopedLock locker(messageLock); - dequeued(msg); + dequeued(msg); if (mgmtObject != 0) { mgmtObject->inc_msgTxnDequeues(); mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize()); @@ -748,8 +755,8 @@ int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::stri return v->get<int>(); } else if (v->convertsTo<std::string>()){ std::string s = v->get<std::string>(); - try { - return boost::lexical_cast<int>(s); + try { + return boost::lexical_cast<int>(s); } catch(const boost::bad_lexical_cast&) { QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s); return 0; @@ -773,7 +780,7 @@ void Queue::configureImpl(const FieldTable& _settings) broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY); } - if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK && + if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK && (!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync())) )) { if ( NullMessageStore::isNullStore(store)) { QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName()); @@ -811,7 +818,7 @@ void Queue::configureImpl(const FieldTable& _settings) QPID_LOG(debug, "Configured queue " << getName() << " as priority queue."); } } - + persistLastNode= _settings.get(qpidPersistLastNode); if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName()); @@ -820,15 +827,15 @@ void Queue::configureImpl(const FieldTable& _settings) if (excludeList.size()) { split(traceExclude, excludeList, ", "); } - QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId + QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements"); FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers); if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>()); autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout); - if (autoDeleteTimeout) - QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout); + if (autoDeleteTimeout) + QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout); if (mgmtObject != 0) { mgmtObject->set_arguments(ManagementAgent::toMap(_settings)); @@ -892,9 +899,9 @@ const QueuePolicy* Queue::getPolicy() return policy.get(); } -uint64_t Queue::getPersistenceId() const -{ - return persistenceId; +uint64_t Queue::getPersistenceId() const +{ + return persistenceId; } void Queue::setPersistenceId(uint64_t _persistenceId) const @@ -908,11 +915,11 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const persistenceId = _persistenceId; } -void Queue::encode(Buffer& buffer) const +void Queue::encode(Buffer& buffer) const { buffer.putShortString(name); buffer.put(settings); - if (policy.get()) { + if (policy.get()) { buffer.put(*policy); } buffer.putShortString(alternateExchange.get() ? alternateExchange->getName() : std::string("")); @@ -965,7 +972,7 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange() void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue) { - if (broker.getQueues().destroyIf(queue->getName(), + if (broker.getQueues().destroyIf(queue->getName(), boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) { QPID_LOG(debug, "Auto-deleting " << queue->getName()); queue->destroyed(); @@ -977,7 +984,7 @@ struct AutoDeleteTask : qpid::sys::TimerTask Broker& broker; Queue::shared_ptr queue; - AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime) + AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime) : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {} void fire() @@ -995,27 +1002,27 @@ void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue) if (queue->autoDeleteTimeout && queue->canAutoDelete()) { AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC)); queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time)); - broker.getClusterTimer().add(queue->autoDeleteTask); + broker.getClusterTimer().add(queue->autoDeleteTask); QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated"); } else { tryAutoDeleteImpl(broker, queue); } } -bool Queue::isExclusiveOwner(const OwnershipToken* const o) const -{ +bool Queue::isExclusiveOwner(const OwnershipToken* const o) const +{ Mutex::ScopedLock locker(ownershipLock); - return o == owner; + return o == owner; } -void Queue::releaseExclusiveOwnership() -{ +void Queue::releaseExclusiveOwnership() +{ Mutex::ScopedLock locker(ownershipLock); - owner = 0; + owner = 0; } -bool Queue::setExclusiveOwner(const OwnershipToken* const o) -{ +bool Queue::setExclusiveOwner(const OwnershipToken* const o) +{ //reset auto deletion timer if necessary if (autoDeleteTimeout && autoDeleteTask) { autoDeleteTask->cancel(); @@ -1024,20 +1031,20 @@ bool Queue::setExclusiveOwner(const OwnershipToken* const o) if (owner) { return false; } else { - owner = o; + owner = o; return true; } } -bool Queue::hasExclusiveOwner() const -{ +bool Queue::hasExclusiveOwner() const +{ Mutex::ScopedLock locker(ownershipLock); - return owner != 0; + return owner != 0; } -bool Queue::hasExclusiveConsumer() const -{ - return exclusive; +bool Queue::hasExclusiveConsumer() const +{ + return exclusive; } void Queue::setExternalQueueStore(ExternalQueueStore* inst) { @@ -1206,6 +1213,10 @@ const Broker* Queue::getBroker() return broker; } +void Queue::setDequeueSincePurge(uint32_t value) { + dequeueSincePurge = value; +} + /** invoked from the store thread when the asynchronous dequeueing of the * message has completed. */ diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 86c184676f..63f2397b77 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -32,9 +32,9 @@ #include "qpid/broker/QueueBindings.h" #include "qpid/broker/QueueListeners.h" #include "qpid/broker/QueueObserver.h" -#include "qpid/broker/RateTracker.h" #include "qpid/framing/FieldTable.h" +#include "qpid/sys/AtomicValue.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Timer.h" #include "qpid/management/Manageable.h" @@ -75,13 +75,13 @@ class Queue : public boost::enable_shared_from_this<Queue>, { Queue& parent; uint count; - + UsageBarrier(Queue&); bool acquire(); void release(); void destroy(); }; - + struct ScopedUse { UsageBarrier& barrier; @@ -89,7 +89,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, ScopedUse(UsageBarrier& b) : barrier(b), acquired(barrier.acquire()) {} ~ScopedUse() { if (acquired) barrier.release(); } }; - + typedef std::set< boost::shared_ptr<QueueObserver> > Observers; enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2}; @@ -120,7 +120,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, boost::shared_ptr<Exchange> alternateExchange; framing::SequenceNumber sequence; qmf::org::apache::qpid::broker::Queue* mgmtObject; - RateTracker dequeueTracker; + sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge. int eventMode; Observers observers; bool insertSeqNo; @@ -147,7 +147,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, void dequeued(const QueuedMessage& msg); void pop(); void popAndDequeue(); - QueuedMessage getFront(); + void forcePersistent(QueuedMessage& msg); int getEventMode(); void configureImpl(const qpid::framing::FieldTable& settings); @@ -185,8 +185,8 @@ class Queue : public boost::enable_shared_from_this<Queue>, typedef std::vector<shared_ptr> vector; QPID_BROKER_EXTERN Queue(const std::string& name, - bool autodelete = false, - MessageStore* const store = 0, + bool autodelete = false, + MessageStore* const store = 0, const OwnershipToken* const owner = 0, management::Manageable* parent = 0, Broker* broker = 0); @@ -246,11 +246,11 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool exclusive = false); QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c); - uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages - QPID_BROKER_EXTERN void purgeExpired(); + uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages + QPID_BROKER_EXTERN void purgeExpired(sys::Duration); //move qty # of messages to destination Queue destq - uint32_t move(const Queue::shared_ptr destq, uint32_t qty); + uint32_t move(const Queue::shared_ptr destq, uint32_t qty); QPID_BROKER_EXTERN uint32_t getMessageCount() const; QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const; @@ -313,8 +313,8 @@ class Queue : public boost::enable_shared_from_this<Queue>, * Inform queue of messages that were enqueued, have since * been acquired but not yet accepted or released (and * thus are still logically on the queue) - used in - * clustered broker. - */ + * clustered broker. + */ void updateEnqueued(const QueuedMessage& msg); /** @@ -325,9 +325,9 @@ class Queue : public boost::enable_shared_from_this<Queue>, * accepted it). */ bool isEnqueued(const QueuedMessage& msg); - + /** - * Gets the next available message + * Gets the next available message */ QPID_BROKER_EXTERN QueuedMessage get(); @@ -408,6 +408,9 @@ class Queue : public boost::enable_shared_from_this<Queue>, const Broker* getBroker(); + uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); } + void setDequeueSincePurge(uint32_t value); + private: std::map<PersistableMessage *, boost::intrusive_ptr<DequeueCompletion> > pendingDequeueCompletions; }; diff --git a/qpid/cpp/src/qpid/broker/QueueCleaner.cpp b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp index 3499ea8a4d..838bc28be8 100644 --- a/qpid/cpp/src/qpid/broker/QueueCleaner.cpp +++ b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -27,7 +27,7 @@ namespace qpid { namespace broker { -QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer& t) : queues(q), timer(t) {} +QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer* t) : queues(q), timer(t) {} QueueCleaner::~QueueCleaner() { @@ -36,10 +36,16 @@ QueueCleaner::~QueueCleaner() void QueueCleaner::start(qpid::sys::Duration p) { + period = p; task = new Task(*this, p); - timer.add(task); + timer->add(task); } +void QueueCleaner::setTimer(qpid::sys::Timer* timer) { + this->timer = timer; +} + + QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : sys::TimerTask(d,"QueueCleaner"), parent(p) {} void QueueCleaner::Task::fire() @@ -65,9 +71,9 @@ void QueueCleaner::fired() std::vector<Queue::shared_ptr> copy; CollectQueues collect(©); queues.eachQueue(collect); - std::for_each(copy.begin(), copy.end(), boost::bind(&Queue::purgeExpired, _1)); + std::for_each(copy.begin(), copy.end(), boost::bind(&Queue::purgeExpired, _1, period)); task->setupNextFire(); - timer.add(task); + timer->add(task); } diff --git a/qpid/cpp/src/qpid/broker/QueueCleaner.h b/qpid/cpp/src/qpid/broker/QueueCleaner.h index 11c2d180ac..ffebfe3e1b 100644 --- a/qpid/cpp/src/qpid/broker/QueueCleaner.h +++ b/qpid/cpp/src/qpid/broker/QueueCleaner.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -35,14 +35,15 @@ class QueueRegistry; class QueueCleaner { public: - QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer& timer); + QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer* timer); QPID_BROKER_EXTERN ~QueueCleaner(); - QPID_BROKER_EXTERN void start(qpid::sys::Duration period); + QPID_BROKER_EXTERN void start(sys::Duration period); + QPID_BROKER_EXTERN void setTimer(sys::Timer* timer); private: class Task : public sys::TimerTask { public: - Task(QueueCleaner& parent, qpid::sys::Duration duration); + Task(QueueCleaner& parent, sys::Duration duration); void fire(); private: QueueCleaner& parent; @@ -50,7 +51,8 @@ class QueueCleaner boost::intrusive_ptr<sys::TimerTask> task; QueueRegistry& queues; - sys::Timer& timer; + sys::Timer* timer; + sys::Duration period; void fired(); }; diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp index b2e2e54bdf..fcf8d089f9 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -167,7 +167,8 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg) msg.payload->getIngressCompletion().startCompleter(); // don't complete until flow resumes bool unique; unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(msg.position, msg.payload)).second; - assert(unique); + // Like this to avoid tripping up unused variable warning when NDEBUG set + if (!unique) assert(unique); } } @@ -379,7 +380,8 @@ void QueueFlowLimit::setState(const qpid::framing::FieldTable& state) QueuedMessage msg(queue->find(seq)); // fyi: msg.payload may be null if msg is delivered & unacked bool unique; unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(seq, msg.payload)).second; - assert(unique); + // Like this to avoid tripping up unused variable warning when NDEBUG set + if (!unique) assert(unique); } } } diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp index a93a6332fd..6ae0d53b1a 100644 --- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp @@ -117,19 +117,20 @@ void QueuePolicy::update(FieldTable& settings) settings.setString(typeKey, type); } -uint32_t QueuePolicy::getCapacity(const FieldTable& settings, const std::string& key, uint32_t defaultValue) +template <typename T> +T getCapacity(const FieldTable& settings, const std::string& key, T defaultValue) { FieldTable::ValuePtr v = settings.get(key); - int32_t result = 0; + T result = 0; if (!v) return defaultValue; if (v->getType() == 0x23) { QPID_LOG(debug, "Value for " << key << " specified as float: " << v->get<float>()); } else if (v->getType() == 0x33) { QPID_LOG(debug, "Value for " << key << " specified as double: " << v->get<double>()); - } else if (v->convertsTo<int>()) { - result = v->get<int>(); + } else if (v->convertsTo<T>()) { + result = v->get<T>(); QPID_LOG(debug, "Got integer value for " << key << ": " << result); if (result >= 0) return result; } else if (v->convertsTo<string>()) { @@ -319,8 +320,8 @@ std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const qpid::framing::F std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings) { - uint32_t maxCount = getCapacity(settings, maxCountKey, 0); - uint32_t maxSize = getCapacity(settings, maxSizeKey, defaultMaxSize); + uint32_t maxCount = getCapacity<int32_t>(settings, maxCountKey, 0); + uint64_t maxSize = getCapacity<int64_t>(settings, maxSizeKey, defaultMaxSize); if (maxCount || maxSize) { return createQueuePolicy(name, maxCount, maxSize, getType(settings)); } else { diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.h b/qpid/cpp/src/qpid/broker/QueuePolicy.h index 3cdd63784d..ec7f846704 100644 --- a/qpid/cpp/src/qpid/broker/QueuePolicy.h +++ b/qpid/cpp/src/qpid/broker/QueuePolicy.h @@ -43,8 +43,7 @@ class QueuePolicy uint32_t count; uint64_t size; bool policyExceeded; - - static uint32_t getCapacity(const qpid::framing::FieldTable& settings, const std::string& key, uint32_t defaultValue); + protected: uint64_t getCurrentQueueSize() const { return size; } diff --git a/qpid/cpp/src/qpid/broker/RateTracker.cpp b/qpid/cpp/src/qpid/broker/RateTracker.cpp deleted file mode 100644 index 048349b658..0000000000 --- a/qpid/cpp/src/qpid/broker/RateTracker.cpp +++ /dev/null @@ -1,51 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/broker/RateTracker.h" - -using qpid::sys::AbsTime; -using qpid::sys::Duration; -using qpid::sys::TIME_SEC; - -namespace qpid { -namespace broker { - -RateTracker::RateTracker() : currentCount(0), lastCount(0), lastTime(AbsTime::now()) {} - -RateTracker& RateTracker::operator++() -{ - ++currentCount; - return *this; -} - -double RateTracker::sampleRatePerSecond() -{ - int32_t increment = currentCount - lastCount; - AbsTime now = AbsTime::now(); - Duration interval(lastTime, now); - lastCount = currentCount; - lastTime = now; - //if sampling at higher frequency than supported, will just return the number of increments - if (interval < TIME_SEC) return increment; - else if (increment == 0) return 0; - else return increment / (interval / TIME_SEC); -} - -}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/RateTracker.h b/qpid/cpp/src/qpid/broker/RateTracker.h deleted file mode 100644 index 0c20b37312..0000000000 --- a/qpid/cpp/src/qpid/broker/RateTracker.h +++ /dev/null @@ -1,57 +0,0 @@ -#ifndef QPID_BROKER_RATETRACKER_H -#define QPID_BROKER_RATETRACKER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/Time.h" - -namespace qpid { -namespace broker { - -/** - * Simple rate tracker: represents some value that can be incremented, - * then can periodcially sample the rate of increments. - */ -class RateTracker -{ - public: - RateTracker(); - /** - * Increments the count being tracked. Can be called concurrently - * with other calls to this operator as well as with calls to - * sampleRatePerSecond(). - */ - RateTracker& operator++(); - /** - * Returns the rate of increments per second since last - * called. Calls to this method should be serialised, but can be - * called concurrently with the increment operator - */ - double sampleRatePerSecond(); - private: - volatile int32_t currentCount; - int32_t lastCount; - qpid::sys::AbsTime lastTime; -}; -}} // namespace qpid::broker - -#endif /*!QPID_BROKER_RATETRACKER_H*/ diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 2383978276..9b8c9b8d58 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -70,7 +70,7 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) deliveryAdapter(da), tagGenerator("sgen"), dtxSelected(false), - authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()), + authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isUserProxyAuth()), userID(getSession().getConnection().getUserId()), userName(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))), isDefaultRealm(userID.find('@') != std::string::npos && getSession().getBroker().getOptions().realm == userID.substr(userID.find('@')+1,userID.size())), @@ -469,7 +469,6 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { /* verify the userid if specified: */ std::string id = msg->hasProperties<MessageProperties>() ? msg->getProperties<MessageProperties>()->getUserId() : nullstring; - if (authMsg && !id.empty() && !(id == userID || (isDefaultRealm && id == userName))) { QPID_LOG(debug, "authorised user id : " << userID << " but user id in message declared as " << id); diff --git a/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp b/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp index fd0e537192..676074a590 100644 --- a/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp +++ b/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp @@ -81,12 +81,11 @@ class SslProtocolFactory : public qpid::sys::ProtocolFactory { SslProtocolFactory(const SslServerOptions&, int backlog, bool nodelay); ~SslProtocolFactory(); void accept(sys::Poller::shared_ptr, sys::ConnectionCodec::Factory*); - void connect(sys::Poller::shared_ptr, const std::string& host, int16_t port, + void connect(sys::Poller::shared_ptr, const std::string& host, const std::string& port, sys::ConnectionCodec::Factory*, ConnectFailedCallback failed); uint16_t getPort() const; - std::string getHost() const; bool supports(const std::string& capability); private: @@ -130,7 +129,7 @@ SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options, int backlog, bool nodelay) : tcpNoDelay(nodelay), - listeningPort(listener.listen(options.port, backlog)), + listeningPort(listener.listen("", boost::lexical_cast<std::string>(options.port), backlog)), clientAuthSelected(options.clientAuth) { SecInvalidateHandle(&credHandle); @@ -237,10 +236,6 @@ uint16_t SslProtocolFactory::getPort() const { return listeningPort; // Immutable no need for lock. } -std::string SslProtocolFactory::getHost() const { - return listener.getSockname(); -} - void SslProtocolFactory::accept(sys::Poller::shared_ptr poller, sys::ConnectionCodec::Factory* fact) { acceptor.reset( @@ -251,7 +246,7 @@ void SslProtocolFactory::accept(sys::Poller::shared_ptr poller, void SslProtocolFactory::connect(sys::Poller::shared_ptr poller, const std::string& host, - int16_t port, + const std::string& port, sys::ConnectionCodec::Factory* fact, ConnectFailedCallback failed) { diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp index 40c004f166..4b7aa07065 100644 --- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp @@ -36,6 +36,7 @@ #include <boost/bind.hpp> #include <boost/format.hpp> +#include <boost/lexical_cast.hpp> #include <boost/shared_ptr.hpp> #include <limits> @@ -258,16 +259,16 @@ void ConnectionImpl::open() connector->setInputHandler(&handler); connector->setShutdownHandler(this); try { - connector->connect(host, port); - + std::string p = boost::lexical_cast<std::string>(port); + connector->connect(host, p); + } catch (const std::exception& e) { QPID_LOG(debug, "Failed to connect to " << protocol << ":" << host << ":" << port << " " << e.what()); connector.reset(); throw; } connector->init(); - QPID_LOG(info, *this << " connected to " << protocol << ":" << host << ":" << port); - + // Enable heartbeat if requested uint16_t heartbeat = static_cast<ConnectionSettings&>(handler).heartbeat; if (heartbeat) { @@ -281,6 +282,7 @@ void ConnectionImpl::open() // - in that case in connector.reset() above; // - or when we are deleted handler.waitForOpen(); + QPID_LOG(info, *this << " connected to " << protocol << ":" << host << ":" << port); // If the SASL layer has provided an "operational" userId for the connection, // put it in the negotiated settings. diff --git a/qpid/cpp/src/qpid/client/Connector.h b/qpid/cpp/src/qpid/client/Connector.h index 586012f9d6..bc611ffe0d 100644 --- a/qpid/cpp/src/qpid/client/Connector.h +++ b/qpid/cpp/src/qpid/client/Connector.h @@ -61,7 +61,7 @@ class Connector : public framing::OutputHandler static void registerFactory(const std::string& proto, Factory* connectorFactory); virtual ~Connector() {}; - virtual void connect(const std::string& host, int port) = 0; + virtual void connect(const std::string& host, const std::string& port) = 0; virtual void init() {}; virtual void close() = 0; virtual void send(framing::AMQFrame& frame) = 0; diff --git a/qpid/cpp/src/qpid/client/RdmaConnector.cpp b/qpid/cpp/src/qpid/client/RdmaConnector.cpp index 6af607198c..664640f5e7 100644 --- a/qpid/cpp/src/qpid/client/RdmaConnector.cpp +++ b/qpid/cpp/src/qpid/client/RdmaConnector.cpp @@ -95,7 +95,7 @@ class RdmaConnector : public Connector, public sys::Codec std::string identifier; - void connect(const std::string& host, int port); + void connect(const std::string& host, const std::string& port); void close(); void send(framing::AMQFrame& frame); void abort() {} // TODO: need to fix this for heartbeat timeouts to work @@ -173,7 +173,7 @@ RdmaConnector::~RdmaConnector() { } } -void RdmaConnector::connect(const std::string& host, int port){ +void RdmaConnector::connect(const std::string& host, const std::string& port){ Mutex::ScopedLock l(dataConnectedLock); assert(!dataConnected); @@ -184,7 +184,7 @@ void RdmaConnector::connect(const std::string& host, int port){ boost::bind(&RdmaConnector::disconnected, this), boost::bind(&RdmaConnector::rejected, this, poller, _1, _2)); - SocketAddress sa(host, boost::lexical_cast<std::string>(port)); + SocketAddress sa(host, port); acon->start(poller, sa); } diff --git a/qpid/cpp/src/qpid/client/SslConnector.cpp b/qpid/cpp/src/qpid/client/SslConnector.cpp index 35c7e6bdf6..f121cfb1ab 100644 --- a/qpid/cpp/src/qpid/client/SslConnector.cpp +++ b/qpid/cpp/src/qpid/client/SslConnector.cpp @@ -114,7 +114,7 @@ class SslConnector : public Connector std::string identifier; - void connect(const std::string& host, int port); + void connect(const std::string& host, const std::string& port); void init(); void close(); void send(framing::AMQFrame& frame); @@ -190,7 +190,7 @@ SslConnector::~SslConnector() { close(); } -void SslConnector::connect(const std::string& host, int port){ +void SslConnector::connect(const std::string& host, const std::string& port){ Mutex::ScopedLock l(closedLock); assert(closed); try { diff --git a/qpid/cpp/src/qpid/client/TCPConnector.cpp b/qpid/cpp/src/qpid/client/TCPConnector.cpp index d90781b365..0070b24ec0 100644 --- a/qpid/cpp/src/qpid/client/TCPConnector.cpp +++ b/qpid/cpp/src/qpid/client/TCPConnector.cpp @@ -88,7 +88,7 @@ TCPConnector::~TCPConnector() { close(); } -void TCPConnector::connect(const std::string& host, int port) { +void TCPConnector::connect(const std::string& host, const std::string& port) { Mutex::ScopedLock l(lock); assert(closed); connector = AsynchConnector::create( @@ -121,7 +121,7 @@ void TCPConnector::start(sys::AsynchIO* aio_) { aio->queueReadBuffer(new Buff(maxFrameSize)); } - identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); + identifier = str(format("[%1%]") % socket.getFullAddress()); } void TCPConnector::initAmqp() { diff --git a/qpid/cpp/src/qpid/client/TCPConnector.h b/qpid/cpp/src/qpid/client/TCPConnector.h index c756469182..eb3f696013 100644 --- a/qpid/cpp/src/qpid/client/TCPConnector.h +++ b/qpid/cpp/src/qpid/client/TCPConnector.h @@ -98,7 +98,7 @@ class TCPConnector : public Connector, public sys::Codec protected: virtual ~TCPConnector(); - void connect(const std::string& host, int port); + void connect(const std::string& host, const std::string& port); void start(sys::AsynchIO* aio_); void initAmqp(); virtual void connectFailed(const std::string& msg); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp index bfb20118b5..e8d250de0f 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp @@ -30,12 +30,23 @@ void AcceptTracker::State::accept() unaccepted.clear(); } -void AcceptTracker::State::accept(qpid::framing::SequenceNumber id) +SequenceSet AcceptTracker::State::accept(qpid::framing::SequenceNumber id, bool cumulative) { - if (unaccepted.contains(id)) { - unaccepted.remove(id); - unconfirmed.add(id); + SequenceSet accepting; + if (cumulative) { + for (SequenceSet::iterator i = unaccepted.begin(); i != unaccepted.end() && *i <= id; ++i) { + accepting.add(*i); + } + unconfirmed.add(accepting); + unaccepted.remove(accepting); + } else { + if (unaccepted.contains(id)) { + unaccepted.remove(id); + unconfirmed.add(id); + accepting.add(id); + } } + return accepting; } void AcceptTracker::State::release() @@ -71,16 +82,15 @@ void AcceptTracker::accept(qpid::client::AsyncSession& session) aggregateState.accept(); } -void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session) +void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session, bool cumulative) { for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) { - i->second.accept(id); + i->second.accept(id, cumulative); } Record record; - record.accepted.add(id); + record.accepted = aggregateState.accept(id, cumulative); record.status = session.messageAccept(record.accepted); pending.push_back(record); - aggregateState.accept(id); } void AcceptTracker::release(qpid::client::AsyncSession& session) diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h index 87890e41cc..9e801e8147 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h @@ -42,7 +42,7 @@ class AcceptTracker public: void delivered(const std::string& destination, const qpid::framing::SequenceNumber& id); void accept(qpid::client::AsyncSession&); - void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&); + void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&, bool cumulative); void release(qpid::client::AsyncSession&); uint32_t acceptsPending(); uint32_t acceptsPending(const std::string& destination); @@ -62,7 +62,7 @@ class AcceptTracker qpid::framing::SequenceSet unconfirmed; void accept(); - void accept(qpid::framing::SequenceNumber); + qpid::framing::SequenceSet accept(qpid::framing::SequenceNumber, bool cumulative); void release(); uint32_t acceptsPending(); void completed(qpid::framing::SequenceSet&); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index f1295a3b66..9cf5f31290 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -233,6 +233,8 @@ class Subscription : public Exchange, public MessageSource const bool reliable; const bool durable; const std::string actualType; + const bool exclusiveQueue; + const bool exclusiveSubscription; FieldTable queueOptions; FieldTable subscriptionOptions; Bindings bindings; @@ -307,6 +309,7 @@ struct Opt Opt& operator/(const std::string& name); operator bool() const; std::string str() const; + bool asBool(bool defaultValue) const; const Variant::List& asList() const; void collect(qpid::framing::FieldTable& args) const; @@ -338,6 +341,12 @@ Opt::operator bool() const return value && !value->isVoid() && value->asBool(); } +bool Opt::asBool(bool defaultValue) const +{ + if (value) return value->asBool(); + else return defaultValue; +} + std::string Opt::str() const { if (value) return value->asString(); @@ -490,7 +499,9 @@ Subscription::Subscription(const Address& address, const std::string& type) queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())), reliable(AddressResolution::is_reliable(address)), durable(Opt(address)/LINK/DURABLE), - actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type) + actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type), + exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)), + exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue)) { (Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions); (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions); @@ -550,7 +561,7 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str checkAssert(session, FOR_RECEIVER); //create subscription queue: - session.queueDeclare(arg::queue=queue, arg::exclusive=true, + session.queueDeclare(arg::queue=queue, arg::exclusive=exclusiveQueue, arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions); //'default' binding: bindings.bind(session); @@ -559,15 +570,15 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str linkBindings.bind(session); //subscribe to subscription queue: AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE; - session.messageSubscribe(arg::queue=queue, arg::destination=destination, - arg::exclusive=true, arg::acceptMode=accept, arg::arguments=subscriptionOptions); + session.messageSubscribe(arg::queue=queue, arg::destination=destination, + arg::exclusive=exclusiveSubscription, arg::acceptMode=accept, arg::arguments=subscriptionOptions); } void Subscription::cancel(qpid::client::AsyncSession& session, const std::string& destination) { linkBindings.unbind(session); session.messageCancel(destination); - session.queueDelete(arg::queue=queue); + if (reliable) session.queueDelete(arg::queue=queue, arg::ifUnused=true); checkDelete(session, FOR_RECEIVER); } diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index a87a8dea67..473f5ecd1c 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -40,11 +40,15 @@ using qpid::types::VAR_LIST; using qpid::framing::Uuid; namespace { -void convert(const Variant::List& from, std::vector<std::string>& to) +void merge(const std::string& value, std::vector<std::string>& list) { + if (std::find(list.begin(), list.end(), value) == list.end()) + list.push_back(value); +} + +void merge(const Variant::List& from, std::vector<std::string>& to) { - for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) { - to.push_back(i->asString()); - } + for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) + merge(i->asString(), to); } std::string asString(const std::vector<std::string>& v) { @@ -93,9 +97,9 @@ void ConnectionImpl::setOption(const std::string& name, const Variant& value) maxReconnectInterval = value; } else if (name == "reconnect-urls" || name == "reconnect_urls") { if (value.getType() == VAR_LIST) { - convert(value.asList(), urls); + merge(value.asList(), urls); } else { - urls.push_back(value.asString()); + merge(value.asString(), urls); } } else if (name == "username") { settings.username = value.asString(); @@ -198,7 +202,7 @@ qpid::messaging::Session ConnectionImpl::newSession(bool transactional, const st sessions[name] = impl; break; } catch (const qpid::TransportFailure&) { - open(); + reopen(); } catch (const qpid::SessionException& e) { throw qpid::messaging::SessionError(e.what()); } catch (const std::exception& e) { @@ -219,6 +223,15 @@ void ConnectionImpl::open() catch (const qpid::Exception& e) { throw messaging::ConnectionError(e.what()); } } +void ConnectionImpl::reopen() +{ + if (!reconnect) { + throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)"); + } + open(); +} + + bool expired(const qpid::sys::AbsTime& start, int64_t timeout) { if (timeout == 0) return true; @@ -246,14 +259,9 @@ void ConnectionImpl::connect(const qpid::sys::AbsTime& started) } void ConnectionImpl::mergeUrls(const std::vector<Url>& more, const sys::Mutex::ScopedLock&) { - if (more.size()) { - for (size_t i = 0; i < more.size(); ++i) { - if (std::find(urls.begin(), urls.end(), more[i].str()) == urls.end()) { - urls.push_back(more[i].str()); - } - } - QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls)); - } + for (std::vector<Url>::const_iterator i = more.begin(); i != more.end(); ++i) + merge(i->str(), urls); + QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls)); } bool ConnectionImpl::tryConnect() diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h index 09f2038312..9e31238bc1 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h @@ -43,6 +43,7 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl public: ConnectionImpl(const std::string& url, const qpid::types::Variant::Map& options); void open(); + void reopen(); bool isOpen() const; void close(); qpid::messaging::Session newSession(bool transactional, const std::string& name); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index 71e89bdba1..5cf20c92eb 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -144,10 +144,10 @@ void IncomingMessages::accept() acceptTracker.accept(session); } -void IncomingMessages::accept(qpid::framing::SequenceNumber id) +void IncomingMessages::accept(qpid::framing::SequenceNumber id, bool cumulative) { sys::Mutex::ScopedLock l(lock); - acceptTracker.accept(id, session); + acceptTracker.accept(id, session, cumulative); } diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h index f6a291bc68..9053b70312 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h @@ -72,7 +72,7 @@ class IncomingMessages bool get(Handler& handler, qpid::sys::Duration timeout); bool getNextDestination(std::string& destination, qpid::sys::Duration timeout); void accept(); - void accept(qpid::framing::SequenceNumber id); + void accept(qpid::framing::SequenceNumber id, bool cumulative); void releaseAll(); void releasePending(const std::string& destination); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 75a71997fd..787be7de2a 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -112,13 +112,14 @@ void SessionImpl::release(qpid::messaging::Message& m) execute1<Release>(m); } -void SessionImpl::acknowledge(qpid::messaging::Message& m) +void SessionImpl::acknowledge(qpid::messaging::Message& m, bool cumulative) { //Should probably throw an exception on failure here, or indicate //it through a return type at least. Failure means that the //message may be redelivered; i.e. the application cannot delete //any state necessary for preventing reprocessing of the message - execute1<Acknowledge1>(m); + Acknowledge2 ack(*this, m, cumulative); + execute(ack); } void SessionImpl::close() @@ -467,10 +468,10 @@ void SessionImpl::acknowledgeImpl() if (!transactional) incoming.accept(); } -void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m) +void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m, bool cumulative) { ScopedLock l(lock); - if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId()); + if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId(), cumulative); } void SessionImpl::rejectImpl(qpid::messaging::Message& m) @@ -509,7 +510,7 @@ void SessionImpl::senderCancelled(const std::string& name) void SessionImpl::reconnect() { - connection->open(); + connection->reopen(); } bool SessionImpl::backoff() diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h index 2a2aa47df6..c7dea77d18 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -63,7 +63,7 @@ class SessionImpl : public qpid::messaging::SessionImpl void acknowledge(bool sync); void reject(qpid::messaging::Message&); void release(qpid::messaging::Message&); - void acknowledge(qpid::messaging::Message& msg); + void acknowledge(qpid::messaging::Message& msg, bool cumulative); void close(); void sync(bool block); qpid::messaging::Sender createSender(const qpid::messaging::Address& address); @@ -139,7 +139,7 @@ class SessionImpl : public qpid::messaging::SessionImpl void commitImpl(); void rollbackImpl(); void acknowledgeImpl(); - void acknowledgeImpl(qpid::messaging::Message&); + void acknowledgeImpl(qpid::messaging::Message&, bool cumulative); void rejectImpl(qpid::messaging::Message&); void releaseImpl(qpid::messaging::Message&); void closeImpl(); @@ -204,12 +204,13 @@ class SessionImpl : public qpid::messaging::SessionImpl void operator()() { impl.releaseImpl(message); } }; - struct Acknowledge1 : Command + struct Acknowledge2 : Command { qpid::messaging::Message& message; + bool cumulative; - Acknowledge1(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {} - void operator()() { impl.acknowledgeImpl(message); } + Acknowledge2(SessionImpl& i, qpid::messaging::Message& m, bool c) : Command(i), message(m), cumulative(c) {} + void operator()() { impl.acknowledgeImpl(message, cumulative); } }; struct CreateSender; diff --git a/qpid/cpp/src/qpid/client/windows/SslConnector.cpp b/qpid/cpp/src/qpid/client/windows/SslConnector.cpp index a33713e1a8..785c817928 100644 --- a/qpid/cpp/src/qpid/client/windows/SslConnector.cpp +++ b/qpid/cpp/src/qpid/client/windows/SslConnector.cpp @@ -77,7 +77,7 @@ public: framing::ProtocolVersion pVersion, const ConnectionSettings&, ConnectionImpl*); - virtual void connect(const std::string& host, int port); + virtual void connect(const std::string& host, const std::string& port); virtual void connected(const Socket&); unsigned int getSSF(); }; @@ -153,7 +153,7 @@ SslConnector::~SslConnector() // Will this get reach via virtual method via boost::bind???? -void SslConnector::connect(const std::string& host, int port) { +void SslConnector::connect(const std::string& host, const std::string& port) { brokerHost = host; TCPConnector::connect(host, port); } diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 0daf0c7f5a..82ed8bf8c9 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -146,6 +146,7 @@ #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/framing/AllInvoker.h" #include "qpid/framing/ClusterConfigChangeBody.h" +#include "qpid/framing/ClusterClockBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionAbortBody.h" #include "qpid/framing/ClusterRetractOfferBody.h" @@ -198,7 +199,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 1097431; +const uint32_t Cluster::CLUSTER_VERSION = 1128070; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -230,7 +231,6 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { cluster.updateOffer(member, updatee, l); } void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); } - void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) { cluster.errorCheck(member, type, frameSeq, l); } @@ -240,6 +240,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void deliverToQueue(const std::string& queue, const std::string& message) { cluster.deliverToQueue(queue, message, l); } + void clock(uint64_t time) { cluster.clock(time, l); } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } }; @@ -253,7 +254,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : self(cpg.self()), clusterId(true), mAgent(0), - expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())), + expiryPolicy(new ExpiryPolicy(*this)), mcast(cpg, poller, boost::bind(&Cluster::leave, this)), dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)), deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1), @@ -365,7 +366,8 @@ void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) { assert(discarding); pair<ConnectionMap::iterator, bool> ib = connections.insert(ConnectionMap::value_type(c->getId(), c)); - assert(ib.second); + // Like this to avoid tripping up unused variable warning when NDEBUG set + if (!ib.second) assert(ib.second); } void Cluster::erase(const ConnectionId& id) { @@ -667,6 +669,8 @@ void Cluster::initMapCompleted(Lock& l) { else { // I can go ready. discarding = false; setReady(l); + // Must be called *before* memberUpdate so first update will be generated. + failoverExchange->setReady(); memberUpdate(l); updateMgmtMembership(l); mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); @@ -719,6 +723,20 @@ void Cluster::configChange(const MemberId&, updateMgmtMembership(l); // Update on every config change for consistency } +struct ClusterClockTask : public sys::TimerTask { + Cluster& cluster; + sys::Timer& timer; + + ClusterClockTask(Cluster& cluster, sys::Timer& timer, uint16_t clockInterval) + : TimerTask(Duration(clockInterval * TIME_MSEC),"ClusterClock"), cluster(cluster), timer(timer) {} + + void fire() { + cluster.sendClockUpdate(); + setupNextFire(); + timer.add(this); + } +}; + void Cluster::becomeElder(Lock&) { if (elder) return; // We were already the elder. // We are the oldest, reactive links if necessary @@ -726,6 +744,8 @@ void Cluster::becomeElder(Lock&) { elder = true; broker.getLinks().setPassive(false); timer->becomeElder(); + + clockTimer.add(new ClusterClockTask(*this, clockTimer, settings.clockInterval)); } void Cluster::makeOffer(const MemberId& id, Lock& ) { @@ -846,7 +866,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) if (updatee != self && url) { QPID_LOG(debug, debugSnapshot()); if (mAgent) mAgent->clusterUpdate(); - // Updatee will call clusterUpdate when update completes + // Updatee will call clusterUpdate() via checkUpdateIn() when update completes } } @@ -927,10 +947,11 @@ void Cluster::checkUpdateIn(Lock& l) { if (!updateClosed) return; // Wait till update connection closes. if (updatedMap) { // We're up to date map = *updatedMap; - failoverExchange->setUrls(getUrls(l)); mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; memberUpdate(l); + // Must be called *after* memberUpdate() to avoid sending an extra update. + failoverExchange->setReady(); // NB: don't updateMgmtMembership() here as we are not in the deliver // thread. It will be updated on delivery of the "ready" we just mcast. broker.setClusterUpdatee(false); @@ -1120,10 +1141,6 @@ void Cluster::setClusterId(const Uuid& uuid, Lock&) { QPID_LOG(notice, *this << " cluster-uuid = " << clusterId); } -void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { - expiryPolicy->deliverExpire(id); -} - void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq, Lock&) { // If we see an errorCheck here (rather than in the ErrorCheck // class) then we have processed succesfully past the point of the @@ -1161,6 +1178,35 @@ void Cluster::deliverToQueue(const std::string& queue, const std::string& messag q->deliver(msg); } +sys::AbsTime Cluster::getClusterTime() { + Mutex::ScopedLock l(lock); + return clusterTime; +} + +// This method is called during update on the updatee to set the initial cluster time. +void Cluster::clock(const uint64_t time) { + Mutex::ScopedLock l(lock); + clock(time, l); +} + +// called when broadcast message received +void Cluster::clock(const uint64_t time, Lock&) { + clusterTime = AbsTime(EPOCH, time); + AbsTime now = AbsTime::now(); + + if (!elder) { + clusterTimeOffset = Duration(now, clusterTime); + } +} + +// called by elder timer to send clock broadcast +void Cluster::sendClockUpdate() { + Mutex::ScopedLock l(lock); + int64_t nanosecondsSinceEpoch = Duration(EPOCH, now()); + nanosecondsSinceEpoch += clusterTimeOffset; + mcast.mcastControl(ClusterClockBody(ProtocolVersion(), nanosecondsSinceEpoch), self); +} + bool Cluster::deferDeliveryImpl(const std::string& queue, const boost::intrusive_ptr<broker::Message>& msg) { diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index 78d325cdf9..adb06b2783 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -63,6 +63,12 @@ class AMQBody; struct Uuid; } +namespace sys { +class Timer; +class AbsTime; +class Duration; +} + namespace cluster { class Connection; @@ -135,6 +141,9 @@ class Cluster : private Cpg::Handler, public management::Manageable { bool deferDeliveryImpl(const std::string& queue, const boost::intrusive_ptr<broker::Message>& msg); + sys::AbsTime getClusterTime(); + void sendClockUpdate(); + void clock(const uint64_t time); private: typedef sys::Monitor::ScopedLock Lock; @@ -180,12 +189,12 @@ class Cluster : private Cpg::Handler, public management::Manageable { const std::string& left, const std::string& joined, Lock& l); - void messageExpired(const MemberId&, uint64_t, Lock& l); void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&); void timerWakeup(const MemberId&, const std::string& name, Lock&); void timerDrop(const MemberId&, const std::string& name, Lock&); void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&); void deliverToQueue(const std::string& queue, const std::string& message, Lock&); + void clock(const uint64_t time, Lock&); // Helper functions ConnectionPtr getConnection(const EventFrame&, Lock&); @@ -296,6 +305,9 @@ class Cluster : private Cpg::Handler, public management::Manageable { ErrorCheck error; UpdateReceiver updateReceiver; ClusterTimer* timer; + sys::Timer clockTimer; + sys::AbsTime clusterTime; + sys::Duration clusterTimeOffset; friend std::ostream& operator<<(std::ostream&, const Cluster&); friend struct ClusterDispatcher; diff --git a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp index 2962daaa07..69ba095f16 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -72,6 +72,7 @@ struct ClusterOptions : public Options { ("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.") #endif ("cluster-size", optValue(settings.size, "N"), "Wait for N cluster members before allowing clients to connect.") + ("cluster-clock-interval", optValue(settings.clockInterval,"N"), "How often to broadcast the current time to the cluster nodes, in milliseconds. A value between 5 and 1000 is recommended.") ("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: flow-control limit reads per connection. 0=no limit.") ; } diff --git a/qpid/cpp/src/qpid/cluster/ClusterSettings.h b/qpid/cpp/src/qpid/cluster/ClusterSettings.h index 8e708aa139..2f7b5be20a 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterSettings.h +++ b/qpid/cpp/src/qpid/cluster/ClusterSettings.h @@ -35,8 +35,9 @@ struct ClusterSettings { size_t readMax; std::string username, password, mechanism; size_t size; + uint16_t clockInterval; - ClusterSettings() : quorum(false), readMax(10), size(1) + ClusterSettings() : quorum(false), readMax(10), size(1), clockInterval(10) {} Url getUrl(uint16_t port) const { diff --git a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp b/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp index f6e1c7a849..b4f7d00f38 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp @@ -70,6 +70,7 @@ void ClusterTimer::add(intrusive_ptr<TimerTask> task) if (i != map.end()) throw Exception(QPID_MSG("Task already exists with name " << task->getName())); map[task->getName()] = task; + // Only the elder actually activates the task with the Timer base class. if (cluster.isElder()) { QPID_LOG(trace, "Elder activating cluster timer task " << task->getName()); @@ -112,6 +113,9 @@ void ClusterTimer::deliverWakeup(const std::string& name) { else { intrusive_ptr<TimerTask> t = i->second; map.erase(i); + // Move the nextFireTime so readyToFire() is true. This is to ensure we + // don't get an error if the fired task calls setupNextFire() + t->setFired(); Timer::fire(t); } } diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index b9895290e9..030d6e34c1 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -322,10 +322,10 @@ size_t Connection::decode(const char* data, size_t size) { while (localDecoder.decode(buf)) received(localDecoder.getFrame()); if (!wasOpen && connection->isOpen()) { - // Connections marked as federation links are allowed to proxy + // Connections marked with setUserProxyAuth are allowed to proxy // messages with user-ID that doesn't match the connection's // authenticated ID. This is important for updates. - connection->setFederationLink(isCatchUp()); + connection->setUserProxyAuth(isCatchUp()); } } else { // Multicast local connections. @@ -601,10 +601,6 @@ void Connection::queueObserverState(const std::string& qname, const std::string& QPID_LOG(error, "Failed to find observer " << observerId << " state on queue " << qname << "; this will result in inconsistencies."); } -void Connection::expiryId(uint64_t id) { - cluster.getExpiryPolicy().setId(id); -} - std::ostream& operator<<(std::ostream& o, const Connection& c) { const char* type="unknown"; if (c.isLocal()) type = "local"; @@ -724,5 +720,16 @@ void Connection::doCatchupIoCallbacks() { if (catchUp) getBrokerConnection()->doIoCallbacks(); } + +void Connection::clock(uint64_t time) { + QPID_LOG(debug, "Cluster connection received time update"); + cluster.clock(time); +} + +void Connection::queueDequeueSincePurgeState(const std::string& qname, uint32_t dequeueSincePurge) { + boost::shared_ptr<broker::Queue> queue(findQueue(qname)); + queue->setDequeueSincePurge(dequeueSincePurge); +} + }} // Namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index a0da9efbb8..a9740f97f8 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -155,7 +155,6 @@ class Connection : void queuePosition(const std::string&, const framing::SequenceNumber&); void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count); void queueObserverState(const std::string&, const std::string&, const framing::FieldTable&); - void expiryId(uint64_t); void txStart(); void txAccept(const framing::SequenceSet&); @@ -192,6 +191,10 @@ class Connection : void doCatchupIoCallbacks(); + void clock(uint64_t time); + + void queueDequeueSincePurgeState(const std::string&, uint32_t); + private: struct NullFrameHandler : public framing::FrameHandler { void handle(framing::AMQFrame&) {} diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp index d9a7b0122a..0ef5c2a35d 100644 --- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp +++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp @@ -21,106 +21,21 @@ #include "qpid/broker/Message.h" #include "qpid/cluster/ExpiryPolicy.h" -#include "qpid/cluster/Multicaster.h" -#include "qpid/framing/ClusterMessageExpiredBody.h" +#include "qpid/cluster/Cluster.h" #include "qpid/sys/Time.h" -#include "qpid/sys/Timer.h" #include "qpid/log/Statement.h" namespace qpid { namespace cluster { -ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, sys::Timer& t) - : expiryId(1), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {} +ExpiryPolicy::ExpiryPolicy(Cluster& cluster) : cluster(cluster) {} -struct ExpiryTask : public sys::TimerTask { - ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when) - : TimerTask(when,"ExpiryPolicy"), expiryPolicy(policy), expiryId(id) {} - void fire() { expiryPolicy->sendExpire(expiryId); } - boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; - const uint64_t expiryId; -}; - -// Called while receiving an update -void ExpiryPolicy::setId(uint64_t id) { - sys::Mutex::ScopedLock l(lock); - expiryId = id; -} - -// Called while giving an update -uint64_t ExpiryPolicy::getId() const { - sys::Mutex::ScopedLock l(lock); - return expiryId; -} - -// Called in enqueuing connection thread -void ExpiryPolicy::willExpire(broker::Message& m) { - uint64_t id; - { - // When messages are fanned out to multiple queues, update sends - // them as independenty messages so we can have multiple messages - // with the same expiry ID. - // - sys::Mutex::ScopedLock l(lock); - id = expiryId++; - if (!id) { // This is an update of an already-expired message. - m.setExpiryPolicy(expiredPolicy); - } - else { - assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end()); - // If this is an update, the id may already exist - unexpiredById.insert(IdMessageMap::value_type(id, &m)); - unexpiredByMessage[&m] = id; - } - } - timer.add(new ExpiryTask(this, id, m.getExpiration())); -} - -// Called in dequeueing connection thread -void ExpiryPolicy::forget(broker::Message& m) { - sys::Mutex::ScopedLock l(lock); - MessageIdMap::iterator i = unexpiredByMessage.find(&m); - assert(i != unexpiredByMessage.end()); - unexpiredById.erase(i->second); - unexpiredByMessage.erase(i); -} - -// Called in dequeueing connection or cleanup thread. bool ExpiryPolicy::hasExpired(broker::Message& m) { - sys::Mutex::ScopedLock l(lock); - return unexpiredByMessage.find(&m) == unexpiredByMessage.end(); -} - -// Called in timer thread -void ExpiryPolicy::sendExpire(uint64_t id) { - { - sys::Mutex::ScopedLock l(lock); - // Don't multicast an expiry notice if message is already forgotten. - if (unexpiredById.find(id) == unexpiredById.end()) return; - } - mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId); + return m.getExpiration() < cluster.getClusterTime(); } -// Called in CPG deliver thread. -void ExpiryPolicy::deliverExpire(uint64_t id) { - sys::Mutex::ScopedLock l(lock); - std::pair<IdMessageMap::iterator, IdMessageMap::iterator> expired = unexpiredById.equal_range(id); - IdMessageMap::iterator i = expired.first; - while (i != expired.second) { - i->second->setExpiryPolicy(expiredPolicy); // hasExpired() == true; - unexpiredByMessage.erase(i->second); - unexpiredById.erase(i++); - } +sys::AbsTime ExpiryPolicy::getCurrentTime() { + return cluster.getClusterTime(); } -// Called in update thread on the updater. -boost::optional<uint64_t> ExpiryPolicy::getId(broker::Message& m) { - sys::Mutex::ScopedLock l(lock); - MessageIdMap::iterator i = unexpiredByMessage.find(&m); - return i == unexpiredByMessage.end() ? boost::optional<uint64_t>() : i->second; -} - -bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; } -void ExpiryPolicy::Expired::willExpire(broker::Message&) { } - }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h index 77a656aa68..d8ddbca8b3 100644 --- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h +++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h @@ -36,12 +36,8 @@ namespace broker { class Message; } -namespace sys { -class Timer; -} - namespace cluster { -class Multicaster; +class Cluster; /** * Cluster expiry policy @@ -49,43 +45,13 @@ class Multicaster; class ExpiryPolicy : public broker::ExpiryPolicy { public: - ExpiryPolicy(Multicaster&, const MemberId&, sys::Timer&); + ExpiryPolicy(Cluster& cluster); - void willExpire(broker::Message&); bool hasExpired(broker::Message&); - void forget(broker::Message&); - - // Send expiration notice to cluster. - void sendExpire(uint64_t); + qpid::sys::AbsTime getCurrentTime(); - // Cluster delivers expiry notice. - void deliverExpire(uint64_t); - - void setId(uint64_t id); - uint64_t getId() const; - - boost::optional<uint64_t> getId(broker::Message&); - private: - typedef std::map<broker::Message*, uint64_t> MessageIdMap; - // When messages are fanned out to multiple queues, update sends - // them as independenty messages so we can have multiple messages - // with the same expiry ID. - typedef std::multimap<uint64_t, broker::Message*> IdMessageMap; - - struct Expired : public broker::ExpiryPolicy { - bool hasExpired(broker::Message&); - void willExpire(broker::Message&); - }; - - mutable sys::Mutex lock; - MessageIdMap unexpiredByMessage; - IdMessageMap unexpiredById; - uint64_t expiryId; - boost::intrusive_ptr<Expired> expiredPolicy; - Multicaster& mcast; - MemberId memberId; - sys::Timer& timer; + Cluster& cluster; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp b/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp index 84232dac1b..cfbe34a460 100644 --- a/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp +++ b/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -39,8 +39,10 @@ using namespace broker; using namespace framing; const string FailoverExchange::typeName("amq.failover"); - -FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b) : Exchange(typeName, parent, b ) { + +FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b) + : Exchange(typeName, parent, b ), ready(false) +{ if (mgmtExchange != 0) mgmtExchange->set_type(typeName); } @@ -53,16 +55,17 @@ void FailoverExchange::setUrls(const vector<Url>& u) { void FailoverExchange::updateUrls(const vector<Url>& u) { Lock l(lock); urls=u; - if (urls.empty()) return; - std::for_each(queues.begin(), queues.end(), - boost::bind(&FailoverExchange::sendUpdate, this, _1)); + if (ready && !urls.empty()) { + std::for_each(queues.begin(), queues.end(), + boost::bind(&FailoverExchange::sendUpdate, this, _1)); + } } string FailoverExchange::getType() const { return typeName; } bool FailoverExchange::bind(Queue::shared_ptr queue, const string&, const framing::FieldTable*) { Lock l(lock); - sendUpdate(queue); + if (ready) sendUpdate(queue); return queues.insert(queue).second; } @@ -84,7 +87,7 @@ void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) { // Called with lock held. if (urls.empty()) return; framing::Array array(0x95); - for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i) + for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i) array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str()))); const ProtocolVersion v; boost::intrusive_ptr<Message> msg(new Message); @@ -96,9 +99,12 @@ void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) { header.get<MessageProperties>(true)->getApplicationHeaders().setArray(typeName, array); AMQFrame headerFrame(header); headerFrame.setFirstSegment(false); - msg->getFrames().append(headerFrame); + msg->getFrames().append(headerFrame); DeliverableMessage(msg).deliverTo(queue); } +void FailoverExchange::setReady() { + ready = true; +} }} // namespace cluster diff --git a/qpid/cpp/src/qpid/cluster/FailoverExchange.h b/qpid/cpp/src/qpid/cluster/FailoverExchange.h index 2e1edfc0ae..c3e50c6929 100644 --- a/qpid/cpp/src/qpid/cluster/FailoverExchange.h +++ b/qpid/cpp/src/qpid/cluster/FailoverExchange.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -46,6 +46,8 @@ class FailoverExchange : public broker::Exchange void setUrls(const std::vector<Url>&); /** Set the URLs and send an update.*/ void updateUrls(const std::vector<Url>&); + /** Flag the failover exchange as ready to generate updates (caught up) */ + void setReady(); // Exchange overrides std::string getType() const; @@ -56,7 +58,7 @@ class FailoverExchange : public broker::Exchange private: void sendUpdate(const boost::shared_ptr<broker::Queue>&); - + typedef sys::Mutex::ScopedLock Lock; typedef std::vector<Url> Urls; typedef std::set<boost::shared_ptr<broker::Queue> > Queues; @@ -64,7 +66,7 @@ class FailoverExchange : public broker::Exchange sys::Mutex lock; Urls urls; Queues queues; - + bool ready; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index a15c14ff48..77448789db 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,9 +26,9 @@ #include "qpid/cluster/Decoder.h" #include "qpid/cluster/ExpiryPolicy.h" #include "qpid/cluster/UpdateDataExchange.h" -#include "qpid/client/SessionBase_0_10Access.h" -#include "qpid/client/ConnectionAccess.h" -#include "qpid/client/SessionImpl.h" +#include "qpid/client/SessionBase_0_10Access.h" +#include "qpid/client/ConnectionAccess.h" +#include "qpid/client/SessionImpl.h" #include "qpid/client/ConnectionImpl.h" #include "qpid/client/Future.h" #include "qpid/broker/Broker.h" @@ -83,11 +83,20 @@ using namespace framing; namespace arg=client::arg; using client::SessionBase_0_10Access; +// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges. +const std::string UpdateClient::UPDATE("x-qpid.cluster-update"); +// Name for header used to carry expiration information. +const std::string UpdateClient::X_QPID_EXPIRATION = "x-qpid.expiration"; +// Headers used to flag headers/properties added by the UpdateClient so they can be +// removed on the other side. +const std::string UpdateClient::X_QPID_NO_MESSAGE_PROPS = "x-qpid.no-message-props"; +const std::string UpdateClient::X_QPID_NO_HEADERS = "x-qpid.no-headers"; + std::ostream& operator<<(std::ostream& o, const UpdateClient& c) { return o << "cluster(" << c.updaterId << " UPDATER)"; } -struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler +struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler { boost::shared_ptr<qpid::client::ConnectionImpl> connection; @@ -121,7 +130,7 @@ void send(client::AsyncSession& s, const AMQBody& body) { // TODO aconway 2008-09-24: optimization: update connections/sessions in parallel. UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url, - broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_, + broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_, const Cluster::ConnectionVector& cons, Decoder& decoder_, const boost::function<void()>& ok, const boost::function<void(const std::exception&)>& fail, @@ -135,9 +144,6 @@ UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, con UpdateClient::~UpdateClient() {} -// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges. -const std::string UpdateClient::UPDATE("qpid.cluster-update"); - void UpdateClient::run() { try { connection.open(updateeUrl, connectionSettings); @@ -155,6 +161,13 @@ void UpdateClient::update() { << " at " << updateeUrl); Broker& b = updaterBroker; + if(b.getExpiryPolicy()) { + QPID_LOG(debug, *this << "Updating updatee with cluster time"); + qpid::sys::AbsTime clusterTime = b.getExpiryPolicy()->getCurrentTime(); + int64_t time = qpid::sys::Duration(qpid::sys::EPOCH, clusterTime); + ClusterConnectionProxy(session).clock(time); + } + updateManagementSetupState(); b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1)); @@ -174,7 +187,6 @@ void UpdateClient::update() { // Update queue listeners: must come after sessions so consumerNumbering is populated b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1)); - ClusterConnectionProxy(session).expiryId(expiry.getId()); updateLinks(); updateManagementAgent(); @@ -188,7 +200,7 @@ void UpdateClient::update() { // NOTE: connection will be closed from the other end, don't close // it here as that causes a race. - + // TODO aconway 2010-03-15: This sleep avoids the race condition // described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831. // It allows the connection to fully close before destroying the @@ -280,7 +292,7 @@ class MessageUpdater { framing::SequenceNumber lastPos; client::AsyncSession session; ExpiryPolicy& expiry; - + public: MessageUpdater(const string& q, const client::AsyncSession s, ExpiryPolicy& expiry_) : queue(q), haveLastPos(false), session(s), expiry(expiry_) { @@ -297,7 +309,6 @@ class MessageUpdater { } } - void updateQueuedMessage(const broker::QueuedMessage& message) { // Send the queue position if necessary. if (!haveLastPos || message.position - lastPos != 1) { @@ -306,10 +317,23 @@ class MessageUpdater { } lastPos = message.position; - // Send the expiry ID if necessary. - if (message.payload->getProperties<DeliveryProperties>()->getTtl()) { - boost::optional<uint64_t> expiryId = expiry.getId(*message.payload); - ClusterConnectionProxy(session).expiryId(expiryId?*expiryId:0); + // if the ttl > 0, we need to send the calculated expiration time to the updatee + if (message.payload->getProperties<DeliveryProperties>()->getTtl() > 0) { + bool hadMessageProps = + message.payload->hasProperties<framing::MessageProperties>(); + framing::MessageProperties* mprops = + message.payload->getProperties<framing::MessageProperties>(); + bool hadApplicationHeaders = mprops->hasApplicationHeaders(); + FieldTable& applicationHeaders = mprops->getApplicationHeaders(); + applicationHeaders.setInt64( + UpdateClient::X_QPID_EXPIRATION, + sys::Duration(sys::EPOCH, message.payload->getExpiration())); + // If message properties or application headers didn't exist + // prior to us adding data, we want to remove them on the other side. + if (!hadMessageProps) + applicationHeaders.setInt(UpdateClient::X_QPID_NO_MESSAGE_PROPS, 0); + else if (!hadApplicationHeaders) + applicationHeaders.setInt(UpdateClient::X_QPID_NO_HEADERS, 0); } // We can't send a broker::Message via the normal client API, @@ -322,7 +346,7 @@ class MessageUpdater { framing::MessageTransferBody transfer( *message.payload->getFrames().as<framing::MessageTransferBody>()); transfer.setDestination(UpdateClient::UPDATE); - + sb.get()->send(transfer, message.payload->getFrames(), !message.payload->isContentReleased()); if (message.payload->isContentReleased()){ @@ -330,12 +354,21 @@ class MessageUpdater { uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); bool morecontent = true; for (uint64_t offset = 0; morecontent; offset += maxContentSize) - { + { AMQFrame frame((AMQContentBody())); morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset); sb.get()->sendRawFrame(frame); } } + // If the ttl > 0, we need to send the calculated expiration time to the updatee + // Careful not to alter the message as a side effect e.g. by adding + // an empty DeliveryProperties or setting TTL when it wasn't set before. + uint64_t ttl = 0; + if (message.payload->hasProperties<DeliveryProperties>()) { + DeliveryProperties* dprops = + message.payload->getProperties<DeliveryProperties>(); + if (dprops->hasTtl()) ttl = dprops->getTtl(); + }; } void updateMessage(const boost::intrusive_ptr<broker::Message>& message) { @@ -361,6 +394,8 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr< if (qpid::broker::Fairshare::getState(q->getMessages(), priority, count)) { ClusterConnectionProxy(s).queueFairshareState(q->getName(), priority, count); } + + ClusterConnectionProxy(s).queueDequeueSincePurgeState(q->getName(), q->getDequeueSincePurge()); } void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) { @@ -393,7 +428,7 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda QPID_LOG(debug, *this << " updating connection " << *updateConnection); assert(updateConnection->getBrokerConnection()); broker::Connection& bc = *updateConnection->getBrokerConnection(); - + // Send the management ID first on the main connection. std::string mgmtId = updateConnection->getBrokerConnection()->getMgmtId(); ClusterConnectionProxy(session).shadowPrepare(mgmtId); @@ -430,7 +465,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { QPID_LOG(debug, *this << " updating session " << ss->getId()); - // Create a client session to update session state. + // Create a client session to update session state. boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection); boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel()); simpl->disableAutoDetach(); @@ -456,12 +491,12 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { // Adjust command counter for message in progress, will be sent after state update. boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress(); SequenceNumber received = ss->receiverGetReceived().command; - if (inProgress) + if (inProgress) --received; // Sync the session to ensure all responses from broker have been processed. shadowSession.sync(); - + // Reset command-sequence state. proxy.sessionState( ss->senderGetReplayPoint().command, @@ -511,7 +546,7 @@ void UpdateClient::updateConsumer( QPID_LOG(debug, *this << " updated consumer " << ci->getName() << " on " << shadowSession.getId()); } - + void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) { if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) { // If the message is acquired then it is no longer on the @@ -543,7 +578,7 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { void operator()(const broker::DtxAck& ) { throw InternalErrorException("DTX transactions not currently supported by cluster."); } - + void operator()(const broker::RecoveredDequeue& rdeq) { updateMessage(rdeq.getMessage()); proxy.txEnqueue(rdeq.getQueue()->getName()); @@ -563,7 +598,7 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { typedef std::list<Queue::shared_ptr> QueueList; const QueueList& qlist = txPub.getQueues(); Array qarray(TYPE_CODE_STR8); - for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i) + for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i) qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName()))); proxy.txPublish(qarray, txPub.delivered); } @@ -573,7 +608,7 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { client::AsyncSession session; ClusterConnectionProxy proxy; }; - + void UpdateClient::updateTxState(broker::SemanticState& s) { QPID_LOG(debug, *this << " updating TX transaction state."); ClusterConnectionProxy proxy(shadowSession); diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h index b72d090d73..21bf6024e0 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.h +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -69,14 +69,19 @@ class ExpiryPolicy; class UpdateClient : public sys::Runnable { public: static const std::string UPDATE; // Name for special update queue and exchange. + static const std::string X_QPID_EXPIRATION; // Update message expiration + // Flag to remove props/headers that were added by the UpdateClient + static const std::string X_QPID_NO_MESSAGE_PROPS; + static const std::string X_QPID_NO_HEADERS; + static client::Connection catchUpConnection(); - + UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&, broker::Broker& donor, const ClusterMap& map, ExpiryPolicy& expiry, const std::vector<boost::intrusive_ptr<Connection> >&, Decoder&, const boost::function<void()>& done, const boost::function<void(const std::exception&)>& fail, - const client::ConnectionSettings& + const client::ConnectionSettings& ); ~UpdateClient(); diff --git a/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp b/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp index 11937f296f..e830459aba 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,6 +19,7 @@ * */ #include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/FieldTable.h" #include "qpid/broker/Message.h" #include "UpdateExchange.h" @@ -27,6 +28,8 @@ namespace cluster { using framing::MessageTransferBody; using framing::DeliveryProperties; +using framing::MessageProperties; +using framing::FieldTable; UpdateExchange::UpdateExchange(management::Manageable* parent) : broker::Exchange(UpdateClient::UPDATE, parent), @@ -34,6 +37,7 @@ UpdateExchange::UpdateExchange(management::Manageable* parent) void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>& msg) { + // Copy exchange name to destination property. MessageTransferBody* transfer = msg->getMethod<MessageTransferBody>(); assert(transfer); const DeliveryProperties* props = msg->getProperties<DeliveryProperties>(); @@ -42,6 +46,23 @@ void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>& transfer->setDestination(props->getExchange()); else transfer->clearDestinationFlag(); -} + // Copy expiration from x-property if present. + if (msg->hasProperties<MessageProperties>()) { + MessageProperties* mprops = msg->getProperties<MessageProperties>(); + if (mprops->hasApplicationHeaders()) { + FieldTable& headers = mprops->getApplicationHeaders(); + if (headers.isSet(UpdateClient::X_QPID_EXPIRATION)) { + msg->setExpiration( + sys::AbsTime(sys::EPOCH, headers.getAsInt64(UpdateClient::X_QPID_EXPIRATION))); + headers.erase(UpdateClient::X_QPID_EXPIRATION); + // Erase props/headers that were added by the UpdateClient + if (headers.isSet(UpdateClient::X_QPID_NO_MESSAGE_PROPS)) + msg->eraseProperties<MessageProperties>(); + else if (headers.isSet(UpdateClient::X_QPID_NO_HEADERS)) + mprops->clearApplicationHeadersFlag(); + } + } + } +} }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/console/SessionManager.cpp b/qpid/cpp/src/qpid/console/SessionManager.cpp index 80c5959417..910ae22be8 100644 --- a/qpid/cpp/src/qpid/console/SessionManager.cpp +++ b/qpid/cpp/src/qpid/console/SessionManager.cpp @@ -362,12 +362,11 @@ void SessionManager::handleCommandComplete(Broker* broker, Buffer& inBuffer, uin void SessionManager::handleClassInd(Broker* broker, Buffer& inBuffer, uint32_t) { - uint8_t kind; string packageName; string className; uint8_t hash[16]; - kind = inBuffer.getOctet(); + /*kind*/ (void) inBuffer.getOctet(); inBuffer.getShortString(packageName); inBuffer.getShortString(className); inBuffer.getBin128(hash); diff --git a/qpid/cpp/src/qpid/framing/AMQHeaderBody.h b/qpid/cpp/src/qpid/framing/AMQHeaderBody.h index a8c326969a..452154eb5c 100644 --- a/qpid/cpp/src/qpid/framing/AMQHeaderBody.h +++ b/qpid/cpp/src/qpid/framing/AMQHeaderBody.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -58,7 +58,7 @@ class QPID_COMMON_CLASS_EXTERN AMQHeaderBody : public AMQBody } else return Base::decode(buffer, size, type); - } + } void print(std::ostream& out) const { const boost::optional<T>& p=this->OptProps<T>::props; if (p) out << *p; @@ -77,7 +77,7 @@ class QPID_COMMON_CLASS_EXTERN AMQHeaderBody : public AMQBody typedef PropSet<PropSet<Empty, DeliveryProperties>, MessageProperties> Properties; Properties properties; - + public: inline uint8_t type() const { return HEADER_BODY; } @@ -99,6 +99,10 @@ public: return properties.OptProps<T>::props.get_ptr(); } + template <class T> void erase() { + properties.OptProps<T>::props.reset(); + } + boost::intrusive_ptr<AMQBody> clone() const { return BodyFactory::copy(*this); } }; diff --git a/qpid/cpp/src/qpid/messaging/Session.cpp b/qpid/cpp/src/qpid/messaging/Session.cpp index 496953a8e5..cccfd9a873 100644 --- a/qpid/cpp/src/qpid/messaging/Session.cpp +++ b/qpid/cpp/src/qpid/messaging/Session.cpp @@ -39,7 +39,8 @@ Session& Session::operator=(const Session& s) { return PI::assign(*this, s); } void Session::commit() { impl->commit(); } void Session::rollback() { impl->rollback(); } void Session::acknowledge(bool sync) { impl->acknowledge(sync); } -void Session::acknowledge(Message& m, bool s) { impl->acknowledge(m); sync(s); } +void Session::acknowledge(Message& m, bool s) { impl->acknowledge(m, false); sync(s); } +void Session::acknowledgeUpTo(Message& m, bool s) { impl->acknowledge(m, true); sync(s); } void Session::reject(Message& m) { impl->reject(m); } void Session::release(Message& m) { impl->release(m); } void Session::close() { impl->close(); } diff --git a/qpid/cpp/src/qpid/messaging/SessionImpl.h b/qpid/cpp/src/qpid/messaging/SessionImpl.h index 02a254e4f2..60ae615253 100644 --- a/qpid/cpp/src/qpid/messaging/SessionImpl.h +++ b/qpid/cpp/src/qpid/messaging/SessionImpl.h @@ -41,7 +41,7 @@ class SessionImpl : public virtual qpid::RefCounted virtual void commit() = 0; virtual void rollback() = 0; virtual void acknowledge(bool sync) = 0; - virtual void acknowledge(Message&) = 0; + virtual void acknowledge(Message&, bool cumulative) = 0; virtual void reject(Message&) = 0; virtual void release(Message&) = 0; virtual void close() = 0; diff --git a/qpid/cpp/src/qpid/sys/AsynchIO.h b/qpid/cpp/src/qpid/sys/AsynchIO.h index 50da8fa4fc..41f74f7ed0 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIO.h +++ b/qpid/cpp/src/qpid/sys/AsynchIO.h @@ -64,8 +64,8 @@ public: // deletes. To correctly manage heaps when needed, the allocate and // delete should both be done from the same class/library. QPID_COMMON_EXTERN static AsynchConnector* create(const Socket& s, - std::string hostname, - uint16_t port, + const std::string& hostname, + const std::string& port, ConnectedCallback connCb, FailedCallback failCb); virtual void start(boost::shared_ptr<Poller> poller) = 0; diff --git a/qpid/cpp/src/qpid/sys/AtomicValue_gcc.h b/qpid/cpp/src/qpid/sys/AtomicValue_gcc.h index d022b07c1d..724bae422e 100644 --- a/qpid/cpp/src/qpid/sys/AtomicValue_gcc.h +++ b/qpid/cpp/src/qpid/sys/AtomicValue_gcc.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -39,6 +39,9 @@ class AtomicValue public: AtomicValue(T init=0) : value(init) {} + // Not atomic. Don't call concurrently with atomic ops. + AtomicValue<T>& operator=(T newValue) { value = newValue; return *this; } + // Update and return new value. inline T operator+=(T n) { return __sync_add_and_fetch(&value, n); } inline T operator-=(T n) { return __sync_sub_and_fetch(&value, n); } @@ -54,11 +57,11 @@ class AtomicValue /** If current value == testval then set to newval. Returns the old value. */ T valueCompareAndSwap(T testval, T newval) { return __sync_val_compare_and_swap(&value, testval, newval); } - /** If current value == testval then set to newval. Returns true if the swap was performed. */ + /** If current value == testval then set to newval. Returns true if the swap was performed. */ bool boolCompareAndSwap(T testval, T newval) { return __sync_bool_compare_and_swap(&value, testval, newval); } T get() const { return const_cast<AtomicValue<T>*>(this)->fetchAndAdd(static_cast<T>(0)); } - + private: T value; }; diff --git a/qpid/cpp/src/qpid/sys/ProtocolFactory.h b/qpid/cpp/src/qpid/sys/ProtocolFactory.h index b233b2da1a..4d198a92da 100644 --- a/qpid/cpp/src/qpid/sys/ProtocolFactory.h +++ b/qpid/cpp/src/qpid/sys/ProtocolFactory.h @@ -39,11 +39,10 @@ class ProtocolFactory : public qpid::SharedObject<ProtocolFactory> virtual ~ProtocolFactory() = 0; virtual uint16_t getPort() const = 0; - virtual std::string getHost() const = 0; virtual void accept(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0; virtual void connect( boost::shared_ptr<Poller>, - const std::string& host, int16_t port, + const std::string& host, const std::string& port, ConnectionCodec::Factory* codec, ConnectFailedCallback failed) = 0; virtual bool supports(const std::string& /*capability*/) { return false; } diff --git a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp index d53db20598..6769e5383c 100644 --- a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp +++ b/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp @@ -31,7 +31,6 @@ #include "qpid/sys/SecuritySettings.h" #include <boost/bind.hpp> -#include <boost/lexical_cast.hpp> #include <memory> #include <netdb.h> @@ -212,10 +211,9 @@ void RdmaIOHandler::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) { if (readError) { return; } - size_t decoded = 0; try { if (codec) { - decoded = codec->decode(buff->bytes(), buff->dataCount()); + (void) codec->decode(buff->bytes(), buff->dataCount()); }else{ // Need to start protocol processing initProtocolIn(buff); @@ -230,9 +228,7 @@ void RdmaIOHandler::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) { void RdmaIOHandler::initProtocolIn(Rdma::Buffer* buff) { framing::Buffer in(buff->bytes(), buff->dataCount()); framing::ProtocolInitiation protocolInit; - size_t decoded = 0; if (protocolInit.decode(in)) { - decoded = in.getPosition(); QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" << protocolInit << ")"); codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings()); @@ -254,10 +250,9 @@ class RdmaIOProtocolFactory : public ProtocolFactory { public: RdmaIOProtocolFactory(int16_t port, int backlog); void accept(Poller::shared_ptr, ConnectionCodec::Factory*); - void connect(Poller::shared_ptr, const string& host, int16_t port, ConnectionCodec::Factory*, ConnectFailedCallback); + void connect(Poller::shared_ptr, const string& host, const std::string& port, ConnectionCodec::Factory*, ConnectFailedCallback); uint16_t getPort() const; - string getHost() const; private: bool request(Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&, ConnectionCodec::Factory*); @@ -347,18 +342,7 @@ uint16_t RdmaIOProtocolFactory::getPort() const { return listeningPort; // Immutable no need for lock. } -string RdmaIOProtocolFactory::getHost() const { - //return listener.getSockname(); - return ""; -} - void RdmaIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { - ::sockaddr_in sin; - - sin.sin_family = AF_INET; - sin.sin_port = htons(listeningPort); - sin.sin_addr.s_addr = INADDR_ANY; - listener.reset( new Rdma::Listener( Rdma::ConnectionParams(65536, Rdma::DEFAULT_WR_ENTRIES), @@ -387,7 +371,7 @@ void RdmaIOProtocolFactory::connected(Poller::shared_ptr poller, Rdma::Connectio void RdmaIOProtocolFactory::connect( Poller::shared_ptr poller, - const std::string& host, int16_t port, + const std::string& host, const std::string& port, ConnectionCodec::Factory* f, ConnectFailedCallback failed) { @@ -399,7 +383,7 @@ void RdmaIOProtocolFactory::connect( boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1), boost::bind(&RdmaIOProtocolFactory::rejected, this, _1, _2, failed)); - SocketAddress sa(host, boost::lexical_cast<std::string>(port)); + SocketAddress sa(host, port); c->start(poller, sa); } diff --git a/qpid/cpp/src/qpid/sys/Socket.h b/qpid/cpp/src/qpid/sys/Socket.h index b1cded1aa1..9f62f3be1c 100644 --- a/qpid/cpp/src/qpid/sys/Socket.h +++ b/qpid/cpp/src/qpid/sys/Socket.h @@ -39,15 +39,12 @@ public: /** Create a socket wrapper for descriptor. */ QPID_COMMON_EXTERN Socket(); - /** Set timeout for read and write */ - void setTimeout(const Duration& interval) const; - /** Set socket non blocking */ void setNonblocking() const; QPID_COMMON_EXTERN void setTcpNoDelay() const; - QPID_COMMON_EXTERN void connect(const std::string& host, uint16_t port) const; + QPID_COMMON_EXTERN void connect(const std::string& host, const std::string& port) const; QPID_COMMON_EXTERN void connect(const SocketAddress&) const; QPID_COMMON_EXTERN void close() const; @@ -57,19 +54,9 @@ public: *@param backlog maximum number of pending connections. *@return The bound port. */ - QPID_COMMON_EXTERN int listen(uint16_t port = 0, int backlog = 10) const; + QPID_COMMON_EXTERN int listen(const std::string& host = "", const std::string& port = "0", int backlog = 10) const; QPID_COMMON_EXTERN int listen(const SocketAddress&, int backlog = 10) const; - /** Returns the "socket name" ie the address bound to - * the near end of the socket - */ - QPID_COMMON_EXTERN std::string getSockname() const; - - /** Returns the "peer name" ie the address bound to - * the remote end of the socket - */ - std::string getPeername() const; - /** * Returns an address (host and port) for the remote end of the * socket @@ -86,9 +73,6 @@ public: */ QPID_COMMON_INLINE_EXTERN std::string getFullAddress() const { return getLocalAddress()+"-"+getPeerAddress(); } - QPID_COMMON_EXTERN uint16_t getLocalPort() const; - uint16_t getRemotePort() const; - /** * Returns the error code stored in the socket. This may be used * to determine the result of a non-blocking connect. @@ -109,7 +93,8 @@ private: void createSocket(const SocketAddress&) const; Socket(IOHandlePrivate*); - mutable std::string connectname; + mutable std::string localname; + mutable std::string peername; mutable bool nonblocking; mutable bool nodelay; }; diff --git a/qpid/cpp/src/qpid/sys/SocketAddress.h b/qpid/cpp/src/qpid/sys/SocketAddress.h index 27b9642f2c..c2120338cf 100644 --- a/qpid/cpp/src/qpid/sys/SocketAddress.h +++ b/qpid/cpp/src/qpid/sys/SocketAddress.h @@ -41,7 +41,7 @@ public: QPID_COMMON_EXTERN SocketAddress& operator=(const SocketAddress&); QPID_COMMON_EXTERN ~SocketAddress(); - std::string asString() const; + std::string asString(bool numeric=true) const; private: std::string host; diff --git a/qpid/cpp/src/qpid/sys/SslPlugin.cpp b/qpid/cpp/src/qpid/sys/SslPlugin.cpp index 0ec051caab..471a0cef60 100644 --- a/qpid/cpp/src/qpid/sys/SslPlugin.cpp +++ b/qpid/cpp/src/qpid/sys/SslPlugin.cpp @@ -66,12 +66,11 @@ class SslProtocolFactory : public ProtocolFactory { public: SslProtocolFactory(const SslServerOptions&, int backlog, bool nodelay); void accept(Poller::shared_ptr, ConnectionCodec::Factory*); - void connect(Poller::shared_ptr, const std::string& host, int16_t port, + void connect(Poller::shared_ptr, const std::string& host, const std::string& port, ConnectionCodec::Factory*, boost::function2<void, int, std::string> failed); uint16_t getPort() const; - std::string getHost() const; bool supports(const std::string& capability); private: @@ -146,10 +145,6 @@ uint16_t SslProtocolFactory::getPort() const { return listeningPort; // Immutable no need for lock. } -std::string SslProtocolFactory::getHost() const { - return listener.getSockname(); -} - void SslProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { acceptor.reset( @@ -160,7 +155,7 @@ void SslProtocolFactory::accept(Poller::shared_ptr poller, void SslProtocolFactory::connect( Poller::shared_ptr poller, - const std::string& host, int16_t port, + const std::string& host, const std::string& port, ConnectionCodec::Factory* fact, ConnectFailedCallback failed) { diff --git a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp index a6528f9ad9..34338ce434 100644 --- a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -42,14 +42,13 @@ class AsynchIOProtocolFactory : public ProtocolFactory { std::auto_ptr<AsynchAcceptor> acceptor; public: - AsynchIOProtocolFactory(int16_t port, int backlog, bool nodelay); + AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay); void accept(Poller::shared_ptr, ConnectionCodec::Factory*); - void connect(Poller::shared_ptr, const std::string& host, int16_t port, + void connect(Poller::shared_ptr, const std::string& host, const std::string& port, ConnectionCodec::Factory*, ConnectFailedCallback); uint16_t getPort() const; - std::string getHost() const; private: void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, @@ -61,22 +60,25 @@ class AsynchIOProtocolFactory : public ProtocolFactory { static class TCPIOPlugin : public Plugin { void earlyInitialize(Target&) { } - + void initialize(Target& target) { broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); // Only provide to a Broker if (broker) { const broker::Broker::Options& opts = broker->getOptions(); - ProtocolFactory::shared_ptr protocol(new AsynchIOProtocolFactory(opts.port, opts.connectionBacklog, - opts.tcpNoDelay)); - QPID_LOG(notice, "Listening on TCP port " << protocol->getPort()); - broker->registerProtocolFactory("tcp", protocol); + ProtocolFactory::shared_ptr protocolt( + new AsynchIOProtocolFactory( + "", boost::lexical_cast<std::string>(opts.port), + opts.connectionBacklog, + opts.tcpNoDelay)); + QPID_LOG(notice, "Listening on TCP port " << protocolt->getPort()); + broker->registerProtocolFactory("tcp", protocolt); } } } tcpPlugin; -AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog, bool nodelay) : - tcpNoDelay(nodelay), listeningPort(listener.listen(port, backlog)) +AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay) : + tcpNoDelay(nodelay), listeningPort(listener.listen(host, port, backlog)) {} void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, @@ -107,10 +109,6 @@ uint16_t AsynchIOProtocolFactory::getPort() const { return listeningPort; // Immutable no need for lock. } -std::string AsynchIOProtocolFactory::getHost() const { - return listener.getSockname(); -} - void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { acceptor.reset( @@ -130,7 +128,7 @@ void AsynchIOProtocolFactory::connectFailed( void AsynchIOProtocolFactory::connect( Poller::shared_ptr poller, - const std::string& host, int16_t port, + const std::string& host, const std::string& port, ConnectionCodec::Factory* fact, ConnectFailedCallback failed) { @@ -139,7 +137,6 @@ void AsynchIOProtocolFactory::connect( // upon connection failure or by the AsynchIO upon connection // shutdown. The allocated AsynchConnector frees itself when it // is no longer needed. - Socket* socket = new Socket(); AsynchConnector* c = AsynchConnector::create( *socket, diff --git a/qpid/cpp/src/qpid/sys/Timer.cpp b/qpid/cpp/src/qpid/sys/Timer.cpp index fdb2e8c6bb..47752e4584 100644 --- a/qpid/cpp/src/qpid/sys/Timer.cpp +++ b/qpid/cpp/src/qpid/sys/Timer.cpp @@ -75,6 +75,12 @@ void TimerTask::cancel() { cancelled = true; } +void TimerTask::setFired() { + // Set nextFireTime to just before now, making readyToFire() true. + nextFireTime = AbsTime(sys::now(), Duration(-1)); +} + + Timer::Timer() : active(false), late(50 * TIME_MSEC), diff --git a/qpid/cpp/src/qpid/sys/Timer.h b/qpid/cpp/src/qpid/sys/Timer.h index 98ba39ce38..fccb17dbc2 100644 --- a/qpid/cpp/src/qpid/sys/Timer.h +++ b/qpid/cpp/src/qpid/sys/Timer.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -64,6 +64,10 @@ class TimerTask : public RefCounted { std::string getName() const { return name; } + // Move the nextFireTime so readyToFire is true. + // Used by the cluster, where tasks are fired on cluster events, not on local time. + QPID_COMMON_EXTERN void setFired(); + protected: // Must be overridden with callback virtual void fire() = 0; diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp index 119a6aa8a4..b5a0b0bf32 100644 --- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -152,8 +152,8 @@ private: public: AsynchConnector(const Socket& socket, - std::string hostname, - uint16_t port, + const std::string& hostname, + const std::string& port, ConnectedCallback connCb, FailedCallback failCb); void start(Poller::shared_ptr poller); @@ -161,8 +161,8 @@ public: }; AsynchConnector::AsynchConnector(const Socket& s, - std::string hostname, - uint16_t port, + const std::string& hostname, + const std::string& port, ConnectedCallback connCb, FailedCallback failCb) : DispatchHandle(s, @@ -174,7 +174,7 @@ AsynchConnector::AsynchConnector(const Socket& s, socket(s) { socket.setNonblocking(); - SocketAddress sa(hostname, boost::lexical_cast<std::string>(port)); + SocketAddress sa(hostname, port); // Note, not catching any exceptions here, also has effect of destructing socket.connect(sa); } @@ -589,8 +589,8 @@ AsynchAcceptor* AsynchAcceptor::create(const Socket& s, } AsynchConnector* AsynchConnector::create(const Socket& s, - std::string hostname, - uint16_t port, + const std::string& hostname, + const std::string& port, ConnectedCallback connCb, FailedCallback failCb) { diff --git a/qpid/cpp/src/qpid/sys/posix/LockFile.cpp b/qpid/cpp/src/qpid/sys/posix/LockFile.cpp index 1862ff6ac9..f5a6c292cb 100755 --- a/qpid/cpp/src/qpid/sys/posix/LockFile.cpp +++ b/qpid/cpp/src/qpid/sys/posix/LockFile.cpp @@ -58,8 +58,7 @@ LockFile::~LockFile() { if (impl) { int f = impl->fd; if (f >= 0) { - int unused_ret; - unused_ret = ::lockf(f, F_ULOCK, 0); // Suppress warnings about ignoring return value. + (void) ::lockf(f, F_ULOCK, 0); // Suppress warnings about ignoring return value. ::close(f); impl->fd = -1; } diff --git a/qpid/cpp/src/qpid/sys/posix/Socket.cpp b/qpid/cpp/src/qpid/sys/posix/Socket.cpp index 3449a753e3..aa25f8062d 100644 --- a/qpid/cpp/src/qpid/sys/posix/Socket.cpp +++ b/qpid/cpp/src/qpid/sys/posix/Socket.cpp @@ -37,13 +37,12 @@ #include <iostream> #include <boost/format.hpp> -#include <boost/lexical_cast.hpp> namespace qpid { namespace sys { namespace { -std::string getName(int fd, bool local, bool includeService = false) +std::string getName(int fd, bool local) { ::sockaddr_storage name; // big enough for any socket address ::socklen_t namelen = sizeof(name); @@ -54,45 +53,15 @@ std::string getName(int fd, bool local, bool includeService = false) } else { result = ::getpeername(fd, (::sockaddr*)&name, &namelen); } - QPID_POSIX_CHECK(result); char servName[NI_MAXSERV]; char dispName[NI_MAXHOST]; - if (includeService) { - if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), - servName, sizeof(servName), - NI_NUMERICHOST | NI_NUMERICSERV) != 0) - throw QPID_POSIX_ERROR(rc); - return std::string(dispName) + ":" + std::string(servName); - - } else { - if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), 0, 0, NI_NUMERICHOST) != 0) - throw QPID_POSIX_ERROR(rc); - return dispName; - } -} - -std::string getService(int fd, bool local) -{ - ::sockaddr_storage name; // big enough for any socket address - ::socklen_t namelen = sizeof(name); - - int result = -1; - if (local) { - result = ::getsockname(fd, (::sockaddr*)&name, &namelen); - } else { - result = ::getpeername(fd, (::sockaddr*)&name, &namelen); - } - - QPID_POSIX_CHECK(result); - - char servName[NI_MAXSERV]; - if (int rc=::getnameinfo((::sockaddr*)&name, namelen, 0, 0, - servName, sizeof(servName), - NI_NUMERICHOST | NI_NUMERICSERV) != 0) + if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), + servName, sizeof(servName), + NI_NUMERICHOST | NI_NUMERICSERV) != 0) throw QPID_POSIX_ERROR(rc); - return servName; + return std::string(dispName) + ":" + std::string(servName); } } @@ -126,15 +95,6 @@ void Socket::createSocket(const SocketAddress& sa) const } } -void Socket::setTimeout(const Duration& interval) const -{ - const int& socket = impl->fd; - struct timeval tv; - toTimeval(tv, interval); - setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); - setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); -} - void Socket::setNonblocking() const { int& socket = impl->fd; nonblocking = true; @@ -154,15 +114,22 @@ void Socket::setTcpNoDelay() const } } -void Socket::connect(const std::string& host, uint16_t port) const +void Socket::connect(const std::string& host, const std::string& port) const { - SocketAddress sa(host, boost::lexical_cast<std::string>(port)); + SocketAddress sa(host, port); connect(sa); } void Socket::connect(const SocketAddress& addr) const { - connectname = addr.asString(); + // The display name for an outbound connection needs to be the name that was specified + // for the address rather than a resolved IP address as we don't know which of + // the IP addresses is actually the one that will be connected to. + peername = addr.asString(false); + + // However the string we compare with the local port must be numeric or it might not + // match when it should as getLocalAddress() will always be numeric + std::string connectname = addr.asString(); createSocket(addr); @@ -170,7 +137,7 @@ void Socket::connect(const SocketAddress& addr) const // TODO the correct thing to do here is loop on failure until you've used all the returned addresses if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) < 0) && (errno != EINPROGRESS)) { - throw Exception(QPID_MSG(strError(errno) << ": " << connectname)); + throw Exception(QPID_MSG(strError(errno) << ": " << peername)); } // When connecting to a port on the same host which no longer has // a process associated with it, the OS occasionally chooses the @@ -185,9 +152,9 @@ void Socket::connect(const SocketAddress& addr) const // Raise an error if we see such a connection, since we know there is // no listener on the peer address. // - if (getLocalAddress() == getPeerAddress()) { + if (getLocalAddress() == connectname) { close(); - throw Exception(QPID_MSG("Connection refused: " << connectname)); + throw Exception(QPID_MSG("Connection refused: " << peername)); } } @@ -200,9 +167,9 @@ Socket::close() const socket = -1; } -int Socket::listen(uint16_t port, int backlog) const +int Socket::listen(const std::string& host, const std::string& port, int backlog) const { - SocketAddress sa("", boost::lexical_cast<std::string>(port)); + SocketAddress sa(host, port); return listen(sa, backlog); } @@ -230,8 +197,11 @@ int Socket::listen(const SocketAddress& sa, int backlog) const Socket* Socket::accept() const { int afd = ::accept(impl->fd, 0, 0); - if ( afd >= 0) - return new Socket(new IOHandlePrivate(afd)); + if ( afd >= 0) { + Socket* s = new Socket(new IOHandlePrivate(afd)); + s->localname = localname; + return s; + } else if (errno == EAGAIN) return 0; else throw QPID_POSIX_ERROR(errno); @@ -247,37 +217,20 @@ int Socket::write(const void *buf, size_t count) const return ::write(impl->fd, buf, count); } -std::string Socket::getSockname() const -{ - return getName(impl->fd, true); -} - -std::string Socket::getPeername() const -{ - return getName(impl->fd, false); -} - std::string Socket::getPeerAddress() const { - if (connectname.empty()) { - connectname = getName(impl->fd, false, true); + if (peername.empty()) { + peername = getName(impl->fd, false); } - return connectname; + return peername; } std::string Socket::getLocalAddress() const { - return getName(impl->fd, true, true); -} - -uint16_t Socket::getLocalPort() const -{ - return std::atoi(getService(impl->fd, true).c_str()); -} - -uint16_t Socket::getRemotePort() const -{ - return std::atoi(getService(impl->fd, true).c_str()); + if (localname.empty()) { + localname = getName(impl->fd, true); + } + return localname; } int Socket::getError() const diff --git a/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp b/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp index 8f5f29d793..10f1c8a563 100644 --- a/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp +++ b/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp @@ -27,6 +27,8 @@ #include <string.h> #include <netdb.h> +#include <algorithm> + namespace qpid { namespace sys { @@ -46,15 +48,9 @@ SocketAddress::SocketAddress(const SocketAddress& sa) : SocketAddress& SocketAddress::operator=(const SocketAddress& sa) { - if (&sa != this) { - host = sa.host; - port = sa.port; + SocketAddress temp(sa); - if (addrInfo) { - ::freeaddrinfo(addrInfo); - addrInfo = 0; - } - } + std::swap(temp, *this); return *this; } @@ -65,9 +61,23 @@ SocketAddress::~SocketAddress() } } -std::string SocketAddress::asString() const +std::string SocketAddress::asString(bool numeric) const { - return host + ":" + port; + if (!numeric) + return host + ":" + port; + // Canonicalise into numeric id + const ::addrinfo& ai = getAddrInfo(*this); + char servName[NI_MAXSERV]; + char dispName[NI_MAXHOST]; + if (int rc=::getnameinfo(ai.ai_addr, ai.ai_addrlen, + dispName, sizeof(dispName), + servName, sizeof(servName), + NI_NUMERICHOST | NI_NUMERICSERV) != 0) + throw QPID_POSIX_ERROR(rc); + std::string s(dispName); + s += ":"; + s += servName; + return s; } const ::addrinfo& getAddrInfo(const SocketAddress& sa) @@ -88,7 +98,7 @@ const ::addrinfo& getAddrInfo(const SocketAddress& sa) int n = ::getaddrinfo(node, service, &hints, &sa.addrInfo); if (n != 0) - throw Exception(QPID_MSG("Cannot resolve " << sa.host << ": " << ::gai_strerror(n))); + throw Exception(QPID_MSG("Cannot resolve " << sa.asString(false) << ": " << ::gai_strerror(n))); } return *sa.addrInfo; diff --git a/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp b/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp index a58a137473..734ebb483a 100644 --- a/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp +++ b/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp @@ -117,7 +117,7 @@ void SslAcceptor::readable(DispatchHandle& h) { SslConnector::SslConnector(const SslSocket& s, Poller::shared_ptr poller, std::string hostname, - uint16_t port, + std::string port, ConnectedCallback connCb, FailedCallback failCb) : DispatchHandle(s, diff --git a/qpid/cpp/src/qpid/sys/ssl/SslIo.h b/qpid/cpp/src/qpid/sys/ssl/SslIo.h index 53ac69d8d6..8785852c24 100644 --- a/qpid/cpp/src/qpid/sys/ssl/SslIo.h +++ b/qpid/cpp/src/qpid/sys/ssl/SslIo.h @@ -73,7 +73,7 @@ public: SslConnector(const SslSocket& socket, Poller::shared_ptr poller, std::string hostname, - uint16_t port, + std::string port, ConnectedCallback connCb, FailedCallback failCb = 0); diff --git a/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp b/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp index 01e2658877..f7483a220c 100644 --- a/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp +++ b/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp @@ -158,7 +158,7 @@ void SslSocket::setNonblocking() const PR_SetSocketOption(socket, &option); } -void SslSocket::connect(const std::string& host, uint16_t port) const +void SslSocket::connect(const std::string& host, const std::string& port) const { std::stringstream namestream; namestream << host << ":" << port; @@ -180,7 +180,7 @@ void SslSocket::connect(const std::string& host, uint16_t port) const PRHostEnt hostEntry; PR_CHECK(PR_GetHostByName(host.data(), hostBuffer, PR_NETDB_BUF_SIZE, &hostEntry)); PRNetAddr address; - int value = PR_EnumerateHostEnt(0, &hostEntry, port, &address); + int value = PR_EnumerateHostEnt(0, &hostEntry, boost::lexical_cast<PRUint16>(port), &address); if (value < 0) { throw Exception(QPID_MSG("Error getting address for host: " << ErrorString())); } else if (value == 0) { diff --git a/qpid/cpp/src/qpid/sys/ssl/SslSocket.h b/qpid/cpp/src/qpid/sys/ssl/SslSocket.h index 25712c98d5..993859495b 100644 --- a/qpid/cpp/src/qpid/sys/ssl/SslSocket.h +++ b/qpid/cpp/src/qpid/sys/ssl/SslSocket.h @@ -53,7 +53,7 @@ public: * NSSInit().*/ void setCertName(const std::string& certName); - void connect(const std::string& host, uint16_t port) const; + void connect(const std::string& host, const std::string& port) const; void close() const; diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp index 71138757a5..8d84fdb7b2 100644 --- a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -175,20 +175,20 @@ private: FailedCallback failCallback; const Socket& socket; const std::string hostname; - const uint16_t port; + const std::string port; public: AsynchConnector(const Socket& socket, - std::string hostname, - uint16_t port, + const std::string& hostname, + const std::string& port, ConnectedCallback connCb, FailedCallback failCb = 0); void start(Poller::shared_ptr poller); }; AsynchConnector::AsynchConnector(const Socket& sock, - std::string hname, - uint16_t p, + const std::string& hname, + const std::string& p, ConnectedCallback connCb, FailedCallback failCb) : connCallback(connCb), failCallback(failCb), socket(sock), @@ -218,8 +218,8 @@ AsynchAcceptor* AsynchAcceptor::create(const Socket& s, } AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s, - std::string hostname, - uint16_t port, + const std::string& hostname, + const std::string& port, ConnectedCallback connCb, FailedCallback failCb) { diff --git a/qpid/cpp/src/qpid/sys/windows/Socket.cpp b/qpid/cpp/src/qpid/sys/windows/Socket.cpp index 2ce274acc9..baa80f04e0 100755 --- a/qpid/cpp/src/qpid/sys/windows/Socket.cpp +++ b/qpid/cpp/src/qpid/sys/windows/Socket.cpp @@ -89,7 +89,7 @@ namespace sys { namespace { -std::string getName(SOCKET fd, bool local, bool includeService = false) +std::string getName(SOCKET fd, bool local) { sockaddr_in name; // big enough for any socket address socklen_t namelen = sizeof(name); @@ -101,41 +101,12 @@ std::string getName(SOCKET fd, bool local, bool includeService = false) char servName[NI_MAXSERV]; char dispName[NI_MAXHOST]; - if (includeService) { - if (int rc = ::getnameinfo((sockaddr*)&name, namelen, - dispName, sizeof(dispName), - servName, sizeof(servName), - NI_NUMERICHOST | NI_NUMERICSERV) != 0) - throw qpid::Exception(QPID_MSG(gai_strerror(rc))); - return std::string(dispName) + ":" + std::string(servName); - } else { - if (int rc = ::getnameinfo((sockaddr*)&name, namelen, - dispName, sizeof(dispName), - 0, 0, - NI_NUMERICHOST) != 0) - throw qpid::Exception(QPID_MSG(gai_strerror(rc))); - return dispName; - } -} - -std::string getService(SOCKET fd, bool local) -{ - sockaddr_in name; // big enough for any socket address - socklen_t namelen = sizeof(name); - - if (local) { - QPID_WINSOCK_CHECK(::getsockname(fd, (sockaddr*)&name, &namelen)); - } else { - QPID_WINSOCK_CHECK(::getpeername(fd, (sockaddr*)&name, &namelen)); - } - - char servName[NI_MAXSERV]; if (int rc = ::getnameinfo((sockaddr*)&name, namelen, - 0, 0, + dispName, sizeof(dispName), servName, sizeof(servName), NI_NUMERICHOST | NI_NUMERICSERV) != 0) throw qpid::Exception(QPID_MSG(gai_strerror(rc))); - return servName; + return std::string(dispName) + ":" + std::string(servName); } } // namespace @@ -179,34 +150,22 @@ Socket::createSocket(const SocketAddress& sa) const } } -void Socket::setTimeout(const Duration& interval) const -{ - const SOCKET& socket = impl->fd; - int64_t nanosecs = interval; - nanosecs /= (1000 * 1000); // nsecs -> usec -> msec - int msec = 0; - if (nanosecs > std::numeric_limits<int>::max()) - msec = std::numeric_limits<int>::max(); - else - msec = static_cast<int>(nanosecs); - setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&msec, sizeof(msec)); - setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&msec, sizeof(msec)); -} - void Socket::setNonblocking() const { u_long nonblock = 1; QPID_WINSOCK_CHECK(ioctlsocket(impl->fd, FIONBIO, &nonblock)); } -void Socket::connect(const std::string& host, uint16_t port) const +void Socket::connect(const std::string& host, const std::string& port) const { - SocketAddress sa(host, boost::lexical_cast<std::string>(port)); + SocketAddress sa(host, port); connect(sa); } void Socket::connect(const SocketAddress& addr) const { + peername = addr.asString(false); + const SOCKET& socket = impl->fd; const addrinfo *addrs = &(getAddrInfo(addr)); int error = 0; @@ -221,7 +180,7 @@ Socket::connect(const SocketAddress& addr) const addrs = addrs->ai_next; } if (error) - throw qpid::Exception(QPID_MSG(strError(error) << ": " << connectname)); + throw qpid::Exception(QPID_MSG(strError(error) << ": " << peername)); } void @@ -252,7 +211,7 @@ int Socket::read(void *buf, size_t count) const return received; } -int Socket::listen(uint16_t port, int backlog) const +int Socket::listen(const std::string&, const std::string& port, int backlog) const { const SOCKET& socket = impl->fd; BOOL yes=1; @@ -260,7 +219,7 @@ int Socket::listen(uint16_t port, int backlog) const struct sockaddr_in name; memset(&name, 0, sizeof(name)); name.sin_family = AF_INET; - name.sin_port = htons(port); + name.sin_port = htons(boost::lexical_cast<uint16_t>(port)); name.sin_addr.s_addr = 0; if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) == SOCKET_ERROR) throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(WSAGetLastError()))); @@ -282,36 +241,18 @@ Socket* Socket::accept() const else throw QPID_WINDOWS_ERROR(WSAGetLastError()); } -std::string Socket::getSockname() const -{ - return getName(impl->fd, true); -} - -std::string Socket::getPeername() const -{ - return getName(impl->fd, false); -} - std::string Socket::getPeerAddress() const { - if (!connectname.empty()) - return std::string (connectname); - return getName(impl->fd, false, true); + if (peername.empty()) + peername = getName(impl->fd, false); + return peername; } std::string Socket::getLocalAddress() const { - return getName(impl->fd, true, true); -} - -uint16_t Socket::getLocalPort() const -{ - return atoi(getService(impl->fd, true).c_str()); -} - -uint16_t Socket::getRemotePort() const -{ - return atoi(getService(impl->fd, true).c_str()); + if (localname.empty()) + localname = getName(impl->fd, true); + return localname; } int Socket::getError() const diff --git a/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp b/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp index 5efdad0183..ac43cd2d23 100644 --- a/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp +++ b/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp @@ -63,7 +63,7 @@ SocketAddress::~SocketAddress() ::freeaddrinfo(addrInfo); } -std::string SocketAddress::asString() const +std::string SocketAddress::asString(bool) const { return host + ":" + port; } diff --git a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp index d0c6668b72..1d5289dc90 100644 --- a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp +++ b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp @@ -599,13 +599,12 @@ namespace qpid { // populate the agent with multiple test objects const size_t objCount = 50; std::vector<TestManageable *> tmv; - uint32_t objLen; for (size_t i = 0; i < objCount; i++) { std::stringstream key; key << "testobj-" << i; TestManageable *tm = new TestManageable(agent, key.str()); - objLen = tm->GetManagementObject()->writePropertiesSize(); + (void) tm->GetManagementObject()->writePropertiesSize(); agent->addObject(tm->GetManagementObject(), key.str()); tmv.push_back(tm); } diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp index 939f8f2b88..3c0cff7350 100644 --- a/qpid/cpp/src/tests/ClientSessionTest.cpp +++ b/qpid/cpp/src/tests/ClientSessionTest.cpp @@ -271,8 +271,12 @@ QPID_AUTO_TEST_CASE(testOpenFailure) { QPID_AUTO_TEST_CASE(testPeriodicExpiration) { Broker::Options opts; opts.queueCleanInterval = 1; + opts.queueFlowStopRatio = 0; + opts.queueFlowResumeRatio = 0; ClientSessionFixture fix(opts); - fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true); + FieldTable args; + args.setInt("qpid.max_count",10); + fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true, arg::arguments=args); for (uint i = 0; i < 10; i++) { Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue"); @@ -283,6 +287,7 @@ QPID_AUTO_TEST_CASE(testPeriodicExpiration) { BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 10u); qpid::sys::sleep(2); BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 5u); + fix.session.messageTransfer(arg::content=Message("Message_11", "my-queue"));//ensure policy is also updated } QPID_AUTO_TEST_CASE(testExpirationOnPop) { diff --git a/qpid/cpp/src/tests/ForkedBroker.cpp b/qpid/cpp/src/tests/ForkedBroker.cpp index 53eaa7e1ce..10674b5175 100644 --- a/qpid/cpp/src/tests/ForkedBroker.cpp +++ b/qpid/cpp/src/tests/ForkedBroker.cpp @@ -68,8 +68,7 @@ ForkedBroker::~ForkedBroker() { } if (!dataDir.empty()) { - int unused_ret; // Suppress warnings about ignoring return value. - unused_ret = ::system(("rm -rf "+dataDir).c_str()); + (void) ::system(("rm -rf "+dataDir).c_str()); } } diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp index 6aa4c63ed7..fae45a94d0 100644 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp @@ -992,6 +992,78 @@ QPID_AUTO_TEST_CASE(testTtlForever) BOOST_CHECK(in.getTtl() == Duration::FOREVER); } +QPID_AUTO_TEST_CASE(testExclusiveTopicSubscriber) +{ + TopicFixture fix; + std::string address = (boost::format("%1%; { link: { name: 'my-subscription', x-declare: { auto-delete: true, exclusive: true }}}") % fix.topic).str(); + Sender sender = fix.session.createSender(fix.topic); + Receiver receiver1 = fix.session.createReceiver(address); + { + ScopedSuppressLogging sl; + try { + fix.session.createReceiver(address); + fix.session.sync(); + BOOST_FAIL("Expected exception."); + } catch (const MessagingException& /*e*/) {} + } +} + +QPID_AUTO_TEST_CASE(testNonExclusiveSubscriber) +{ + TopicFixture fix; + std::string address = (boost::format("%1%; {node:{type:topic}, link:{name:'my-subscription', x-declare:{auto-delete:true, exclusive:false}}}") % fix.topic).str(); + Receiver receiver1 = fix.session.createReceiver(address); + Receiver receiver2 = fix.session.createReceiver(address); + Sender sender = fix.session.createSender(fix.topic); + sender.send(Message("one"), true); + Message in = receiver1.fetch(Duration::IMMEDIATE); + BOOST_CHECK_EQUAL(in.getContent(), std::string("one")); + sender.send(Message("two"), true); + in = receiver2.fetch(Duration::IMMEDIATE); + BOOST_CHECK_EQUAL(in.getContent(), std::string("two")); + fix.session.acknowledge(); +} + +QPID_AUTO_TEST_CASE(testAcknowledgeUpTo) +{ + QueueFixture fix; + Sender sender = fix.session.createSender(fix.queue); + const uint count(20); + for (uint i = 0; i < count; ++i) { + sender.send(Message((boost::format("Message_%1%") % (i+1)).str())); + } + + Session other = fix.connection.createSession(); + Receiver receiver = other.createReceiver(fix.queue); + std::vector<Message> messages; + for (uint i = 0; i < count; ++i) { + Message msg = receiver.fetch(); + BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1)).str()); + messages.push_back(msg); + } + const uint batch = 10; + other.acknowledgeUpTo(messages[batch-1]);//acknowledge first 10 messages only + + messages.clear(); + other.sync(); + other.close(); + + other = fix.connection.createSession(); + receiver = other.createReceiver(fix.queue); + Message msg; + for (uint i = 0; i < (count-batch); ++i) { + msg = receiver.fetch(); + BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1+batch)).str()); + } + other.acknowledgeUpTo(msg); + other.sync(); + other.close(); + + Message m; + //check queue is empty + BOOST_CHECK(!fix.session.createReceiver(fix.queue).fetch(m, Duration::IMMEDIATE)); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index a752e3afec..6372aa93c3 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -681,7 +681,7 @@ QPID_AUTO_TEST_CASE(testPurgeExpired) { addMessagesToQueue(10, queue); BOOST_CHECK_EQUAL(queue.getMessageCount(), 10u); ::usleep(300*1000); - queue.purgeExpired(); + queue.purgeExpired(0); BOOST_CHECK_EQUAL(queue.getMessageCount(), 5u); } @@ -692,7 +692,7 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) { addMessagesToQueue(10, *queue, 200, 400); BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u); - QueueCleaner cleaner(queues, timer); + QueueCleaner cleaner(queues, &timer); cleaner.start(100 * qpid::sys::TIME_MSEC); ::usleep(300*1000); BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u); diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h index 0c6f39d62e..d195f11aa9 100644 --- a/qpid/cpp/src/tests/SocketProxy.h +++ b/qpid/cpp/src/tests/SocketProxy.h @@ -35,6 +35,8 @@ #include "qpid/sys/Mutex.h" #include "qpid/log/Statement.h" +#include <boost/lexical_cast.hpp> + namespace qpid { namespace tests { @@ -62,7 +64,7 @@ class SocketProxy : private qpid::sys::Runnable : closed(false), joined(true), port(listener.listen()), dropClient(), dropServer() { - client.connect(host, connectPort); + client.connect(host, boost::lexical_cast<std::string>(connectPort)); joined = false; thread = qpid::sys::Thread(static_cast<qpid::sys::Runnable*>(this)); } diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index a19dd305e5..0415a667a2 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -157,8 +157,13 @@ class Popen(subprocess.Popen): try: self.kill() # Just make sure its dead except: pass elif self.expect == EXPECT_RUNNING: - try: self.kill() - except: self.unexpected("expected running, exit code %d" % self.wait()) + if self.poll() != None: + self.unexpected("expected running, exit code %d" % self.returncode) + else: + try: + self.kill() + except Exception,e: + self.unexpected("exception from kill: %s" % str(e)) else: retry(lambda: self.poll() is not None) if self.returncode is None: # Still haven't stopped @@ -544,6 +549,7 @@ class NumberedSender(Thread): "--broker", "localhost:%s"%broker.port(), "--address", "%s;{create:always}"%queue, "--failover-updates", + "--connection-options", "{reconnect:true}", "--content-stdin" ], expect=EXPECT_RUNNING, @@ -562,6 +568,7 @@ class NumberedSender(Thread): try: self.sent = 0 while not self.stopped: + self.sender.assert_running() if self.max: self.condition.acquire() while not self.stopped and self.sent - self.received > self.max: @@ -604,6 +611,7 @@ class NumberedReceiver(Thread): "--broker", "localhost:%s"%broker.port(), "--address", "%s;{create:always}"%queue, "--failover-updates", + "--connection-options", "{reconnect:true}", "--forever" ], expect=EXPECT_RUNNING, @@ -611,15 +619,16 @@ class NumberedReceiver(Thread): self.lock = Lock() self.error = None self.sender = sender + self.received = 0 def read_message(self): return int(self.receiver.stdout.readline()) def run(self): try: - self.received = 0 m = self.read_message() while m != -1: + self.receiver.assert_running() assert(m <= self.received) # Check for missing messages if (m == self.received): # Ignore duplicates self.received += 1 diff --git a/qpid/cpp/src/tests/cluster_test_logs.py b/qpid/cpp/src/tests/cluster_test_logs.py index 9f7d1e2f6c..a0ce8fb9c3 100755 --- a/qpid/cpp/src/tests/cluster_test_logs.py +++ b/qpid/cpp/src/tests/cluster_test_logs.py @@ -54,7 +54,7 @@ def filter_log(log): 'caught up', 'active for links|Passivating links|Activating links', 'info Connection.* connected to', # UpdateClient connection - 'warning Connection [\d+ [0-9.:]+] closed', # UpdateClient connection + 'warning Connection \\[[-0-9.: ]+\\] closed', # UpdateClient connection 'warning Broker closed connection: 200, OK', 'task late', 'task overran', diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index caa41fa001..c1d9103f08 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -18,11 +18,12 @@ # under the License. # -import os, signal, sys, time, imp, re, subprocess, glob, cluster_test_logs +import os, signal, sys, time, imp, re, subprocess, glob, random, logging +import cluster_test_logs from qpid import datatypes, messaging from brokertest import * from qpid.harness import Skipped -from qpid.messaging import Message, Empty, Disposition, REJECTED +from qpid.messaging import Message, Empty, Disposition, REJECTED, util from threading import Thread, Lock, Condition from logging import getLogger from itertools import chain @@ -96,9 +97,15 @@ class ShortTests(BrokerTest): destination="amq.direct", message=qpid.datatypes.Message(props, "content")) + # Try message with TTL and differnet headers/properties + cluster[0].send_message("q", Message(durable=True, ttl=100000)) + cluster[0].send_message("q", Message(durable=True, properties={}, ttl=100000)) + cluster[0].send_message("q", Message(durable=True, properties={"x":10}, ttl=100000)) + # Now update a new member and compare their dumps. cluster.start(args=["--test-store-dump", "updatee.dump"]) assert readfile("direct.dump") == readfile("updatee.dump") + os.remove("direct.dump") os.remove("updatee.dump") @@ -253,6 +260,7 @@ acl allow all all client was attached. """ args=["--mgmt-pub-interval=1","--log-enable=trace+:management"] + # First broker will be killed. cluster0 = self.cluster(1, args=args) cluster1 = self.cluster(1, args=args) assert 0 == subprocess.call( @@ -287,6 +295,7 @@ acl allow all all # Force a change of elder cluster0.start() + cluster0[0].expect=EXPECT_EXIT_FAIL # About to die. cluster0[0].kill() time.sleep(2) # Allow a management interval to pass. # Verify logs are consistent @@ -525,7 +534,7 @@ acl allow all all receiver.wait() q_obj.update() assert not q_obj.flowStopped - assert q_obj.msgDepth == 0 + self.assertEqual(q_obj.msgDepth, 0) # verify that the sender has become unblocked sender.join(timeout=5) @@ -685,6 +694,25 @@ acl allow all all self.assert_browse(s1, "q", ["foo"]) + def test_ttl_consistent(self): + """Ensure we don't get inconsistent errors with message that have TTL very close together""" + messages = [ Message(str(i), ttl=i/1000.0) for i in xrange(0,1000)] + messages.append(Message("x")) + cluster = self.cluster(2) + sender = cluster[0].connect().session().sender("q;{create:always}") + + def fetch(b): + receiver = b.connect().session().receiver("q;{create:always}") + while receiver.fetch().content != "x": pass + + for m in messages: sender.send(m, sync=False) + for m in messages: sender.send(m, sync=False) + fetch(cluster[0]) + fetch(cluster[1]) + for m in messages: sender.send(m, sync=False) + cluster.start() + fetch(cluster[2]) + class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" def duration(self): @@ -697,22 +725,28 @@ class LongTests(BrokerTest): # Original cluster will all be killed so expect exit with failure cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL) + for b in cluster: b.ready() # Wait for brokers to be ready for b in cluster: ErrorGenerator(b) # Start sender and receiver threads cluster[0].declare_queue("test-queue") - sender = NumberedSender(cluster[1], 1000) # Max queue depth - receiver = NumberedReceiver(cluster[2], sender) + sender = NumberedSender(cluster[0], 1000) # Max queue depth + receiver = NumberedReceiver(cluster[0], sender) receiver.start() sender.start() + # Wait for sender & receiver to get up and running + retry(lambda: receiver.received > 0) # Kill original brokers, start new ones for the duration. endtime = time.time() + self.duration() i = 0 while time.time() < endtime: + sender.sender.assert_running() + receiver.receiver.assert_running() cluster[i].kill() i += 1 b = cluster.start(expect=EXPECT_EXIT_FAIL) + for b in cluster[i:]: b.ready() ErrorGenerator(b) time.sleep(5) sender.stop() @@ -782,7 +816,7 @@ class LongTests(BrokerTest): args += ["--log-enable=trace+:management"] # Use store if present. if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib] - cluster = self.cluster(3, args) + cluster = self.cluster(3, args, expect=EXPECT_EXIT_FAIL) # brokers will be killed clients = [] # Per-broker list of clients that only connect to one broker. mclients = [] # Management clients that connect to every broker in the cluster. @@ -806,7 +840,7 @@ class LongTests(BrokerTest): endtime = time.time() + self.duration() # For long duration, first run is a quarter of the duration. - runtime = max(5, self.duration() / 4.0) + runtime = min(5.0, self.duration() / 3.0) alive = 0 # First live cluster member for i in range(len(cluster)): start_clients(cluster[i]) start_mclients(cluster[alive]) @@ -817,7 +851,7 @@ class LongTests(BrokerTest): for b in cluster[alive:]: b.ready() # Check if a broker crashed. # Kill the first broker, expect the clients to fail. b = cluster[alive] - b.expect = EXPECT_EXIT_FAIL + b.ready() b.kill() # Stop the brokers clients and all the mclients. for c in clients[alive] + mclients: @@ -827,11 +861,15 @@ class LongTests(BrokerTest): mclients = [] # Start another broker and clients alive += 1 - cluster.start() + cluster.start(expect=EXPECT_EXIT_FAIL) + cluster[-1].ready() # Wait till its ready start_clients(cluster[-1]) start_mclients(cluster[alive]) for c in chain(mclients, *clients): c.stop() + for b in cluster[alive:]: + b.ready() # Verify still alive + b.kill() # Verify that logs are consistent cluster_test_logs.verify_logs() @@ -844,7 +882,7 @@ class LongTests(BrokerTest): end = time.time() + self.duration() while (time.time() < end): # Get a management interval for i in xrange(1000): cluster[0].connect().close() - cluster_test_logs.verify_logs() + cluster_test_logs.verify_logs() def test_flowlimit_failover(self): """Test fail-over during continuous send-receive with flow control @@ -853,34 +891,149 @@ class LongTests(BrokerTest): # Original cluster will all be killed so expect exit with failure cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL) - #for b in cluster: ErrorGenerator(b) + for b in cluster: b.ready() # Wait for brokers to be ready # create a queue with rather draconian flow control settings ssn0 = cluster[0].connect().session() s0 = ssn0.sender("test-queue; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':2000, 'qpid.flow_resume_count':100}}}}") - receiver = NumberedReceiver(cluster[2]) + receiver = NumberedReceiver(cluster[0]) receiver.start() - senders = [NumberedSender(cluster[i]) for i in range(1,3)] + senders = [NumberedSender(cluster[0]) for i in range(1,3)] for s in senders: s.start() + # Wait for senders & receiver to get up and running + retry(lambda: receiver.received > 2*senders) # Kill original brokers, start new ones for the duration. endtime = time.time() + self.duration(); i = 0 while time.time() < endtime: + for s in senders: s.sender.assert_running() + receiver.receiver.assert_running() + for b in cluster[i:]: b.ready() # Check if any broker crashed. cluster[i].kill() i += 1 b = cluster.start(expect=EXPECT_EXIT_FAIL) - #ErrorGenerator(b) time.sleep(5) - #b = cluster[0] - #b.startQmf() for s in senders: s.stop() receiver.stop() for i in range(i, len(cluster)): cluster[i].kill() + def test_ttl_failover(self): + """Test that messages with TTL don't cause problems in a cluster with failover""" + + class Client(StoppableThread): + + def __init__(self, broker): + StoppableThread.__init__(self) + self.connection = broker.connect(reconnect=True) + self.auto_fetch_reconnect_urls(self.connection) + self.session = self.connection.session() + + def auto_fetch_reconnect_urls(self, conn): + """Replacment for qpid.messaging.util version which is noisy""" + ssn = conn.session("auto-fetch-reconnect-urls") + rcv = ssn.receiver("amq.failover") + rcv.capacity = 10 + + def main(): + while True: + try: + msg = rcv.fetch() + qpid.messaging.util.set_reconnect_urls(conn, msg) + ssn.acknowledge(msg, sync=False) + except messaging.exceptions.LinkClosed: return + except messaging.exceptions.ConnectionError: return + + thread = Thread(name="auto-fetch-reconnect-urls", target=main) + thread.setDaemon(True) + thread.start() + + def stop(self): + StoppableThread.stop(self) + self.connection.detach() + + class Sender(Client): + def __init__(self, broker, address): + Client.__init__(self, broker) + self.sent = 0 # Number of messages _reliably_ sent. + self.sender = self.session.sender(address, capacity=1000) + + def send_counted(self, ttl): + self.sender.send(Message(str(self.sent), ttl=ttl)) + self.sent += 1 + + def run(self): + while not self.stopped: + choice = random.randint(0,4) + if choice == 0: self.send_counted(None) # No ttl + elif choice == 1: self.send_counted(100000) # Large ttl + else: # Small ttl, might expire + self.sender.send(Message("", ttl=random.random()/10)) + self.sender.send(Message("z"), sync=True) # Chaser. + + class Receiver(Client): + + def __init__(self, broker, address): + Client.__init__(self, broker) + self.received = 0 # Number of non-empty (reliable) messages received. + self.receiver = self.session.receiver(address, capacity=1000) + def run(self): + try: + while True: + m = self.receiver.fetch(1) + if m.content == "z": break + if m.content: # Ignore unreliable messages + # Ignore duplicates + if int(m.content) == self.received: self.received += 1 + except Exception,e: self.error = e + + # def test_ttl_failover + + # Original cluster will all be killed so expect exit with failure + # Set small purge interval. + cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["--queue-purge-interval=1"]) + for b in cluster: b.ready() # Wait for brokers to be ready + + # Python client failover produces noisy WARN logs, disable temporarily + logger = logging.getLogger() + log_level = logger.getEffectiveLevel() + logger.setLevel(logging.ERROR) + try: + # Start sender and receiver threads + receiver = Receiver(cluster[0], "q;{create:always}") + receiver.start() + sender = Sender(cluster[0], "q;{create:always}") + sender.start() + # Wait for sender & receiver to get up and running + retry(lambda: receiver.received > 0) + + # Kill brokers in a cycle. + endtime = time.time() + self.duration() + runtime = min(5.0, self.duration() / 4.0) + i = 0 + while time.time() < endtime: + for b in cluster[i:]: b.ready() # Check if any broker crashed. + cluster[i].kill() + i += 1 + b = cluster.start(expect=EXPECT_EXIT_FAIL) + b.ready() + time.sleep(runtime) + sender.stop() + receiver.stop() + for b in cluster[i:]: + b.ready() # Check it didn't crash + b.kill() + self.assertEqual(sender.sent, receiver.received) + cluster_test_logs.verify_logs() + + finally: + # Detach to avoid slow reconnect attempts during shut-down if test fails. + sender.connection.detach() + receiver.connection.detach() + logger.setLevel(log_level) class StoreTests(BrokerTest): """ diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py index 201b06a4a2..49bdecdd95 100755 --- a/qpid/cpp/src/tests/federation.py +++ b/qpid/cpp/src/tests/federation.py @@ -649,10 +649,17 @@ class FederationTests(TestBase010): self.verify_cleanup() - def test_dynamic_headers(self): + def test_dynamic_headers_any(self): + self.do_test_dynamic_headers('any') + + def test_dynamic_headers_all(self): + self.do_test_dynamic_headers('all') + + + def do_test_dynamic_headers(self, match_mode): session = self.session r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) - r_session = r_conn.session("test_dynamic_headers") + r_session = r_conn.session("test_dynamic_headers_%s" % match_mode) session.exchange_declare(exchange="fed.headers", type="headers") r_session.exchange_declare(exchange="fed.headers", type="headers") @@ -671,7 +678,7 @@ class FederationTests(TestBase010): sleep(5) session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) - session.exchange_bind(queue="fed1", exchange="fed.headers", binding_key="key1", arguments={'x-match':'any', 'class':'first'}) + session.exchange_bind(queue="fed1", exchange="fed.headers", binding_key="key1", arguments={'x-match':match_mode, 'class':'first'}) self.subscribe(queue="fed1", destination="f1") queue = session.incoming("f1") diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index 4d83c5b5de..c33f2e4852 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -8,9 +8,9 @@ - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at -- +- - http://www.apache.org/licenses/LICENSE-2.0 -- +- - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -78,10 +78,6 @@ <field name="left" type="vbin16"/> <!-- packed member-id array --> </control> - <control name="message-expired" code="0x12"> - <field name="id" type="uint64"/> - </control> - <domain name="error-type" type="uint8" label="Types of error"> <enum> <choice name="none" value="0"/> @@ -89,7 +85,7 @@ <choice name="connection" value="2"/> </enum> </domain> - + <!-- Check for error consistency across the cluster --> <control name="error-check" code="0x14"> <field name="type" type="error-type"/> @@ -116,6 +112,11 @@ <field name="message" type="vbin32"/> </control> + <!-- Update the cluster time --> + <control name="clock" code="0x22"> + <field name="time" type="uint64"/> + </control> + </class> <!-- Controls associated with a specific connection. --> @@ -149,7 +150,7 @@ <!-- Abort a connection that is sending invalid data. --> <control name="abort" code="0x4"/> - + <!-- Update controls. Sent to a new broker in joining mode. A connection is updated as followed: - send the shadow's management ID in shadow-perpare on the update connection @@ -183,7 +184,7 @@ <field name="position" type="sequence-no"/> <field name="tag" type="str8"/> <field name="id" type="sequence-no"/> - <field name="acquired" type="bit"/> <!--If not set, message follows. --> + <field name="acquired" type="bit"/> <!--If not set, message is on update queue. --> <field name="accepted" type="bit"/> <field name="cancelled" type="bit"/> <field name="completed" type="bit"/> @@ -192,9 +193,9 @@ <field name="enqueued" type="bit"/> <field name="credit" type="uint32"/> </control> - + <!-- Tx transaction state. --> - <control name="tx-start" code="0x12"/> + <control name="tx-start" code="0x12"/> <control name="tx-accept" code="0x13"> <field name="commands" type="sequence-set"/> </control> <control name="tx-dequeue" code="0x14"> <field name="queue" type="str8"/> </control> <control name="tx-enqueue" code="0x15"> <field name="queue" type="str8"/> </control> @@ -204,7 +205,7 @@ </control> <control name="tx-end" code="0x17"/> <control name="accumulated-ack" code="0x18"> <field name="commands" type="sequence-set"/> </control> - + <!-- Consumers in the connection's output task --> <control name="output-task" code="0x19"> <field name="channel" type="uint16"/> @@ -253,9 +254,6 @@ <!-- Replicate encoded exchanges/queues. --> <control name="exchange" code="0x31"><field name="encoded" type="str32"/></control> - <!-- Set expiry-id for subsequent messages. --> - <control name="expiry-id" code="0x33"><field name="expiry-id" type="uint64"/></control> - <!-- Add a listener to a queue --> <control name="add-queue-listener" code="0x34"> <field name="queue" type="str8"/> @@ -289,6 +287,18 @@ <field name="state" type="map"/> <!-- "name"=value --> </control> + <!-- Update the cluster time --> + <control name="clock" code="0x40"> + <field name="time" type="uint64"/> + </control> + + <!-- Update a queue's dequeue rate --> + <control name="queue-dequeue-since-purge-state" code="0x41"> + <field name="queue" type="str8"/> + <field name="dequeueSincePurge" type="uint32"/> + </control> + + </class> </amqp> |