diff options
Diffstat (limited to 'qpid/cpp')
153 files changed, 2629 insertions, 1281 deletions
diff --git a/qpid/cpp/configure.ac b/qpid/cpp/configure.ac index 90b943f047..144ca3b607 100644 --- a/qpid/cpp/configure.ac +++ b/qpid/cpp/configure.ac @@ -87,13 +87,14 @@ else AC_CHECK_DECL([__SUNPRO_CC], [SUNCC=yes], [SUNCC=no]) # Set up for sun CC compiler - if test x$SUNCC = xno; then + if test x$SUNCC = xyes; then if test "${enableval}" = yes; then WARNING_FLAGS=+w fi CXXFLAGS="$CXXFLAGS -library=stlport4 -mt" LD="$CXX" LDFLAGS="$LDFLAGS -library=stlport4 -mt" + AC_SUBST([SUNCC_RUNTIME_LIBS], [-lCrun]) fi fi @@ -150,8 +151,9 @@ test -z "$RUBY" && AC_MSG_ERROR([Missing ruby installation (try "yum install rub specdir=`pwd`/$srcdir/../specs AMQP_FINAL_XML=$specdir/amqp.0-10-qpid-errata.xml +test -f $AMQP_FINAL_XML || test -d $srcdir/src/gen || AC_MSG_ERROR([Neither AMQP specs nor spec-generated code present; cannot build.]) AC_SUBST(AMQP_FINAL_XML) -AM_CONDITIONAL([GENERATE], [ls $AMQP_FINAL_XML >/dev/null]) +AM_CONDITIONAL([GENERATE], [test -f $AMQP_FINAL_XML]) # URL and download URL for the package. URL=http://rhm.et.redhat.com/qpidc @@ -311,14 +313,19 @@ LIBS=$tmp_LIBS AM_CONDITIONAL([RDMA], [test x$with_RDMA = xyes]) # Setup --with-ssl/--without-ssl as arguments to configure -tmp_LIBS=$LIBS +SSL_CFLAGS="" +SSL_LDFLAGS="" AC_ARG_WITH([ssl], [AS_HELP_STRING([--with-ssl], [Build with support for SSL])], [case ${withval} in yes) with_SSL=yes - PKG_CHECK_MODULES([SSL], [nspr],,[AC_MSG_ERROR([nspr not found])]) - PKG_CHECK_MODULES([SSL], [nss],,[AC_MSG_ERROR([nss not found])]) + AC_PATH_PROG([NSPR_CONFIG], [nspr-config]) + AS_IF([test x$NSPR_CONFIG = x], [AC_MSG_ERROR([libnspr not found])], []) + AC_PATH_PROG([NSS_CONFIG], [nss-config]) + AS_IF([test x$NSS_CONFIG = x], [AC_MSG_ERROR([libnss not found])], []) + SSL_CFLAGS="`$NSPR_CONFIG --cflags` `$NSS_CONFIG --cflags`" + SSL_LDFLAGS="`$NSPR_CONFIG --libs` `$NSS_CONFIG --libs`" ;; no) with_SSL=no @@ -329,13 +336,18 @@ AC_ARG_WITH([ssl], esac], [ with_SSL=yes - PKG_CHECK_MODULES([SSL], [nspr],,[with_SSL=no]) - PKG_CHECK_MODULES([SSL], [nss],,[with_SSL=no]) + AC_PATH_PROG([NSPR_CONFIG], [nspr-config]) + AS_IF([test x$NSPR_CONFIG = x], [with_SSL=no], + [AC_PATH_PROG([NSS_CONFIG], [nss-config]) + AS_IF([test x$NSS_CONFIG = x], [with_SSL=no], + [SSL_CFLAGS="`$NSPR_CONFIG --cflags` `$NSS_CONFIG --cflags`" + SSL_LDFLAGS="`$NSPR_CONFIG --libs` `$NSS_CONFIG --libs`"])]) ] ) # Remove from LIBS, we will link it explicitly in make files. -LIBS=$tmp_LIBS AM_CONDITIONAL([SSL], [test x$with_SSL = xyes]) +AC_SUBST([SSL_CFLAGS]) +AC_SUBST([SSL_LDFLAGS]) poller=no @@ -367,6 +379,37 @@ if test $poller = xno; then AC_MSG_ERROR([Polling mechanism not implemented for $host]) fi +#Guess host architecture, to choose platform-dependent objects +case "$host" in + *sun-solaris*) + arch=solaris + ;; +esac +AM_CONDITIONAL([SUNOS], [test x$arch = xsolaris]) + +# Check for some syslog capabilities not present in all systems +AC_TRY_COMPILE([#include <sys/syslog.h>], + [int v = LOG_AUTHPRIV;], + [AC_DEFINE([HAVE_LOG_AUTHPRIV], [1], [Set to 1 whether LOG_AUTHPRIV is supported.])],) + +AC_TRY_COMPILE([#include <sys/syslog.h>], + [int v = LOG_FTP;], + [AC_DEFINE([HAVE_LOG_FTP], [1], [Set to 1 whether LOG_FTP is supported.])],) + +#Check if we need to include libacl to provide acl API +gl_saved_libs=$LIBS + AC_SEARCH_LIBS(acl, [acl], + [test "$ac_cv_search_acl" = "none required" || + LIB_ACL=$ac_cv_search_acl]) + AC_SUBST([LIB_ACL]) +LIBS=$gl_saved_libs + +SOCKLIBS="" +AC_CHECK_LIB([socket],[socket],[SOCKET_LIB="-lsocket"],[SOCKET_LIB=""],[]) +AC_CHECK_LIB([nsl],[getipnodebyname],[NSL_LIB="-lnsl"],[NSL_LIB=""],[]) +SOCKLIBS="$SOCKET_LIB $NSL_LIB" +AC_SUBST([SOCKLIBS]) + AM_PATH_PYTHON() # Files to generate diff --git a/qpid/cpp/examples/direct/Makefile.am b/qpid/cpp/examples/direct/Makefile.am index 07431f6fe4..8f220d1b7e 100644 --- a/qpid/cpp/examples/direct/Makefile.am +++ b/qpid/cpp/examples/direct/Makefile.am @@ -18,6 +18,7 @@ # examplesdir=$(pkgdatadir)/examples/direct +MAKELDFLAGS=$(CLIENTFLAGS) include $(top_srcdir)/examples/makedist.mk noinst_PROGRAMS=direct_producer listener declare_queues diff --git a/qpid/cpp/examples/failover/Makefile.am b/qpid/cpp/examples/failover/Makefile.am index c41f26eaba..67d9eb4b95 100644 --- a/qpid/cpp/examples/failover/Makefile.am +++ b/qpid/cpp/examples/failover/Makefile.am @@ -18,6 +18,7 @@ # examplesdir=$(pkgdatadir)/examples/failover +MAKELDFLAGS=$(CLIENTFLAGS) include $(top_srcdir)/examples/makedist.mk noinst_PROGRAMS=declare_queues resuming_receiver replaying_sender diff --git a/qpid/cpp/examples/fanout/Makefile.am b/qpid/cpp/examples/fanout/Makefile.am index 6cbf2c93ad..395fcec9aa 100644 --- a/qpid/cpp/examples/fanout/Makefile.am +++ b/qpid/cpp/examples/fanout/Makefile.am @@ -18,6 +18,7 @@ # examplesdir=$(pkgdatadir)/examples/fanout +MAKELDFLAGS=$(CLIENTFLAGS) include $(top_srcdir)/examples/makedist.mk noinst_PROGRAMS=fanout_producer listener diff --git a/qpid/cpp/examples/makedist.mk b/qpid/cpp/examples/makedist.mk index 4345378983..ce34fd31e2 100644 --- a/qpid/cpp/examples/makedist.mk +++ b/qpid/cpp/examples/makedist.mk @@ -3,7 +3,8 @@ AM_CXXFLAGS = $(WARNING_CFLAGS) INCLUDES = -I$(top_srcdir)/src -I$(top_srcdir)/src/gen -I$(top_builddir)/src -I$(top_builddir)/src/gen CLIENT_LIB=$(top_builddir)/src/libqpidclient.la CONSOLE_LIB=$(top_builddir)/src/libqmfconsole.la -MAKELDFLAG ?= qpidclient +CLIENTFLAGS=-lqpidclient +CONSOLEFLAGS=-lqmfconsole # Generate a simple non-automake Makefile for distribution. MAKEDIST=.libs/Makefile @@ -12,7 +13,7 @@ $(MAKEDIST): Makefile mkdir -p .libs @$(ECHO) CXX=$(CXX) > $(MAKEDIST) @$(ECHO) CXXFLAGS=$(CXXFLAGS) >> $(MAKEDIST) - @$(ECHO) LDFLAGS=-l$(MAKELDFLAG) >> $(MAKEDIST) + @$(ECHO) LDFLAGS=-l$(MAKELDFLAGS) >> $(MAKEDIST) @$(ECHO) >> $(MAKEDIST) @$(ECHO) all: $(noinst_PROGRAMS) >> $(MAKEDIST) @$(ECHO) >> $(MAKEDIST) diff --git a/qpid/cpp/examples/pub-sub/Makefile.am b/qpid/cpp/examples/pub-sub/Makefile.am index bdbf0f8d20..1d52daf638 100644 --- a/qpid/cpp/examples/pub-sub/Makefile.am +++ b/qpid/cpp/examples/pub-sub/Makefile.am @@ -18,6 +18,7 @@ # examplesdir=$(pkgdatadir)/examples/pub-sub +MAKELDFLAGS=$(CLIENTFLAGS) include $(top_srcdir)/examples/makedist.mk noinst_PROGRAMS=topic_listener topic_publisher diff --git a/qpid/cpp/examples/qmf-console/Makefile.am b/qpid/cpp/examples/qmf-console/Makefile.am index 471620f465..04a92c214e 100644 --- a/qpid/cpp/examples/qmf-console/Makefile.am +++ b/qpid/cpp/examples/qmf-console/Makefile.am @@ -19,7 +19,7 @@ examplesdir=$(pkgdatadir)/examples/qmf-console -MAKELDFLAG = qmfconsole +MAKELDFLAGS=$(CONSOLEFLAGS) include $(top_srcdir)/examples/makedist.mk noinst_PROGRAMS=console printevents ping queuestats diff --git a/qpid/cpp/examples/request-response/Makefile.am b/qpid/cpp/examples/request-response/Makefile.am index 8fb95f24fc..d7e7d692de 100644 --- a/qpid/cpp/examples/request-response/Makefile.am +++ b/qpid/cpp/examples/request-response/Makefile.am @@ -18,6 +18,7 @@ # examplesdir=$(pkgdatadir)/examples/request-response +MAKELDFLAGS=$(CLIENTFLAGS) include $(top_srcdir)/examples/makedist.mk noinst_PROGRAMS=client server diff --git a/qpid/cpp/examples/tradedemo/Makefile.am b/qpid/cpp/examples/tradedemo/Makefile.am index ef90e04bb0..a3c7fabd65 100644 --- a/qpid/cpp/examples/tradedemo/Makefile.am +++ b/qpid/cpp/examples/tradedemo/Makefile.am @@ -18,6 +18,7 @@ # examplesdir=$(pkgdatadir)/examples/tradedemo +MAKELDFLAGS=$(CLIENTFLAGS) include $(top_srcdir)/examples/makedist.mk noinst_PROGRAMS=topic_listener topic_publisher declare_queues diff --git a/qpid/cpp/examples/xml-exchange/Makefile.am b/qpid/cpp/examples/xml-exchange/Makefile.am index 0f99aea44e..aa450a32c2 100644 --- a/qpid/cpp/examples/xml-exchange/Makefile.am +++ b/qpid/cpp/examples/xml-exchange/Makefile.am @@ -18,6 +18,7 @@ # examplesdir=$(pkgdatadir)/examples/xml-exchange +MAKELDFLAGS=$(CLIENTFLAGS) include $(top_srcdir)/examples/makedist.mk noinst_PROGRAMS=declare_queues xml_producer listener diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 05fba55251..0eece459b6 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -127,7 +127,6 @@ posix_plat_src = \ qpid/sys/posix/Time.cpp \ qpid/sys/posix/Thread.cpp \ qpid/sys/posix/Shlib.cpp \ - qpid/sys/posix/SystemInfo.cpp \ qpid/sys/posix/Mutex.cpp \ qpid/sys/posix/Fork.cpp \ qpid/sys/posix/StrError.cpp \ @@ -151,7 +150,13 @@ if HAVE_ECF poller = qpid/sys/solaris/ECFPoller.cpp endif -platform_src = $(posix_plat_src) $(poller) +if SUNOS + systeminfo = qpid/sys/solaris/SystemInfo.cpp +else + systeminfo = qpid/sys/posix/SystemInfo.cpp +endif + +platform_src = $(posix_plat_src) $(poller) $(systeminfo) platform_hdr = $(posix_plat_hdr) posix_broker_src = \ diff --git a/qpid/cpp/src/acl.mk b/qpid/cpp/src/acl.mk index 2b3ab4dfd9..95b47acc1c 100644 --- a/qpid/cpp/src/acl.mk +++ b/qpid/cpp/src/acl.mk @@ -31,4 +31,8 @@ acl_la_SOURCES = \ qpid/acl/AclReader.h acl_la_LIBADD = libqpidbroker.la +if SUNOS + acl_la_LIBADD += libqmfagent.la libqmfconsole.la libqpidcommon.la -lboost_program_options $(SUNCC_RUNTIME_LIBS) +endif + acl_la_LDFLAGS = $(PLUGINLDFLAGS) diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk index 0d34788d2e..e2054d75e9 100644 --- a/qpid/cpp/src/cluster.mk +++ b/qpid/cpp/src/cluster.mk @@ -40,6 +40,8 @@ cluster_la_SOURCES = \ $(CMAN_SOURCES) \ qpid/cluster/Cluster.cpp \ qpid/cluster/Cluster.h \ + qpid/cluster/Decoder.cpp \ + qpid/cluster/Decoder.h \ qpid/cluster/PollableQueue.h \ qpid/cluster/ClusterMap.cpp \ qpid/cluster/ClusterMap.h \ @@ -49,14 +51,8 @@ cluster_la_SOURCES = \ qpid/cluster/Connection.h \ qpid/cluster/ConnectionCodec.cpp \ qpid/cluster/ConnectionCodec.h \ - qpid/cluster/ConnectionMap.h \ - qpid/cluster/ConnectionMap.cpp \ qpid/cluster/Cpg.cpp \ qpid/cluster/Cpg.h \ - qpid/cluster/Decoder.cpp \ - qpid/cluster/Decoder.h \ - qpid/cluster/ConnectionDecoder.cpp \ - qpid/cluster/ConnectionDecoder.h \ qpid/cluster/Dispatchable.h \ qpid/cluster/UpdateClient.cpp \ qpid/cluster/UpdateClient.h \ @@ -68,8 +64,11 @@ cluster_la_SOURCES = \ qpid/cluster/ExpiryPolicy.cpp \ qpid/cluster/FailoverExchange.cpp \ qpid/cluster/FailoverExchange.h \ + qpid/cluster/UpdateExchange.h \ + qpid/cluster/LockedConnectionMap.h \ qpid/cluster/Multicaster.cpp \ qpid/cluster/Multicaster.h \ + qpid/cluster/McastFrameHandler.h \ qpid/cluster/NoOpConnectionOutputHandler.h \ qpid/cluster/OutputInterceptor.cpp \ qpid/cluster/OutputInterceptor.h \ diff --git a/qpid/cpp/src/common.vcproj b/qpid/cpp/src/common.vcproj index 893c5bfd7b..5ab2dbafde 100644 --- a/qpid/cpp/src/common.vcproj +++ b/qpid/cpp/src/common.vcproj @@ -420,6 +420,9 @@ RelativePath="gen\qpid\framing\ClusterConnectionExchangeBody.cpp">
</File>
<File
+ RelativePath="gen\qpid\framing\ClusterConnectionExpiryIdBody.cpp">
+ </File>
+ <File
RelativePath="gen\qpid\framing\ClusterConnectionMembershipBody.cpp">
</File>
<File
@@ -1126,6 +1129,9 @@ RelativePath="gen\qpid\framing\ClusterConnectionExchangeBody.h">
</File>
<File
+ RelativePath="gen\qpid\framing\ClusterConnectionExpiryIdBody.h">
+ </File>
+ <File
RelativePath="gen\qpid\framing\ClusterConnectionMembershipBody.h">
</File>
<File
diff --git a/qpid/cpp/src/qpid/SessionId.cpp b/qpid/cpp/src/qpid/SessionId.cpp index fce6619f5d..098998015f 100644 --- a/qpid/cpp/src/qpid/SessionId.cpp +++ b/qpid/cpp/src/qpid/SessionId.cpp @@ -35,7 +35,7 @@ bool SessionId::operator==(const SessionId& id) const { } std::ostream& operator<<(std::ostream& o, const SessionId& id) { - return o << id.getName() << "@" << id.getUserId(); + return o << id.getUserId() << "." << id.getName(); } std::string SessionId::str() const { diff --git a/qpid/cpp/src/qpid/SessionState.cpp b/qpid/cpp/src/qpid/SessionState.cpp index ac75b5c5ff..3e844fb24b 100644 --- a/qpid/cpp/src/qpid/SessionState.cpp +++ b/qpid/cpp/src/qpid/SessionState.cpp @@ -113,7 +113,8 @@ SessionState::ReplayRange SessionState::senderExpected(const SessionPoint& expec void SessionState::senderRecord(const AMQFrame& f) { if (isControl(f)) return; // Ignore control frames. - QPID_LOG_IF(debug, f.getMethod(), getId() << ": sent cmd " << sender.sendPoint.command << ": " << *f.getMethod()); + QPID_LOG(trace, getId() << ": sent cmd " << sender.sendPoint.command << ": " << *f.getBody()); + stateful = true; if (timeout) sender.replayList.push_back(f); sender.unflushedSize += f.encodedSize(); @@ -183,6 +184,7 @@ void SessionState::receiverSetCommandPoint(const SessionPoint& point) { } bool SessionState::receiverRecord(const AMQFrame& f) { + if (receiverTrackingDisabled) return true; //Very nasty hack for push bridges if (isControl(f)) return true; // Ignore control frames. stateful = true; receiver.expected.advance(f); @@ -192,12 +194,13 @@ bool SessionState::receiverRecord(const AMQFrame& f) { receiver.received = receiver.expected; receiver.incomplete += receiverGetCurrent(); } - QPID_LOG_IF(debug, f.getMethod(), getId() << ": recv cmd " << receiverGetCurrent() << ": " << *f.getMethod()); - QPID_LOG_IF(debug, !firstTime, "Ignoring duplicate frame: " << receiverGetCurrent() << ": " << f); + QPID_LOG(trace, getId() << ": recv cmd " << receiverGetCurrent() << ": " << *f.getBody()); + if (!firstTime) QPID_LOG(trace, "Ignoring duplicate frame."); return firstTime; } void SessionState::receiverCompleted(SequenceNumber command, bool cumulative) { + if (receiverTrackingDisabled) return; //Very nasty hack for push bridges assert(receiver.incomplete.contains(command)); // Internal error to complete command twice. SequenceNumber first =cumulative ? receiver.incomplete.front() : command; SequenceNumber last = command; @@ -237,7 +240,7 @@ SessionState::Configuration::Configuration(size_t flush, size_t hard) : replayFlushLimit(flush), replayHardLimit(hard) {} SessionState::SessionState(const SessionId& i, const Configuration& c) - : id(i), timeout(), config(c), stateful() + : id(i), timeout(), config(c), stateful(), receiverTrackingDisabled(false) { QPID_LOG(debug, "SessionState::SessionState " << id << ": " << this); } @@ -275,4 +278,7 @@ void SessionState::setState( receiver.bytesSinceKnownCompleted = 0; } +void SessionState::disableReceiverTracking() { receiverTrackingDisabled = true; } +void SessionState::enableReceiverTracking() { receiverTrackingDisabled = false; } + } // namespace qpid diff --git a/qpid/cpp/src/qpid/SessionState.h b/qpid/cpp/src/qpid/SessionState.h index cdf59506a0..a37ce804e4 100644 --- a/qpid/cpp/src/qpid/SessionState.h +++ b/qpid/cpp/src/qpid/SessionState.h @@ -182,6 +182,20 @@ class SessionState { const SequenceSet& receivedIncomplete ); + /** + * So called 'push' bridges work by faking a subscribe request + * (and the accompanyingflows etc) to the local broker to initiate + * the outflow of messages for the bridge. + * + * As the peer doesn't send these it cannot include them in its + * session state. To keep the session state on either side of the + * bridge in sync, this hack allows the tracking of state for + * received messages to be disabled for the faked commands and + * subsequently re-enabled. + */ + void disableReceiverTracking(); + void enableReceiverTracking(); + private: struct SendState { @@ -210,6 +224,7 @@ class SessionState { uint32_t timeout; Configuration config; bool stateful; + bool receiverTrackingDisabled;//very nasty hack for 'push' bridges }; inline bool operator==(const SessionId& id, const SessionState& s) { return s == id; } diff --git a/qpid/cpp/src/qpid/acl/AclPlugin.cpp b/qpid/cpp/src/qpid/acl/AclPlugin.cpp index 7310139041..4c6efa19dd 100644 --- a/qpid/cpp/src/qpid/acl/AclPlugin.cpp +++ b/qpid/cpp/src/qpid/acl/AclPlugin.cpp @@ -62,15 +62,10 @@ struct AclPlugin : public Plugin { if (acl) throw Exception("ACL plugin cannot be initialized twice in one process."); - if (values.aclFile.at(0) == '/') - { - values.aclFile = values.aclFile; - } - else - { - std::ostringstream oss; - oss << b.getDataDir().getPath() << "/" << values.aclFile; - values.aclFile = oss.str(); + if (values.aclFile.at(0) != '/' && !b.getDataDir().getPath().empty()) { + std::ostringstream oss; + oss << b.getDataDir().getPath() << "/" << values.aclFile; + values.aclFile = oss.str(); } acl = new Acl(values, b); diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 1e2f4cdd78..7c70c12213 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -151,7 +151,7 @@ void ManagementAgentImpl::init(const client::ConnectionSettings& settings, // TODO: Abstract the socket calls for portability if (extThread) { int pair[2]; - int result = socketpair(PF_LOCAL, SOCK_STREAM, 0, pair); + int result = socketpair(PF_UNIX, SOCK_STREAM, 0, pair); if (result == -1) { return; } @@ -485,6 +485,9 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + encodeHeader(outBuffer, 'g', sequence); object->writeProperties(outBuffer); object->writeStatistics(outBuffer, true); @@ -508,6 +511,9 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + encodeHeader(outBuffer, 'g', sequence); object->writeProperties(outBuffer); object->writeStatistics(outBuffer, true); diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp index f58d59cbd7..db957051d8 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp @@ -277,7 +277,6 @@ void SessionHandler::sendCompletion() { } void SessionHandler::sendAttach(bool force) { - CHECK_ATTACHED("session.send-attach"); QPID_LOG(debug, "SessionHandler::sendAttach attach id=" << getState()->getId()); peer.attach(getState()->getId().getName(), force); if (getState()->hasState()) diff --git a/qpid/cpp/src/qpid/assert.cpp b/qpid/cpp/src/qpid/assert.cpp index 5d039da528..bf36d3be86 100644 --- a/qpid/cpp/src/qpid/assert.cpp +++ b/qpid/cpp/src/qpid/assert.cpp @@ -35,7 +35,7 @@ void assert_fail(char const * expr, char const * function, char const * file, lo #ifdef NDEBUG throw framing::InternalErrorException(msg.str()); #else - std::cerr << msg << std::endl; + std::cerr << msg.str() << std::endl; abort(); #endif } diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index cc28213884..4d275b958f 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -22,6 +22,7 @@ #include "ConnectionState.h" #include "Connection.h" #include "LinkRegistry.h" +#include "SessionState.h" #include "qpid/agent/ManagementAgent.h" #include "qpid/framing/FieldTable.h" @@ -80,31 +81,31 @@ Bridge::~Bridge() mgmtObject->resourceDestroy(); } -void Bridge::create(ConnectionState& c) +void Bridge::create(Connection& c) { + connState = &c; FieldTable options; if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync); - connState = &c; + SessionHandler& sessionHandler = c.getChannel(id); if (args.i_srcIsLocal) { if (args.i_dynamic) throw Exception("Dynamic routing not supported for push routes"); // Point the bridging commands at the local connection handler - Connection* conn = dynamic_cast<Connection*>(&c); - if (conn == 0) - return; - pushHandler.reset(new PushHandler(conn)); + pushHandler.reset(new PushHandler(&c)); channelHandler.reset(new framing::ChannelHandler(id, pushHandler.get())); + + session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler)); + peer.reset(new framing::AMQP_ServerProxy(*channelHandler)); + + session->attach(name, false); + session->commandPoint(0,0); } else { + sessionHandler.attachAs(name); // Point the bridging commands at the remote peer broker - channelHandler.reset(new framing::ChannelHandler(id, &(connState->getOutput()))); + peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out)); } - session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler)); - peer.reset(new framing::AMQP_ServerProxy(*channelHandler)); - - session->attach(name, false); - session->commandPoint(0,0); - + if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking(); if (args.i_srcIsQueue) { peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options); peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); @@ -116,7 +117,7 @@ void Bridge::create(ConnectionState& c) if (args.i_tag.size()) { queueSettings.setString("qpid.trace.id", args.i_tag); } else { - const string& peerTag = connState->getFederationPeerTag(); + const string& peerTag = c.getFederationPeerTag(); if (peerTag.size()) queueSettings.setString("qpid.trace.id", peerTag); } @@ -129,7 +130,7 @@ void Bridge::create(ConnectionState& c) queueSettings.setString("qpid.trace.exclude", localTag); } - bool durable = false;//should this be an arg, or would be use srcIsQueue for durable queues? + bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues? bool autoDelete = !durable;//auto delete transient queues? peer->getQueue().declare(queueName, "", false, durable, true, autoDelete, queueSettings); if (!args.i_dynamic) @@ -148,12 +149,23 @@ void Bridge::create(ConnectionState& c) QPID_LOG(debug, "Activated static route from exchange " << args.i_src << " to " << args.i_dest); } } + if (args.i_srcIsLocal) sessionHandler.getSession()->enableReceiverTracking(); } -void Bridge::cancel() +void Bridge::cancel(Connection& c) { + if (args.i_srcIsLocal) { + //recreate peer to be sure that the session handler reference + //is valid (it could have been deleted due to a detach) + SessionHandler& sessionHandler = c.getChannel(id); + peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out)); + } peer->getMessage().cancel(args.i_dest); peer->getSession().detach(name); +} + +void Bridge::closed() +{ if (args.i_dynamic) { Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src); if (exchange.get() != 0) diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h index c530a5d696..dae28ddeaa 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.h +++ b/qpid/cpp/src/qpid/broker/Bridge.h @@ -52,8 +52,9 @@ public: const qmf::org::apache::qpid::broker::ArgsLinkBridge& args); ~Bridge(); - void create(ConnectionState& c); - void cancel(); + void create(Connection& c); + void cancel(Connection& c); + void closed(); void destroy(); bool isDurable() { return args.i_durable; } diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 95f55bb596..b999a62965 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -199,6 +199,7 @@ Broker::Broker(const Broker::Options& conf) : } QueuePolicy::setDefaultMaxSize(conf.queueLimit); + queues.setQueueEvents(&queueEvents); // Early-Initialize plugins const Plugin::Plugins& plugins=Plugin::getPlugins(); diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index b7446a2220..b06e06d353 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -80,7 +80,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std void Connection::requestIOProcessing(boost::function0<void> callback) { - ioCallback = callback; + ScopedLock<Mutex> l(ioCallbackLock); + ioCallbacks.push(callback); out.activateOutput(); } @@ -221,10 +222,13 @@ bool Connection::hasOutput() { return outputTasks.hasOutput(); } bool Connection::doOutput() { try{ - if (ioCallback) - ioCallback(); // Lend the IO thread for management processing - ioCallback = 0; - + { + ScopedLock<Mutex> l(ioCallbackLock); + while (!ioCallbacks.empty()) { + ioCallbacks.front()(); // Lend the IO thread for management processing + ioCallbacks.pop(); + } + } if (mgmtClosing) close(connection::CLOSE_CODE_CONNECTION_FORCED, "Closed by Management Request"); else diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index b1e1cda973..b659fe6468 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -25,6 +25,7 @@ #include <memory> #include <sstream> #include <vector> +#include <queue> #include <boost/ptr_container/ptr_map.hpp> @@ -47,6 +48,7 @@ #include "qpid/sys/ConnectionOutputHandler.h" #include "qpid/sys/Socket.h" #include "qpid/sys/TimeoutHandler.h" +#include "qpid/sys/Mutex.h" #include <boost/ptr_container/ptr_map.hpp> #include <boost/bind.hpp> @@ -119,7 +121,8 @@ class Connection : public sys::ConnectionInputHandler, const bool isLink; bool mgmtClosing; const std::string mgmtId; - boost::function0<void> ioCallback; + sys::Mutex ioCallbackLock; + std::queue<boost::function0<void> > ioCallbacks; qmf::org::apache::qpid::broker::Connection* mgmtObject; LinkRegistry& links; management::ManagementAgent* agent; diff --git a/qpid/cpp/src/qpid/broker/ConnectionState.h b/qpid/cpp/src/qpid/broker/ConnectionState.h index 0e9d211b56..b712af11f5 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionState.h +++ b/qpid/cpp/src/qpid/broker/ConnectionState.h @@ -48,8 +48,9 @@ class ConnectionState : public ConnectionToken, public management::Manageable heartbeatmax(120), stagingThreshold(broker.getStagingThreshold()), federationLink(true), - clientSupportsThrottling(false) - {} + clientSupportsThrottling(false), + clusterOrderOut(0) + {} virtual ~ConnectionState () {} @@ -75,7 +76,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable const string& getFederationPeerTag() const { return federationPeerTag; } std::vector<Url>& getKnownHosts() { return knownHosts; } - void setClientThrottling() { clientSupportsThrottling = true; } + void setClientThrottling(bool set=true) { clientSupportsThrottling = set; } bool getClientThrottling() const { return clientSupportsThrottling; } Broker& getBroker() { return broker; } @@ -86,11 +87,21 @@ class ConnectionState : public ConnectionToken, public management::Manageable //contained output tasks sys::AggregateOutput outputTasks; - sys::ConnectionOutputHandlerPtr& getOutput() { return out; } + sys::ConnectionOutputHandler& getOutput() { return out; } framing::ProtocolVersion getVersion() const { return version; } - void setOutputHandler(qpid::sys::ConnectionOutputHandler* o) { out.set(o); } + /** + * If the broker is part of a cluster, this is a handler provided + * by cluster code. It ensures consistent ordering of commands + * that are sent based on criteria that are not predictably + * ordered cluster-wide, e.g. a timer firing. + */ + framing::FrameHandler* getClusterOrderOutput() { return clusterOrderOut; } + void setClusterOrderOutput(framing::FrameHandler& fh) { clusterOrderOut = &fh; } + + virtual void requestIOProcessing (boost::function0<void>) = 0; + protected: framing::ProtocolVersion version; uint32_t framemax; @@ -103,6 +114,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable string federationPeerTag; std::vector<Url> knownHosts; bool clientSupportsThrottling; + framing::FrameHandler* clusterOrderOut; }; }} diff --git a/qpid/cpp/src/qpid/broker/Daemon.h b/qpid/cpp/src/qpid/broker/Daemon.h index 98468debb7..a9cd98bce2 100644 --- a/qpid/cpp/src/qpid/broker/Daemon.h +++ b/qpid/cpp/src/qpid/broker/Daemon.h @@ -19,10 +19,12 @@ * */ -#include <string> +#include "qpid/sys/IntegerTypes.h" #include <boost/scoped_ptr.hpp> #include <boost/function.hpp> #include <boost/noncopyable.hpp> +#include <string> + namespace qpid { namespace broker { diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index f8b9e4b183..dd1fe98b2c 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -102,8 +102,8 @@ static const std::string QPID_MANAGEMENT("qpid.management"); Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, Manageable* parent) - : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0), - sequence(false), sequenceNo(0), ive(false), mgmtExchange(0) + : name(_name), durable(_durable), alternateUsers(0), persistenceId(0), + args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0) { if (parent != 0) { @@ -275,3 +275,7 @@ bool Exchange::MatchQueue::operator()(Exchange::Binding::shared_ptr b) { return b->queue == queue; } + +void Exchange::setProperties(const boost::intrusive_ptr<Message>& msg) { + msg->getProperties<DeliveryProperties>()->setExchange(getName()); +} diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h index 4591216aaf..45f71a039d 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.h +++ b/qpid/cpp/src/qpid/broker/Exchange.h @@ -59,12 +59,12 @@ public: private: const std::string name; const bool durable; - mutable qpid::framing::FieldTable args; boost::shared_ptr<Exchange> alternate; uint32_t alternateUsers; mutable uint64_t persistenceId; protected: + mutable qpid::framing::FieldTable args; bool sequence; mutable qpid::sys::Mutex sequenceLock; int64_t sequenceNo; @@ -140,13 +140,14 @@ public: virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0; virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0; virtual bool isBound(Queue::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0; + virtual void setProperties(const boost::intrusive_ptr<Message>&); virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0; - + //PersistableExchange: void setPersistenceId(uint64_t id) const; uint64_t getPersistenceId() const { return persistenceId; } uint32_t encodedSize() const; - QPID_BROKER_EXTERN void encode(framing::Buffer& buffer) const; + QPID_BROKER_EXTERN virtual void encode(framing::Buffer& buffer) const; static QPID_BROKER_EXTERN Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer); diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp index 104b34da8b..09fb2d9bef 100644 --- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp @@ -105,33 +105,42 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){ - if (!args) return;//can't match if there were no headers passed in + if (!args) { + //can't match if there were no headers passed in + if (mgmtExchange != 0) { + mgmtExchange->inc_msgReceives(); + mgmtExchange->inc_byteReceives(msg.contentSize()); + mgmtExchange->inc_msgDrops(); + mgmtExchange->inc_byteDrops(msg.contentSize()); + } + return; + } + PreRoute pr(msg, this); uint32_t count(0); Bindings::ConstPtr p = bindings.snapshot(); if (p.get()){ - for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++) { - if (match((*i)->args, *args)) msg.deliverTo((*i)->queue); - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched (); + for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) { + if (match((*i)->args, *args)) { + msg.deliverTo((*i)->queue); + count++; + if ((*i)->mgmtBinding != 0) + (*i)->mgmtBinding->inc_msgMatched(); + } } } - if (mgmtExchange != 0) - { - mgmtExchange->inc_msgReceives (); - mgmtExchange->inc_byteReceives (msg.contentSize ()); - if (count == 0) - { - mgmtExchange->inc_msgDrops (); - mgmtExchange->inc_byteDrops (msg.contentSize ()); - } - else - { - mgmtExchange->inc_msgRoutes (count); - mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + if (mgmtExchange != 0) { + mgmtExchange->inc_msgReceives(); + mgmtExchange->inc_byteReceives(msg.contentSize()); + if (count == 0) { + mgmtExchange->inc_msgDrops(); + mgmtExchange->inc_byteDrops(msg.contentSize()); + } else { + mgmtExchange->inc_msgRoutes(count); + mgmtExchange->inc_byteRoutes(count * msg.contentSize()); } } } diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index e36635831b..dd1a1fa0b4 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -158,7 +158,7 @@ void Link::closed (int, std::string text) } for (Bridges::iterator i = active.begin(); i != active.end(); i++) { - (*i)->cancel(); + (*i)->closed(); created.push_back(*i); } active.clear(); @@ -217,21 +217,27 @@ void Link::add(Bridge::shared_ptr bridge) void Link::cancel(Bridge::shared_ptr bridge) { - Mutex::ScopedLock mutex(lock); - - for (Bridges::iterator i = created.begin(); i != created.end(); i++) { - if ((*i).get() == bridge.get()) { - created.erase(i); - break; + { + Mutex::ScopedLock mutex(lock); + + for (Bridges::iterator i = created.begin(); i != created.end(); i++) { + if ((*i).get() == bridge.get()) { + created.erase(i); + break; + } } - } - for (Bridges::iterator i = active.begin(); i != active.end(); i++) { - if ((*i).get() == bridge.get()) { - bridge->cancel(); - active.erase(i); - break; + for (Bridges::iterator i = active.begin(); i != active.end(); i++) { + if ((*i).get() == bridge.get()) { + cancellations.push_back(bridge); + bridge->closed(); + active.erase(i); + break; + } } } + if (!cancellations.empty()) { + connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); + } } void Link::ioThreadProcessing() @@ -242,7 +248,7 @@ void Link::ioThreadProcessing() return; QPID_LOG(debug, "Link::ioThreadProcessing()"); - //process any pending creates + //process any pending creates and/or cancellations if (!created.empty()) { for (Bridges::iterator i = created.begin(); i != created.end(); ++i) { active.push_back(*i); @@ -250,6 +256,13 @@ void Link::ioThreadProcessing() } created.clear(); } + if (!cancellations.empty()) { + for (Bridges::iterator i = cancellations.begin(); i != cancellations.end(); ++i) { + active.push_back(*i); + (*i)->cancel(*connection); + } + cancellations.clear(); + } } void Link::setConnection(Connection* c) @@ -284,7 +297,7 @@ void Link::maintenanceVisit () } } } - else if (state == STATE_OPERATIONAL && !created.empty() && connection != 0) + else if (state == STATE_OPERATIONAL && (!created.empty() || !cancellations.empty()) && connection != 0) connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h index 8e741c6eb7..39014b0ec0 100644 --- a/qpid/cpp/src/qpid/broker/Link.h +++ b/qpid/cpp/src/qpid/broker/Link.h @@ -67,6 +67,7 @@ namespace qpid { typedef std::vector<Bridge::shared_ptr> Bridges; Bridges created; // Bridges pending creation Bridges active; // Bridges active + Bridges cancellations; // Bridges pending cancellation uint channelCounter; Connection* connection; management::ManagementAgent* agent; diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp index 97b1c32d64..d4ae0aedd8 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp @@ -19,19 +19,24 @@ * */ #include "LinkRegistry.h" +#include "Connection.h" #include "qpid/log/Statement.h" #include <iostream> +#include <boost/format.hpp> using namespace qpid::broker; using namespace qpid::sys; using std::pair; using std::stringstream; using boost::intrusive_ptr; +using boost::format; +using boost::str; namespace _qmf = qmf::org::apache::qpid::broker; #define LINK_MAINT_INTERVAL 2 -LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0), passive(false), passiveChanged(false) +LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0), passive(false), passiveChanged(false), + realm(broker ? broker->getOptions().realm : "") { timer.add (intrusive_ptr<TimerTask> (new Periodic(*this))); } @@ -241,6 +246,7 @@ void LinkRegistry::notifyConnection(const std::string& key, Connection* c) { l->second->established(); l->second->setConnection(c); + c->setUserId(str(format("%1%@%2%") % l->second->getUsername() % realm)); } } diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h index 884228bd63..92064692f5 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.h +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h @@ -66,6 +66,7 @@ namespace broker { MessageStore* store; bool passive; bool passiveChanged; + std::string realm; void periodicMaintenance (); bool updateAddress(const std::string& oldKey, const TcpAddress& newAddress); diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 133e2b5ad1..30294b4507 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -382,4 +382,9 @@ void Message::resetEnqueueCompleteCallback() { enqueueCallback = 0; } void Message::setDequeueCompleteCallback(MessageCallback& cb) { dequeueCallback = &cb; } void Message::resetDequeueCompleteCallback() { dequeueCallback = 0; } +framing::FieldTable& Message::getOrInsertHeaders() +{ + return getProperties<MessageProperties>()->getApplicationHeaders(); +} + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h index a41ecd76be..458c6c7d1a 100644 --- a/qpid/cpp/src/qpid/broker/Message.h +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -73,6 +73,7 @@ public: QPID_BROKER_EXTERN std::string getExchangeName() const; bool isImmediate() const; QPID_BROKER_EXTERN const framing::FieldTable* getApplicationHeaders() const; + framing::FieldTable& getOrInsertHeaders(); QPID_BROKER_EXTERN bool isPersistent(); bool requiresAccept(); diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 3ae53c8ea9..a1a83926bf 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -93,7 +93,8 @@ Queue::Queue(const string& _name, bool _autodelete, policyExceeded(false), mgmtObject(0), eventMode(0), - eventMgr(0) + eventMgr(0), + insertSeqNo(0) { if (parent != 0) { @@ -176,7 +177,7 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ void Queue::recover(boost::intrusive_ptr<Message>& msg){ - push(msg); + push(msg, true); msg->enqueueComplete(); // mark the message as enqueued mgntEnqStats(msg); @@ -545,12 +546,13 @@ void Queue::popMsg(QueuedMessage& qmsg) ++dequeueTracker; } -void Queue::push(boost::intrusive_ptr<Message>& msg){ +void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); if (policy.get()) policy->tryEnqueue(qm); + if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); LVQ::iterator i; const framing::FieldTable* ft = msg->getApplicationHeaders(); @@ -566,14 +568,21 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){ boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this); if (!old) old = i->second; i->second->setReplacementMessage(msg,this); - dequeued(QueuedMessage(qm.queue, old, qm.position)); + if (isRecovery) { + //can't issue new requests for the store until + //recovery is complete + pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position)); + } else { + dequeue(0, QueuedMessage(qm.queue, old, qm.position)); + } } }else { messages.push_back(qm); listeners.populate(copy); } - if (eventMode && eventMgr) { - eventMgr->enqueued(qm); + if (eventMode) { + if (eventMgr) eventMgr->enqueued(qm); + else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName()); } } copy.notify(); @@ -664,7 +673,7 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) msg->addTraceId(traceId); } - if (msg->isPersistent() && store && !lastValueQueue) { + if (msg->isPersistent() && store) { msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg); store->enqueue(ctxt, pmsg, *this); @@ -676,14 +685,14 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) // return true if store exists, bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { - if (policy.get() && !policy->isEnqueued(msg)) return false; { Mutex::ScopedLock locker(messageLock); + if (policy.get() && !policy->isEnqueued(msg)) return false; if (!ctxt) { dequeued(msg); } } - if (msg.payload->isPersistent() && store && !lastValueQueue) { + if (msg.payload->isPersistent() && store) { msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload); store->dequeue(ctxt, pmsg, *this); @@ -976,3 +985,16 @@ void Queue::setQueueEventManager(QueueEvents& mgr) { eventMgr = &mgr; } + +void Queue::recoveryComplete() +{ + //process any pending dequeues + for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); + pendingDequeues.clear(); +} + +void Queue::insertSequenceNumbers(const std::string& key) +{ + seqNoKey = key; + insertSeqNo = !seqNoKey.empty(); +} diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 822e827f35..d1f71581d6 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -87,6 +87,7 @@ namespace qpid { std::vector<std::string> traceExclude; QueueListeners listeners; Messages messages; + Messages pendingDequeues;//used to avoid dequeuing during recovery LVQ lvq; mutable qpid::sys::Mutex consumerLock; mutable qpid::sys::Mutex messageLock; @@ -102,8 +103,10 @@ namespace qpid { RateTracker dequeueTracker; int eventMode; QueueEvents* eventMgr; + bool insertSeqNo; + std::string seqNoKey; - void push(boost::intrusive_ptr<Message>& msg); + void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false); void setPolicy(std::auto_ptr<QueuePolicy> policy); bool seek(QueuedMessage& msg, Consumer::shared_ptr position); bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); @@ -298,6 +301,11 @@ namespace qpid { void setPosition(framing::SequenceNumber pos); int getEventMode(); void setQueueEventManager(QueueEvents&); + void insertSequenceNumbers(const std::string& key); + /** + * Notify queue that recovery has completed. + */ + void recoveryComplete(); }; } } diff --git a/qpid/cpp/src/qpid/broker/QueueEvents.cpp b/qpid/cpp/src/qpid/broker/QueueEvents.cpp index a6517e1bfe..7525e4cb76 100644 --- a/qpid/cpp/src/qpid/broker/QueueEvents.cpp +++ b/qpid/cpp/src/qpid/broker/QueueEvents.cpp @@ -20,12 +20,13 @@ */ #include "QueueEvents.h" #include "qpid/Exception.h" +#include "qpid/log/Statement.h" namespace qpid { namespace broker { QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller) : - eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller) + eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true) { eventQueue.start(); } @@ -37,12 +38,12 @@ QueueEvents::~QueueEvents() void QueueEvents::enqueued(const QueuedMessage& m) { - eventQueue.push(Event(ENQUEUE, m)); + if (enabled) eventQueue.push(Event(ENQUEUE, m)); } void QueueEvents::dequeued(const QueuedMessage& m) { - eventQueue.push(Event(DEQUEUE, m)); + if (enabled) eventQueue.push(Event(DEQUEUE, m)); } void QueueEvents::registerListener(const std::string& id, const EventListener& listener) @@ -81,6 +82,18 @@ void QueueEvents::shutdown() if (!eventQueue.empty() && !listeners.empty()) eventQueue.shutdown(); } +void QueueEvents::enable() +{ + enabled = true; + QPID_LOG(debug, "Queue events enabled"); +} + +void QueueEvents::disable() +{ + enabled = false; + QPID_LOG(debug, "Queue events disabled"); +} + QueueEvents::Event::Event(EventType t, const QueuedMessage& m) : type(t), msg(m) {} diff --git a/qpid/cpp/src/qpid/broker/QueueEvents.h b/qpid/cpp/src/qpid/broker/QueueEvents.h index 9bb56f888d..82abd3d20a 100644 --- a/qpid/cpp/src/qpid/broker/QueueEvents.h +++ b/qpid/cpp/src/qpid/broker/QueueEvents.h @@ -61,6 +61,8 @@ class QueueEvents QPID_BROKER_EXTERN void registerListener(const std::string& id, const EventListener&); QPID_BROKER_EXTERN void unregisterListener(const std::string& id); + void enable(); + void disable(); //process all outstanding events QPID_BROKER_EXTERN void shutdown(); private: @@ -69,6 +71,7 @@ class QueueEvents EventQueue eventQueue; Listeners listeners; + volatile bool enabled; qpid::sys::Mutex lock;//protect listeners from concurrent access void handle(EventQueue::Queue& e); diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp index 41a6709d27..c59736969f 100644 --- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp @@ -126,7 +126,7 @@ std::string QueuePolicy::getType(const FieldTable& settings) FieldTable::ValuePtr v = settings.get(typeKey); if (v && v->convertsTo<std::string>()) { std::string t = v->get<std::string>(); - transform(t.begin(), t.end(), t.begin(), tolower); + std::transform(t.begin(), t.end(), t.begin(), tolower); if (t == REJECT || t == FLOW_TO_DISK || t == RING || t == RING_STRICT) return t; } return FLOW_TO_DISK; @@ -197,11 +197,12 @@ void RingQueuePolicy::enqueued(const QueuedMessage& m) void RingQueuePolicy::dequeued(const QueuedMessage& m) { qpid::sys::Mutex::ScopedLock l(lock); - QueuePolicy::dequeued(m); //find and remove m from queue - for (Messages::iterator i = queue.begin(); i != queue.end() && m.position <= i->position; i++) { - if (i->position == m.position) { + for (Messages::iterator i = queue.begin(); i != queue.end(); i++) { + if (i->payload == m.payload) { queue.erase(i); + //now update count and size + QueuePolicy::dequeued(m); break; } } @@ -210,9 +211,11 @@ void RingQueuePolicy::dequeued(const QueuedMessage& m) bool RingQueuePolicy::isEnqueued(const QueuedMessage& m) { qpid::sys::Mutex::ScopedLock l(lock); - //for non-strict ring policy, a message can be dequeued before acked; need to detect this - for (Messages::iterator i = queue.begin(); i != queue.end() && m.position <= i->position; i++) { - if (i->position == m.position) { + //for non-strict ring policy, a message can be replaced (and + //therefore dequeued) before it is accepted or released by + //subscriber; need to detect this + for (Messages::const_iterator i = queue.begin(); i != queue.end(); i++) { + if (i->payload == m.payload) { return true; } } @@ -236,13 +239,10 @@ bool RingQueuePolicy::checkLimit(const QueuedMessage& m) oldest = queue.front(); } if (oldest.queue->acquire(oldest) || !strict) { - qpid::sys::Mutex::ScopedLock l(lock); - if (oldest.position == queue.front().position) { - queue.pop_front(); - QPID_LOG(debug, "Ring policy triggered in queue " - << (m.queue ? m.queue->getName() : std::string("unknown queue")) - << ": removed message " << oldest.position << " to make way for " << m.position); - } + oldest.queue->dequeue(0, oldest); + QPID_LOG(debug, "Ring policy triggered in queue " + << (m.queue ? m.queue->getName() : std::string("unknown queue")) + << ": removed message " << oldest.position << " to make way for " << m.position); return true; } else { QPID_LOG(debug, "Ring policy could not be triggered in queue " diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp index 2cb801bf83..d079e543c4 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp @@ -19,6 +19,7 @@ * */ #include "QueueRegistry.h" +#include "QueueEvents.h" #include "qpid/log/Statement.h" #include <sstream> #include <assert.h> @@ -27,7 +28,7 @@ using namespace qpid::broker; using namespace qpid::sys; QueueRegistry::QueueRegistry() : - counter(1), store(0), parent(0), lastNode(false) {} + counter(1), store(0), events(0), parent(0), lastNode(false) {} QueueRegistry::~QueueRegistry(){} @@ -43,7 +44,8 @@ QueueRegistry::declare(const string& declareName, bool durable, if (i == queues.end()) { Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent)); queues[name] = queue; - if (lastNode) queue->setLastNodeFailure(); + if (lastNode) queue->setLastNodeFailure(); + if (events) queue->setQueueEventManager(*events); return std::pair<Queue::shared_ptr, bool>(queue, true); } else { @@ -105,3 +107,7 @@ void QueueRegistry::updateQueueClusterState(bool _lastNode) lastNode = _lastNode; } +void QueueRegistry::setQueueEvents(QueueEvents* e) +{ + events = e; +} diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.h b/qpid/cpp/src/qpid/broker/QueueRegistry.h index 07bb550fd4..3c02afedc4 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.h +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.h @@ -32,6 +32,8 @@ namespace qpid { namespace broker { +class QueueEvents; + /** * A registry of queues indexed by queue name. * @@ -90,6 +92,8 @@ class QueueRegistry { */ string generateName(); + void setQueueEvents(QueueEvents*); + /** * Set the store to use. May only be called once. */ @@ -124,6 +128,7 @@ private: mutable qpid::sys::RWlock lock; int counter; MessageStore* store; + QueueEvents* events; management::Manageable* parent; bool lastNode; //used to set mode on queue declare diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 8030cf7d0e..5f8b57fa0b 100644 --- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -149,7 +149,8 @@ RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer void RecoveryManagerImpl::recoveryComplete() { - //TODO (finalise binding setup etc) + //notify all queues + queues.eachQueue(boost::bind(&Queue::recoveryComplete, _1)); } bool RecoverableMessageImpl::loadContent(uint64_t available) diff --git a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp index 736b051945..edc66444ec 100644 --- a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp +++ b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp @@ -141,21 +141,31 @@ NullAuthenticator::~NullAuthenticator() {} void NullAuthenticator::getMechanisms(Array& mechanisms) { mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value("ANONYMOUS"))); + mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value("PLAIN")));//useful for testing } void NullAuthenticator::start(const string& mechanism, const string& response) { if (mechanism == "PLAIN") { // Old behavior - if (response.size() > 0 && response[0] == (char) 0) { - string temp = response.substr(1); - string::size_type i = temp.find((char)0); - string uid = temp.substr(0, i); - string pwd = temp.substr(i + 1); - i = uid.find_last_of(realm); - if (i == string::npos || i != (uid.size() - 1)) { - uid = str(format("%1%@%2%") % uid % realm); + if (response.size() > 0) { + string uid; + string::size_type i = response.find((char)0); + if (i == 0 && response.size() > 1) { + //no authorization id; use authentication id + i = response.find((char)0, 1); + if (i != string::npos) uid = response.substr(1, i-1); + } else if (i != string::npos) { + //authorization id is first null delimited field + uid = response.substr(0, i); + }//else not a valid SASL PLAIN response, throw error? + if (!uid.empty()) { + //append realm if it has not already been added + i = uid.find(realm); + if (i == string::npos || realm.size() + i < uid.size()) { + uid = str(format("%1%@%2%") % uid % realm); + } + connection.setUserId(uid); } - connection.setUserId(uid); } } else { connection.setUserId("anonymous"); diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 4f751e43b7..3c7c6d9afa 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -355,20 +355,12 @@ const std::string nullstring; } void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { + msg->setTimestamp(getSession().getBroker().getExpiryPolicy()); + std::string exchangeName = msg->getExchangeName(); - //TODO: the following should be hidden behind message (using MessageAdapter or similar) - - if (msg->isA<MessageTransferBody>()) { - // Do not replace the delivery-properties.exchange if it is is already set. - // This is used internally (by the cluster) to force the exchange name on a message. - // The client library ensures this is always empty for messages from normal clients. - if (!msg->hasProperties<DeliveryProperties>() || msg->getProperties<DeliveryProperties>()->getExchange().empty()) - msg->getProperties<DeliveryProperties>()->setExchange(exchangeName); - msg->setTimestamp(getSession().getBroker().getExpiryPolicy()); - } - if (!cacheExchange || cacheExchange->getName() != exchangeName){ + if (!cacheExchange || cacheExchange->getName() != exchangeName) cacheExchange = session.getBroker().getExchanges().get(exchangeName); - } + cacheExchange->setProperties(msg); /* verify the userid if specified: */ std::string id = @@ -516,14 +508,16 @@ void SemanticState::ConsumerImpl::setCreditMode() void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) { if (byteCredit != 0xFFFFFFFF) { - byteCredit += value; + if (value == 0xFFFFFFFF) byteCredit = value; + else byteCredit += value; } } void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) { if (msgCredit != 0xFFFFFFFF) { - msgCredit += value; + if (value == 0xFFFFFFFF) msgCredit = value; + else msgCredit += value; } } diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index ae160fabc7..96c47085f0 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -362,10 +362,6 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& getBroker().getExchanges().getDefault()->bind(queue, name, 0); queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments); - //if event generation is turned on, pass in a pointer to - //the QueueEvents instance to use - if (queue->getEventMode()) queue->setQueueEventManager(getBroker().getQueueEvents()); - //handle automatic cleanup: if (exclusive) { exclusiveQueues.push_back(queue); diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp index 2c4de478f6..442c3eb34b 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp @@ -34,7 +34,8 @@ using namespace qpid::sys; SessionHandler::SessionHandler(Connection& c, ChannelId ch) : amqp_0_10::SessionHandler(&c.getOutput(), ch), connection(c), - proxy(out) + proxy(out), + clusterOrderProxy(c.getClusterOrderOutput() ? new SetChannelProxy(ch, c.getClusterOrderOutput()) : 0) {} SessionHandler::~SessionHandler() {} @@ -84,11 +85,23 @@ void SessionHandler::readyToSend() { if (session.get()) session->readyToSend(); } -// TODO aconway 2008-05-12: hacky - handle attached for bridge clients. -// We need to integrate the client code so we can run a real client -// in the bridge. -// -void SessionHandler::attached(const std::string& name) { +/** + * Used by inter-broker bridges to set up session id and attach + */ +void SessionHandler::attachAs(const std::string& name) +{ + SessionId id(connection.getUserId(), name); + SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig(); + session.reset(new SessionState(connection.getBroker(), *this, id, config)); + sendAttach(false); +} + +/** + * TODO: this is a little ugly, fix it; its currently still relied on + * for 'push' bridges + */ +void SessionHandler::attached(const std::string& name) +{ if (session.get()) { amqp_0_10::SessionHandler::attached(name); } else { diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h index 7449db1560..ffc032f64c 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.h +++ b/qpid/cpp/src/qpid/broker/SessionHandler.h @@ -54,10 +54,20 @@ class SessionHandler : public amqp_0_10::SessionHandler { framing::AMQP_ClientProxy& getProxy() { return proxy; } const framing::AMQP_ClientProxy& getProxy() const { return proxy; } + /** + * If commands are sent based on the local time (e.g. in timers), they don't have + * a well-defined ordering across cluster nodes. + * This proxy is for sending such commands. In a clustered broker it will take steps + * to synchronize command order across the cluster. In a stand-alone broker + * it is just a synonym for getProxy() + */ + framing::AMQP_ClientProxy& getClusterOrderProxy() { + return clusterOrderProxy.get() ? *clusterOrderProxy : proxy; + } + virtual void handleDetach(); - - // Overrides - void attached(const std::string& name); + void attached(const std::string& name);//used by 'pushing' inter-broker bridges + void attachAs(const std::string& name);//used by 'pulling' inter-broker bridges protected: virtual void setState(const std::string& sessionName, bool force); @@ -69,9 +79,16 @@ class SessionHandler : public amqp_0_10::SessionHandler { virtual void readyToSend(); private: + struct SetChannelProxy : public framing::AMQP_ClientProxy { // Proxy that sets the channel. + framing::ChannelHandler setChannel; + SetChannelProxy(uint16_t ch, framing::FrameHandler* out) + : framing::AMQP_ClientProxy(setChannel), setChannel(ch, out) {} + }; + Connection& connection; framing::AMQP_ClientProxy proxy; std::auto_ptr<SessionState> session; + std::auto_ptr<SetChannelProxy> clusterOrderProxy; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index dffc7cf6af..7e5f605753 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -66,7 +66,7 @@ SessionState::SessionState( uint32_t maxRate = broker.getOptions().maxSessionRate; if (maxRate) { if (handler->getConnection().getClientThrottling()) { - rateFlowcontrol = new RateFlowcontrol(maxRate); + rateFlowcontrol.reset(new RateFlowcontrol(maxRate)); } else { QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support"); } @@ -210,14 +210,17 @@ struct ScheduledCreditTask : public TimerTask { {} void fire() { - QPID_LOG(critical, "ScheduledCreditTask fired"); // FIXME aconway 2009-02-23: REMOVE // This is the best we can currently do to avoid a destruction/fire race if (!isCancelled()) { - if ( !sessionState.processSendCredit(0) ) { - QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit"); - reset(); - timer.add(this); - } + sessionState.getConnection().requestIOProcessing(boost::bind(&ScheduledCreditTask::sendCredit, this)); + } + } + + void sendCredit() { + if ( !sessionState.processSendCredit(0) ) { + QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit"); + reset(); + timer.add(this); } } }; @@ -275,7 +278,8 @@ bool SessionState::processSendCredit(uint32_t msgs) if ( msgs > 0 && rateFlowcontrol->flowStopped() ) { QPID_LOG(warning, getId() << ": producer throttling violation"); // TODO: Probably do message.stop("") first time then disconnect - getProxy().getMessage().stop(""); + // See comment on getClusterOrderProxy() in .h file + getClusterOrderProxy().getMessage().stop(""); return true; } AbsTime now = AbsTime::now(); @@ -283,7 +287,7 @@ bool SessionState::processSendCredit(uint32_t msgs) if (mgmtObject) mgmtObject->dec_clientCredit(msgs); if ( sendCredit>0 ) { QPID_LOG(debug, getId() << ": send producer credit " << sendCredit); - getProxy().getMessage().flow("", 0, sendCredit); + getClusterOrderProxy().getMessage().flow("", 0, sendCredit); rateFlowcontrol->sentCredit(now, sendCredit); if (mgmtObject) mgmtObject->inc_clientCredit(sendCredit); return true; @@ -364,8 +368,9 @@ void SessionState::readyToSend() { // Issue initial credit - use a heuristic here issue min of 300 messages or 1 secs worth uint32_t credit = std::min(rateFlowcontrol->getRate(), 300U); QPID_LOG(debug, getId() << ": Issuing producer message credit " << credit); - getProxy().getMessage().setFlowMode("", 0); - getProxy().getMessage().flow("", 0, credit); + // See comment on getClusterOrderProxy() in .h file + getClusterOrderProxy().getMessage().setFlowMode("", 0); + getClusterOrderProxy().getMessage().flow("", 0, credit); rateFlowcontrol->sentCredit(AbsTime::now(), credit); if (mgmtObject) mgmtObject->inc_clientCredit(credit); } @@ -373,4 +378,8 @@ void SessionState::readyToSend() { Broker& SessionState::getBroker() { return broker; } +framing::AMQP_ClientProxy& SessionState::getClusterOrderProxy() { + return handler->getClusterOrderProxy(); +} + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index 1b7f2420a9..bdfed87905 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -125,6 +125,15 @@ class SessionState : public qpid::SessionState, void sendAcceptAndCompletion(); + /** + * If commands are sent based on the local time (e.g. in timers), they don't have + * a well-defined ordering across cluster nodes. + * This proxy is for sending such commands. In a clustered broker it will take steps + * to synchronize command order across the cluster. In a stand-alone broker + * it is just a synonym for getProxy() + */ + framing::AMQP_ClientProxy& getClusterOrderProxy(); + Broker& broker; SessionHandler* handler; sys::AbsTime expiry; // Used by SessionManager. @@ -138,7 +147,7 @@ class SessionState : public qpid::SessionState, // State used for producer flow control (rate limited) qpid::sys::Mutex rateLock; - RateFlowcontrol* rateFlowcontrol; + boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol; boost::intrusive_ptr<TimerTask> flowControlTimer; friend class SessionManager; diff --git a/qpid/cpp/src/qpid/broker/TxAccept.cpp b/qpid/cpp/src/qpid/broker/TxAccept.cpp index c7001e5526..73f365d509 100644 --- a/qpid/cpp/src/qpid/broker/TxAccept.cpp +++ b/qpid/cpp/src/qpid/broker/TxAccept.cpp @@ -50,12 +50,12 @@ void TxAccept::RangeOps::operator()(SequenceNumber start, SequenceNumber end) void TxAccept::RangeOps::prepare(TransactionContext* ctxt) { - for_each(ranges.begin(), ranges.end(), bind(&RangeOp::prepare, _1, ctxt)); + std::for_each(ranges.begin(), ranges.end(), bind(&RangeOp::prepare, _1, ctxt)); } void TxAccept::RangeOps::commit() { - for_each(ranges.begin(), ranges.end(), bind(&RangeOp::commit, _1)); + std::for_each(ranges.begin(), ranges.end(), bind(&RangeOp::commit, _1)); //now remove if isRedundant(): if (!ranges.empty()) { ack_iterator i = ranges.front().range.start; diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp index 8e27d78479..745bdb63b5 100644 --- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp @@ -180,6 +180,9 @@ void ConnectionImpl::close() template <class F> void ConnectionImpl::closeInternal(const F& f) { + if (heartbeatTask) { + heartbeatTask->cancel(); + } { Mutex::ScopedUnlock u(lock); connector->close(); diff --git a/qpid/cpp/src/qpid/client/Connector.cpp b/qpid/cpp/src/qpid/client/Connector.cpp index e6355601df..c251233082 100644 --- a/qpid/cpp/src/qpid/client/Connector.cpp +++ b/qpid/cpp/src/qpid/client/Connector.cpp @@ -92,8 +92,6 @@ class TCPConnector : public Connector, public sys::Codec, private sys::Runnable framing::ProtocolVersion version; bool initiated; - - sys::Mutex closedLock; bool closed; bool joined; @@ -185,7 +183,7 @@ TCPConnector::~TCPConnector() { } void TCPConnector::connect(const std::string& host, int port){ - Mutex::ScopedLock l(closedLock); + Mutex::ScopedLock l(lock); assert(closed); try { socket.connect(host, port); @@ -207,7 +205,7 @@ void TCPConnector::connect(const std::string& host, int port){ } void TCPConnector::init(){ - Mutex::ScopedLock l(closedLock); + Mutex::ScopedLock l(lock); assert(joined); ProtocolInitiation init(version); writeDataBlock(init); @@ -216,17 +214,21 @@ void TCPConnector::init(){ } bool TCPConnector::closeInternal() { - Mutex::ScopedLock l(closedLock); - bool ret = !closed; + bool ret; + { + Mutex::ScopedLock l(lock); + ret = !closed; if (!closed) { closed = true; + aio->queueForDeletion(); poller->shutdown(); } - if (!joined && receiver.id() != Thread::current().id()) { - joined = true; - Mutex::ScopedUnlock u(closedLock); - receiver.join(); + if (joined || receiver.id() == Thread::current().id()) { + return ret; } + joined = true; + } + receiver.join(); return ret; } @@ -259,21 +261,19 @@ const std::string& TCPConnector::getIdentifier() const { } void TCPConnector::send(AMQFrame& frame) { + Mutex::ScopedLock l(lock); + frames.push_back(frame); + //only ask to write if this is the end of a frameset or if we + //already have a buffers worth of data + currentSize += frame.encodedSize(); bool notifyWrite = false; - { - Mutex::ScopedLock l(lock); - frames.push_back(frame); - //only ask to write if this is the end of a frameset or if we - //already have a buffers worth of data - currentSize += frame.encodedSize(); - if (frame.getEof()) { - lastEof = frames.size(); - notifyWrite = true; - } else { - notifyWrite = (currentSize >= maxFrameSize); - } + if (frame.getEof()) { + lastEof = frames.size(); + notifyWrite = true; + } else { + notifyWrite = (currentSize >= maxFrameSize); } - if (notifyWrite) aio->notifyPendingWrite(); + if (notifyWrite && !closed) aio->notifyPendingWrite(); } void TCPConnector::handleClosed() { @@ -384,14 +384,13 @@ void TCPConnector::run() { assert(protect); try { Dispatcher d(poller); - + for (int i = 0; i < 32; i++) { aio->queueReadBuffer(new Buff(maxFrameSize)); } - + aio->start(poller); d.run(); - aio->queueForDeletion(); socket.close(); } catch (const std::exception& e) { QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what())); diff --git a/qpid/cpp/src/qpid/client/Dispatcher.cpp b/qpid/cpp/src/qpid/client/Dispatcher.cpp index 27cc4184f9..8d8574520a 100644 --- a/qpid/cpp/src/qpid/client/Dispatcher.cpp +++ b/qpid/cpp/src/qpid/client/Dispatcher.cpp @@ -136,8 +136,7 @@ void Dispatcher::listen(const boost::intrusive_ptr<SubscriptionImpl>& subscripti void Dispatcher::cancel(const std::string& destination) { ScopedLock<Mutex> l(lock); - listeners.erase(destination); - if (autoStop && listeners.empty()) + if (listeners.erase(destination) && running && autoStop && listeners.empty()) queue->close(); } diff --git a/qpid/cpp/src/qpid/client/FailoverManager.cpp b/qpid/cpp/src/qpid/client/FailoverManager.cpp index ab9dbca70f..86b50a0a61 100644 --- a/qpid/cpp/src/qpid/client/FailoverManager.cpp +++ b/qpid/cpp/src/qpid/client/FailoverManager.cpp @@ -21,12 +21,15 @@ #include "FailoverManager.h" #include "qpid/Exception.h" #include "qpid/log/Statement.h" +#include "qpid/sys/Time.h" namespace qpid { namespace client { using qpid::sys::Monitor; +using qpid::sys::AbsTime; +using qpid::sys::Duration; FailoverManager::FailoverManager(const ConnectionSettings& s, ReconnectionStrategy* rs) : settings(s), strategy(rs), state(IDLE) {} @@ -35,15 +38,21 @@ void FailoverManager::execute(Command& c) { bool retry = false; bool completed = false; + AbsTime failed; while (!completed) { try { AsyncSession session = connect().newSession(); + if (retry) { + Duration failoverTime(failed, AbsTime::now()); + QPID_LOG(info, "Failed over for " << &c << " in " << (failoverTime/qpid::sys::TIME_MSEC) << " milliseconds"); + } c.execute(session, retry); session.sync();//TODO: shouldn't be required session.close(); completed = true; } catch(const TransportFailure&) { - retry = true; + retry = true; + failed = AbsTime::now(); } } } diff --git a/qpid/cpp/src/qpid/client/SessionImpl.cpp b/qpid/cpp/src/qpid/client/SessionImpl.cpp index ee542a9cf8..5df376efa0 100644 --- a/qpid/cpp/src/qpid/client/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/SessionImpl.cpp @@ -512,6 +512,7 @@ void SessionImpl::detach(const std::string& _name) if (id.getName() != _name) throw InternalErrorException("Incorrect session name"); setState(DETACHED); QPID_LOG(info, "Session detached by peer: " << id); + proxy.detached(_name, DETACH_CODE_NORMAL); } void SessionImpl::detached(const std::string& _name, uint8_t _code) { @@ -744,7 +745,8 @@ void SessionImpl::assertOpen() const void SessionImpl::handleClosed() { - demux.close(exceptionHolder.empty() ? new ClosedException() : exceptionHolder); + demux.close(exceptionHolder.empty() ? + sys::ExceptionHolder(new ClosedException()) : exceptionHolder); results.close(); } diff --git a/qpid/cpp/src/qpid/client/SslConnector.cpp b/qpid/cpp/src/qpid/client/SslConnector.cpp index 75c3f5677e..a4298dd4ca 100644 --- a/qpid/cpp/src/qpid/client/SslConnector.cpp +++ b/qpid/cpp/src/qpid/client/SslConnector.cpp @@ -221,6 +221,7 @@ bool SslConnector::closeInternal() { bool ret = !closed; if (!closed) { closed = true; + aio->queueForDeletion(); poller->shutdown(); } if (!joined && receiver.id() != Thread::current().id()) { @@ -386,7 +387,6 @@ void SslConnector::run(){ aio->start(poller); d.run(); - aio->queueForDeletion(); socket.close(); } catch (const std::exception& e) { QPID_LOG(error, e.what()); diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 6221b0054c..f8e412f1e6 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -21,7 +21,9 @@ #include "Connection.h" #include "UpdateClient.h" #include "FailoverExchange.h" +#include "UpdateExchange.h" +#include "qpid/assert.h" #include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" #include "qmf/org/apache/qpid/cluster/Package.h" #include "qpid/broker/Broker.h" @@ -91,9 +93,10 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : cpg(*this), name(settings.name), myUrl(settings.url.empty() ? Url() : Url(settings.url)), - myId(cpg.self()), + self(cpg.self()), readMax(settings.readMax), writeEstimate(settings.writeEstimate), + expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())), mcast(cpg, poller, boost::bind(&Cluster::leave, this)), dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)), deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1), @@ -104,15 +107,12 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : boost::bind(&Cluster::leave, this), "Error delivering frames", poller), - connections(*this), - decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1), connections), - expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, myId, broker.getTimer())), - frameId(0), initialized(false), + decoder(boost::bind(&Cluster::deliverFrame, this, _1)), + discarding(true), state(INIT), lastSize(0), - lastBroker(false), - sequence(0) + lastBroker(false) { mAgent = ManagementAgent::Singleton::getInstance(); if (mAgent != 0){ @@ -122,7 +122,13 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : mgmtObject->set_status("JOINING"); } + // Failover exchange provides membership updates to clients. failoverExchange.reset(new FailoverExchange(this)); + broker.getExchanges().registerExchange(failoverExchange); + + // Update exchange is used during updates to replicate messages without modifying delivery-properties.exchange. + broker.getExchanges().registerExchange(boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); + if (settings.quorum) quorum.init(); cpg.join(name); // pump the CPG dispatch manually till we get initialized. @@ -149,21 +155,21 @@ void Cluster::initialize() { // Called in connection thread to insert a client connection. void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) { - Lock l(lock); - connections.insert(c); + localConnections.insert(c); } // Called in connection thread to insert an updated shadow connection. void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) { - Lock l(lock); - assert(state <= UPDATEE); // Only during update. - connections.insert(c); + // Safe to use connections here because we're pre-catchup, either + // discarding or stalled, so deliveredFrame is not processing any + // connection events. + assert(discarding); + connections.insert(ConnectionMap::value_type(c->getId(), c)); } +// Called by Connection::deliverClose() in deliverFrameQueue thread. void Cluster::erase(const ConnectionId& id) { - // Called only by Connection::deliverClose in deliver thread, no need to lock. connections.erase(id); - decoder.erase(id); } std::vector<string> Cluster::getIds() const { @@ -193,7 +199,6 @@ void Cluster::leave(Lock&) { if (state != LEFT) { state = LEFT; QPID_LOG(notice, *this << " leaving cluster " << name); - connections.clear(); try { broker.shutdown(); } catch (const std::exception& e) { QPID_LOG(critical, *this << " error during broker shutdown: " << e.what()); @@ -213,52 +218,88 @@ void Cluster::deliver( MemberId from(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); Event e(Event::decodeCopy(from, buf)); - e.setSequence(sequence++); - if (from == myId) // Record self-deliveries for flow control. + if (from == self) // Record self-deliveries for flow control. mcast.selfDeliver(e); - deliver(e); + deliverEvent(e); } -void Cluster::deliver(const Event& e) { - if (state == LEFT) return; - QPID_LATENCY_INIT(e); +void Cluster::deliverEvent(const Event& e) { deliverEventQueue.push(e); } -// Handler for deliverEventQueue +void Cluster::deliverFrame(const EventFrame& e) { + deliverFrameQueue.push(e); +} + +// Handler for deliverEventQueue. +// This thread decodes frames from events. void Cluster::deliveredEvent(const Event& e) { - QPID_LATENCY_RECORD("delivered event queue", e); - Buffer buf(const_cast<char*>(e.getData()), e.getSize()); - if (e.getType() == CONTROL) { - AMQFrame frame; - while (frame.decode(buf)) - deliverFrameQueue.push(EventFrame(e, frame)); + QPID_LOG(trace, *this << " DLVR: " << e); + if (e.isCluster()) { + EventFrame ef(e, e.getFrame()); + // Stop the deliverEventQueue on update offers. + // This preserves the connection decoder fragments for an update. + ClusterUpdateOfferBody* offer = dynamic_cast<ClusterUpdateOfferBody*>(ef.frame.getBody()); + if (offer) + deliverEventQueue.stop(); + deliverFrame(ef); } - else if (e.getType() == DATA) - decoder.decode(e, e.getData()); + else if(!discarding) { + if (e.isControl()) + deliverFrame(EventFrame(e, e.getFrame())); + else + decoder.decode(e, e.getData()); +} + else // Discard connection events if discarding is set. + QPID_LOG(trace, *this << " DROP: " << e); } -// Handler for deliverFrameQueue +// Handler for deliverFrameQueue. +// This thread executes the main logic. void Cluster::deliveredFrame(const EventFrame& e) { Mutex::ScopedLock l(lock); - const_cast<AMQFrame&>(e.frame).setClusterId(frameId++); - QPID_LOG(trace, *this << " DLVR: " << e); - QPID_LATENCY_RECORD("delivered frame queue", e.frame); - if (e.isCluster()) { // Cluster control frame + if (e.isCluster()) { + QPID_LOG(trace, *this << " DLVR: " << e); ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l); if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled()) throw Exception(QPID_MSG("Invalid cluster control")); } - else { // Connection frame. - if (state <= UPDATEE) { - QPID_LOG(trace, *this << " DROP: " << e); - return; - } - boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId); - if (connection) // Ignore frames to closed local connections. + else if (state >= CATCHUP) { + QPID_LOG(trace, *this << " DLVR: " << e); + ConnectionPtr connection = getConnection(e.connectionId, l); + if (connection) connection->deliveredFrame(e); } - QPID_LATENCY_RECORD("processed", e.frame); + else // Drop connection frames while state < CATCHUP + QPID_LOG(trace, *this << " DROP: " << e); +} + +// Called in deliverFrameQueue thread +ConnectionPtr Cluster::getConnection(const ConnectionId& id, Lock&) { + ConnectionPtr cp; + ConnectionMap::iterator i = connections.find(id); + if (i != connections.end()) + cp = i->second; + else { + if(id.getMember() == self) + cp = localConnections.getErase(id); + else { + // New remote connection, create a shadow. + std::ostringstream mgmtId; + mgmtId << id; + cp = new Connection(*this, shadowOut, mgmtId.str(), id); + } + if (cp) + connections.insert(ConnectionMap::value_type(id, cp)); + } + return cp; +} + +Cluster::ConnectionVector Cluster::getConnections(Lock&) { + ConnectionVector result(connections.size()); + std::transform(connections.begin(), connections.end(), result.begin(), + boost::bind(&ConnectionMap::value_type::second, _1)); + return result; } struct AddrList { @@ -306,42 +347,45 @@ void Cluster::configChange ( std::string addresses; for (cpg_address* p = current; p < current+nCurrent; ++p) addresses.append(MemberId(*p).str()); - deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), myId)); + deliverEvent(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self)); } void Cluster::setReady(Lock&) { state = READY; if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); mcast.release(); + broker.getQueueEvents().enable(); } void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& l) { bool memberChange = map.configChange(addresses); if (state == LEFT) return; - if (!map.isAlive(myId)) { // Final config change. + if (!map.isAlive(self)) { // Final config change. leave(l); return; } if (state == INIT) { // First configChange if (map.aliveCount() == 1) { - setClusterId(true); + setClusterId(true, l); + discarding = false; setReady(l); - map = ClusterMap(myId, myUrl, true); + map = ClusterMap(self, myUrl, true); memberUpdate(l); QPID_LOG(notice, *this << " first in cluster"); } else { // Joining established group. state = JOINER; QPID_LOG(info, *this << " joining cluster: " << map); - mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), myId); + mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); elders = map.getAlive(); - elders.erase(myId); + elders.erase(self); broker.getLinks().setPassive(true); + broker.getQueueEvents().disable(); } - } - else if (state >= READY && memberChange) { + } + else if (state >= CATCHUP && memberChange) { memberUpdate(l); elders = ClusterMap::intersection(elders, map.getAlive()); if (elders.empty()) { @@ -351,13 +395,11 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& } } -bool Cluster::isLeader() const { return elders.empty(); } - -void Cluster::tryMakeOffer(const MemberId& id, Lock& ) { +void Cluster::makeOffer(const MemberId& id, Lock& ) { if (state == READY && map.isJoiner(id)) { state = OFFER; QPID_LOG(info, *this << " send update-offer to " << id); - mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), myId); + mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), self); } } @@ -367,88 +409,89 @@ void Cluster::tryMakeOffer(const MemberId& id, Lock& ) { // callbacks will be invoked. // void Cluster::brokerShutdown() { - if (state != LEFT) { - try { cpg.shutdown(); } - catch (const std::exception& e) { - QPID_LOG(error, *this << " shutting down CPG: " << e.what()); - } + try { cpg.shutdown(); } + catch (const std::exception& e) { + QPID_LOG(error, *this << " shutting down CPG: " << e.what()); } delete this; } void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) { map.updateRequest(id, url); - tryMakeOffer(id, l); + makeOffer(id, l); } void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { if (map.ready(id, Url(url))) memberUpdate(l); - if (state == CATCHUP && id == myId) { + if (state == CATCHUP && id == self) { setReady(l); QPID_LOG(notice, *this << " caught up, active cluster member"); } } void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) { + // NOTE: deliverEventQueue has been stopped at the update offer by + // deliveredEvent in case an update is required. if (state == LEFT) return; MemberId updatee(updateeInt); boost::optional<Url> url = map.updateOffer(updater, updatee); - if (updater == myId) { + if (updater == self) { assert(state == OFFER); - if (url) { // My offer was first. + if (url) // My offer was first. updateStart(updatee, *url, l); - } else { // Another offer was first. + deliverEventQueue.start(); // Don't need to update setReady(l); QPID_LOG(info, *this << " cancelled update offer to " << updatee); - tryMakeOffer(map.firstJoiner(), l); // Maybe make another offer. + makeOffer(map.firstJoiner(), l); // Maybe make another offer. } } - else if (updatee == myId && url) { + else if (updatee == self && url) { assert(state == JOINER); - setClusterId(uuid); + setClusterId(uuid, l); state = UPDATEE; QPID_LOG(info, *this << " receiving update from " << updater); - deliverFrameQueue.stop(); checkUpdateIn(l); } + else + deliverEventQueue.start(); // Don't need to update } -void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) { +void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) { + // NOTE: deliverEventQueue is already stopped at the stall point by deliveredEvent. if (state == LEFT) return; assert(state == OFFER); state = UPDATER; - QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url); - deliverFrameQueue.stop(); - if (updateThread.id()) updateThread.join(); // Join the previous updatethread. + QPID_LOG(info, *this << " sending update to " << updatee << " at " << url); + if (updateThread.id()) + updateThread.join(); // Join the previous updateThread to avoid leaks. client::ConnectionSettings cs; cs.username = settings.username; cs.password = settings.password; cs.mechanism = settings.mechanism; updateThread = Thread( - new UpdateClient(myId, updatee, url, broker, map, frameId, connections.values(), + new UpdateClient(self, updatee, url, broker, map, *expiryPolicy, getConnections(l), decoder, boost::bind(&Cluster::updateOutDone, this), boost::bind(&Cluster::updateOutError, this, _1), cs)); } // Called in update thread. -void Cluster::updateInDone(const ClusterMap& m, uint64_t fid) { +void Cluster::updateInDone(const ClusterMap& m) { Lock l(lock); updatedMap = m; - frameId = fid; checkUpdateIn(l); } -void Cluster::checkUpdateIn(Lock& ) { - if (state == LEFT) return; +void Cluster::checkUpdateIn(Lock&) { if (state == UPDATEE && updatedMap) { map = *updatedMap; - mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId); + mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; + discarding = false; // ok to set, we're stalled for update. QPID_LOG(info, *this << " received update, starting catch-up"); - deliverFrameQueue.start(); + deliverEventQueue.start(); } } @@ -462,8 +505,8 @@ void Cluster::updateOutDone(Lock& l) { assert(state == UPDATER); state = READY; mcast.release(); - deliverFrameQueue.start(); - tryMakeOffer(map.firstJoiner(), l); // Try another offer + deliverEventQueue.start(); // Start processing events again. + makeOffer(map.firstJoiner(), l); // Try another offer } void Cluster::updateOutError(const std::exception& e) { @@ -487,7 +530,7 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, s { _qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args; stringstream stream; - stream << myId; + stream << self; if (iargs.i_brokerId == stream.str()) stopClusterNode(l); } @@ -508,7 +551,7 @@ void Cluster::stopClusterNode(Lock& l) { void Cluster::stopFullCluster(Lock& ) { QPID_LOG(notice, *this << " shutting down cluster " << name); - mcast.mcastControl(ClusterShutdownBody(), myId); + mcast.mcastControl(ClusterShutdownBody(), self); } void Cluster::memberUpdate(Lock& l) { @@ -518,13 +561,13 @@ void Cluster::memberUpdate(Lock& l) { size_t size = urls.size(); failoverExchange->setUrls(urls); - if (size == 1 && lastSize > 1 && state >= READY) { - QPID_LOG(info, *this << " last broker standing, update queue policies"); + if (size == 1 && lastSize > 1 && state >= CATCHUP) { + QPID_LOG(notice, *this << " last broker standing, update queue policies"); lastBroker = true; broker.getQueues().updateQueueClusterState(true); } else if (size > 1 && lastBroker) { - QPID_LOG(info, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size); + QPID_LOG(notice, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size); lastBroker = false; broker.getQueues().updateQueueClusterState(false); } @@ -546,17 +589,23 @@ void Cluster::memberUpdate(Lock& l) { mgmtObject->set_memberIDs(idstr); } - // Close connections belonging to members that have now been excluded - connections.update(myId, map); + // Erase connections belonging to members that have left the cluster. + ConnectionMap::iterator i = connections.begin(); + while (i != connections.end()) { + ConnectionMap::iterator j = i++; + MemberId m = j->second->getId().getMember(); + if (m != self && !map.isMember(m)) + connections.erase(j); + } } std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" }; - return o << cluster.myId << "(" << STATE[cluster.state] << ")"; + return o << cluster.self << "(" << STATE[cluster.state] << ")"; } MemberId Cluster::getId() const { - return myId; // Immutable, no need to lock. + return self; // Immutable, no need to lock. } broker::Broker& Cluster::getBroker() const { @@ -571,11 +620,11 @@ void Cluster::checkQuorum() { } } -void Cluster::setClusterId(const Uuid& uuid) { +void Cluster::setClusterId(const Uuid& uuid, Lock&) { clusterId = uuid; if (mgmtObject) { stringstream stream; - stream << myId; + stream << self; mgmtObject->set_clusterID(clusterId.str()); mgmtObject->set_memberID(stream.str()); } diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index 8c5eb06ff7..b716e2d781 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -19,34 +19,34 @@ * */ -#include "ClusterSettings.h" #include "ClusterMap.h" -#include "ConnectionMap.h" +#include "ClusterSettings.h" #include "Cpg.h" +#include "Decoder.h" #include "Event.h" +#include "EventFrame.h" +#include "ExpiryPolicy.h" #include "FailoverExchange.h" +#include "LockedConnectionMap.h" #include "Multicaster.h" -#include "EventFrame.h" #include "NoOpConnectionOutputHandler.h" +#include "PollableQueue.h" #include "PollerDispatch.h" #include "Quorum.h" -#include "Decoder.h" -#include "PollableQueue.h" -#include "ExpiryPolicy.h" +#include "qmf/org/apache/qpid/cluster/Cluster.h" +#include "qpid/Url.h" #include "qpid/broker/Broker.h" -#include "qpid/sys/Monitor.h" #include "qpid/management/Manageable.h" -#include "qpid/Url.h" -#include "qmf/org/apache/qpid/cluster/Cluster.h" +#include "qpid/sys/Monitor.h" -#include <boost/intrusive_ptr.hpp> #include <boost/bind.hpp> +#include <boost/intrusive_ptr.hpp> #include <boost/optional.hpp> #include <algorithm> -#include <vector> #include <map> +#include <vector> namespace qpid { @@ -58,6 +58,7 @@ class Uuid; namespace cluster { class Connection; +class EventFrame; /** * Connection to the cluster @@ -65,82 +66,91 @@ class Connection; class Cluster : private Cpg::Handler, public management::Manageable { public: typedef boost::intrusive_ptr<Connection> ConnectionPtr; - typedef std::vector<ConnectionPtr> Connections; + typedef std::vector<ConnectionPtr> ConnectionVector; - /** Construct the cluster in plugin earlyInitialize */ + // Public functions are thread safe unless otherwise mentioned in a comment. + + // Construct the cluster in plugin earlyInitialize. Cluster(const ClusterSettings&, broker::Broker&); virtual ~Cluster(); - /** Join the cluster in plugin initialize. Requires transport - * plugins to be available.. */ + // Called by plugin initialize: cluster start-up requires transport plugins . + // Thread safety: only called by plugin initialize. void initialize(); - // Connection map - called in connection threads. + // Connection map. void addLocalConnection(const ConnectionPtr&); void addShadowConnection(const ConnectionPtr&); void erase(const ConnectionId&); - // URLs of current cluster members - called in connection threads. + // URLs of current cluster members. std::vector<std::string> getIds() const; std::vector<Url> getUrls() const; boost::shared_ptr<FailoverExchange> getFailoverExchange() const { return failoverExchange; } - // Leave the cluster - called in any thread. + // Leave the cluster - called when fatal errors occur. void leave(); // Update completed - called in update thread - void updateInDone(const ClusterMap&, uint64_t frameId); + void updateInDone(const ClusterMap&); MemberId getId() const; broker::Broker& getBroker() const; Multicaster& getMulticast() { return mcast; } - boost::function<bool ()> isQuorate; - void checkQuorum(); // called in connection threads. + void checkQuorum(); size_t getReadMax() { return readMax; } size_t getWriteEstimate() { return writeEstimate; } - bool isLeader() const; // Called in deliver thread. + void deliverFrame(const EventFrame&); + + // Called only during update by Connection::shadowReady + Decoder& getDecoder() { return decoder; } + + ExpiryPolicy& getExpiryPolicy() { return *expiryPolicy; } private: typedef sys::Monitor::ScopedLock Lock; typedef PollableQueue<Event> PollableEventQueue; typedef PollableQueue<EventFrame> PollableFrameQueue; + typedef std::map<ConnectionId, ConnectionPtr> ConnectionMap; - // NB: The final Lock& parameter on functions below is used to mark functions - // that should only be called by a function that already holds the lock. - // The parameter makes it hard to forget since you have to have an instance of - // a Lock to call the unlocked functions. - + // NB: A dummy Lock& parameter marks functions that must only be + // called with Cluster::lock locked. + void leave(Lock&); std::vector<std::string> getIds(Lock&) const; std::vector<Url> getUrls(Lock&) const; - // Make an offer if we can - called in deliver thread. - void tryMakeOffer(const MemberId&, Lock&); - - // Called in main thread in ~Broker. + // == Called in main thread from Broker destructor. void brokerShutdown(); + // == Called in deliverEventQueue thread + void deliveredEvent(const Event&); + + // == Called in deliverFrameQueue thread + void deliveredFrame(const EventFrame&); + // Cluster controls implement XML methods from cluster.xml. - // Called in deliver thread. - // void updateRequest(const MemberId&, const std::string&, Lock&); void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&); void ready(const MemberId&, const std::string&, Lock&); void configChange(const MemberId&, const std::string& addresses, Lock& l); void messageExpired(const MemberId&, uint64_t, Lock& l); void shutdown(const MemberId&, Lock&); - void deliveredEvent(const Event&); - void deliveredFrame(const EventFrame&); - // Helper, called in deliver thread. + // Helper functions + ConnectionPtr getConnection(const ConnectionId&, Lock&); + ConnectionVector getConnections(Lock&); void updateStart(const MemberId& updatee, const Url& url, Lock&); - + void makeOffer(const MemberId&, Lock&); void setReady(Lock&); + void memberUpdate(Lock&); + void setClusterId(const framing::Uuid&, Lock&); + // == Called in CPG dispatch thread void deliver( // CPG deliver callback. cpg_handle_t /*handle*/, struct cpg_name *group, @@ -149,7 +159,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void* /*msg*/, int /*msg_len*/); - void deliver(const Event&); + void deliverEvent(const Event&); void configChange( // CPG config change callback. cpg_handle_t /*handle*/, @@ -159,23 +169,21 @@ class Cluster : private Cpg::Handler, public management::Manageable { struct cpg_address */*joined*/, int /*nJoined*/ ); + // == Called in management threads. virtual qpid::management::ManagementObject* GetManagementObject() const; virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); void stopClusterNode(Lock&); void stopFullCluster(Lock&); - void memberUpdate(Lock&); - // Called in connection IO threads . + // == Called in connection IO threads . void checkUpdateIn(Lock&); - // Called in UpdateClient thread. + // == Called in UpdateClient thread. void updateOutDone(); void updateOutError(const std::exception&); void updateOutDone(Lock&); - void setClusterId(const framing::Uuid&); - // Immutable members set on construction, never changed. ClusterSettings settings; broker::Broker& broker; @@ -184,34 +192,38 @@ class Cluster : private Cpg::Handler, public management::Manageable { Cpg cpg; const std::string name; Url myUrl; - const MemberId myId; + const MemberId self; const size_t readMax; const size_t writeEstimate; framing::Uuid clusterId; NoOpConnectionOutputHandler shadowOut; qpid::management::ManagementAgent* mAgent; + boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; // Thread safe members Multicaster mcast; PollerDispatch dispatcher; PollableEventQueue deliverEventQueue; PollableFrameQueue deliverFrameQueue; - ConnectionMap connections; boost::shared_ptr<FailoverExchange> failoverExchange; Quorum quorum; - - // Used only in delivery thread - Decoder decoder; - ClusterMap::Set elders; - boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; - uint64_t frameId; + LockedConnectionMap localConnections; // Used only during initialization bool initialized; - // Remaining members are protected by lock + // Used only in deliverEventQueue thread or when stalled for update. + Decoder decoder; + bool discarding; + + // Remaining members are protected by lock. + // FIXME aconway 2009-03-06: Most of these members are also only used in + // deliverFrameQueue thread or during stall. Review and separate members + // that require a lock, drop lock when not needed. + // mutable sys::Monitor lock; + // Local cluster state, cluster map enum { INIT, ///< Initial state, no CPG messages received. @@ -223,15 +235,16 @@ class Cluster : private Cpg::Handler, public management::Manageable { UPDATER, ///< Offer accepted, sending a state update. LEFT ///< Final state, left the cluster. } state; + + ConnectionMap connections; ClusterMap map; + ClusterMap::Set elders; size_t lastSize; bool lastBroker; - uint64_t sequence; - - // Update related sys::Thread updateThread; boost::optional<ClusterMap> updatedMap; + friend std::ostream& operator<<(std::ostream&, const Cluster&); friend class ClusterDispatcher; }; diff --git a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp index 132043f91a..adb6621caf 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -138,7 +138,6 @@ struct ClusterPlugin : public Plugin { broker->setConnectionFactory( boost::shared_ptr<sys::ConnectionCodec::Factory>( new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster))); - broker->getExchanges().registerExchange(cluster->getFailoverExchange()); ManagementBroker* mgmt = dynamic_cast<ManagementBroker*>(ManagementAgent::Singleton::getInstance()); if (mgmt) { std::auto_ptr<IdAllocator> allocator(new UpdateClientIdAllocator()); diff --git a/qpid/cpp/src/qpid/cluster/ClusterSettings.h b/qpid/cpp/src/qpid/cluster/ClusterSettings.h index a8f33be75e..88e8829dfe 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterSettings.h +++ b/qpid/cpp/src/qpid/cluster/ClusterSettings.h @@ -35,7 +35,7 @@ struct ClusterSettings { size_t readMax, writeEstimate; std::string username, password, mechanism; - ClusterSettings() : quorum(false), readMax(10), writeEstimate(64), username("guest"), password("guest") {} + ClusterSettings() : quorum(false), readMax(10), writeEstimate(64) {} Url getUrl(uint16_t port) const { if (url.empty()) return Url::getIpAddressesUrl(port); diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index 1a3f7c4ef7..aa7d082720 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -40,6 +40,7 @@ #include "qpid/framing/ConnectionCloseOkBody.h" #include "qpid/log/Statement.h" #include "qpid/sys/LatencyMetric.h" +#include "qpid/sys/AtomicValue.h" #include <boost/current_function.hpp> @@ -58,27 +59,36 @@ using namespace framing; NoOpConnectionOutputHandler Connection::discardHandler; -// Shadow connections -Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, - const std::string& wrappedId, ConnectionId myId) - : cluster(c), self(myId), catchUp(false), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId), expectProtocolHeader(false) +namespace { +sys::AtomicValue<uint64_t> idCounter; +} + +// Shadow connection +Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id) + : cluster(c), self(id), catchUp(false), output(*this, out), + connection(&output, cluster.getBroker(), logId), expectProtocolHeader(false), + mcastFrameHandler(cluster.getMulticast(), self) { init(); } -// Local connections +// Local connection Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, - const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink) - : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId : 0), - expectProtocolHeader(isLink) + const std::string& logId, MemberId member, bool isCatchUp, bool isLink) + : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out), + connection(&output, cluster.getBroker(), logId, isLink, catchUp ? ++catchUpId : 0), + expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self) { init(); } void Connection::init() { QPID_LOG(debug, cluster << " new connection: " << *this); - if (isLocalClient()) { + if (isLocalClient()) { + connection.setClusterOrderOutput(mcastFrameHandler); // Actively send cluster-order frames from local node cluster.addLocalConnection(this); giveReadCredit(cluster.getReadMax()); } + else { // Shadow or catch-up connection + connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames + connection.setClientThrottling(false); // Disable client throttling, done by active node. + } } void Connection::giveReadCredit(int credit) { @@ -140,10 +150,16 @@ bool Connection::checkUnsupported(const AMQBody& body) { void Connection::deliveredFrame(const EventFrame& f) { assert(!catchUp); currentChannel = f.frame.getChannel(); - if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol. + if (f.frame.getBody() // frame can be emtpy with just readCredit + && !framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol. && !checkUnsupported(*f.frame.getBody())) // Unsupported operation. { - connection.received(const_cast<AMQFrame&>(f.frame)); // Pass to broker connection. + if (f.type == DATA) // incoming data frames to broker::Connection + connection.received(const_cast<AMQFrame&>(f.frame)); + else { // frame control, send frame via SessionState + broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession(); + if (ss) ss->out(const_cast<AMQFrame&>(f.frame)); + } } giveReadCredit(f.readCredit); } @@ -186,12 +202,12 @@ void Connection::left() { connection.closed(); } -// Decode data from local clients. +// ConnectoinCodec::decode receives read buffers from directly-connected clients. size_t Connection::decode(const char* buffer, size_t size) { if (catchUp) { // Handle catch-up locally. Buffer buf(const_cast<char*>(buffer), size); while (localDecoder.decode(buf)) - received(localDecoder.frame); + received(localDecoder.getFrame()); } else { // Multicast local connections. assert(isLocal()); @@ -242,6 +258,7 @@ void Connection::sessionState( const SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete) { + sessionState().setState( replayStart, sendCommandPoint, @@ -253,21 +270,23 @@ void Connection::sessionState( QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId()); } -void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username) { - ConnectionId shadow = ConnectionId(memberId, connectionId); - QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadow); - self = shadow; +void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment) { + ConnectionId shadowId = ConnectionId(memberId, connectionId); + QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId); + self = shadowId; connection.setUserId(username); + // OK to use decoder here because we are stalled for update. + cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size()); } -void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) { +void Connection::membership(const FieldTable& joiners, const FieldTable& members) { QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); - cluster.updateInDone(ClusterMap(joiners, members), frameId); + cluster.updateInDone(ClusterMap(joiners, members)); self.second = 0; // Mark this as completed update connection. } bool Connection::isLocal() const { - return self.first == cluster.getId() && self.second == this; + return self.first == cluster.getId() && self.second; } bool Connection::isShadow() const { @@ -333,6 +352,10 @@ void Connection::queuePosition(const string& qname, const SequenceNumber& positi q->setPosition(position); } +void Connection::expiryId(uint64_t id) { + cluster.getExpiryPolicy().setId(id); +} + std::ostream& operator<<(std::ostream& o, const Connection& c) { const char* type="unknown"; if (c.isLocal()) type = "local"; diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index 98b47e1bc0..6434f763a8 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -27,14 +27,15 @@ #include "OutputInterceptor.h" #include "NoOpConnectionOutputHandler.h" #include "EventFrame.h" +#include "McastFrameHandler.h" #include "qpid/broker/Connection.h" #include "qpid/amqp_0_10/Connection.h" #include "qpid/sys/AtomicValue.h" #include "qpid/sys/ConnectionInputHandler.h" #include "qpid/sys/ConnectionOutputHandler.h" -#include "qpid/framing/FrameDecoder.h" #include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/FrameDecoder.h" #include <iosfwd> @@ -63,10 +64,10 @@ class Connection : public: typedef sys::PollableQueue<EventFrame> PollableFrameQueue; - /** Local connection, use this in ConnectionId */ - Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId, bool catchUp, bool isLink); - /** Shadow connection */ - Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, ConnectionId); + /** Local connection. */ + Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, MemberId, bool catchUp, bool isLink); + /** Shadow connection. */ + Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id); ~Connection(); ConnectionId getId() const { return self; } @@ -99,7 +100,7 @@ class Connection : /** Called if the connectors member has left the cluster */ void left(); - // ConnectionCodec methods + // ConnectionCodec methods - called by IO layer with a read buffer. size_t decode(const char* buffer, size_t size); // Called for data delivered from the cluster. @@ -117,9 +118,9 @@ class Connection : const framing::SequenceNumber& received, const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete); - void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username); + void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment); - void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameId); + void membership(const framing::FieldTable&, const framing::FieldTable&); void deliveryRecord(const std::string& queue, const framing::SequenceNumber& position, @@ -134,6 +135,7 @@ class Connection : uint32_t credit); void queuePosition(const std::string&, const framing::SequenceNumber&); + void expiryId(uint64_t); void txStart(); void txAccept(const framing::SequenceSet&); @@ -148,8 +150,12 @@ class Connection : void exchange(const std::string& encoded); void giveReadCredit(int credit); - + private: + struct NullFrameHandler : public framing::FrameHandler { + void handle(framing::AMQFrame&) {} + }; + void init(); bool checkUnsupported(const framing::AMQBody& body); void deliverClose(); @@ -174,6 +180,8 @@ class Connection : framing::ChannelId currentChannel; boost::shared_ptr<broker::TxBuffer> txBuffer; bool expectProtocolHeader; + McastFrameHandler mcastFrameHandler; + NullFrameHandler nullFrameHandler; static qpid::sys::AtomicValue<uint64_t> catchUpId; diff --git a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp index 442ac1438f..007337792b 100644 --- a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp +++ b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp @@ -44,18 +44,15 @@ ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out, con return 0; } -// Used for outgoing Link connections, we don't care. +// Used for outgoing Link connections sys::ConnectionCodec* -ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) { - return new ConnectionCodec(out, id, cluster, false, true); - //return next->create(out, id); +ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& logId) { + return new ConnectionCodec(out, logId, cluster, false, true); } -ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster, bool catchUp, bool isLink) - : codec(out, id, isLink), - interceptor(new Connection(cluster, codec, id, cluster.getId(), catchUp, isLink)), - id(interceptor->getId()), - localId(id) +ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& logId, Cluster& cluster, bool catchUp, bool isLink) + : codec(out, logId, isLink), + interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink)) { std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor)); codec.setInputHandler(ih); diff --git a/qpid/cpp/src/qpid/cluster/ConnectionCodec.h b/qpid/cpp/src/qpid/cluster/ConnectionCodec.h index 69c2b0c3c8..ea01b7abb9 100644 --- a/qpid/cpp/src/qpid/cluster/ConnectionCodec.h +++ b/qpid/cpp/src/qpid/cluster/ConnectionCodec.h @@ -56,7 +56,7 @@ class ConnectionCodec : public sys::ConnectionCodec { sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id); }; - ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& c, bool catchUp, bool isLink); + ConnectionCodec(sys::OutputControl& out, const std::string& logId, Cluster& c, bool catchUp, bool isLink); ~ConnectionCodec(); // ConnectionCodec functions. @@ -71,8 +71,6 @@ class ConnectionCodec : public sys::ConnectionCodec { private: amqp_0_10::Connection codec; boost::intrusive_ptr<cluster::Connection> interceptor; - cluster::ConnectionId id; - std::string localId; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp b/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp deleted file mode 100644 index 3c18cf751e..0000000000 --- a/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp +++ /dev/null @@ -1,57 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ConnectionDecoder.h" -#include "EventFrame.h" -#include "ConnectionMap.h" - -namespace qpid { -namespace cluster { - -using namespace framing; - -ConnectionDecoder::ConnectionDecoder(const Handler& h) : handler(h) {} - -void ConnectionDecoder::decode(const EventHeader& eh, const void* data, ConnectionMap& map) { - assert(eh.getType() == DATA); // Only handle connection data events. - const char* cp = static_cast<const char*>(data); - Buffer buf(const_cast<char*>(cp), eh.getSize()); - if (decoder.decode(buf)) { // Decoded a frame - AMQFrame frame(decoder.frame); - while (decoder.decode(buf)) { - handler(EventFrame(eh, frame)); - frame = decoder.frame; - } - // Set read-credit on the last frame ending in this event. - // Credit will be given when this frame is processed. - handler(EventFrame(eh, frame, 1)); - } - else { - // We must give 1 unit read credit per event. - // This event does not complete any frames so - // we give read credit directly. - ConnectionPtr connection = map.getLocal(eh.getConnectionId()); - if (connection) - connection->giveReadCredit(1); - } -} - -}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp b/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp deleted file mode 100644 index b412bb13cc..0000000000 --- a/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp +++ /dev/null @@ -1,96 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "ConnectionMap.h" -#include "Cluster.h" -#include "qpid/framing/reply_exceptions.h" -#include "qpid/log/Statement.h" - -namespace qpid { -namespace cluster { - -using framing::InternalErrorException; - -void ConnectionMap::insert(ConnectionPtr p) { - std::pair<Map::iterator, bool> ib = map.insert(Map::value_type(p->getId(), p)); - if (!ib.second) { - assert(0); - throw InternalErrorException(QPID_MSG("Duplicate connection replica: " << p->getId())); - } -} - -void ConnectionMap::erase(const ConnectionId& id) { - Map::iterator i = map.find(id); - if (i == map.end()) { - assert(0); - QPID_LOG(warning, "Erase non-existent connection replica: " << id); - } - map.erase(i); -} - -ConnectionMap::ConnectionPtr ConnectionMap::get(const ConnectionId& id) { - Map::const_iterator i = map.find(id); - if (i == map.end()) { - // Deleted local connection. - if(id.getMember() == cluster.getId()) - return 0; - // New remote connection, create a shadow. - std::ostringstream mgmtId; - mgmtId << id; - ConnectionPtr cp = new Connection(cluster, shadowOut, mgmtId.str(), id); - std::pair<Map::iterator, bool> ib = map.insert(Map::value_type(id, cp)); - if (!ib.second) - throw InternalErrorException(QPID_MSG("Duplicate entry in cluster connection map: " << id)); - i = ib.first; - } - return i->second; -} - -ConnectionMap::ConnectionPtr ConnectionMap::getLocal(const ConnectionId& id) { - if (id.getMember() != cluster.getId()) return 0; - Map::const_iterator i = map.find(id); - assert(i != map.end()); // FIXME aconway 2009-02-11: remove or exception. - return i == map.end() ? 0 : i->second; -} - -ConnectionMap::Vector ConnectionMap::values() const { - Vector result(map.size()); - std::transform(map.begin(), map.end(), result.begin(), - boost::bind(&Map::value_type::second, _1)); - return result; -} - -void ConnectionMap::update(MemberId myId, const ClusterMap& cluster) { - for (Map::iterator i = map.begin(); i != map.end(); ) { - MemberId member = i->first.getMember(); - if (member != myId && !cluster.isMember(member)) { - i->second->left(); - map.erase(i++); - } else { - i++; - } - } -} - -void ConnectionMap::clear() { - map.clear(); -} - -}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/ConnectionMap.h b/qpid/cpp/src/qpid/cluster/ConnectionMap.h deleted file mode 100644 index f8aa663339..0000000000 --- a/qpid/cpp/src/qpid/cluster/ConnectionMap.h +++ /dev/null @@ -1,88 +0,0 @@ -#ifndef QPID_CLUSTER_CONNECTIONMAP_H -#define QPID_CLUSTER_CONNECTIONMAP_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "types.h" -#include "Connection.h" -#include "ClusterMap.h" -#include "NoOpConnectionOutputHandler.h" -#include "qpid/sys/Mutex.h" -#include <boost/intrusive_ptr.hpp> -#include <map> - -namespace qpid { -namespace cluster { - -class Cluster; - -/** - * Thread safe map of connections. The map is used in: - * - deliver thread to look connections and create new shadow connections. - * - local catch-up connection threads to add a caught-up shadow connections. - * - local client connection threads when local connections are created. - */ -class ConnectionMap { - public: - typedef boost::intrusive_ptr<cluster::Connection> ConnectionPtr; - typedef std::vector<ConnectionPtr> Vector; - - ConnectionMap(Cluster& c) : cluster(c) {} - - /** Insert a local connection or a caught up shadow connection. - * Called in local connection thread. - */ - void insert(ConnectionPtr p); - - /** Erase a closed connection. Called in deliver thread. */ - void erase(const ConnectionId& id); - - /** Get an existing connection. Returns 0 if id is a closed local - * connections, frames for closed connections should be ignored. - */ - ConnectionPtr get(const ConnectionId& id); - - /** If ID is a local connection and in the map return it, else return 0 */ - ConnectionPtr getLocal(const ConnectionId& id); - - /** Get connections for sending an update. */ - Vector values() const; - - /** Remove connections who's members are no longer in the cluster. Deliver thread. */ - void update(MemberId myId, const ClusterMap& cluster); - - - void clear(); - - size_t size() const; - - private: - typedef std::map<ConnectionId, ConnectionPtr> Map; - - Cluster& cluster; - NoOpConnectionOutputHandler shadowOut; - Map map; -}; - - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_CONNECTIONMAP_H*/ diff --git a/qpid/cpp/src/qpid/cluster/Cpg.cpp b/qpid/cpp/src/qpid/cluster/Cpg.cpp index c5a1b72003..92fceba904 100644 --- a/qpid/cpp/src/qpid/cluster/Cpg.cpp +++ b/qpid/cpp/src/qpid/cluster/Cpg.cpp @@ -107,17 +107,16 @@ void Cpg::leave() { check(cpg_leave(handle, &group), cantLeaveMsg(group)); } -bool Cpg::isFlowControlEnabled() { - cpg_flow_control_state_t flowState; - check(cpg_flow_control_state_get(handle, &flowState), "Cannot get CPG flow control status."); - return flowState == CPG_FLOW_CONTROL_ENABLED; -} + + bool Cpg::mcast(const iovec* iov, int iovLen) { - if (isFlowControlEnabled()) { - QPID_LOG(debug, "CPG flow control enabled") + // Check for flow control + cpg_flow_control_state_t flowState; + check(cpg_flow_control_state_get(handle, &flowState), "Cannot get CPG flow control status."); + if (flowState == CPG_FLOW_CONTROL_ENABLED) return false; - } + cpg_error_t result; do { result = cpg_mcast_joined(handle, CPG_TYPE_AGREED, const_cast<iovec*>(iov), iovLen); diff --git a/qpid/cpp/src/qpid/cluster/Cpg.h b/qpid/cpp/src/qpid/cluster/Cpg.h index 5ac5a5bdbc..ac27a09ae6 100644 --- a/qpid/cpp/src/qpid/cluster/Cpg.h +++ b/qpid/cpp/src/qpid/cluster/Cpg.h @@ -114,8 +114,6 @@ class Cpg : public sys::IOHandle { int getFd(); - bool isFlowControlEnabled(); - private: static std::string errorStr(cpg_error_t err, const std::string& msg); static std::string cantJoinMsg(const Name&); diff --git a/qpid/cpp/src/qpid/cluster/Decoder.cpp b/qpid/cpp/src/qpid/cluster/Decoder.cpp index 1ba36bb521..4de586d89f 100644 --- a/qpid/cpp/src/qpid/cluster/Decoder.cpp +++ b/qpid/cpp/src/qpid/cluster/Decoder.cpp @@ -18,33 +18,43 @@ * under the License. * */ - #include "Decoder.h" -#include "Event.h" +#include "EventFrame.h" +#include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/Buffer.h" -#include "qpid/ptr_map.h" +#include "qpid/framing/AMQFrame.h" + namespace qpid { namespace cluster { -using namespace framing; - -Decoder::Decoder(const Handler& h, ConnectionMap& cm) : handler(h), connections(cm) {} - -void Decoder::decode(const EventHeader& eh, const void* data) { - ConnectionId id = eh.getConnectionId(); - Map::iterator i = map.find(id); - if (i == map.end()) { - std::pair<Map::iterator, bool> ib = map.insert(id, new ConnectionDecoder(handler)); - i = ib.first; +void Decoder::decode(const EventHeader& eh, const char* data) { + assert(eh.getType() == DATA); // Only handle connection data events. + const char* cp = static_cast<const char*>(data); + framing::Buffer buf(const_cast<char*>(cp), eh.getSize()); + framing::FrameDecoder& decoder = map[eh.getConnectionId()]; + if (decoder.decode(buf)) { // Decoded a frame + framing::AMQFrame frame(decoder.getFrame()); + while (decoder.decode(buf)) { + process(EventFrame(eh, frame)); + frame = decoder.getFrame(); + } + // Set read-credit on the last frame ending in this event. + // Credit will be given when this frame is processed. + process(EventFrame(eh, frame, 1)); } - ptr_map_ptr(i)->decode(eh, data, connections); + else { + // We must give 1 unit read credit per event. + // This event does not complete any frames so + // send an empty frame with the read credit. + process(EventFrame(EventHeader(), framing::AMQFrame(), 1)); + } } -void Decoder::erase(const ConnectionId& c) { - Map::iterator i = map.find(c); - if (i != map.end()) - map.erase(i); +void Decoder::process(const EventFrame& ef) { + if (ef.frame.getMethod() && ef.frame.getMethod()->isA<framing::ClusterConnectionDeliverCloseBody>()) + map.erase(ef.connectionId); + callback(ef); } }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Decoder.h b/qpid/cpp/src/qpid/cluster/Decoder.h index 50f6afa491..acde4258a2 100644 --- a/qpid/cpp/src/qpid/cluster/Decoder.h +++ b/qpid/cpp/src/qpid/cluster/Decoder.h @@ -22,44 +22,36 @@ * */ -#include "ConnectionDecoder.h" #include "types.h" -#include <boost/ptr_container/ptr_map.hpp> +#include "qpid/framing/FrameDecoder.h" +#include <boost/function.hpp> +#include <map> namespace qpid { namespace cluster { +class EventFrame; class EventHeader; -class ConnectionMap; /** - * Holds a map of ConnectionDecoders. Decodes Events into EventFrames - * and forwards EventFrames to a handler. - * - * THREAD UNSAFE: Called sequentially with un-decoded cluster events from CPG. + * A map of decoders for connections. */ class Decoder { public: - typedef boost::function<void(const EventFrame&)> Handler; - - Decoder(const Handler& h, ConnectionMap&); + typedef boost::function<void(const EventFrame&)> FrameHandler; - /** Takes EventHeader + data rather than Event so that the caller can - * pass a pointer to connection data or a CPG buffer directly without copy. - */ - void decode(const EventHeader& eh, const void* data); - - /** Erase the decoder for a connection. */ + Decoder(FrameHandler fh) : callback(fh) {} + void decode(const EventHeader& eh, const char* data); void erase(const ConnectionId&); + framing::FrameDecoder& get(const ConnectionId& c) { return map[c]; } private: - typedef boost::ptr_map<ConnectionId, ConnectionDecoder> Map; - Handler handler; + typedef std::map<ConnectionId, framing::FrameDecoder> Map; Map map; - ConnectionMap& connections; + void process(const EventFrame&); + FrameHandler callback; }; - }} // namespace qpid::cluster #endif /*!QPID_CLUSTER_DECODER_H*/ diff --git a/qpid/cpp/src/qpid/cluster/Event.cpp b/qpid/cpp/src/qpid/cluster/Event.cpp index e30b961b3e..1cb010c266 100644 --- a/qpid/cpp/src/qpid/cluster/Event.cpp +++ b/qpid/cpp/src/qpid/cluster/Event.cpp @@ -23,6 +23,7 @@ #include "Cpg.h" #include "qpid/framing/Buffer.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/assert.h" #include <ostream> #include <iterator> #include <algorithm> @@ -31,6 +32,7 @@ namespace qpid { namespace cluster { using framing::Buffer; +using framing::AMQFrame; const size_t EventHeader::HEADER_SIZE = sizeof(uint8_t) + // type @@ -42,7 +44,7 @@ const size_t EventHeader::HEADER_SIZE = ; EventHeader::EventHeader(EventType t, const ConnectionId& c, size_t s) - : type(t), connectionId(c), size(s), sequence(0) {} + : type(t), connectionId(c), size(s) {} Event::Event() {} @@ -57,7 +59,7 @@ void EventHeader::decode(const MemberId& m, framing::Buffer& buf) { type = (EventType)buf.getOctet(); if(type != DATA && type != CONTROL) throw Exception("Invalid multicast event type"); - connectionId = ConnectionId(m, reinterpret_cast<Connection*>(buf.getLongLong())); + connectionId = ConnectionId(m, buf.getLongLong()); size = buf.getLong(); #ifdef QPID_LATENCY_METRIC latency_metric_timestamp = buf.getLongLong(); @@ -74,14 +76,17 @@ Event Event::decodeCopy(const MemberId& m, framing::Buffer& buf) { return e; } -Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) { - framing::AMQFrame f(body); +Event Event::control(const framing::AMQFrame& f, const ConnectionId& cid) { Event e(CONTROL, cid, f.encodedSize()); Buffer buf(e); f.encode(buf); return e; } +Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) { + return control(framing::AMQFrame(body), cid); +} + iovec Event::toIovec() { encodeHeader(); iovec iov = { const_cast<char*>(getStore()), getStoreSize() }; @@ -90,7 +95,7 @@ iovec Event::toIovec() { void EventHeader::encode(Buffer& b) const { b.putOctet(type); - b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer())); + b.putLongLong(connectionId.getNumber()); b.putLong(size); #ifdef QPID_LATENCY_METRIC b.putLongLong(latency_metric_timestamp); @@ -108,12 +113,22 @@ Event::operator Buffer() const { return Buffer(const_cast<char*>(getData()), getSize()); } +AMQFrame Event::getFrame() const { + assert(type == CONTROL); + Buffer buf(*this); + AMQFrame frame; + QPID_ASSERT(frame.decode(buf)); + return frame; +} + static const char* EVENT_TYPE_NAMES[] = { "data", "control" }; +std::ostream& operator << (std::ostream& o, EventType t) { + return o << EVENT_TYPE_NAMES[t]; +} + std::ostream& operator << (std::ostream& o, const EventHeader& e) { - o << "[event " << e.getConnectionId() << "/" << e.getSequence() - << " " << EVENT_TYPE_NAMES[e.getType()] - << " " << e.getSize() << " bytes]"; + o << "Event[" << e.getConnectionId() << " " << e.getType() << " " << e.getSize() << " bytes]"; return o; } diff --git a/qpid/cpp/src/qpid/cluster/Event.h b/qpid/cpp/src/qpid/cluster/Event.h index f1de248f89..e05ad60bcf 100644 --- a/qpid/cpp/src/qpid/cluster/Event.h +++ b/qpid/cpp/src/qpid/cluster/Event.h @@ -24,6 +24,7 @@ #include "types.h" #include "qpid/RefCountedBuffer.h" +#include "qpid/framing/AMQFrame.h" #include "qpid/sys/LatencyMetric.h" #include <sys/uio.h> // For iovec #include <iosfwd> @@ -34,6 +35,7 @@ namespace qpid { namespace framing { class AMQBody; +class AMQFrame; class Buffer; } @@ -55,11 +57,9 @@ class EventHeader : public ::qpid::sys::LatencyMetricTimestamp { /** Size of header + payload. */ size_t getStoreSize() { return size + HEADER_SIZE; } - uint64_t getSequence() const { return sequence; } - void setSequence(uint64_t n) { sequence = n; } - - bool isCluster() const { return connectionId.getPointer() == 0; } - bool isConnection() const { return connectionId.getPointer() != 0; } + bool isCluster() const { return connectionId.getNumber() == 0; } + bool isConnection() const { return connectionId.getNumber() != 0; } + bool isControl() const { return type == CONTROL; } protected: static const size_t HEADER_SIZE; @@ -67,7 +67,6 @@ class EventHeader : public ::qpid::sys::LatencyMetricTimestamp { EventType type; ConnectionId connectionId; size_t size; - uint64_t sequence; }; /** @@ -83,8 +82,11 @@ class Event : public EventHeader { /** Create an event copied from delivered data. */ static Event decodeCopy(const MemberId& m, framing::Buffer&); - /** Create an event containing a control */ + /** Create a control event. */ static Event control(const framing::AMQBody&, const ConnectionId&); + + /** Create a control event. */ + static Event control(const framing::AMQFrame&, const ConnectionId&); // Data excluding header. char* getData() { return store + HEADER_SIZE; } @@ -93,6 +95,8 @@ class Event : public EventHeader { // Store including header char* getStore() { return store; } const char* getStore() const { return store; } + + framing::AMQFrame getFrame() const; operator framing::Buffer() const; @@ -105,6 +109,7 @@ class Event : public EventHeader { }; std::ostream& operator << (std::ostream&, const EventHeader&); + }} // namespace qpid::cluster #endif /*!QPID_CLUSTER_EVENT_H*/ diff --git a/qpid/cpp/src/qpid/cluster/EventFrame.cpp b/qpid/cpp/src/qpid/cluster/EventFrame.cpp index ba01c170dd..8259b6da6e 100644 --- a/qpid/cpp/src/qpid/cluster/EventFrame.cpp +++ b/qpid/cpp/src/qpid/cluster/EventFrame.cpp @@ -24,16 +24,18 @@ namespace qpid { namespace cluster { -EventFrame::EventFrame() : sequence(0) {} +EventFrame::EventFrame() {} EventFrame::EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc) - : connectionId(e.getConnectionId()), frame(f), sequence(e.getSequence()), readCredit(rc) + : connectionId(e.getConnectionId()), frame(f), readCredit(rc), type(e.getType()) { QPID_LATENCY_INIT(frame); } std::ostream& operator<<(std::ostream& o, const EventFrame& e) { - return o << e.connectionId << "/" << e.sequence << " " << e.frame << " rc=" << e.readCredit; + return o << e.frame << " " << e.type << " " << e.connectionId; + if (e.readCredit) o << " read-credit=" << e.readCredit; + return o; } }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/EventFrame.h b/qpid/cpp/src/qpid/cluster/EventFrame.h index 7f33cedb5b..d6ff58dd38 100644 --- a/qpid/cpp/src/qpid/cluster/EventFrame.h +++ b/qpid/cpp/src/qpid/cluster/EventFrame.h @@ -42,22 +42,15 @@ struct EventFrame EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc=0); - bool isCluster() const { return !connectionId.getPointer(); } - bool isConnection() const { return connectionId.getPointer(); } + bool isCluster() const { return connectionId.getNumber() == 0; } + bool isConnection() const { return connectionId.getNumber() != 0; } bool isLastInEvent() const { return readCredit; } - // True if this frame follows immediately after frame e. - bool follows(const EventFrame& e) const { - return sequence == e.sequence || (sequence == e.sequence+1 && e.readCredit); - } - - bool operator<(const EventFrame& e) const { return sequence < e.sequence; } - ConnectionId connectionId; framing::AMQFrame frame; - uint64_t sequence; - int readCredit; // last frame in an event, give credit when processed. + int readCredit; ///< last frame in an event, give credit when processed. + EventType type; }; std::ostream& operator<<(std::ostream& o, const EventFrame& e); diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp index 690acfc3ad..409180c499 100644 --- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp +++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp @@ -30,48 +30,46 @@ namespace qpid { namespace cluster { -ExpiryPolicy::ExpiryPolicy(const boost::function<bool()> & f, Multicaster& m, const MemberId& id, broker::Timer& t) - : expiredPolicy(new Expired), isLeader(f), mcast(m), memberId(id), timer(t) {} - -namespace { -uint64_t clusterId(const broker::Message& m) { - assert(m.getFrames().begin() != m.getFrames().end()); - return m.getFrames().begin()->getClusterId(); -} +ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, broker::Timer& t) + : expiryId(0), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {} struct ExpiryTask : public broker::TimerTask { ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when) - : TimerTask(when), expiryPolicy(policy), messageId(id) {} - void fire() { expiryPolicy->sendExpire(messageId); } + : TimerTask(when), expiryPolicy(policy), expiryId(id) {} + void fire() { expiryPolicy->sendExpire(expiryId); } boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; - const uint64_t messageId; + const uint64_t expiryId; }; -} void ExpiryPolicy::willExpire(broker::Message& m) { - timer.add(new ExpiryTask(this, clusterId(m), m.getExpiration())); + uint64_t id = expiryId++; + assert(unexpiredById.find(id) == unexpiredById.end()); + assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end()); + unexpiredById[id] = &m; + unexpiredByMessage[&m] = id; + timer.add(new ExpiryTask(this, id, m.getExpiration())); } bool ExpiryPolicy::hasExpired(broker::Message& m) { - sys::Mutex::ScopedLock l(lock); - IdSet::iterator i = expired.find(clusterId(m)); - if (i != expired.end()) { - expired.erase(i); - const_cast<broker::Message&>(m).setExpiryPolicy(expiredPolicy); // hasExpired() == true; - return true; - } - return false; + return unexpiredByMessage.find(&m) == unexpiredByMessage.end(); } void ExpiryPolicy::sendExpire(uint64_t id) { - sys::Mutex::ScopedLock l(lock); - if (isLeader()) - mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId); + mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId); } void ExpiryPolicy::deliverExpire(uint64_t id) { - sys::Mutex::ScopedLock l(lock); - expired.insert(id); + IdMessageMap::iterator i = unexpiredById.find(id); + if (i != unexpiredById.end()) { + i->second->setExpiryPolicy(expiredPolicy); // hasExpired() == true; + unexpiredByMessage.erase(i->second); + unexpiredById.erase(i); + } +} + +boost::optional<uint64_t> ExpiryPolicy::getId(broker::Message& m) { + MessageIdMap::iterator i = unexpiredByMessage.find(&m); + return i == unexpiredByMessage.end() ? boost::optional<uint64_t>() : i->second; } bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; } diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h index 7fb63c731e..9f8b1a9236 100644 --- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h +++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h @@ -27,11 +27,15 @@ #include "qpid/sys/Mutex.h" #include <boost/function.hpp> #include <boost/intrusive_ptr.hpp> -#include <set> +#include <boost/optional.hpp> +#include <map> namespace qpid { -namespace broker { class Timer; } +namespace broker { +class Timer; +class Message; +} namespace cluster { class Multicaster; @@ -42,7 +46,7 @@ class Multicaster; class ExpiryPolicy : public broker::ExpiryPolicy { public: - ExpiryPolicy(const boost::function<bool()> & isLeader, Multicaster&, const MemberId&, broker::Timer&); + ExpiryPolicy(Multicaster&, const MemberId&, broker::Timer&); void willExpire(broker::Message&); @@ -54,18 +58,24 @@ class ExpiryPolicy : public broker::ExpiryPolicy // Cluster delivers expiry notice. void deliverExpire(uint64_t); + void setId(uint64_t id) { expiryId = id; } + uint64_t getId() const { return expiryId; } + + boost::optional<uint64_t> getId(broker::Message&); + private: - sys::Mutex lock; - typedef std::set<uint64_t> IdSet; + typedef std::map<broker::Message*, uint64_t> MessageIdMap; + typedef std::map<uint64_t, broker::Message*> IdMessageMap; struct Expired : public broker::ExpiryPolicy { bool hasExpired(broker::Message&); void willExpire(broker::Message&); }; - IdSet expired; + MessageIdMap unexpiredByMessage; + IdMessageMap unexpiredById; + uint64_t expiryId; boost::intrusive_ptr<Expired> expiredPolicy; - boost::function<bool()> isLeader; Multicaster& mcast; MemberId memberId; broker::Timer& timer; diff --git a/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h b/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h index 449387c1cc..8b2f6dae8e 100644 --- a/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h +++ b/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h @@ -1,5 +1,5 @@ -#ifndef QPID_CLUSTER_CONNECTIONDECODER_H -#define QPID_CLUSTER_CONNECTIONDECODER_H +#ifndef QPID_CLUSTER_LOCKEDCONNECTIONMAP_H +#define QPID_CLUSTER_LOCKEDCONNECTIONMAP_H /* * @@ -22,40 +22,41 @@ * */ -#include "qpid/framing/FrameDecoder.h" -#include <boost/function.hpp> +#include "types.h" +#include "qpid/sys/Mutex.h" +#include "Connection.h" namespace qpid { namespace cluster { -class EventHeader; -class EventFrame; -class ConnectionMap; - /** - * Decodes delivered connection data Event's as EventFrame's for a - * connection replica, local or shadow. Manages state for frame - * fragments and flow control. - * - * THREAD UNSAFE: connection events are decoded in sequence. + * Thread safe map of connections. */ -class ConnectionDecoder +class LockedConnectionMap { public: - typedef boost::function<void(const EventFrame&)> Handler; - - ConnectionDecoder(const Handler& h); - - /** Takes EventHeader + data rather than Event so that the caller can - * pass a pointer to connection data or a CPG buffer directly without copy. - */ - void decode(const EventHeader& eh, const void* data, ConnectionMap& connections); + void insert(const ConnectionPtr& c) { + sys::Mutex::ScopedLock l(lock); + map[c->getId()] = c; + } + + ConnectionPtr getErase(const ConnectionId& c) { + sys::Mutex::ScopedLock l(lock); + Map::iterator i = map.find(c); + if (i != map.end()) { + ConnectionPtr cp = i->second; + map.erase(i); + return cp; + } + else + return 0; + } private: - Handler handler; - framing::FrameDecoder decoder; + typedef std::map<ConnectionId, ConnectionPtr> Map; + mutable sys::Mutex lock; + Map map; }; - }} // namespace qpid::cluster -#endif /*!QPID_CLUSTER_CONNECTIONDECODER_H*/ +#endif /*!QPID_CLUSTER_LOCKEDCONNECTIONMAP_H*/ diff --git a/qpid/cpp/src/qpid/cluster/McastFrameHandler.h b/qpid/cpp/src/qpid/cluster/McastFrameHandler.h new file mode 100644 index 0000000000..5127c31c84 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/McastFrameHandler.h @@ -0,0 +1,46 @@ +#ifndef QPID_CLUSTER_MCASTFRAMEHANDLER_H +#define QPID_CLUSTER_MCASTFRAMEHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types.h" +#include "Multicaster.h" +#include "qpid/framing/FrameHandler.h" + +namespace qpid { +namespace cluster { + +/** + * A frame handler that multicasts frames as CONTROL events. + */ +class McastFrameHandler : public framing::FrameHandler +{ + public: + McastFrameHandler(Multicaster& m, const ConnectionId& cid) : mcast(m), connection(cid) {} + void handle(framing::AMQFrame& frame) { mcast.mcastControl(frame, connection); } + private: + Multicaster& mcast; + ConnectionId connection; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_MCASTFRAMEHANDLER_H*/ diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.cpp b/qpid/cpp/src/qpid/cluster/Multicaster.cpp index 239b3f5f35..f0738ab08f 100644 --- a/qpid/cpp/src/qpid/cluster/Multicaster.cpp +++ b/qpid/cpp/src/qpid/cluster/Multicaster.cpp @@ -24,6 +24,7 @@ #include "qpid/log/Statement.h" #include "qpid/sys/LatencyMetric.h" #include "qpid/framing/AMQBody.h" +#include "qpid/framing/AMQFrame.h" namespace qpid { namespace cluster { @@ -43,6 +44,11 @@ void Multicaster::mcastControl(const framing::AMQBody& body, const ConnectionId& mcast(Event::control(body, id)); } +void Multicaster::mcastControl(const framing::AMQFrame& frame, const ConnectionId& id) { + QPID_LOG(trace, "MCAST " << id << ": " << frame); + mcast(Event::control(frame, id)); +} + void Multicaster::mcastBuffer(const char* data, size_t size, const ConnectionId& id) { Event e(DATA, id, size); memcpy(e.getData(), data, size); diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.h b/qpid/cpp/src/qpid/cluster/Multicaster.h index 1dfee47bd5..d1c3115977 100644 --- a/qpid/cpp/src/qpid/cluster/Multicaster.h +++ b/qpid/cpp/src/qpid/cluster/Multicaster.h @@ -50,6 +50,7 @@ class Multicaster boost::function<void()> onError ); void mcastControl(const framing::AMQBody& controlBody, const ConnectionId&); + void mcastControl(const framing::AMQFrame& controlFrame, const ConnectionId&); void mcastBuffer(const char*, size_t, const ConnectionId&); void mcast(const Event& e); /** End holding mode, held events are mcast */ diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp index 45a369eea9..cd42446016 100644 --- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -70,17 +70,12 @@ void OutputInterceptor::giveReadCredit(int32_t credit) { // Called in write thread when the IO layer has no more data to write. // We do nothing in the write thread, we run doOutput only on delivery // of doOutput requests. -bool OutputInterceptor::doOutput() { - QPID_LOG(trace, parent << " write idle."); - return false; -} +bool OutputInterceptor::doOutput() { return false; } // Delivery of doOutput allows us to run the real connection doOutput() // which tranfers frames to the codec for writing. // void OutputInterceptor::deliverDoOutput(size_t requested) { - QPID_LATENCY_RECORD("deliver do-output", *this); - QPID_LATENCY_CLEAR(*this); size_t buf = getBuffered(); if (parent.isLocal()) writeEstimate.delivered(requested, sent, buf); // Update the estimate. @@ -91,9 +86,7 @@ void OutputInterceptor::deliverDoOutput(size_t requested) { moreOutput = parent.getBrokerConnection().doOutput(); } while (sent < requested && moreOutput); sent += buf; // Include buffered data in the sent total. - - QPID_LOG(trace, "Delivered doOutput: requested=" << requested << " output=" << sent << " more=" << moreOutput); - + QPID_LOG(trace, parent << " delivereDoOutput: requested=" << requested << " sent=" << sent << " more=" << moreOutput); if (parent.isLocal() && moreOutput) { QPID_LOG(trace, parent << " deliverDoOutput - sending doOutput, more output available."); sendDoOutput(); diff --git a/qpid/cpp/src/qpid/cluster/PollableQueue.h b/qpid/cpp/src/qpid/cluster/PollableQueue.h index e0422e2449..a44c39ad85 100644 --- a/qpid/cpp/src/qpid/cluster/PollableQueue.h +++ b/qpid/cpp/src/qpid/cluster/PollableQueue.h @@ -52,6 +52,8 @@ template <class T> class PollableQueue : public sys::PollableQueue<T> { } catch (const std::exception& e) { QPID_LOG(error, message << ": " << e.what()); + values.clear(); + this->stop(); error(); } } diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index 18746ccb7e..97eae7efa3 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -22,6 +22,8 @@ #include "Cluster.h" #include "ClusterMap.h" #include "Connection.h" +#include "Decoder.h" +#include "ExpiryPolicy.h" #include "qpid/client/SessionBase_0_10Access.h" #include "qpid/client/ConnectionAccess.h" #include "qpid/broker/Broker.h" @@ -86,33 +88,40 @@ void send(client::AsyncSession& s, const AMQBody& body) { // TODO aconway 2008-09-24: optimization: update connections/sessions in parallel. UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url, - broker::Broker& broker, const ClusterMap& m, uint64_t frameId_, - const Cluster::Connections& cons, + broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_, + const Cluster::ConnectionVector& cons, Decoder& decoder_, const boost::function<void()>& ok, const boost::function<void(const std::exception&)>& fail, const client::ConnectionSettings& cs ) : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), - frameId(frameId_), connections(cons), + expiry(expiry_), connections(cons), decoder(decoder_), connection(catchUpConnection()), shadowConnection(catchUpConnection()), - done(ok), failed(fail) + done(ok), failed(fail), connectionSettings(cs) { connection.open(url, cs); - session = connection.newSession("update_shared"); + session = connection.newSession(UPDATE); } UpdateClient::~UpdateClient() {} // Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges. -const std::string UpdateClient::UPDATE("qpid.qpid-update"); +const std::string UpdateClient::UPDATE("qpid.cluster-update"); + +void UpdateClient::run() { + try { + update(); + done(); + } catch (const std::exception& e) { + failed(e); + } + delete this; +} void UpdateClient::update() { QPID_LOG(debug, updaterId << " updating state to " << updateeId << " at " << updateeUrl); Broker& b = updaterBroker; b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1)); - - // Update exchange is used to route messages to the proper queue without modifying routing key. - session.exchangeDeclare(arg::exchange=UPDATE, arg::type="fanout", arg::autoDelete=true); b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueue, this, _1)); // Update queue is used to transfer acquired messages that are no longer on their original queue. session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true); @@ -121,25 +130,15 @@ void UpdateClient::update() { std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1)); + ClusterConnectionProxy(session).expiryId(expiry.getId()); ClusterConnectionMembershipBody membership; map.toMethodBody(membership); - membership.setFrameId(frameId); AMQFrame frame(membership); client::ConnectionAccess::getImpl(connection)->handle(frame); connection.close(); QPID_LOG(debug, updaterId << " updated state to " << updateeId << " at " << updateeUrl); } -void UpdateClient::run() { - try { - update(); - done(); - } catch (const std::exception& e) { - failed(e); - } - delete this; -} - namespace { template <class T> std::string encode(const T& t) { std::string encoded; @@ -152,8 +151,7 @@ template <class T> std::string encode(const T& t) { void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) { QPID_LOG(debug, updaterId << " updating exchange " << ex->getName()); - ClusterConnectionProxy proxy(session); - proxy.exchange(encode(*ex)); + ClusterConnectionProxy(session).exchange(encode(*ex)); } /** Bind a queue to the update exchange and update messges to it @@ -164,24 +162,40 @@ class MessageUpdater { bool haveLastPos; framing::SequenceNumber lastPos; client::AsyncSession session; - + ExpiryPolicy& expiry; + public: - MessageUpdater(const string& q, const client::AsyncSession s) : queue(q), haveLastPos(false), session(s) { + MessageUpdater(const string& q, const client::AsyncSession s, ExpiryPolicy& expiry_) : queue(q), haveLastPos(false), session(s), expiry(expiry_) { session.exchangeBind(queue, UpdateClient::UPDATE); } ~MessageUpdater() { - session.exchangeUnbind(queue, UpdateClient::UPDATE); + try { + session.exchangeUnbind(queue, UpdateClient::UPDATE); + } + catch (const std::exception& e) { + // Don't throw in a destructor. + QPID_LOG(error, "Unbinding update queue " << queue << ": " << e.what()); + } } void updateQueuedMessage(const broker::QueuedMessage& message) { + // Send the queue position if necessary. if (!haveLastPos || message.position - lastPos != 1) { ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1); haveLastPos = true; } lastPos = message.position; + + // Send the expiry ID if necessary. + if (message.payload->getProperties<DeliveryProperties>()->getTtl()) { + boost::optional<uint64_t> expiryId = expiry.getId(*message.payload); + if (!expiryId) return; // Message already expired, don't replicate. + ClusterConnectionProxy(session).expiryId(*expiryId); + } + SessionBase_0_10Access sb(session); framing::MessageTransferBody transfer( framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED); @@ -204,16 +218,13 @@ class MessageUpdater { void updateMessage(const boost::intrusive_ptr<broker::Message>& message) { updateQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? lastPos.getValue()+1 : 1)); } - - }; - void UpdateClient::updateQueue(const boost::shared_ptr<Queue>& q) { QPID_LOG(debug, updaterId << " updating queue " << q->getName()); ClusterConnectionProxy proxy(session); proxy.queue(encode(*q)); - MessageUpdater updater(q->getName(), session); + MessageUpdater updater(q->getName(), session, expiry); q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1)); q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, q->getName(), _1)); } @@ -228,13 +239,16 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda shadowConnection = catchUpConnection(); broker::Connection& bc = updateConnection->getBrokerConnection(); - // FIXME aconway 2008-10-20: What authentication info to use on reconnect? - shadowConnection.open(updateeUrl, bc.getUserId(), ""/*password*/, "/"/*vhost*/, bc.getFrameMax()); + connectionSettings.maxFrameSize = bc.getFrameMax(); + shadowConnection.open(updateeUrl, connectionSettings); bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1)); + // Safe to use decoder here because we are stalled for update. + std::pair<const char*, size_t> fragment = decoder.get(updateConnection->getId()).getFragment(); ClusterConnectionProxy(shadowConnection).shadowReady( updateConnection->getId().getMember(), - reinterpret_cast<uint64_t>(updateConnection->getId().getPointer()), - updateConnection->getBrokerConnection().getUserId() + updateConnection->getId().getNumber(), + bc.getUserId(), + string(fragment.first, fragment.second) ); shadowConnection.close(); QPID_LOG(debug, updaterId << " updated connection " << *updateConnection); @@ -269,7 +283,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { SequenceNumber received = ss->receiverGetReceived().command; if (inProgress) --received; - + // Reset command-sequence state. proxy.sessionState( ss->senderGetReplayPoint().command, @@ -285,9 +299,6 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { if (inProgress) { inProgress->getFrames().map(simpl->out); } - - // FIXME aconway 2008-09-23: update session replay list. - QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId()); } @@ -322,7 +333,7 @@ void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) { // If the message is acquired then it is no longer on the // updatees queue, put it on the update queue for updatee to pick up. // - MessageUpdater(UPDATE, shadowSession).updateQueuedMessage(dr.getMessage()); + MessageUpdater(UPDATE, shadowSession, expiry).updateQueuedMessage(dr.getMessage()); } ClusterConnectionProxy(shadowSession).deliveryRecord( dr.getQueue()->getName(), @@ -341,8 +352,8 @@ void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) { class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { public: - TxOpUpdater(UpdateClient& dc, client::AsyncSession s) - : MessageUpdater(UpdateClient::UPDATE, s), parent(dc), session(s), proxy(s) {} + TxOpUpdater(UpdateClient& dc, client::AsyncSession s, ExpiryPolicy& expiry) + : MessageUpdater(UpdateClient::UPDATE, s, expiry), parent(dc), session(s), proxy(s) {} void operator()(const broker::DtxAck& ) { throw InternalErrorException("DTX transactions not currently supported by cluster."); @@ -385,7 +396,7 @@ void UpdateClient::updateTxState(broker::SemanticState& s) { broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer(); if (txBuffer) { proxy.txStart(); - TxOpUpdater updater(*this, shadowSession); + TxOpUpdater updater(*this, shadowSession, expiry); txBuffer->accept(updater); proxy.txEnd(); } diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h index 23f647c820..23d061b7e4 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.h +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h @@ -46,6 +46,7 @@ class SessionHandler; class DeliveryRecord; class SessionState; class SemanticState; +class Decoder; } // namespace broker @@ -54,6 +55,8 @@ namespace cluster { class Cluster; class Connection; class ClusterMap; +class Decoder; +class ExpiryPolicy; /** * A client that updates the contents of a local broker to a remote one using AMQP. @@ -63,8 +66,8 @@ class UpdateClient : public sys::Runnable { static const std::string UPDATE; // Name for special update queue and exchange. UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&, - broker::Broker& donor, const ClusterMap& map, uint64_t sequence, - const std::vector<boost::intrusive_ptr<Connection> >& , + broker::Broker& donor, const ClusterMap& map, ExpiryPolicy& expiry, + const std::vector<boost::intrusive_ptr<Connection> >&, Decoder&, const boost::function<void()>& done, const boost::function<void(const std::exception&)>& fail, const client::ConnectionSettings& @@ -92,12 +95,14 @@ class UpdateClient : public sys::Runnable { Url updateeUrl; broker::Broker& updaterBroker; ClusterMap map; - uint64_t frameId; + ExpiryPolicy& expiry; std::vector<boost::intrusive_ptr<Connection> > connections; + Decoder& decoder; client::Connection connection, shadowConnection; client::AsyncSession session, shadowSession; boost::function<void()> done; boost::function<void(const std::exception& e)> failed; + client::ConnectionSettings connectionSettings; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/UpdateExchange.h b/qpid/cpp/src/qpid/cluster/UpdateExchange.h new file mode 100644 index 0000000000..7a4a484c8a --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/UpdateExchange.h @@ -0,0 +1,45 @@ +#ifndef QPID_CLUSTER_UPDATEEXCHANGE_H +#define QPID_CLUSTER_UPDATEEXCHANGE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "UpdateClient.h" +#include "qpid/broker/FanOutExchange.h" + + +namespace qpid { +namespace cluster { + +/** + * A keyless exchange (like fanout exchange) that does not modify deliver-properties.exchange + * on messages. + */ +class UpdateExchange : public broker::FanOutExchange +{ + public: + UpdateExchange(management::Manageable* parent) : broker::Exchange(UpdateClient::UPDATE, parent), broker::FanOutExchange(UpdateClient::UPDATE, parent) {} + void setProperties(const boost::intrusive_ptr<broker::Message>&) {} +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_UPDATEEXCHANGE_H*/ diff --git a/qpid/cpp/src/qpid/cluster/types.h b/qpid/cpp/src/qpid/cluster/types.h index d1d6fdc427..c19152e4d8 100644 --- a/qpid/cpp/src/qpid/cluster/types.h +++ b/qpid/cpp/src/qpid/cluster/types.h @@ -68,16 +68,17 @@ inline bool operator==(const cpg_address& caddr, const MemberId& id) { return id std::ostream& operator<<(std::ostream&, const MemberId&); -struct ConnectionId : public std::pair<MemberId, Connection*> { - ConnectionId(const MemberId& m=MemberId(), Connection* c=0) : std::pair<MemberId, Connection*> (m,c) {} - ConnectionId(uint64_t m, uint64_t c) - : std::pair<MemberId, Connection*>(MemberId(m), reinterpret_cast<Connection*>(c)) {} +struct ConnectionId : public std::pair<MemberId, uint64_t> { + ConnectionId(const MemberId& m=MemberId(), uint64_t c=0) : std::pair<MemberId, uint64_t> (m,c) {} + ConnectionId(uint64_t m, uint64_t c) : std::pair<MemberId, uint64_t>(MemberId(m), c) {} MemberId getMember() const { return first; } - Connection* getPointer() const { return second; } + uint64_t getNumber() const { return second; } }; std::ostream& operator<<(std::ostream&, const ConnectionId&); +std::ostream& operator<<(std::ostream&, EventType); + }} // namespace qpid::cluster #endif /*!QPID_CLUSTER_TYPES_H*/ diff --git a/qpid/cpp/src/qpid/console/ClassKey.cpp b/qpid/cpp/src/qpid/console/ClassKey.cpp index 1780b03f94..b97eb3ca44 100644 --- a/qpid/cpp/src/qpid/console/ClassKey.cpp +++ b/qpid/cpp/src/qpid/console/ClassKey.cpp @@ -21,6 +21,7 @@ #include "ClassKey.h" #include <string.h> +#include <cstdio> using namespace std; using namespace qpid::console; diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.cpp b/qpid/cpp/src/qpid/framing/AMQFrame.cpp index 80c8e0b56d..9473b2a513 100644 --- a/qpid/cpp/src/qpid/framing/AMQFrame.cpp +++ b/qpid/cpp/src/qpid/framing/AMQFrame.cpp @@ -41,7 +41,7 @@ AMQFrame::AMQFrame(const boost::intrusive_ptr<AMQBody>& b) : body(b) { init(); } AMQFrame::AMQFrame(const AMQBody& b) : body(b.clone()) { init(); } -AMQFrame::~AMQFrame() {} +AMQFrame::~AMQFrame() { init(); } AMQBody* AMQFrame::getBody() { // Non-const AMQBody* may be used to modify the body. diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.h b/qpid/cpp/src/qpid/framing/AMQFrame.h index cf384e61e8..34319e7ed4 100644 --- a/qpid/cpp/src/qpid/framing/AMQFrame.h +++ b/qpid/cpp/src/qpid/framing/AMQFrame.h @@ -93,9 +93,6 @@ class AMQFrame : public AMQDataBlock, public sys::LatencyMetricTimestamp /** Must point to at least DECODE_SIZE_MIN bytes of data */ static uint16_t decodeSize(char* data); - uint64_t getClusterId() const { return clusterId; } - void setClusterId(uint64_t id) { clusterId = id; } - private: void init(); @@ -107,7 +104,6 @@ class AMQFrame : public AMQDataBlock, public sys::LatencyMetricTimestamp bool bos : 1; bool eos : 1; mutable uint32_t encodedSizeCache; - uint64_t clusterId; // Used to identify frames in a clustered broekr. }; QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream&, const AMQFrame&); diff --git a/qpid/cpp/src/qpid/framing/FrameDecoder.cpp b/qpid/cpp/src/qpid/framing/FrameDecoder.cpp index cbdac181e9..1e73ee1e51 100644 --- a/qpid/cpp/src/qpid/framing/FrameDecoder.cpp +++ b/qpid/cpp/src/qpid/framing/FrameDecoder.cpp @@ -21,8 +21,9 @@ #include "FrameDecoder.h" #include "Buffer.h" #include "qpid/log/Statement.h" -#include <algorithm> #include "qpid/framing/reply_exceptions.h" +#include <algorithm> +#include <string.h> namespace qpid { namespace framing { @@ -67,4 +68,13 @@ bool FrameDecoder::decode(Buffer& buffer) { return false; } +void FrameDecoder::setFragment(const char* data, size_t size) { + fragment.resize(size); + ::memcpy(&fragment[0], data, size); +} + +std::pair<const char*, size_t> FrameDecoder::getFragment() const { + return std::pair<const char*, size_t>(&fragment[0], fragment.size()); +} + }} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/FrameDecoder.h b/qpid/cpp/src/qpid/framing/FrameDecoder.h index 7f974dadc3..961cc666a9 100644 --- a/qpid/cpp/src/qpid/framing/FrameDecoder.h +++ b/qpid/cpp/src/qpid/framing/FrameDecoder.h @@ -35,9 +35,16 @@ class FrameDecoder { public: bool decode(Buffer& buffer); - AMQFrame frame; + const AMQFrame& getFrame() const { return frame; } + AMQFrame& getFrame() { return frame; } + + void setFragment(const char*, size_t); + std::pair<const char*, size_t> getFragment() const; + private: std::vector<char> fragment; + AMQFrame frame; + }; }} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/Uuid.h b/qpid/cpp/src/qpid/framing/Uuid.h index 42dc31a26d..fe0c32dc0b 100644 --- a/qpid/cpp/src/qpid/framing/Uuid.h +++ b/qpid/cpp/src/qpid/framing/Uuid.h @@ -19,8 +19,9 @@ * */ -#include "qpid/sys/uuid.h" #include "qpid/CommonImportExport.h" +#include "qpid/sys/uuid.h" +#include "qpid/sys/IntegerTypes.h" #include <boost/array.hpp> diff --git a/qpid/cpp/src/qpid/log/Selector.cpp b/qpid/cpp/src/qpid/log/Selector.cpp index 4d1c5b6e0c..0a629edc3e 100644 --- a/qpid/cpp/src/qpid/log/Selector.cpp +++ b/qpid/cpp/src/qpid/log/Selector.cpp @@ -20,6 +20,7 @@ #include "Options.h" #include <boost/bind.hpp> #include <algorithm> +#include <string.h> namespace qpid { namespace log { diff --git a/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp b/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp index 9d51358e2e..985cb32c5c 100644 --- a/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp +++ b/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp @@ -42,10 +42,14 @@ public: struct NameValue { const char* name; int value; }; NameValue nameValue[] = { { "AUTH", LOG_AUTH }, +#ifdef HAVE_LOG_AUTHPRIV { "AUTHPRIV", LOG_AUTHPRIV }, +#endif { "CRON", LOG_CRON }, { "DAEMON", LOG_DAEMON }, +#ifdef HAVE_LOG_FTP { "FTP", LOG_FTP }, +#endif { "KERN", LOG_KERN }, { "LOCAL0", LOG_LOCAL0 }, { "LOCAL1", LOG_LOCAL1 }, @@ -72,7 +76,7 @@ public: int value(const string& name) const { string key(name); - transform(key.begin(), key.end(), key.begin(), ::toupper); + std::transform(key.begin(), key.end(), key.begin(), ::toupper); ByName::const_iterator i = byName.find(key); if (i == byName.end()) throw Exception("Not a valid syslog facility: " + name); diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.cpp b/qpid/cpp/src/qpid/management/ManagementBroker.cpp index 0f96e97fb0..729826e0cd 100644 --- a/qpid/cpp/src/qpid/management/ManagementBroker.cpp +++ b/qpid/cpp/src/qpid/management/ManagementBroker.cpp @@ -80,7 +80,7 @@ ManagementBroker::RemoteAgent::~RemoteAgent () } ManagementBroker::ManagementBroker () : - threadPoolSize(1), interval(10), broker(0) + threadPoolSize(1), interval(10), broker(0), startTime(uint64_t(Duration(now()))) { nextObjectId = 1; brokerBank = 1; @@ -346,6 +346,9 @@ void ManagementBroker::periodicProcessing (void) string routingKey; list<pair<ObjectId, ManagementObject*> > deleteList; + uint64_t uptime = uint64_t(Duration(now())) - startTime; + static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); + moveNewObjectsLH(); if (clientWasAdded) { @@ -844,6 +847,9 @@ void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, ui Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + encodeHeader(outBuffer, 'g', sequence); object->writeProperties(outBuffer); object->writeStatistics(outBuffer, true); @@ -865,6 +871,9 @@ void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, ui Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + encodeHeader(outBuffer, 'g', sequence); object->writeProperties(outBuffer); object->writeStatistics(outBuffer, true); diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.h b/qpid/cpp/src/qpid/management/ManagementBroker.h index f65d6a345e..a57f73be15 100644 --- a/qpid/cpp/src/qpid/management/ManagementBroker.h +++ b/qpid/cpp/src/qpid/management/ManagementBroker.h @@ -182,6 +182,7 @@ private: uint32_t nextRemoteBank; uint32_t nextRequestSequence; bool clientWasAdded; + const uint64_t startTime; std::auto_ptr<IdAllocator> allocator; diff --git a/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp b/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp index 2d8af3b052..e3990a13cc 100644 --- a/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp +++ b/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp @@ -57,7 +57,6 @@ void ReplicatingEventListener::deliverDequeueMessage(const QueuedMessage& dequeu { FieldTable headers; headers.setString(REPLICATION_TARGET_QUEUE, dequeued.queue->getName()); - headers.setInt(REPLICATION_EVENT_SEQNO, ++sequence); headers.setInt(REPLICATION_EVENT_TYPE, DEQUEUE); headers.setInt(DEQUEUED_MESSAGE_POSITION, dequeued.position); boost::intrusive_ptr<Message> msg(createMessage(headers)); @@ -69,7 +68,6 @@ void ReplicatingEventListener::deliverEnqueueMessage(const QueuedMessage& enqueu boost::intrusive_ptr<Message> msg(cloneMessage(*(enqueued.queue), enqueued.payload)); FieldTable& headers = msg->getProperties<MessageProperties>()->getApplicationHeaders(); headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName()); - headers.setInt(REPLICATION_EVENT_SEQNO, ++sequence); headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE); queue->deliver(msg); } @@ -131,12 +129,14 @@ void ReplicatingEventListener::initialize(Plugin::Target& target) { Broker* broker = dynamic_cast<broker::Broker*>(&target); if (broker && !options.queue.empty()) { + broker->addFinalizer(boost::bind(&ReplicatingEventListener::shutdown, this)); if (options.createQueue) { queue = broker->getQueues().declare(options.queue).first; } else { queue = broker->getQueues().find(options.queue); } if (queue) { + queue->insertSequenceNumbers(REPLICATION_EVENT_SEQNO); QueueEvents::EventListener callback = boost::bind(&ReplicatingEventListener::handle, this, _1); broker->getQueueEvents().registerListener(options.name, callback); QPID_LOG(info, "Registered replicating queue event listener"); @@ -147,6 +147,7 @@ void ReplicatingEventListener::initialize(Plugin::Target& target) } void ReplicatingEventListener::earlyInitialize(Target&) {} +void ReplicatingEventListener::shutdown() { queue.reset(); } ReplicatingEventListener::PluginOptions::PluginOptions() : Options("Queue Replication Options"), name("replicator"), diff --git a/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h b/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h index 7616c7ac8a..3d8f23e7ac 100644 --- a/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h +++ b/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h @@ -58,10 +58,10 @@ class ReplicatingEventListener : public Plugin PluginOptions options; qpid::broker::Queue::shared_ptr queue; - qpid::framing::SequenceNumber sequence; void deliverDequeueMessage(const qpid::broker::QueuedMessage& enqueued); void deliverEnqueueMessage(const qpid::broker::QueuedMessage& enqueued); + void shutdown(); boost::intrusive_ptr<qpid::broker::Message> createMessage(const qpid::framing::FieldTable& headers); boost::intrusive_ptr<qpid::broker::Message> cloneMessage(qpid::broker::Queue& queue, diff --git a/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp b/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp index 639cfb5d2e..88c94ad7ba 100644 --- a/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp +++ b/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp @@ -34,11 +34,13 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::replication::constants; +const std::string SEQUENCE_VALUE("qpid.replication-event.sequence"); ReplicationExchange::ReplicationExchange(const std::string& name, bool durable, const FieldTable& args, QueueRegistry& qr, Manageable* parent) - : Exchange(name, durable, args, parent), queues(qr), init(false) {} + : Exchange(name, durable, args, parent), queues(qr), sequence(args.getAsInt64(SEQUENCE_VALUE)), init(false) + {} std::string ReplicationExchange::getType() const { return typeName; } @@ -68,26 +70,33 @@ void ReplicationExchange::handleEnqueueEvent(const FieldTable* args, Deliverable { std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE); Queue::shared_ptr queue = queues.find(queueName); - FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders(); - headers.erase(REPLICATION_TARGET_QUEUE); - headers.erase(REPLICATION_EVENT_SEQNO); - headers.erase(REPLICATION_EVENT_TYPE); - msg.deliverTo(queue); - QPID_LOG(debug, "Enqueued replicated message onto " << queue); + if (queue) { + FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders(); + headers.erase(REPLICATION_TARGET_QUEUE); + headers.erase(REPLICATION_EVENT_SEQNO); + headers.erase(REPLICATION_EVENT_TYPE); + msg.deliverTo(queue); + QPID_LOG(debug, "Enqueued replicated message onto " << queueName); + } else { + QPID_LOG(error, "Cannot enqueue replicated message. Queue " << queueName << " does not exist"); + } } void ReplicationExchange::handleDequeueEvent(const FieldTable* args) { std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE); Queue::shared_ptr queue = queues.find(queueName); - SequenceNumber position(args->getAsInt(DEQUEUED_MESSAGE_POSITION)); - - QueuedMessage dequeued; - if (queue->acquireMessageAt(position, dequeued)) { - queue->dequeue(0, dequeued); - QPID_LOG(debug, "Processed replicated 'dequeue' event from " << queueName << " at position " << position); + if (queue) { + SequenceNumber position(args->getAsInt(DEQUEUED_MESSAGE_POSITION)); + QueuedMessage dequeued; + if (queue->acquireMessageAt(position, dequeued)) { + queue->dequeue(0, dequeued); + QPID_LOG(debug, "Processed replicated 'dequeue' event from " << queueName << " at position " << position); + } else { + QPID_LOG(warning, "Could not acquire message " << position << " from " << queueName); + } } else { - QPID_LOG(warning, "Could not acquire message " << position << " from " << queueName); + QPID_LOG(error, "Cannot process replicated 'dequeue' event. Queue " << queueName << " does not exist"); } } @@ -128,6 +137,13 @@ bool ReplicationExchange::isBound(Queue::shared_ptr /*queue*/, const string* con const std::string ReplicationExchange::typeName("replication"); +void ReplicationExchange::encode(Buffer& buffer) const +{ + args.setInt64(std::string(SEQUENCE_VALUE), sequence); + Exchange::encode(buffer); +} + + struct ReplicationExchangePlugin : Plugin { Broker* broker; diff --git a/qpid/cpp/src/qpid/replication/ReplicationExchange.h b/qpid/cpp/src/qpid/replication/ReplicationExchange.h index 897e4a954e..4cc45ed5f5 100644 --- a/qpid/cpp/src/qpid/replication/ReplicationExchange.h +++ b/qpid/cpp/src/qpid/replication/ReplicationExchange.h @@ -22,6 +22,7 @@ * */ #include "qpid/broker/Exchange.h" +#include "qpid/framing/Buffer.h" #include "qpid/framing/SequenceNumber.h" namespace qpid { @@ -58,6 +59,7 @@ class ReplicationExchange : public qpid::broker::Exchange bool isDuplicate(const qpid::framing::FieldTable* args); void handleEnqueueEvent(const qpid::framing::FieldTable* args, qpid::broker::Deliverable& msg); void handleDequeueEvent(const qpid::framing::FieldTable* args); + void encode(framing::Buffer& buffer) const; }; }} // namespace qpid::replication diff --git a/qpid/cpp/src/qpid/sys/Codec.h b/qpid/cpp/src/qpid/sys/Codec.h index f9645f554e..ace721fbcc 100644 --- a/qpid/cpp/src/qpid/sys/Codec.h +++ b/qpid/cpp/src/qpid/sys/Codec.h @@ -38,11 +38,11 @@ class Codec * @return may be less than size if there was incomplete * data at the end of the buffer. */ - virtual size_t decode(const char* buffer, size_t size) = 0; + virtual std::size_t decode(const char* buffer, std::size_t size) = 0; /** Encode into buffer, return number of bytes encoded */ - virtual size_t encode(const char* buffer, size_t size) = 0; + virtual std::size_t encode(const char* buffer, std::size_t size) = 0; /** Return true if we have data to encode */ virtual bool canEncode() = 0; diff --git a/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h b/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h index df6de89982..32809d86a1 100644 --- a/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h +++ b/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h @@ -30,7 +30,7 @@ namespace sys { /** * A ConnectionOutputHandler that delegates to another * ConnectionOutputHandler. Allows the "real" ConnectionOutputHandler - * to be changed modified without updating all the pointers/references + * to be changed without updating all the pointers/references * using the ConnectionOutputHandlerPtr */ class ConnectionOutputHandlerPtr : public ConnectionOutputHandler diff --git a/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h b/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h index a09ee9d441..577a475afd 100644 --- a/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h +++ b/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h @@ -80,7 +80,7 @@ public: bool add_unless(T& t, F f) { Mutex::ScopedLock l(lock); - if (array && find_if(array->begin(), array->end(), f) != array->end()) { + if (array && std::find_if(array->begin(), array->end(), f) != array->end()) { return false; } else { ArrayPtr copy(array ? new std::vector<T>(*array) : new std::vector<T>()); diff --git a/qpid/cpp/src/qpid/sys/DispatchHandle.cpp b/qpid/cpp/src/qpid/sys/DispatchHandle.cpp index cbdee7eda6..cd7dec7fa6 100644 --- a/qpid/cpp/src/qpid/sys/DispatchHandle.cpp +++ b/qpid/cpp/src/qpid/sys/DispatchHandle.cpp @@ -21,6 +21,8 @@ #include "DispatchHandle.h" +#include <algorithm> + #include <boost/cast.hpp> #include <assert.h> @@ -29,7 +31,6 @@ namespace qpid { namespace sys { DispatchHandle::~DispatchHandle() { - stopWatch(); } void DispatchHandle::startWatch(Poller::shared_ptr poller0) { @@ -37,13 +38,21 @@ void DispatchHandle::startWatch(Poller::shared_ptr poller0) { bool w = writableCallback; ScopedLock<Mutex> lock(stateLock); - assert(state == IDLE); + assert(state == IDLE || state == DELAYED_IDLE); // If no callbacks set then do nothing (that is what we were asked to do!) // TODO: Maybe this should be an assert instead if (!r && !w) { - state = INACTIVE; - return; + switch (state) { + case IDLE: + state = INACTIVE; + return; + case DELAYED_IDLE: + state = DELAYED_INACTIVE; + return; + default: + assert(state == IDLE || state == DELAYED_IDLE); + } } Poller::Direction d = r ? @@ -53,9 +62,20 @@ void DispatchHandle::startWatch(Poller::shared_ptr poller0) { poller = poller0; poller->addFd(*this, d); - state = r ? - (w ? ACTIVE_RW : ACTIVE_R) : - ACTIVE_W; + switch (state) { + case IDLE: + state = r ? + (w ? ACTIVE_RW : ACTIVE_R) : + ACTIVE_W; + return; + case DELAYED_IDLE: + state = r ? + (w ? DELAYED_RW : DELAYED_R) : + DELAYED_W; + return; + default: + assert(state == IDLE || state == DELAYED_IDLE); + } } void DispatchHandle::rewatch() { @@ -93,6 +113,8 @@ void DispatchHandle::rewatch() { case ACTIVE_RW: // Don't need to do anything already waiting for readable/writable break; + case ACTIVE_DELETE: + assert(state != ACTIVE_DELETE); } } @@ -130,6 +152,8 @@ void DispatchHandle::rewatchRead() { poller->modFd(*this, Poller::INOUT); state = ACTIVE_RW; break; + case ACTIVE_DELETE: + assert(state != ACTIVE_DELETE); } } @@ -167,6 +191,8 @@ void DispatchHandle::rewatchWrite() { case ACTIVE_RW: // Nothing to do: already waiting for writable break; + case ACTIVE_DELETE: + assert(state != ACTIVE_DELETE); } } @@ -203,6 +229,8 @@ void DispatchHandle::unwatchRead() { case ACTIVE_W: case INACTIVE: break; + case ACTIVE_DELETE: + assert(state != ACTIVE_DELETE); } } @@ -239,6 +267,8 @@ void DispatchHandle::unwatchWrite() { case ACTIVE_R: case INACTIVE: break; + case ACTIVE_DELETE: + assert(state != ACTIVE_DELETE); } } @@ -261,6 +291,8 @@ void DispatchHandle::unwatch() { poller->modFd(*this, Poller::NONE); state = INACTIVE; break; + case ACTIVE_DELETE: + assert(state != ACTIVE_DELETE); } } @@ -280,47 +312,72 @@ void DispatchHandle::stopWatch() { default: state = IDLE; break; + case ACTIVE_DELETE: + assert(state != ACTIVE_DELETE); } assert(poller); poller->delFd(*this); poller.reset(); } +// If we are already in the IDLE state we can't do the callback as we might +// race to delete and callback at the same time +// TODO: might be able to fix this by adding a new state, but would make +// the state machine even more complex void DispatchHandle::call(Callback iCb) { assert(iCb); ScopedLock<Mutex> lock(stateLock); - interruptedCallbacks.push(iCb); - - (void) poller->interrupt(*this); + switch (state) { + case IDLE: + case ACTIVE_DELETE: + assert(false); + return; + default: + interruptedCallbacks.push(iCb); + assert(poller); + (void) poller->interrupt(*this); + } } // The slightly strange switch structure // is to ensure that the lock is released before // we do the delete void DispatchHandle::doDelete() { - // Ensure that we're no longer watching anything - stopWatch(); - - // If we're in the middle of a callback defer the delete { ScopedLock<Mutex> lock(stateLock); + // Ensure that we're no longer watching anything switch (state) { + case DELAYED_R: + case DELAYED_W: + case DELAYED_RW: + case DELAYED_INACTIVE: + assert(poller); + poller->delFd(*this); + poller.reset(); + // Fallthrough case DELAYED_IDLE: - case DELAYED_DELETE: state = DELAYED_DELETE; + // Fallthrough + case DELAYED_DELETE: + case ACTIVE_DELETE: return; case IDLE: break; default: - // Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states - assert(false); + state = ACTIVE_DELETE; + assert(poller); + (void) poller->interrupt(*this); + poller->delFd(*this); + return; } } - // If we're not then do it right away + // If we're IDLE we can do this right away delete this; } void DispatchHandle::processEvent(Poller::EventType type) { + CallbackQueue callbacks; + // Note that we are now doing the callbacks { ScopedLock<Mutex> lock(stateLock); @@ -336,6 +393,16 @@ void DispatchHandle::processEvent(Poller::EventType type) { case ACTIVE_RW: state = DELAYED_RW; break; + case ACTIVE_DELETE: + // Need to make sure we clean up any pending callbacks in this case + std::swap(callbacks, interruptedCallbacks); + goto saybyebye; + // Can get here in idle if we are stopped in a different thread + // just after we return with this handle in Poller::wait + case IDLE: + // Can get here in INACTIVE if a non connection thread unwatches + // whilst we were stuck in the above lock + case INACTIVE: // Can only get here in a DELAYED_* state in the rare case // that we're already here for reading and we get activated for // writing and we can write (it might be possible the other way @@ -348,9 +415,9 @@ void DispatchHandle::processEvent(Poller::EventType type) { case DELAYED_IDLE: case DELAYED_DELETE: return; - default: - assert(false); } + + std::swap(callbacks, interruptedCallbacks); } // Do callbacks - whilst we are doing the callbacks we are prevented from processing @@ -378,8 +445,8 @@ void DispatchHandle::processEvent(Poller::EventType type) { break; case Poller::INTERRUPTED: { - ScopedLock<Mutex> lock(stateLock); - assert(interruptedCallbacks.size() > 0); + // We could only be interrupted if we also had a callback to do + assert(callbacks.size() > 0); // We'll actually do the interrupt below } break; @@ -387,16 +454,18 @@ void DispatchHandle::processEvent(Poller::EventType type) { assert(false); } - { - ScopedLock<Mutex> lock(stateLock); - // If we've got a pending interrupt do it now - while (interruptedCallbacks.size() > 0) { - Callback cb = interruptedCallbacks.front(); + // If we have any callbacks do them now - + // (because we use a copy from before the previous callbacks we won't + // do anything yet that was just added) + while (callbacks.size() > 0) { + Callback cb = callbacks.front(); assert(cb); cb(*this); - interruptedCallbacks.pop(); + callbacks.pop(); } + { + ScopedLock<Mutex> lock(stateLock); // If any of the callbacks re-enabled reading/writing then actually // do it now switch (state) { @@ -425,7 +494,9 @@ void DispatchHandle::processEvent(Poller::EventType type) { case DELAYED_DELETE: break; } - } + } + +saybyebye: delete this; } diff --git a/qpid/cpp/src/qpid/sys/DispatchHandle.h b/qpid/cpp/src/qpid/sys/DispatchHandle.h index 63cd25f8fd..bc9f98775e 100644 --- a/qpid/cpp/src/qpid/sys/DispatchHandle.h +++ b/qpid/cpp/src/qpid/sys/DispatchHandle.h @@ -65,6 +65,7 @@ private: Mutex stateLock; enum { IDLE, INACTIVE, ACTIVE_R, ACTIVE_W, ACTIVE_RW, + ACTIVE_DELETE, DELAYED_IDLE, DELAYED_INACTIVE, DELAYED_R, DELAYED_W, DELAYED_RW, DELAYED_DELETE } state; diff --git a/qpid/cpp/src/qpid/sys/PollableQueue.h b/qpid/cpp/src/qpid/sys/PollableQueue.h index a23cc5137a..f8acf0a5f6 100644 --- a/qpid/cpp/src/qpid/sys/PollableQueue.h +++ b/qpid/cpp/src/qpid/sys/PollableQueue.h @@ -53,7 +53,7 @@ class PollableQueue { * @param values Queue of values to process. Any items remaining * on return from Callback are put back on the queue. */ - typedef boost::function<void (Queue& values)> Callback; + typedef boost::function<void (Queue&)> Callback; /** * Constructor; sets necessary parameters. diff --git a/qpid/cpp/src/qpid/sys/Poller.h b/qpid/cpp/src/qpid/sys/Poller.h index 49d7b321d7..825ad8bfed 100644 --- a/qpid/cpp/src/qpid/sys/Poller.h +++ b/qpid/cpp/src/qpid/sys/Poller.h @@ -86,8 +86,9 @@ public: // with the handle and the INTERRUPTED event type // if it returns false then the handle is not being monitored by the poller // - This can either be because it has just received an event which has been - // reported and has not been reenabled since. Or because it was removed - // from the monitoring set + // reported and has not been reenabled since. + // - Because it was removed from the monitoring set + // - Or because it is already being interrupted QPID_COMMON_EXTERN bool interrupt(PollerHandle& handle); // Poller run loop diff --git a/qpid/cpp/src/qpid/sys/Socket.h b/qpid/cpp/src/qpid/sys/Socket.h index a81d823d0c..e6555f5774 100644 --- a/qpid/cpp/src/qpid/sys/Socket.h +++ b/qpid/cpp/src/qpid/sys/Socket.h @@ -27,8 +27,6 @@ #include "qpid/CommonImportExport.h" #include <string> -struct sockaddr; - namespace qpid { namespace sys { @@ -93,7 +91,7 @@ public: /** Accept a connection from a socket that is already listening * and has an incoming connection */ - QPID_COMMON_EXTERN Socket* accept(struct sockaddr *addr, socklen_t *addrlen) const; + QPID_COMMON_EXTERN Socket* accept() const; // TODO The following are raw operations, maybe they need better wrapping? QPID_COMMON_EXTERN int read(void *buf, size_t count) const; diff --git a/qpid/cpp/src/qpid/sys/Thread.h b/qpid/cpp/src/qpid/sys/Thread.h index 14e2fef1c0..b532d4d80a 100644 --- a/qpid/cpp/src/qpid/sys/Thread.h +++ b/qpid/cpp/src/qpid/sys/Thread.h @@ -28,6 +28,8 @@ # define QPID_TSS __declspec(thread) #elif defined (__GNUC__) # define QPID_TSS __thread +#elif defined (__SUNPRO_CC) +# define QPID_TSS __thread #else # error "Dont know how to define QPID_TSS for this platform" #endif diff --git a/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp index 10705e12da..42b5d8b1aa 100644 --- a/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp +++ b/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp @@ -54,17 +54,20 @@ class PollerHandlePrivate { INACTIVE, HUNGUP, MONITORED_HUNGUP, + INTERRUPTED, DELETED }; int fd; ::__uint32_t events; + PollerHandle* pollerHandle; FDStat stat; Mutex lock; - PollerHandlePrivate(int f) : + PollerHandlePrivate(int f, PollerHandle* p) : fd(f), events(0), + pollerHandle(p), stat(ABSENT) { } @@ -101,6 +104,14 @@ class PollerHandlePrivate { stat = HUNGUP; } + bool isInterrupted() const { + return stat == INTERRUPTED; + } + + void setInterrupted() { + stat = INTERRUPTED; + } + bool isDeleted() const { return stat == DELETED; } @@ -111,7 +122,7 @@ class PollerHandlePrivate { }; PollerHandle::PollerHandle(const IOHandle& h) : - impl(new PollerHandlePrivate(toFd(h.impl))) + impl(new PollerHandlePrivate(toFd(h.impl), this)) {} PollerHandle::~PollerHandle() { @@ -120,6 +131,10 @@ PollerHandle::~PollerHandle() { if (impl->isDeleted()) { return; } + if (impl->isInterrupted()) { + impl->setDeleted(); + return; + } if (impl->isActive()) { impl->setDeleted(); } @@ -243,23 +258,21 @@ class PollerPrivate { ::close(epollFd); } - void interrupt(bool all=false) { + void interrupt() { ::epoll_event epe; - if (all) { - // Not EPOLLONESHOT, so we eventually get all threads - epe.events = ::EPOLLIN; - epe.data.u64 = 0; // Keep valgrind happy - } else { - // Use EPOLLONESHOT so we only wake a single thread - epe.events = ::EPOLLIN | ::EPOLLONESHOT; - epe.data.u64 = 0; // Keep valgrind happy - epe.data.ptr = &static_cast<PollerHandle&>(interruptHandle); - } + // Use EPOLLONESHOT so we only wake a single thread + epe.events = ::EPOLLIN | ::EPOLLONESHOT; + epe.data.u64 = 0; // Keep valgrind happy + epe.data.ptr = &static_cast<PollerHandle&>(interruptHandle); QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, alwaysReadableFd, &epe)); } void interruptAll() { - interrupt(true); + ::epoll_event epe; + // Not EPOLLONESHOT, so we eventually get all threads + epe.events = ::EPOLLIN; + epe.data.u64 = 0; // Keep valgrind happy + QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, alwaysReadableFd, &epe)); } }; @@ -281,7 +294,7 @@ void Poller::addFd(PollerHandle& handle, Direction dir) { epe.events = eh.events | PollerPrivate::directionToEpollEvent(dir); } epe.data.u64 = 0; // Keep valgrind happy - epe.data.ptr = &handle; + epe.data.ptr = &eh; QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, eh.fd, &epe)); @@ -312,7 +325,7 @@ void Poller::modFd(PollerHandle& handle, Direction dir) { ::epoll_event epe; epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT; epe.data.u64 = 0; // Keep valgrind happy - epe.data.ptr = &handle; + epe.data.ptr = &eh; QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); @@ -329,7 +342,7 @@ void Poller::rearmFd(PollerHandle& handle) { ::epoll_event epe; epe.events = eh.events; epe.data.u64 = 0; // Keep valgrind happy - epe.data.ptr = &handle; + epe.data.ptr = &eh; QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); @@ -355,15 +368,14 @@ bool Poller::interrupt(PollerHandle& handle) { { PollerHandlePrivate& eh = *handle.impl; ScopedLock<Mutex> l(eh.lock); - if (eh.isInactive()) { + if (!eh.isActive()) { return false; } ::epoll_event epe; epe.events = 0; epe.data.u64 = 0; // Keep valgrind happy - epe.data.ptr = &eh; QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); - eh.setInactive(); + eh.setInterrupted(); } PollerPrivate::InterruptHandle& ih = impl->interruptHandle; @@ -422,37 +434,54 @@ Poller::Event Poller::wait(Duration timeout) { #else int rc = ::epoll_pwait(impl->epollFd, &epe, 1, timeoutMs, &impl->sigMask); #endif - // Check for shutdown - if (impl->isShutdown) { - PollerHandleDeletionManager.markAllUnusedInThisThread(); - return Event(0, SHUTDOWN); - } if (rc ==-1 && errno != EINTR) { QPID_POSIX_CHECK(rc); } else if (rc > 0) { assert(rc == 1); - PollerHandle* handle = static_cast<PollerHandle*>(epe.data.ptr); + void* dataPtr = epe.data.ptr; + + // Check if this is an interrupt + PollerPrivate::InterruptHandle& interruptHandle = impl->interruptHandle; + if (dataPtr == &interruptHandle) { + PollerHandle* wrappedHandle = 0; + { + ScopedLock<Mutex> l(interruptHandle.impl->lock); + if (interruptHandle.impl->isActive()) { + wrappedHandle = interruptHandle.getHandle(); + // If there is an interrupt queued behind this one we need to arm it + // We do it this way so that another thread can pick it up + if (interruptHandle.queuedHandles()) { + impl->interrupt(); + interruptHandle.impl->setActive(); + } else { + interruptHandle.impl->setInactive(); + } + } + } + if (wrappedHandle) { + ScopedLock<Mutex> l(wrappedHandle->impl->lock); + if (!wrappedHandle->impl->isDeleted()) { + wrappedHandle->impl->setInactive(); + return Event(wrappedHandle, INTERRUPTED); + } + PollerHandleDeletionManager.markForDeletion(wrappedHandle->impl); + } + continue; + } + + // Check for shutdown + if (impl->isShutdown) { + PollerHandleDeletionManager.markAllUnusedInThisThread(); + return Event(0, SHUTDOWN); + } - PollerHandlePrivate& eh = *handle->impl; + PollerHandlePrivate& eh = *static_cast<PollerHandlePrivate*>(dataPtr); ScopedLock<Mutex> l(eh.lock); // the handle could have gone inactive since we left the epoll_wait if (eh.isActive()) { - - // Check if this is an interrupt - if (handle == &impl->interruptHandle) { - PollerHandle* wrappedHandle = impl->interruptHandle.getHandle(); - // If there is an interrupt queued behind this one we need to arm it - // We do it this way so that another thread can pick it up - if (impl->interruptHandle.queuedHandles()) { - impl->interrupt(); - eh.setActive(); - } else { - eh.setInactive(); - } - return Event(wrappedHandle, INTERRUPTED); - } + PollerHandle* handle = eh.pollerHandle; // If the connection has been hungup we could still be readable // (just not writable), allow us to readable until we get here again diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp index a356a72650..a914dc817a 100644 --- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -123,7 +123,7 @@ void AsynchAcceptorPrivate::readable(DispatchHandle& h) { // TODO: Currently we ignore the peers address, perhaps we should // log it or use it for connection acceptance. try { - s = socket.accept(0, 0); + s = socket.accept(); if (s) { acceptedCallback(*s); } else { @@ -474,7 +474,7 @@ void AsynchIO::readable(DispatchHandle& h) { break; } else { // Report error then just treat as a socket disconnect - QPID_LOG(error, "Error reading socket: " << qpid::sys::strError(rc) << "(" << rc << ")" ); + QPID_LOG(error, "Error reading socket: " << qpid::sys::strError(errno) << "(" << errno << ")" ); eofCallback(*this); h.unwatchRead(); break; diff --git a/qpid/cpp/src/qpid/sys/posix/Socket.cpp b/qpid/cpp/src/qpid/sys/posix/Socket.cpp index 415d5293ef..ab0c28c48c 100644 --- a/qpid/cpp/src/qpid/sys/posix/Socket.cpp +++ b/qpid/cpp/src/qpid/sys/posix/Socket.cpp @@ -108,7 +108,7 @@ void Socket::createTcp() const { int& socket = impl->fd; if (socket != -1) Socket::close(); - int s = ::socket (PF_INET, SOCK_STREAM, 0); + int s = ::socket (AF_INET, SOCK_STREAM, 0); if (s < 0) throw QPID_POSIX_ERROR(errno); socket = s; } @@ -138,25 +138,30 @@ const char* h_errstr(int e) { } } -void Socket::connect(const std::string& host, uint16_t port) const +void Socket::connect(const std::string& host, uint16_t p) const { - std::stringstream namestream; - namestream << host << ":" << port; - connectname = namestream.str(); + std::stringstream portstream; + portstream << p; + std::string port = portstream.str(); + connectname = host + ":" + port; const int& socket = impl->fd; - struct sockaddr_in name; - name.sin_family = AF_INET; - name.sin_port = htons(port); - // TODO: Be good to make this work for IPv6 as well as IPv4 - // Use more modern lookup functions - struct hostent* hp = gethostbyname ( host.c_str() ); - if (hp == 0) - throw Exception(QPID_MSG("Cannot resolve " << host << ": " << h_errstr(h_errno))); - ::memcpy(&name.sin_addr.s_addr, hp->h_addr_list[0], hp->h_length); - if ((::connect(socket, (struct sockaddr*)(&name), sizeof(name)) < 0) && - (errno != EINPROGRESS)) + + ::addrinfo *res; + ::addrinfo hints; + ::memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_INET; // In order to allow AF_INET6 we'd have to change createTcp() as well + hints.ai_socktype = SOCK_STREAM; + int n = ::getaddrinfo(host.c_str(), port.c_str(), &hints, &res); + if (n != 0) + throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n))); + // TODO the correct thing to do here is loop on failure until you've used all the returned addresses + if ((::connect(socket, res->ai_addr, res->ai_addrlen) < 0) && + (errno != EINPROGRESS)) { + ::freeaddrinfo(res); throw qpid::Exception(QPID_MSG(strError(errno) << ": " << host << ":" << port)); + } + ::freeaddrinfo(res); } void @@ -189,9 +194,9 @@ int Socket::listen(uint16_t port, int backlog) const return ntohs(name.sin_port); } -Socket* Socket::accept(struct sockaddr *addr, socklen_t *addrlen) const +Socket* Socket::accept() const { - int afd = ::accept(impl->fd, addr, addrlen); + int afd = ::accept(impl->fd, 0, 0); if ( afd >= 0) return new Socket(new IOHandlePrivate(afd)); else if (errno == EAGAIN) @@ -238,7 +243,7 @@ uint16_t Socket::getLocalPort() const uint16_t Socket::getRemotePort() const { - return atoi(getService(impl->fd, true).c_str()); + return std::atoi(getService(impl->fd, true).c_str()); } int Socket::getError() const diff --git a/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp b/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp index 783f84576b..71fbea27c2 100644 --- a/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp +++ b/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp @@ -30,9 +30,11 @@ #include <port.h> #include <poll.h> #include <errno.h> +#include <pthread.h> +#include <signal.h> #include <assert.h> -#include <vector> +#include <queue> #include <exception> @@ -43,11 +45,11 @@ namespace qpid { namespace sys { // Deletion manager to handle deferring deletion of PollerHandles to when they definitely aren't being used -DeletionManager<PollerHandle> PollerHandleDeletionManager; +DeletionManager<PollerHandlePrivate> PollerHandleDeletionManager; // Instantiate (and define) class static for DeletionManager template <> -DeletionManager<PollerHandle>::AllThreadsStatuses DeletionManager<PollerHandle>::allThreadsStatuses(0); +DeletionManager<PollerHandlePrivate>::AllThreadsStatuses DeletionManager<PollerHandlePrivate>::allThreadsStatuses(0); class PollerHandlePrivate { friend class Poller; @@ -58,7 +60,8 @@ class PollerHandlePrivate { MONITORED, INACTIVE, HUNGUP, - MONITORED_HUNGUP + MONITORED_HUNGUP, + DELETED }; int fd; @@ -104,6 +107,14 @@ class PollerHandlePrivate { assert(stat == MONITORED); stat = HUNGUP; } + + bool isDeleted() const { + return stat == DELETED; + } + + void setDeleted() { + stat = DELETED; + } }; PollerHandle::PollerHandle(const IOHandle& h) : @@ -111,11 +122,16 @@ PollerHandle::PollerHandle(const IOHandle& h) : {} PollerHandle::~PollerHandle() { - delete impl; -} - -void PollerHandle::deferDelete() { - PollerHandleDeletionManager.markForDeletion(this); + { + ScopedLock<Mutex> l(impl->lock); + if (impl->isDeleted()) { + return; + } + if (impl->isActive()) { + impl->setDeleted(); + } + } + PollerHandleDeletionManager.markForDeletion(impl); } /** @@ -125,35 +141,80 @@ void PollerHandle::deferDelete() { class PollerPrivate { friend class Poller; - const int portId; + class InterruptHandle: public PollerHandle { + std::queue<PollerHandle*> handles; + + void processEvent(Poller::EventType) { + PollerHandle* handle = handles.front(); + handles.pop(); + assert(handle); + + //Synthesise event + Poller::Event event(handle, Poller::INTERRUPTED); + + //Process synthesised event + event.process(); + } + public: + InterruptHandle() : PollerHandle(DummyIOHandle) {} + + void addHandle(PollerHandle& h) { + handles.push(&h); + } + + PollerHandle *getHandle() { + PollerHandle* handle = handles.front(); + handles.pop(); + return handle; + } + + bool queuedHandles() { + return handles.size() > 0; + } + }; + + const int portId; + bool isShutdown; + InterruptHandle interruptHandle; + static uint32_t directionToPollEvent(Poller::Direction dir) { switch (dir) { - case Poller::INPUT: return POLLIN; - case Poller::OUTPUT: return POLLOUT; - case Poller::INOUT: return POLLIN | POLLOUT; - default: return 0; + case Poller::INPUT: return POLLIN; + case Poller::OUTPUT: return POLLOUT; + case Poller::INOUT: return POLLIN | POLLOUT; + default: return 0; } } static Poller::EventType pollToDirection(uint32_t events) { uint32_t e = events & (POLLIN | POLLOUT); switch (e) { - case POLLIN: return Poller::READABLE; - case POLLOUT: return Poller::WRITABLE; - case POLLIN | POLLOUT: return Poller::READ_WRITABLE; - default: - return (events & (POLLHUP | POLLERR)) ? - Poller::DISCONNECTED : Poller::INVALID; + case POLLIN: return Poller::READABLE; + case POLLOUT: return Poller::WRITABLE; + case POLLIN | POLLOUT: return Poller::READ_WRITABLE; + default: + return (events & (POLLHUP | POLLERR)) ? + Poller::DISCONNECTED : Poller::INVALID; } } - + PollerPrivate() : - portId(::port_create()) { + portId(::port_create()), + isShutdown(false) { + QPID_POSIX_CHECK(portId); } ~PollerPrivate() { } + + void interrupt() { + //Send an Alarm to the port + //We need to send a nonzero event mask, using POLLHUP, + //nevertheless the wait method will only look for a PORT_ALERT_SET + QPID_POSIX_CHECK(::port_alert(portId, PORT_ALERT_SET, POLLHUP, + &static_cast<PollerHandle&>(interruptHandle))); + } }; void Poller::addFd(PollerHandle& handle, Direction dir) { @@ -177,7 +238,6 @@ void Poller::addFd(PollerHandle& handle, Direction dir) { QPID_LOG(trace, "Poller::addFd(handle=" << &handle << "[" << typeid(&handle).name() << "], fd=" << eh.fd << ")"); - //assert(dynamic_cast<DispatchHandle*>(&handle)); } void Poller::delFd(PollerHandle& handle) { @@ -223,17 +283,56 @@ void Poller::rearmFd(PollerHandle& handle) { } void Poller::shutdown() { - //Send an Alarm to the port - //We need to send a nonzero event mask, using POLLHUP, but - //The wait method will only look for a PORT_ALERT_SET - QPID_POSIX_CHECK(::port_alert(impl->portId, PORT_ALERT_SET, POLLHUP, NULL)); - QPID_LOG(trace, "Poller::shutdown"); + //Allow sloppy code to shut us down more than once + if (impl->isShutdown) + return; + + impl->isShutdown = true; + impl->interrupt(); +} + +bool Poller::interrupt(PollerHandle& handle) { + PollerPrivate::InterruptHandle& ih = impl->interruptHandle; + PollerHandlePrivate& eh = *static_cast<PollerHandle&>(ih).impl; + ScopedLock<Mutex> l(eh.lock); + ih.addHandle(handle); + impl->interrupt(); + eh.setActive(); + return true; +} + +void Poller::run() { + // Make sure we can't be interrupted by signals at a bad time + ::sigset_t ss; + ::sigfillset(&ss); + ::pthread_sigmask(SIG_SETMASK, &ss, 0); + + do { + Event event = wait(); + + // If can read/write then dispatch appropriate callbacks + if (event.handle) { + event.process(); + } else { + // Handle shutdown + switch (event.type) { + case SHUTDOWN: + return; + default: + // This should be impossible + assert(false); + } + } + } while (true); } Poller::Event Poller::wait(Duration timeout) { timespec_t tout; timespec_t* ptout = NULL; port_event_t pe; + + AbsTime targetTimeout = (timeout == TIME_INFINITE) ? FAR_FUTURE : + AbsTime(now(), timeout); if (timeout != TIME_INFINITE) { tout.tv_sec = 0; @@ -246,9 +345,15 @@ Poller::Event Poller::wait(Duration timeout) { QPID_LOG(trace, "About to enter port_get. Thread " << pthread_self() << ", timeout=" << timeout); - + + int rc = ::port_get(impl->portId, &pe, ptout); + if (impl->isShutdown) { + PollerHandleDeletionManager.markAllUnusedInThisThread(); + return Event(0, SHUTDOWN); + } + if (rc < 0) { switch (errno) { case EINTR: @@ -259,33 +364,60 @@ Poller::Event Poller::wait(Duration timeout) { QPID_POSIX_CHECK(rc); } } else { - //We use alert mode to notify the shutdown of the Poller - if (pe.portev_source == PORT_SOURCE_ALERT) { - return Event(0, SHUTDOWN); - } - if (pe.portev_source == PORT_SOURCE_FD) { - PollerHandle *handle = static_cast<PollerHandle*>(pe.portev_user); - PollerHandlePrivate& eh = *handle->impl; - ScopedLock<Mutex> l(eh.lock); - QPID_LOG(trace, "About to send handle: " << handle); + PollerHandle* handle = static_cast<PollerHandle*>(pe.portev_user); + PollerHandlePrivate& eh = *handle->impl; + ScopedLock<Mutex> l(eh.lock); + + if (eh.isActive()) { + //We use alert mode to notify interrupts + if (pe.portev_source == PORT_SOURCE_ALERT && + handle == &impl->interruptHandle) { + PollerHandle* wrappedHandle = impl->interruptHandle.getHandle(); + + if (impl->interruptHandle.queuedHandles()) { + impl->interrupt(); + eh.setActive(); + } else { + eh.setInactive(); + } + return Event(wrappedHandle, INTERRUPTED); + } - if (eh.isActive()) { - if (pe.portev_events & POLLHUP) { - if (eh.isHungup()) { - return Event(handle, DISCONNECTED); + return Event(0, SHUTDOWN); + + if (pe.portev_source == PORT_SOURCE_FD) { + QPID_LOG(trace, "About to send handle: " << handle); + if (pe.portev_events & POLLHUP) { + if (eh.isHungup()) { + return Event(handle, DISCONNECTED); + } + eh.setHungup(); + } else { + eh.setInactive(); } - eh.setHungup(); - } else { - eh.setInactive(); - } - QPID_LOG(trace, "Sending event (thread: " - << pthread_self() << ") for handle " << handle - << ", direction= " - << PollerPrivate::pollToDirection(pe.portev_events)); - return Event(handle, PollerPrivate::pollToDirection(pe.portev_events)); + QPID_LOG(trace, "Sending event (thread: " + << pthread_self() << ") for handle " << handle + << ", direction= " + << PollerPrivate::pollToDirection(pe.portev_events)); + return Event(handle, PollerPrivate::pollToDirection(pe.portev_events)); + } + } else if (eh.isDeleted()) { + //Remove the handle from the poller + int rc = ::port_dissociate(impl->portId, PORT_SOURCE_FD, + (uintptr_t) eh.fd); + if (rc == -1 && errno != EBADFD) { + QPID_POSIX_CHECK(rc); } } } + + if (timeout == TIME_INFINITE) { + continue; + } + if (rc == 0 && now() > targetTimeout) { + PollerHandleDeletionManager.markAllUnusedInThisThread(); + return Event(0, TIMEOUT); + } } while (true); } diff --git a/qpid/cpp/src/qpid/sys/solaris/SystemInfo.cpp b/qpid/cpp/src/qpid/sys/solaris/SystemInfo.cpp new file mode 100755 index 0000000000..0075a89021 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/solaris/SystemInfo.cpp @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/SystemInfo.h" + +#define BSD_COMP +#include <sys/ioctl.h> +#include <netdb.h> +#undef BDS_COMP + + +#include <unistd.h> +#include <net/if.h> +#include <sys/types.h> +#include <sys/utsname.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <stdio.h> +#include <errno.h> +#include <limits.h> +#include <procfs.h> +#include <fcntl.h> +#include <sys/types.h> + +using namespace std; + +namespace qpid { +namespace sys { + +long SystemInfo::concurrency() { + return sysconf(_SC_NPROCESSORS_ONLN); +} + +bool SystemInfo::getLocalHostname(TcpAddress &address) { + char name[MAXHOSTNAMELEN]; + if (::gethostname(name, sizeof(name)) != 0) + return false; + address.host = name; + return true; +} + +static const string LOCALHOST("127.0.0.1"); + +void SystemInfo::getLocalIpAddresses(uint16_t port, + std::vector<Address> &addrList) { + int s = socket(PF_INET, SOCK_STREAM, 0); + for (int i=1;;i++) { + struct lifreq ifr; + ifr.lifr_index = i; + if (::ioctl(s, SIOCGIFADDR, &ifr) < 0) { + break; + } + struct sockaddr_in *sin = (struct sockaddr_in *) &ifr.lifr_addr; + std::string addr(inet_ntoa(sin->sin_addr)); + if (addr != LOCALHOST) + addrList.push_back(TcpAddress(addr, port)); + } + if (addrList.empty()) { + addrList.push_back(TcpAddress(LOCALHOST, port)); + } + close (s); +} + +void SystemInfo::getSystemId(std::string &osName, + std::string &nodeName, + std::string &release, + std::string &version, + std::string &machine) { + struct utsname _uname; + if (uname (&_uname) == 0) { + osName = _uname.sysname; + nodeName = _uname.nodename; + release = _uname.release; + version = _uname.version; + machine = _uname.machine; + } +} + +uint32_t SystemInfo::getProcessId() +{ + return (uint32_t) ::getpid(); +} + +uint32_t SystemInfo::getParentProcessId() +{ + return (uint32_t) ::getppid(); +} + +string SystemInfo::getProcessName() +{ + psinfo processInfo; + char procfile[PATH_MAX]; + int fd; + string value; + + snprintf(procfile, PATH_MAX, "/proc/%d/psinfo", getProcessId()); + if ((fd = open(procfile, O_RDONLY)) >= 0) { + if (read(fd, (void *) &processInfo, sizeof(processInfo)) == sizeof(processInfo)) { + value = processInfo.pr_fname; + } + } + return value; +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp b/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp index 9be75af47d..624683ae7d 100644 --- a/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp +++ b/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp @@ -90,7 +90,7 @@ void SslAcceptor::readable(DispatchHandle& h) { // TODO: Currently we ignore the peers address, perhaps we should // log it or use it for connection acceptance. try { - s = socket.accept(0, 0); + s = socket.accept(); if (s) { acceptedCallback(*s); } else { diff --git a/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp b/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp index 597fbe57db..dc816b403b 100644 --- a/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp +++ b/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp @@ -201,9 +201,9 @@ int SslSocket::listen(uint16_t port, int backlog, const std::string& certName, b return ntohs(name.sin_port); } -SslSocket* SslSocket::accept(struct sockaddr *addr, socklen_t *addrlen) const +SslSocket* SslSocket::accept() const { - int afd = ::accept(impl->fd, addr, addrlen); + int afd = ::accept(impl->fd, 0, 0); if ( afd >= 0) { return new SslSocket(new IOHandlePrivate(afd), prototype); } else if (errno == EAGAIN) { diff --git a/qpid/cpp/src/qpid/sys/ssl/SslSocket.h b/qpid/cpp/src/qpid/sys/ssl/SslSocket.h index a82e9133e8..7434667b78 100644 --- a/qpid/cpp/src/qpid/sys/ssl/SslSocket.h +++ b/qpid/cpp/src/qpid/sys/ssl/SslSocket.h @@ -64,7 +64,7 @@ public: * Accept a connection from a socket that is already listening * and has an incoming connection */ - SslSocket* accept(struct sockaddr *addr, socklen_t *addrlen) const; + SslSocket* accept() const; // TODO The following are raw operations, maybe they need better wrapping? int read(void *buf, size_t count) const; diff --git a/qpid/cpp/src/qpid/sys/windows/IntegerTypes.h b/qpid/cpp/src/qpid/sys/windows/IntegerTypes.h index 80168fab88..4b72154cd0 100755 --- a/qpid/cpp/src/qpid/sys/windows/IntegerTypes.h +++ b/qpid/cpp/src/qpid/sys/windows/IntegerTypes.h @@ -32,7 +32,6 @@ typedef __int64 int64_t; // Visual Studio doesn't define other common types, so set them up here too. typedef int pid_t; -typedef int socklen_t; typedef unsigned int size_t; typedef int ssize_t; typedef unsigned int uint; diff --git a/qpid/cpp/src/qpid/sys/windows/Socket.cpp b/qpid/cpp/src/qpid/sys/windows/Socket.cpp index a9959bf43e..93059d03ef 100755 --- a/qpid/cpp/src/qpid/sys/windows/Socket.cpp +++ b/qpid/cpp/src/qpid/sys/windows/Socket.cpp @@ -262,9 +262,9 @@ int Socket::listen(uint16_t port, int backlog) const return ntohs(name.sin_port); } -Socket* Socket::accept(struct sockaddr *addr, socklen_t *addrlen) const +Socket* Socket::accept() const { - SOCKET afd = ::accept(impl->fd, addr, addrlen); + SOCKET afd = ::accept(impl->fd, 0, 0); if (afd != INVALID_SOCKET) return new Socket(new IOHandlePrivate(afd)); else if (WSAGetLastError() == EAGAIN) diff --git a/qpid/cpp/src/qpid/xml/XmlExchange.cpp b/qpid/cpp/src/qpid/xml/XmlExchange.cpp index 5197b239d0..4e9de49ad5 100644 --- a/qpid/cpp/src/qpid/xml/XmlExchange.cpp +++ b/qpid/cpp/src/qpid/xml/XmlExchange.cpp @@ -33,6 +33,9 @@ #include "qpid/Plugin.h" #include <xercesc/framework/MemBufInputSource.hpp> + +#include <xqilla/ast/XQGlobalVariable.hpp> + #include <xqilla/context/ItemFactory.hpp> #include <xqilla/xqilla-simple.hpp> @@ -62,17 +65,6 @@ XmlExchange::XmlExchange(const std::string& _name, bool _durable, mgmtExchange->set_type (typeName); } -/* - * Use the name of the query as the binding key. - * - * The first time a given name is used in a binding, the query body - * must be provided.After that, no query body should be present. - * - * To modify an installed query, the user must first unbind the - * existing query, then replace it by binding again with the same - * name. - * - */ // #### TODO: The Binding should take the query text // #### only. Consider encapsulating the entire block, including @@ -94,6 +86,21 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const bindings.add(binding); QPID_LOG(trace, "Bound successfully with query: " << queryText ); + binding->parse_message_content = false; + + if (query->getQueryBody()->getStaticAnalysis().areContextFlagsUsed()) { + binding->parse_message_content = true; + } + else { + GlobalVariables &vars = const_cast<GlobalVariables&>(query->getVariables()); + for(GlobalVariables::iterator it = vars.begin(); it != vars.end(); ++it) { + if ((*it)->getStaticAnalysis().areContextFlagsUsed()) { + binding->parse_message_content = true; + break; + } + } + } + if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount(); @@ -126,59 +133,74 @@ bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, cons } } -bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args) +bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content) { - // ### TODO: Need istream for frameset - // Hack alert - the following code does not work for really large messages - string msgContent; try { - msg.getMessage().getFrames().getContent(msgContent); - - QPID_LOG(trace, "matches: query is [" << UTF8(query->getQueryText()) << "]"); - QPID_LOG(trace, "matches: message content is [" << msgContent << "]"); - - boost::scoped_ptr<DynamicContext> context(query->createDynamicContext()); - if (!context.get()) { - throw InternalErrorException(QPID_MSG("Query context looks munged ...")); - } - - XERCES_CPP_NAMESPACE::MemBufInputSource xml((const XMLByte*) msgContent.c_str(), - msgContent.length(), "input" ); - Sequence seq(context->parseDocument(xml)); - - if (args) { - FieldTable::ValueMap::const_iterator v = args->begin(); - for(; v != args->end(); ++v) { - // ### TODO: Do types properly - if (v->second->convertsTo<std::string>()) { - QPID_LOG(trace, "XmlExchange, external variable: " << v->first << " = " << v->second->getData().getString().c_str()); - Item::Ptr value = context->getItemFactory()->createString(X(v->second->getData().getString().c_str()), context.get()); - context->setExternalVariable(X(v->first.c_str()), value); - } - } - } - - if(!seq.isEmpty() && seq.first()->isNode()) { - context->setContextItem(seq.first()); - context->setContextPosition(1); - context->setContextSize(1); - } - Result result = query->execute(context.get()); - return result->getEffectiveBooleanValue(context.get(), 0); + QPID_LOG(trace, "matches: query is [" << UTF8(query->getQueryText()) << "]"); + + boost::scoped_ptr<DynamicContext> context(query->createDynamicContext()); + if (!context.get()) { + throw InternalErrorException(QPID_MSG("Query context looks munged ...")); + } + + if (parse_message_content) { + + msg.getMessage().getFrames().getContent(msgContent); + + QPID_LOG(trace, "matches: message content is [" << msgContent << "]"); + + XERCES_CPP_NAMESPACE::MemBufInputSource xml((const XMLByte*) msgContent.c_str(), + msgContent.length(), "input" ); + + // This will parse the document using either Xerces or FastXDM, depending + // on your XQilla configuration. FastXDM can be as much as 10x faster. + + Sequence seq(context->parseDocument(xml)); + + if(!seq.isEmpty() && seq.first()->isNode()) { + context->setContextItem(seq.first()); + context->setContextPosition(1); + context->setContextSize(1); + } + } + + if (args) { + FieldTable::ValueMap::const_iterator v = args->begin(); + for(; v != args->end(); ++v) { + // ### TODO: Do types properly + if (v->second->convertsTo<std::string>()) { + QPID_LOG(trace, "XmlExchange, external variable: " << v->first << " = " << v->second->getData().getString().c_str()); + Item::Ptr value = context->getItemFactory()->createString(X(v->second->getData().getString().c_str()), context.get()); + context->setExternalVariable(X(v->first.c_str()), value); + } + } + } + + Result result = query->execute(context.get()); + return result->getEffectiveBooleanValue(context.get(), 0); } catch (XQException& e) { - QPID_LOG(warning, "Could not parse XML content (or message headers):" << msgContent); - return 0; + QPID_LOG(warning, "Could not parse XML content (or message headers):" << msgContent); } catch (...) { - QPID_LOG(warning, "Unexpected error routing message: " << msgContent); - return 0; + QPID_LOG(warning, "Unexpected error routing message: " << msgContent); } return 0; } +// Future optimization: If any query in a binding for a given routing key requires +// message content, parse the message once, and use that parsed form for all bindings. +// +// Future optimization: XQilla does not currently do document projection for data +// accessed via the context item. If there is a single query for a given routing key, +// and it accesses document data, this could be a big win. +// +// Document projection often is not a win if you have multiple queries on the same data. +// But for very large messages, if all these queries are on the first part of the data, +// it could still be a big win. + void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* args) { PreRoute pr(msg, this); @@ -192,7 +214,7 @@ void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldT int count(0); for (std::vector<XmlBinding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++) { - if ((*i)->xquery && matches((*i)->xquery, msg, args)) { // Overly defensive? There should always be a query ... + if (matches((*i)->xquery, msg, args, (*i)->parse_message_content)) { msg.deliverTo((*i)->queue); count++; QPID_LOG(trace, "Delivered to queue" ); diff --git a/qpid/cpp/src/qpid/xml/XmlExchange.h b/qpid/cpp/src/qpid/xml/XmlExchange.h index 066a26489d..7bec480bde 100644 --- a/qpid/cpp/src/qpid/xml/XmlExchange.h +++ b/qpid/cpp/src/qpid/xml/XmlExchange.h @@ -46,9 +46,10 @@ class XmlExchange : public virtual Exchange { typedef qpid::sys::CopyOnWriteArray<XmlBinding::shared_ptr> vector; boost::shared_ptr<XQQuery> xquery; + bool parse_message_content; XmlBinding(const std::string& key, const Queue::shared_ptr queue, Exchange* parent, Query query): - Binding(key, queue, parent), xquery(query) {} + Binding(key, queue, parent), xquery(query), parse_message_content(true) {} }; @@ -58,7 +59,7 @@ class XmlExchange : public virtual Exchange { XQilla xqilla; qpid::sys::RWlock lock; - bool matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args); + bool matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content); public: static const std::string typeName; diff --git a/qpid/cpp/src/replication.mk b/qpid/cpp/src/replication.mk index 6830f99d36..c65a7ae3b9 100644 --- a/qpid/cpp/src/replication.mk +++ b/qpid/cpp/src/replication.mk @@ -29,6 +29,9 @@ replicating_listener_la_SOURCES = \ qpid/replication/ReplicatingEventListener.h replicating_listener_la_LIBADD = libqpidbroker.la +if SUNOS + replicating_listener_la_LIBADD += libqpidcommon.la -lboost_program_options -luuid $(SUNCC_RUNTIME_LIBS) +endif replicating_listener_la_LDFLAGS = $(PLUGINLDFLAGS) @@ -43,4 +46,7 @@ replication_exchange_la_SOURCES = \ replication_exchange_la_LIBADD = libqpidbroker.la +if SUNOS + replication_exchange_la_LIBADD += libqpidcommon.la -lboost_program_options -lCrun -luuid +endif replication_exchange_la_LDFLAGS = $(PLUGINLDFLAGS) diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp index 1658f3d4ec..589e1154e1 100644 --- a/qpid/cpp/src/tests/ClientSessionTest.cpp +++ b/qpid/cpp/src/tests/ClientSessionTest.cpp @@ -428,8 +428,8 @@ QPID_AUTO_TEST_CASE(testConcurrentSenders) for (size_t i = 0; i < 5; i++) { publishers.push_back(new Publisher(connection, message, 100)); } - for_each(publishers.begin(), publishers.end(), boost::bind(&Publisher::start, _1)); - for_each(publishers.begin(), publishers.end(), boost::bind(&Publisher::join, _1)); + std::for_each(publishers.begin(), publishers.end(), boost::bind(&Publisher::start, _1)); + std::for_each(publishers.begin(), publishers.end(), boost::bind(&Publisher::join, _1)); connection.close(); } @@ -564,6 +564,24 @@ QPID_AUTO_TEST_CASE(testSessionManagerSetFlowControl) { BOOST_CHECK_EQUAL("my-message", got.getData()); } +QPID_AUTO_TEST_CASE(testGetThenSubscribe) { + ClientSessionFixture fix; + std::string name("myqueue"); + fix.session.queueDeclare(arg::queue=name, arg::exclusive=true, arg::autoDelete=true); + fix.session.messageTransfer(arg::content=Message("one", name)); + fix.session.messageTransfer(arg::content=Message("two", name)); + Message got; + BOOST_CHECK(fix.subs.get(got, name)); + BOOST_CHECK_EQUAL("one", got.getData()); + + DummyListener listener(fix.session, name, 1); + listener.run(); + BOOST_CHECK_EQUAL(1u, listener.messages.size()); + if (!listener.messages.empty()) { + BOOST_CHECK_EQUAL("two", listener.messages[0].getData()); + } +} + QPID_AUTO_TEST_SUITE_END() diff --git a/qpid/cpp/src/tests/ClusterFixture.cpp b/qpid/cpp/src/tests/ClusterFixture.cpp index 3a0ea74098..5658957b48 100644 --- a/qpid/cpp/src/tests/ClusterFixture.cpp +++ b/qpid/cpp/src/tests/ClusterFixture.cpp @@ -109,7 +109,7 @@ void ClusterFixture::addLocal() { Args args(makeArgs(prefix)); vector<const char*> argv(args.size()); transform(args.begin(), args.end(), argv.begin(), boost::bind(&string::c_str, _1)); - qpid::log::Logger::instance().setPrefix(os.str()); + qpid::log::Logger::instance().setPrefix(prefix); localBroker.reset(new BrokerFixture(parseOpts(argv.size(), &argv[0]))); push_back(localBroker->getPort()); forkedBrokers.push_back(shared_ptr<ForkedBroker>()); diff --git a/qpid/cpp/src/tests/DispatcherTest.cpp b/qpid/cpp/src/tests/DispatcherTest.cpp index c2f6bca12a..c619a36438 100644 --- a/qpid/cpp/src/tests/DispatcherTest.cpp +++ b/qpid/cpp/src/tests/DispatcherTest.cpp @@ -78,9 +78,6 @@ void reader(DispatchHandle& h, int fd) { h.rewatch(); } -DispatchHandle* rh = 0; -DispatchHandle* wh = 0; - void rInterrupt(DispatchHandle&) { cerr << "R"; } @@ -92,9 +89,27 @@ void wInterrupt(DispatchHandle&) { DispatchHandle::Callback rcb = rInterrupt; DispatchHandle::Callback wcb = wInterrupt; +DispatchHandleRef *volatile rh = 0; +DispatchHandleRef *volatile wh = 0; + +volatile bool stopWait = false; +volatile bool phase1finished = false; + +timer_t timer; + +void stop_handler(int /*signo*/, siginfo_t* /*info*/, void* /*context*/) { + stopWait = true; +} + void timer_handler(int /*signo*/, siginfo_t* /*info*/, void* /*context*/) { - rh->call(rcb); - wh->call(wcb); + static int count = 0; + if (count++ < 10) { + rh->call(rcb); + wh->call(wcb); + } else { + phase1finished = true; + assert(::timer_delete(timer) == 0); + } } int main(int /*argc*/, char** /*argv*/) @@ -114,7 +129,7 @@ int main(int /*argc*/, char** /*argv*/) // Setup sender and receiver int sv[2]; - int rc = ::socketpair(AF_LOCAL, SOCK_STREAM, 0, sv); + int rc = ::socketpair(AF_UNIX, SOCK_STREAM, 0, sv); assert(rc >= 0); // Set non-blocking @@ -132,8 +147,8 @@ int main(int /*argc*/, char** /*argv*/) PosixIOHandle f0(sv[0]); PosixIOHandle f1(sv[1]); - rh = new DispatchHandle(f0, boost::bind(reader, _1, sv[0]), 0, 0); - wh = new DispatchHandle(f1, 0, boost::bind(writer, _1, sv[1], testString), 0); + rh = new DispatchHandleRef(f0, boost::bind(reader, _1, sv[0]), 0, 0); + wh = new DispatchHandleRef(f1, 0, boost::bind(writer, _1, sv[1], testString), 0); rh->startWatch(poller); wh->startWatch(poller); @@ -155,10 +170,9 @@ int main(int /*argc*/, char** /*argv*/) assert(rc == 0); ::sigevent se; + ::memset(&se, 0, sizeof(se)); // Clear to make valgrind happy (this *is* the neatest way to do this portably - sigh) se.sigev_notify = SIGEV_SIGNAL; se.sigev_signo = SIGRTMIN; - se.sigev_value.sival_ptr = 0; - timer_t timer; rc = ::timer_create(CLOCK_REALTIME, &se, &timer); assert(rc == 0); @@ -169,11 +183,52 @@ int main(int /*argc*/, char** /*argv*/) rc = ::timer_settime(timer, 0, &ts, 0); assert(rc == 0); - // wait 2 minutes then shutdown - ::sleep(60); + // wait + while (!phase1finished) { + ::sleep(1); + } + + // Now test deleting/creating DispatchHandles in tight loop, so that we are likely to still be using the + // attached PollerHandles after deleting the DispatchHandle + DispatchHandleRef* t = wh; + wh = 0; + delete t; + t = rh; + rh = 0; + delete t; + + sa.sa_sigaction = stop_handler; + rc = ::sigaction(SIGRTMIN, &sa,0); + assert(rc == 0); + + itimerspec nts = { + /*.it_value = */ {30, 0}, // s, ns + /*.it_interval = */ {30, 0}}; // s, ns + + rc = ::timer_create(CLOCK_REALTIME, &se, &timer); + assert(rc == 0); + rc = ::timer_settime(timer, 0, &nts, 0); + assert(rc == 0); + + DispatchHandleRef* rh1; + DispatchHandleRef* wh1; + + struct timespec w = {0, 1000000}; + while (!stopWait) { + rh1 = new DispatchHandleRef(f0, boost::bind(reader, _1, sv[0]), 0, 0); + wh1 = new DispatchHandleRef(f1, 0, boost::bind(writer, _1, sv[1], testString), 0); + rh1->startWatch(poller); + wh1->startWatch(poller); + + ::nanosleep(&w, 0); + + delete wh1; + delete rh1; + } rc = ::timer_delete(timer); assert(rc == 0); + poller->shutdown(); dt.join(); dt1.join(); diff --git a/qpid/cpp/src/tests/ForkedBroker.cpp b/qpid/cpp/src/tests/ForkedBroker.cpp index 2b90068549..f90f76aeb2 100644 --- a/qpid/cpp/src/tests/ForkedBroker.cpp +++ b/qpid/cpp/src/tests/ForkedBroker.cpp @@ -22,6 +22,9 @@ #include "ForkedBroker.h" #include <boost/bind.hpp> #include <algorithm> +#include <stdlib.h> +#include <sys/types.h> +#include <signal.h> ForkedBroker::ForkedBroker(const Args& args) { init(args); } @@ -41,9 +44,9 @@ void ForkedBroker::kill(int sig) { if (::kill(savePid, sig) < 0) throw ErrnoException("kill failed"); int status; - if (::waitpid(savePid, &status, 0) < 0) + if (::waitpid(savePid, &status, 0) < 0 && sig != 9) throw ErrnoException("wait for forked process failed"); - if (WEXITSTATUS(status) != 0) + if (WEXITSTATUS(status) != 0 && sig != 9) throw qpid::Exception(QPID_MSG("Forked broker exited with: " << WEXITSTATUS(status))); } diff --git a/qpid/cpp/src/tests/FrameDecoder.cpp b/qpid/cpp/src/tests/FrameDecoder.cpp new file mode 100644 index 0000000000..f5db66d5fe --- /dev/null +++ b/qpid/cpp/src/tests/FrameDecoder.cpp @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "unit_test.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/FrameDecoder.h" +#include "qpid/framing/AMQContentBody.h" +#include "qpid/framing/Buffer.h" +#include <string> + + +QPID_AUTO_TEST_SUITE(FrameDecoderTest) + +using namespace std; +using namespace qpid::framing; + + +string makeData(int size) { + string data; + data.resize(size); + for (int i =0; i < size; ++i) + data[i] = 'a' + (i%26); + return data; +} +string encodeFrame(string data) { + AMQFrame f((AMQContentBody(data))); + string encoded; + encoded.resize(f.encodedSize()); + Buffer b(&encoded[0], encoded.size()); + f.encode(b); + return encoded; +} + +string getData(const AMQFrame& frame) { + const AMQContentBody* content = dynamic_cast<const AMQContentBody*>(frame.getBody()); + BOOST_CHECK(content); + return content->getData(); +} + +QPID_AUTO_TEST_CASE(testByteFragments) { + string data = makeData(42); + string encoded = encodeFrame(data); + FrameDecoder decoder; + for (size_t i = 0; i < encoded.size()-1; ++i) { + Buffer buf(&encoded[i], 1); + BOOST_CHECK(!decoder.decode(buf)); + } + Buffer buf(&encoded[encoded.size()-1], 1); + BOOST_CHECK(decoder.decode(buf)); + BOOST_CHECK_EQUAL(data, getData(decoder.getFrame())); +} + + + +QPID_AUTO_TEST_SUITE_END() diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index 9da85eaad4..7a33a7257b 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -94,7 +94,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ QueueEvents.cpp \ ProxyTest.cpp \ RetryList.cpp \ - RateFlowcontrolTest.cpp + RateFlowcontrolTest.cpp \ + FrameDecoder.cpp if HAVE_XML unit_test_SOURCES+= XmlClientSessionTest.cpp @@ -163,7 +164,7 @@ header_test_LDADD=$(lib_client) check_PROGRAMS+=failover_soak failover_soak_SOURCES=failover_soak.cpp ForkedBroker.h ForkedBroker.cpp -failover_soak_LDADD=$(lib_client) +failover_soak_LDADD=$(lib_client) $(lib_broker) check_PROGRAMS+=declare_queues declare_queues_SOURCES=declare_queues.cpp @@ -195,11 +196,11 @@ sender_LDADD=$(lib_client) check_PROGRAMS+=PollerTest PollerTest_SOURCES=PollerTest.cpp -PollerTest_LDADD=$(lib_common) +PollerTest_LDADD=$(lib_common) $(SOCKLIBS) check_PROGRAMS+=DispatcherTest DispatcherTest_SOURCES=DispatcherTest.cpp -DispatcherTest_LDADD=$(lib_common) +DispatcherTest_LDADD=$(lib_common) $(SOCKLIBS) TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= BOOST_TEST_SHOW_PROGRESS=yes $(srcdir)/run_test @@ -255,9 +256,11 @@ CLEANFILES+=valgrind.out *.log *.vglog* dummy_test $(unit_wrappers) # Longer running stability tests, not run by default check: target. # Not run under valgrind, too slow -LONG_TESTS+=start_broker fanout_perftest shared_perftest multiq_perftest topic_perftest run_failover_soak stop_broker reliable_replication_test +LONG_TESTS+=start_broker fanout_perftest shared_perftest multiq_perftest topic_perftest run_failover_soak stop_broker \ + reliable_replication_test federated_cluster_test_with_node_failure -EXTRA_DIST+=fanout_perftest shared_perftest multiq_perftest topic_perftest run_failover_soak reliable_replication_test +EXTRA_DIST+=fanout_perftest shared_perftest multiq_perftest topic_perftest run_failover_soak reliable_replication_test \ + federated_cluster_test_with_node_failure check-long: $(MAKE) check TESTS="$(LONG_TESTS)" VALGRIND= diff --git a/qpid/cpp/src/tests/MessageUtils.h b/qpid/cpp/src/tests/MessageUtils.h index 67a852aa10..6a12c72007 100644 --- a/qpid/cpp/src/tests/MessageUtils.h +++ b/qpid/cpp/src/tests/MessageUtils.h @@ -33,7 +33,7 @@ struct MessageUtils static boost::intrusive_ptr<Message> createMessage(const string& exchange="", const string& routingKey="", const Uuid& messageId=Uuid(true), uint64_t contentSize = 0) { - boost::intrusive_ptr<Message> msg(new Message()); + boost::intrusive_ptr<broker::Message> msg(new broker::Message()); AMQFrame method(( MessageTransferBody(ProtocolVersion(), exchange, 0, 0))); AMQFrame header((AMQHeaderBody())); diff --git a/qpid/cpp/src/tests/PollerTest.cpp b/qpid/cpp/src/tests/PollerTest.cpp index 4f11dc5901..5b6b09ef65 100644 --- a/qpid/cpp/src/tests/PollerTest.cpp +++ b/qpid/cpp/src/tests/PollerTest.cpp @@ -74,7 +74,7 @@ int main(int /*argc*/, char** /*argv*/) try { int sv[2]; - int rc = ::socketpair(AF_LOCAL, SOCK_STREAM, 0, sv); + int rc = ::socketpair(AF_UNIX, SOCK_STREAM, 0, sv); assert(rc >= 0); // Set non-blocking diff --git a/qpid/cpp/src/tests/QueueEvents.cpp b/qpid/cpp/src/tests/QueueEvents.cpp index df3c937e33..cd9439355e 100644 --- a/qpid/cpp/src/tests/QueueEvents.cpp +++ b/qpid/cpp/src/tests/QueueEvents.cpp @@ -38,7 +38,7 @@ using namespace qpid::broker; using namespace qpid::sys; using qpid::framing::SequenceNumber; -struct DummyListener +struct EventChecker { typedef std::deque<QueueEvents::Event> Events; @@ -71,9 +71,9 @@ QPID_AUTO_TEST_CASE(testBasicEventProcessing) sys::Dispatcher dispatcher(poller); Thread dispatchThread(dispatcher); QueueEvents events(poller); - DummyListener listener; + EventChecker listener; listener.poller = poller; - events.registerListener("dummy", boost::bind(&DummyListener::handle, &listener, _1)); + events.registerListener("dummy", boost::bind(&EventChecker::handle, &listener, _1)); //signal occurence of some events: Queue queue("queue1"); SequenceNumber id; diff --git a/qpid/cpp/src/tests/QueuePolicyTest.cpp b/qpid/cpp/src/tests/QueuePolicyTest.cpp index 6c650169c7..7c7f8b7a10 100644 --- a/qpid/cpp/src/tests/QueuePolicyTest.cpp +++ b/qpid/cpp/src/tests/QueuePolicyTest.cpp @@ -158,6 +158,15 @@ QPID_AUTO_TEST_CASE(testRingPolicy) BOOST_CHECK_EQUAL((boost::format("%1%_%2%") % "Message" % (i+1)).str(), msg.getData()); } BOOST_CHECK(!f.subs.get(msg, q)); + + for (int i = 10; i < 20; i++) { + f.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q)); + } + for (int i = 15; i < 20; i++) { + BOOST_CHECK(f.subs.get(msg, q, qpid::sys::TIME_SEC)); + BOOST_CHECK_EQUAL((boost::format("%1%_%2%") % "Message" % (i+1)).str(), msg.getData()); + } + BOOST_CHECK(!f.subs.get(msg, q)); } QPID_AUTO_TEST_CASE(testStrictRingPolicy) diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h index 9722359d82..d2a93c902b 100644 --- a/qpid/cpp/src/tests/SocketProxy.h +++ b/qpid/cpp/src/tests/SocketProxy.h @@ -94,7 +94,7 @@ class SocketProxy : private qpid::sys::Runnable throwIf(!(event.type == qpid::sys::Poller::READABLE && event.handle == &listenerHandle), "SocketProxy: Accept failed"); poller.delFd(listenerHandle); - server.reset(listener.accept(0, 0)); + server.reset(listener.accept()); // Pump data between client & server sockets qpid::sys::PollerHandle clientHandle(client); diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk index c3100ac968..5d115de5a5 100644 --- a/qpid/cpp/src/tests/cluster.mk +++ b/qpid/cpp/src/tests/cluster.mk @@ -29,8 +29,9 @@ if HAVE_LIBCPG # ais_check checks pre-requisites for cluster tests and runs them if ok. -TESTS+=ais_check federated_cluster_test -EXTRA_DIST+=ais_check start_cluster stop_cluster restart_cluster cluster_python_tests cluster_python_tests_failing.txt federated_cluster_test +TESTS+=ais_check federated_cluster_test clustered_replication_test +EXTRA_DIST+=ais_check start_cluster stop_cluster restart_cluster cluster_python_tests cluster_python_tests_failing.txt \ + federated_cluster_test clustered_replication_test check_PROGRAMS+=cluster_test cluster_test_SOURCES=unit_test.cpp cluster_test.cpp ClusterFixture.cpp ClusterFixture.h ForkedBroker.h ForkedBroker.cpp diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index c880f30e6b..eee2df58cc 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -27,6 +27,7 @@ #include "qpid/client/ConnectionAccess.h" #include "qpid/client/Session.h" #include "qpid/client/FailoverListener.h" +#include "qpid/client/FailoverManager.h" #include "qpid/cluster/Cluster.h" #include "qpid/cluster/Cpg.h" #include "qpid/cluster/UpdateClient.h" @@ -35,6 +36,8 @@ #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/enum.h" #include "qpid/log/Logger.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/Thread.h" #include <boost/bind.hpp> #include <boost/shared_ptr.hpp> @@ -148,49 +151,65 @@ vector<string> browse(Client& c, const string& q, int n) { c.subs.subscribe(lq, q, browseSettings); vector<string> result; for (int i = 0; i < n; ++i) { - result.push_back(lq.get(TIMEOUT).getData()); + Message m; + if (!lq.get(m, TIMEOUT)) + break; + result.push_back(m.getData()); } c.subs.getSubscription(q).cancel(); return result; } +ConnectionSettings aclSettings(int port, const std::string& id) { + ConnectionSettings settings; + settings.port = port; + settings.mechanism = "PLAIN"; + settings.username = id; + settings.password = id; + return settings; +} + +#if 0 +// FIXME aconway 2009-03-10: This test passes but exposes a memory leak in the SASL client code. +// Enable it when the leak is fixed. + +QPID_AUTO_TEST_CASE(testAcl) { + ofstream policyFile("cluster_test.acl"); + // FIXME aconway 2009-02-12: guest -> qpidd? + policyFile << "acl allow foo@QPID create queue name=foo" << endl + << "acl allow foo@QPID create queue name=foo2" << endl + << "acl deny foo@QPID create queue name=bar" << endl + << "acl allow all all" << endl; + policyFile.close(); + char cwd[1024]; + BOOST_CHECK(::getcwd(cwd, sizeof(cwd))); + ClusterFixture cluster(2,-1, list_of<string> + ("--no-data-dir") + ("--auth=no") + ("--acl-file="+string(cwd)+"/cluster_test.acl") + ("--cluster-mechanism=PLAIN") + ("--cluster-username=cluster") + ("--cluster-password=cluster") + ("--load-module=../.libs/acl.so")); + + Client c0(aclSettings(cluster[0], "c0"), "c0"); + Client c1(aclSettings(cluster[1], "c1"), "c1"); + Client foo(aclSettings(cluster[1], "foo"), "foo"); + + foo.session.queueDeclare("foo"); + BOOST_CHECK_EQUAL(c0.session.queueQuery("foo").getQueue(), "foo"); + + BOOST_CHECK_THROW(foo.session.queueDeclare("bar"), framing::NotAllowedException); + BOOST_CHECK(c0.session.queueQuery("bar").getQueue().empty()); + BOOST_CHECK(c1.session.queueQuery("bar").getQueue().empty()); -// FIXME aconway 2009-02-12: need to figure out how to test this properly. -// Current problems: -// - all brokers share the same data-dir (set ACL without data dir?) -// - updater's user name not making it through to updatee for ACL checks. -// -// QPID_AUTO_TEST_CASE(testAcl) { -// ofstream policyFile("cluster_test.acl"); -// // FIXME aconway 2009-02-12: guest -> qpidd? -// policyFile << "acl allow guest@QPID all all" << endl -// << "acl allow foo@QPID create queue name=foo" << endl -// << "acl allow bar@QPID create queue name=bar" << endl -// << "acl deny all create queue" << endl -// << "acl allow all all" << endl; -// policyFile.close(); -// ClusterFixture cluster(2,-1, list_of<string> -// ("--data-dir=.") ("--auth=no") -// ("--acl-file=cluster_test.acl") -// ("--cluster-mechanism=PLAIN") -// ("--load-module=../.libs/acl.so")); -// Client c0(cluster[0], "c0"); -// Client c1(cluster[1], "c1"); - -// ConnectionSettings settings; -// settings.port = cluster[0]; -// settings.username = "foo"; -// Client foo(settings, "foo"); - -// foo.session.queueDeclare("foo"); -// BOOST_CHECK_EQUAL(c0.session.queueQuery("foo").getQueue(), "foo"); -// BOOST_CHECK_EQUAL(c1.session.queueQuery("foo").getQueue(), "foo"); - -// BOOST_CHECK_THROW(foo.session.queueDeclare("bar"), int); -// BOOST_CHECK_EQUAL(c0.session.queueQuery("bar").getQueue(), ""); -// BOOST_CHECK_EQUAL(c1.session.queueQuery("bar").getQueue(), ""); -// } + cluster.add(); + Client c2(aclSettings(cluster[2], "c2"), "c2"); + BOOST_CHECK_THROW(foo.session.queueDeclare("bar"), framing::NotAllowedException); + BOOST_CHECK(c2.session.queueQuery("bar").getQueue().empty()); +} +#endif QPID_AUTO_TEST_CASE(testMessageTimeToLive) { // Note: this doesn't actually test for cluster race conditions around TTL, @@ -199,13 +218,23 @@ QPID_AUTO_TEST_CASE(testMessageTimeToLive) { ClusterFixture cluster(2); Client c0(cluster[0], "c0"); Client c1(cluster[1], "c1"); + c0.session.queueDeclare("p"); c0.session.queueDeclare("q"); c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200)); c0.session.messageTransfer(arg::content=Message("b", "q")); - BOOST_CHECK_EQUAL(browse(c1, "q", 2), list_of<string>("a")("b")); - sys::usleep(300*1000); + c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 10000)); + c0.session.messageTransfer(arg::content=Message("y", "p")); + cluster.add(); + Client c2(cluster[1], "c2"); + + BOOST_CHECK_EQUAL(browse(c0, "p", 2), list_of<string>("x")("y")); + BOOST_CHECK_EQUAL(browse(c1, "p", 2), list_of<string>("x")("y")); + BOOST_CHECK_EQUAL(browse(c2, "p", 2), list_of<string>("x")("y")); + + sys::usleep(200*1000); BOOST_CHECK_EQUAL(browse(c0, "q", 1), list_of<string>("b")); BOOST_CHECK_EQUAL(browse(c1, "q", 1), list_of<string>("b")); + BOOST_CHECK_EQUAL(browse(c2, "q", 1), list_of<string>("b")); } QPID_AUTO_TEST_CASE(testSequenceOptions) { @@ -506,10 +535,11 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { c0.session.queueDeclare("q"); c0.session.messageTransfer(arg::content=Message("foo","q")); c0.session.messageTransfer(arg::content=Message("bar","q")); + while (c0.session.queueQuery("q").getMessageCount() != 2) sys::usleep(1000); // Wait for message to show up on broker 0. - // Add a new broker, it should catch up. + // Add a new broker, it will catch up. cluster.add(); // Do some work post-add @@ -527,6 +557,7 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "foo"); + BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), ""); BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "bar"); BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); @@ -628,4 +659,86 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) { BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount()); } +QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover) +{ + struct Sender : FailoverManager::Command + { + std::string queue; + std::string content; + + Sender(const std::string& q, const std::string& c) : queue(q), content(c) {} + + void execute(AsyncSession& session, bool) + { + session.messageTransfer(arg::content=Message(content, queue)); + } + }; + + struct Receiver : FailoverManager::Command, MessageListener, qpid::sys::Runnable + { + FailoverManager& mgr; + std::string queue; + std::string expectedContent; + qpid::client::Subscription subscription; + qpid::sys::Monitor lock; + bool ready; + + Receiver(FailoverManager& m, const std::string& q, const std::string& c) : mgr(m), queue(q), expectedContent(c), ready(false) {} + + void received(Message& message) + { + BOOST_CHECK_EQUAL(expectedContent, message.getData()); + subscription.cancel(); + } + + void execute(AsyncSession& session, bool) + { + session.queueDeclare(arg::queue=queue); + SubscriptionManager subs(session); + subscription = subs.subscribe(*this, queue); + session.sync(); + setReady(); + subs.run(); + //cleanup: + session.queueDelete(arg::queue=queue); + } + + void run() + { + mgr.execute(*this); + } + + void waitForReady() + { + qpid::sys::Monitor::ScopedLock l(lock); + while (!ready) { + lock.wait(); + } + } + + void setReady() + { + qpid::sys::Monitor::ScopedLock l(lock); + ready = true; + lock.notify(); + } + }; + + ClusterFixture cluster(2); + ConnectionSettings settings; + settings.port = cluster[1]; + settings.heartbeat = 1; + FailoverManager fmgr(settings); + Sender sender("my-queue", "my-data"); + Receiver receiver(fmgr, "my-queue", "my-data"); + qpid::sys::Thread runner(receiver); + receiver.waitForReady(); + cluster.kill(1); + //sleep for 2 secs to allow the heartbeat task to fire on the now dead connection: + ::usleep(2*1000*1000); + fmgr.execute(sender); + runner.join(); + fmgr.close(); +} + QPID_AUTO_TEST_SUITE_END() diff --git a/qpid/cpp/src/tests/clustered_replication_test b/qpid/cpp/src/tests/clustered_replication_test new file mode 100755 index 0000000000..2a3e742632 --- /dev/null +++ b/qpid/cpp/src/tests/clustered_replication_test @@ -0,0 +1,127 @@ +#!/bin/sh + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Test reliability of the replication feature in the face of link +# failures: +srcdir=`dirname $0` +PYTHON_DIR=$srcdir/../../../python + +trap stop_brokers INT EXIT + +fail() { + echo $1 + exit 1 +} +with_ais_group() { + id -nG | grep '\<ais\>' >/dev/null || { echo "You are not a member of the ais group." 1>&2; exit 1; } + echo $* | newgrp ais +} + +stop_brokers() { + if [[ $PRIMARY1 ]] ; then + ../qpidd -q --port $PRIMARY1 + unset PRIMARY1 + fi + if [[ $PRIMARY2 ]] ; then + ../qpidd -q --port $PRIMARY2 + unset PRIMARY2 + fi + if [[ $DR1 ]] ; then + ../qpidd -q --port $DR1 + unset DR1 + fi + if [[ $DR2 ]] ; then + ../qpidd -q --port $DR2 + unset DR2 + fi +} + +if test -d ${PYTHON_DIR}; then + id -nG | grep '\<ais\>' >/dev/null || \ + NOGROUP="You are not a member of the ais group." + ps -u root | grep 'aisexec\|corosync' >/dev/null || \ + NOAISEXEC="The aisexec or corosync daemon is not running as root" + + if test -n "$NOGROUP" -o -n "$NOAISEXEC"; then + cat <<EOF +Not running federation to cluster test because: + $NOGROUP + $NOAISEXEC +EOF + exit 0; + fi + + #todo: these cluster names need to be unique to prevent clashes + PRIMARY_CLUSTER=PRIMARY_$(hostname)_$(pwd) + DR_CLUSTER=DR_$(hostname)_$(pwd) + + GENERAL_OPTS="--auth no --no-module-dir --no-data-dir --daemon --port 0 --log-enable notice+ --log-to-stderr false" + PRIMARY_OPTS="--load-module ../.libs/replicating_listener.so --create-replication-queue true --replication-queue REPLICATION_QUEUE --load-module ../.libs/cluster.so --cluster-name $PRIMARY_CLUSTER" + DR_OPTS="--load-module ../.libs/replication_exchange.so --load-module ../.libs/cluster.so --cluster-name $DR_CLUSTER" + + rm -f repl*.tmp #cleanup any files left from previous run + + #start first node of primary cluster and set up test queue + echo Starting primary cluster + PRIMARY1=$(with_ais_group ../qpidd $GENERAL_OPTS $PRIMARY_OPTS --log-to-file repl.primary.1.tmp) || fail "Could not start node" + $PYTHON_DIR/commands/qpid-config -a "localhost:$PRIMARY1" add queue test-queue --generate-queue-events 2 + $PYTHON_DIR/commands/qpid-config -a "localhost:$PRIMARY1" add queue control-queue --generate-queue-events 1 + + #send 10 messages, consume 5 of them + for i in `seq 1 10`; do echo Message$i; done | ./sender --port $PRIMARY1 + ./receiver --port $PRIMARY1 --messages 5 > /dev/null + + #add new node to primary cluster, testing correct transfer of state: + echo Adding node to primary cluster + PRIMARY2=$(with_ais_group ../qpidd $GENERAL_OPTS $PRIMARY_OPTS --log-to-file repl.primary.2.tmp) + + #start DR cluster, set up test queue there and establish replication bridge + echo Starting DR cluster + DR1=$(with_ais_group ../qpidd $GENERAL_OPTS $DR_OPTS --log-to-file repl.dr.1.tmp) + DR2=$(with_ais_group ../qpidd $GENERAL_OPTS $DR_OPTS --log-to-file repl.dr.2.tmp) + + $PYTHON_DIR/commands/qpid-config -a "localhost:$DR1" add queue test-queue + $PYTHON_DIR/commands/qpid-config -a "localhost:$DR1" add queue control-queue + $PYTHON_DIR/commands/qpid-config -a "localhost:$DR1" add exchange replication REPLICATION_EXCHANGE + $PYTHON_DIR/commands/qpid-route queue add localhost:$DR2 localhost:$PRIMARY2 REPLICATION_EXCHANGE REPLICATION_QUEUE + + #send more messages to primary + for i in `seq 11 20`; do echo Message$i; done | ./sender --port $PRIMARY1 --send-eos 1 + + #wait for replication events to all be processed: + echo Waiting for replication to complete + echo Done | ./sender --port $PRIMARY1 --routing-key control-queue --send-eos 1 + ./receiver --queue control-queue --port $DR1 > /dev/null + + #verify contents of test queue on dr cluster: + echo Verifying... + ./receiver --port $DR2 > repl.out.tmp + for i in `seq 6 20`; do echo Message$i; done | diff repl.out.tmp - || FAIL=1 + + if [[ $FAIL ]]; then + echo Clustered replication test failed: expectations not met! + exit 1 + else + echo Clustered replication test passed + rm -f repl*.tmp + fi + +fi diff --git a/qpid/cpp/src/tests/failover_soak.cpp b/qpid/cpp/src/tests/failover_soak.cpp index 129c6b9745..4f16e469b8 100644 --- a/qpid/cpp/src/tests/failover_soak.cpp +++ b/qpid/cpp/src/tests/failover_soak.cpp @@ -26,6 +26,8 @@ #include <sys/wait.h> #include <sys/time.h> #include <string.h> +#include <sys/types.h> +#include <signal.h> #include <string> #include <iostream> @@ -34,7 +36,11 @@ #include <boost/assign.hpp> +#include "qpid/framing/Uuid.h" + #include <ForkedBroker.h> +#include <qpid/client/Connection.h> + @@ -42,6 +48,9 @@ using namespace std; using boost::assign::list_of; +using namespace qpid::framing; +using namespace qpid::client; + @@ -56,11 +65,34 @@ typedef enum childStatus; +typedef enum +{ + NO_TYPE, + DECLARING_CLIENT, + SENDING_CLIENT, + RECEIVING_CLIENT +} +childType; + + +ostream& operator<< ( ostream& os, const childType& ct ) { + switch ( ct ) { + case DECLARING_CLIENT: os << "Declaring Client"; break; + case SENDING_CLIENT: os << "Sending Client"; break; + case RECEIVING_CLIENT: os << "Receiving Client"; break; + default: os << "No Client"; break; + } + + return os; +} + + + struct child { - child ( string & name, pid_t pid ) - : name(name), pid(pid), retval(-999), status(RUNNING) + child ( string & name, pid_t pid, childType type ) + : name(name), pid(pid), retval(-999), status(RUNNING), type(type) { gettimeofday ( & startTime, 0 ); } @@ -75,10 +107,18 @@ struct child } + void + setType ( childType t ) + { + type = t; + } + + string name; pid_t pid; int retval; childStatus status; + childType type; struct timeval startTime, stopTime; }; @@ -88,10 +128,11 @@ struct child struct children : public vector<child *> { + void - add ( string & name, pid_t pid ) + add ( string & name, pid_t pid, childType type ) { - push_back(new child ( name, pid )); + push_back ( new child ( name, pid, type ) ); } @@ -113,7 +154,7 @@ struct children : public vector<child *> child * kid = get ( pid ); if(! kid) { - if ( verbosity > 0 ) + if ( verbosity > 1 ) { cerr << "children::exited warning: Can't find child with pid " << pid @@ -143,10 +184,15 @@ struct children : public vector<child *> int checkChildren ( ) { - vector<child *>::iterator i; + vector<child *>::iterator i; for ( i = begin(); i != end(); ++ i ) if ( (COMPLETED == (*i)->status) && (0 != (*i)->retval) ) - return (*i)->retval; + { + cerr << "checkChildren: error on child of type " + << (*i)->type + << endl; + return (*i)->retval; + } return 0; } @@ -181,22 +227,43 @@ struct children : public vector<child *> If it has been at least that long since a shild stopped running, we judge the system to have hung. */ - bool + int hanging ( int hangTime ) { struct timeval now, duration; gettimeofday ( &now, 0 ); + int how_many_hanging = 0; + vector<child *>::iterator i; for ( i = begin(); i != end(); ++ i ) { - timersub ( & now, &((*i)->startTime), & duration ); - if ( duration.tv_sec >= hangTime ) - return true; + //Not in POSIX + //timersub ( & now, &((*i)->startTime), & duration ); + duration.tv_sec = now.tv_sec - (*i)->startTime.tv_sec; + duration.tv_usec = now.tv_usec - (*i)->startTime.tv_usec; + if (duration.tv_usec < 0) { + --duration.tv_sec; + duration.tv_usec += 1000000; + } + + if ( (COMPLETED != (*i)->status) // child isn't done running + && + ( duration.tv_sec >= hangTime ) // it's been too long + ) + { + std::cerr << "Child of type " + << (*i)->type + << " hanging. " + << "PID is " + << (*i)->pid + << endl; + ++ how_many_hanging; + } } - return false; + return how_many_hanging; } @@ -211,9 +278,8 @@ children allMyChildren; void -childExit ( int signalNumber ) +childExit ( int ) { - signalNumber ++; // Now maybe the compiler willleave me alone? int childReturnCode; pid_t pid = waitpid ( 0, & childReturnCode, WNOHANG); @@ -240,10 +306,9 @@ mrand ( int minDesiredVal, int maxDesiredVal ) { void -makeClusterName ( string & s, int & num ) { - num = mrand(1000); +makeClusterName ( string & s ) { stringstream ss; - ss << "soakTestCluster_" << num; + ss << "soakTestCluster_" << Uuid(true).str(); s = ss.str(); } @@ -268,64 +333,136 @@ printBrokers ( brokerVector & brokers ) +ForkedBroker * newbie = 0; +int newbie_port = 0; + + + +bool +wait_for_newbie ( ) +{ + if ( ! newbie ) + return true; + + try + { + Connection connection; + connection.open ( "127.0.0.1", newbie_port ); + connection.close(); + newbie = 0; // He's no newbie anymore! + return true; + } + catch ( const std::exception& error ) + { + std::cerr << "wait_for_newbie error: " + << error.what() + << endl; + return false; + } +} + + + void startNewBroker ( brokerVector & brokers, char const * srcRoot, char const * moduleDir, - string const clusterName ) + string const clusterName, + int verbosity ) { static int brokerId = 0; stringstream path, prefix, module; module << moduleDir << "/cluster.so"; path << srcRoot << "/qpidd"; - prefix << "soak-" << brokerId++; - - std::vector<std::string> argv = - list_of<string> ("qpidd") - ("--no-module-dir") - ("--load-module=cluster.so") - ("--cluster-name") - (clusterName) - ("--auth=no") - ("--no-data-dir") - ("--mgmt-enable=no") - ("--log-prefix") - (prefix.str()) - ("--log-to-file") - ("/tmp/qpidd.log"); - - brokers.push_back ( new ForkedBroker ( argv ) ); + prefix << "soak-" << brokerId; + std::vector<std::string> argv = list_of<string> + ("qpidd") + ("--no-module-dir") + ("--load-module=cluster.so") + ("--cluster-name") + (clusterName) + ("--auth=no") + ("--no-data-dir") + ("--mgmt-enable=no") + ("--log-prefix") + (prefix.str()) + ("--log-to-file") + (prefix.str()+".log"); + + newbie = new ForkedBroker ( argv ); + newbie_port = newbie->getPort(); + ForkedBroker * broker = newbie; + + if ( verbosity > 0 ) + std::cerr << "new broker created: pid == " + << broker->getPID() + << " log-prefix == " + << "soak-" << brokerId + << endl; + brokers.push_back ( broker ); + + ++ brokerId; } -void +bool killFrontBroker ( brokerVector & brokers, int verbosity ) { + cerr << "killFrontBroker: waiting for newbie sync...\n"; + if ( ! wait_for_newbie() ) + return false; + cerr << "killFrontBroker: newbie synced.\n"; + if ( verbosity > 0 ) cout << "killFrontBroker pid: " << brokers[0]->getPID() << " on port " << brokers[0]->getPort() << endl; try { brokers[0]->kill(9); } catch ( const exception& error ) { if ( verbosity > 0 ) - cout << "error killing broker: " << error.what() << endl; + { + cout << "error killing broker: " + << error.what() + << endl; + } + + return false; } delete brokers[0]; brokers.erase ( brokers.begin() ); + return true; } +/* + * The optional delay is to avoid killing newbie brokers that have just + * been added and are still in the process of updating. This causes + * spurious, test-generated errors that scare everybody. + */ void -killAllBrokers ( brokerVector & brokers ) +killAllBrokers ( brokerVector & brokers, int delay ) { + if ( delay > 0 ) + { + std::cerr << "Killing all brokers after delay of " << delay << endl; + sleep ( delay ); + } + for ( uint i = 0; i < brokers.size(); ++ i ) try { brokers[i]->kill(9); } - catch ( ... ) { } + catch ( const exception& error ) + { + std::cerr << "killAllBrokers Warning: exception during kill on broker " + << i + << " " + << error.what() + << endl; + } } @@ -342,7 +479,7 @@ runDeclareQueuesClient ( brokerVector brokers, string name("declareQueues"); int port = brokers[0]->getPort ( ); - if ( verbosity > 0 ) + if ( verbosity > 1 ) cout << "startDeclareQueuesClient: host: " << host << " port: " @@ -360,11 +497,11 @@ runDeclareQueuesClient ( brokerVector brokers, if ( ! pid ) { execv ( path, const_cast<char * const *>(&argv[0]) ); - perror ( "error executing dq: " ); + perror ( "error executing declareQueues: " ); return 0; } - allMyChildren.add ( name, pid ); + allMyChildren.add ( name, pid, DECLARING_CLIENT ); return pid; } @@ -383,12 +520,16 @@ startReceivingClient ( brokerVector brokers, string name("receiver"); int port = brokers[0]->getPort ( ); - if ( verbosity > 0 ) + if ( verbosity > 1 ) cout << "startReceivingClient: port " << port << endl; + + // verbosity has to be > 1 to let clients talk. + int client_verbosity = (verbosity > 1 ) ? 1 : 0; + char portStr[100]; char verbosityStr[100]; sprintf(portStr, "%d", port); - sprintf(verbosityStr, "%d", verbosity); + sprintf(verbosityStr, "%d", client_verbosity); vector<const char*> argv; @@ -407,7 +548,7 @@ startReceivingClient ( brokerVector brokers, return 0; } - allMyChildren.add ( name, pid ); + allMyChildren.add ( name, pid, RECEIVING_CLIENT ); return pid; } @@ -427,13 +568,16 @@ startSendingClient ( brokerVector brokers, string name("sender"); int port = brokers[0]->getPort ( ); - if ( verbosity ) + if ( verbosity > 1) cout << "startSenderClient: port " << port << endl; char portStr[100]; char verbosityStr[100]; + // + // verbosity has to be > 1 to let clients talk. + int client_verbosity = (verbosity > 1 ) ? 1 : 0; sprintf ( portStr, "%d", port); - sprintf ( verbosityStr, "%d", verbosity); + sprintf ( verbosityStr, "%d", client_verbosity); vector<const char*> argv; argv.push_back ( "replayingSender" ); @@ -452,19 +596,21 @@ startSendingClient ( brokerVector brokers, return 0; } - allMyChildren.add ( name, pid ); + allMyChildren.add ( name, pid, SENDING_CLIENT ); return pid; } -#define HUNKY_DORY 0 -#define BAD_ARGS 1 -#define CANT_FORK_DQ 2 -#define CANT_FORK_RECEIVER 3 -#define DQ_FAILED 4 -#define ERROR_ON_CHILD 5 -#define HANGING 6 +#define HUNKY_DORY 0 +#define BAD_ARGS 1 +#define CANT_FORK_DQ 2 +#define CANT_FORK_RECEIVER 3 +#define CANT_FORK_SENDER 4 +#define DQ_FAILED 5 +#define ERROR_ON_CHILD 6 +#define HANGING 7 +#define ERROR_KILLING_BROKER 8 int @@ -492,16 +638,15 @@ main ( int argc, char const ** argv ) allMyChildren.verbosity = verbosity; - int clusterNum; string clusterName; srand ( getpid() ); - makeClusterName ( clusterName, clusterNum ); + makeClusterName ( clusterName ); brokerVector brokers; - if ( verbosity > 0 ) + if ( verbosity > 1 ) cout << "Starting initial cluster...\n"; int nBrokers = 3; @@ -509,7 +654,8 @@ main ( int argc, char const ** argv ) startNewBroker ( brokers, srcRoot, moduleDir, - clusterName ); + clusterName, + verbosity ); } @@ -521,14 +667,14 @@ main ( int argc, char const ** argv ) pid_t dqClientPid = runDeclareQueuesClient ( brokers, host, declareQueuesPath, verbosity ); if ( -1 == dqClientPid ) { - cerr << "failoverSoak error: Couldn't fork declareQueues.\n"; + cerr << "END_OF_TEST ERROR_START_DECLARE_1\n"; return CANT_FORK_DQ; } // Don't continue until declareQueues is finished. pid_t retval = waitpid ( dqClientPid, & childStatus, 0); if ( retval != dqClientPid) { - cerr << "failoverSoak error: waitpid on declareQueues returned value " << retval << endl; + cerr << "END_OF_TEST ERROR_START_DECLARE_2\n"; return DQ_FAILED; } allMyChildren.exited ( dqClientPid, childStatus ); @@ -543,7 +689,7 @@ main ( int argc, char const ** argv ) reportFrequency, verbosity ); if ( -1 == receivingClientPid ) { - cerr << "failoverSoak error: Couldn't fork receiver.\n"; + cerr << "END_OF_TEST ERROR_START_RECEIVER\n"; return CANT_FORK_RECEIVER; } @@ -557,13 +703,13 @@ main ( int argc, char const ** argv ) reportFrequency, verbosity ); if ( -1 == sendingClientPid ) { - cerr << "failoverSoak error: Couldn't fork sender.\n"; - return CANT_FORK_RECEIVER; + cerr << "END_OF_TEST ERROR_START_SENDER\n"; + return CANT_FORK_SENDER; } - int minSleep = 3, - maxSleep = 6; + int minSleep = 2, + maxSleep = 4; for ( int totalBrokers = 3; @@ -581,11 +727,16 @@ main ( int argc, char const ** argv ) sleep ( sleepyTime ); // Kill the oldest broker. -------------------------- - killFrontBroker ( brokers, verbosity ); + if ( ! killFrontBroker ( brokers, verbosity ) ) + { + allMyChildren.killEverybody(); + std::cerr << "END_OF_TEST ERROR_BROKER\n"; + return ERROR_KILLING_BROKER; + } // Sleep for a while. ------------------------- sleepyTime = mrand ( minSleep, maxSleep ); - if ( verbosity > 0 ) + if ( verbosity > 1 ) cerr << "Sleeping for " << sleepyTime << " seconds.\n"; sleep ( sleepyTime ); @@ -596,22 +747,33 @@ main ( int argc, char const ** argv ) startNewBroker ( brokers, srcRoot, moduleDir, - clusterName ); + clusterName, + verbosity ); - if ( verbosity > 0 ) + if ( verbosity > 1 ) printBrokers ( brokers ); // If all children have exited, quit. int unfinished = allMyChildren.unfinished(); if ( ! unfinished ) { - killAllBrokers ( brokers ); + killAllBrokers ( brokers, 5 ); - if ( verbosity > 0 ) + if ( verbosity > 1 ) cout << "failoverSoak: all children have exited.\n"; int retval = allMyChildren.checkChildren(); - if ( verbosity > 0 ) + if ( verbosity > 1 ) std::cerr << "failoverSoak: checkChildren: " << retval << endl; - return retval ? ERROR_ON_CHILD : HUNKY_DORY; + + if ( retval ) + { + std::cerr << "END_OF_TEST ERROR_CLIENT\n"; + return ERROR_ON_CHILD; + } + else + { + std::cerr << "END_OF_TEST SUCCESSFUL\n"; + return HUNKY_DORY; + } } // Even if some are still running, if there's an error, quit. @@ -620,35 +782,38 @@ main ( int argc, char const ** argv ) if ( verbosity > 0 ) cout << "failoverSoak: error on child.\n"; allMyChildren.killEverybody(); - killAllBrokers ( brokers ); + killAllBrokers ( brokers, 5 ); + std::cerr << "END_OF_TEST ERROR_CLIENT\n"; return ERROR_ON_CHILD; } // If one is hanging, quit. if ( allMyChildren.hanging ( 120 ) ) { - if ( verbosity > 0 ) - cout << "failoverSoak: child hanging.\n"; - allMyChildren.killEverybody(); - killAllBrokers ( brokers ); + /* + * Don't kill any processes. Leave alive for questioning. + * */ + std::cerr << "END_OF_TEST ERROR_HANGING\n"; return HANGING; } - if ( verbosity > 0 ) { + if ( verbosity > 1 ) { std::cerr << "------- next kill-broker loop --------\n"; allMyChildren.print(); } } retval = allMyChildren.checkChildren(); - if ( verbosity > 0 ) + if ( verbosity > 1 ) std::cerr << "failoverSoak: checkChildren: " << retval << endl; - if ( verbosity > 0 ) + if ( verbosity > 1 ) cout << "failoverSoak: maxBrokers reached.\n"; allMyChildren.killEverybody(); - killAllBrokers ( brokers ); + killAllBrokers ( brokers, 5 ); + + std::cerr << "END_OF_TEST SUCCESSFUL\n"; return retval ? ERROR_ON_CHILD : HUNKY_DORY; } diff --git a/qpid/cpp/src/tests/federated_cluster_test b/qpid/cpp/src/tests/federated_cluster_test index ef0db4cb8e..575a5d9c9b 100755 --- a/qpid/cpp/src/tests/federated_cluster_test +++ b/qpid/cpp/src/tests/federated_cluster_test @@ -22,7 +22,7 @@ # Test reliability of the replication feature in the face of link # failures: srcdir=`dirname $0` -PYTHON_DIR=${srcdir}/../../../python +PYTHON_DIR=$srcdir/../../../python trap stop_brokers EXIT @@ -37,9 +37,14 @@ stop_brokers() { unset BROKER_A fi if [[ $NODE_1 ]] ; then - ./stop_cluster + ../qpidd -q --port $NODE_1 unset NODE_1 fi + if [[ $NODE_2 ]] ; then + ../qpidd -q --port $NODE_2 + unset NODE_2 + fi + rm cluster.ports } start_brokers() { @@ -60,8 +65,8 @@ setup() { $PYTHON_DIR/commands/qpid-config -a "localhost:$NODE_1" add exchange direct test-exchange #create dynamic routes for test exchange - $PYTHON_DIR/commands/qpid-route dynamic add "localhost:$NODE_1" "localhost:$BROKER_A" test-exchange - $PYTHON_DIR/commands/qpid-route dynamic add "localhost:$BROKER_A" "localhost:$NODE_1" test-exchange + $PYTHON_DIR/commands/qpid-route dynamic add "localhost:$NODE_2" "localhost:$BROKER_A" test-exchange + $PYTHON_DIR/commands/qpid-route dynamic add "localhost:$BROKER_A" "localhost:$NODE_2" test-exchange #create test queue on cluster and bind it to the test exchange $PYTHON_DIR/commands/qpid-config -a "localhost:$NODE_1" add queue test-queue @@ -72,7 +77,7 @@ setup() { $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" bind test-exchange test-queue from-cluster } -run_test_pull_to_cluster() { +run_test_pull_to_cluster_two_consumers() { #start consumers on each of the two nodes of the cluster ./receiver --port $NODE_1 --queue test-queue --credit-window 1 > fed1.out.tmp & ./receiver --port $NODE_2 --queue test-queue --credit-window 1 > fed2.out.tmp & @@ -89,6 +94,20 @@ run_test_pull_to_cluster() { rm -f fed*.tmp #cleanup } +run_test_pull_to_cluster() { + #send stream of messages to test exchange on single broker + for i in `seq 1 1000`; do echo Message $i >> fed.in.tmp; done + ./sender --port $BROKER_A --exchange test-exchange --routing-key to-cluster --send-eos 1 < fed.in.tmp + + #consume from remaining node of the cluster + ./receiver --port $NODE_2 --queue test-queue > fed.out.tmp + + #verify all messages are received + diff fed.in.tmp fed.out.tmp || fail "federated link to cluster failed: expectations not met!" + + rm -f fed*.tmp #cleanup +} + run_test_pull_from_cluster() { #start consumer on single broker ./receiver --port $BROKER_A --queue test-queue --credit-window 1 > fed.out.tmp & @@ -125,8 +144,19 @@ EOF echo "brokers started" setup echo "setup completed" - run_test_pull_to_cluster + run_test_pull_to_cluster_two_consumers echo "federated link to cluster verified" run_test_pull_from_cluster echo "federated link from cluster verified" + if [[ $TEST_NODE_FAILURE ]] ; then + #kill first cluster node and retest + kill -9 $(../qpidd --check --port $NODE_1) && unset NODE_1 + echo "killed first cluster node; waiting for links to re-establish themselves..." + sleep 5 + echo "retesting..." + run_test_pull_to_cluster + echo "federated link to cluster verified" + run_test_pull_from_cluster + echo "federated link from cluster verified" + fi fi diff --git a/qpid/cpp/src/tests/federated_cluster_test_with_node_failure b/qpid/cpp/src/tests/federated_cluster_test_with_node_failure new file mode 100755 index 0000000000..f144a676de --- /dev/null +++ b/qpid/cpp/src/tests/federated_cluster_test_with_node_failure @@ -0,0 +1,23 @@ +#!/bin/sh + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +srcdir=`dirname $0` +TEST_NODE_FAILURE=1 $srcdir/federated_cluster_test diff --git a/qpid/cpp/src/tests/receiver.cpp b/qpid/cpp/src/tests/receiver.cpp index 1ecaece51e..49f7ff0338 100644 --- a/qpid/cpp/src/tests/receiver.cpp +++ b/qpid/cpp/src/tests/receiver.cpp @@ -23,7 +23,7 @@ #include <qpid/client/Session.h> #include <qpid/client/Message.h> #include <qpid/client/SubscriptionManager.h> -#include <qpid/client/SubscriptionManager.h> +#include <qpid/client/SubscriptionSettings.h> #include "TestOptions.h" #include <iostream> @@ -43,15 +43,17 @@ struct Args : public qpid::TestOptions bool ignoreDuplicates; uint creditWindow; uint ackFrequency; + bool browse; - Args() : queue("test-queue"), messages(0), ignoreDuplicates(false), creditWindow(0), ackFrequency(1) + Args() : queue("test-queue"), messages(0), ignoreDuplicates(false), creditWindow(0), ackFrequency(1), browse(false) { addOptions() ("queue", qpid::optValue(queue, "QUEUE NAME"), "Queue from which to request messages") ("messages", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely") ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)") ("credit-window", qpid::optValue(creditWindow, "N"), "Credit window (0 implies infinite window)") - ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)"); + ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)") + ("browse", qpid::optValue(browse), "Browse rather than consuming"); } }; @@ -60,7 +62,7 @@ const string EOS("eos"); class Receiver : public MessageListener, public FailoverManager::Command { public: - Receiver(const string& queue, uint messages, bool ignoreDuplicates, uint creditWindow, uint ackFrequency); + Receiver(const string& queue, uint messages, bool ignoreDuplicates, uint creditWindow, uint ackFrequency, bool browse); void received(Message& message); void execute(AsyncSession& session, bool isRetry); private: @@ -75,9 +77,10 @@ class Receiver : public MessageListener, public FailoverManager::Command bool isDuplicate(Message& message); }; -Receiver::Receiver(const string& q, uint messages, bool ignoreDuplicates, uint creditWindow, uint ackFrequency) : +Receiver::Receiver(const string& q, uint messages, bool ignoreDuplicates, uint creditWindow, uint ackFrequency, bool browse) : queue(q), count(messages), skipDups(ignoreDuplicates), processed(0), lastSn(0) { + if (browse) settings.acquireMode = ACQUIRE_MODE_NOT_ACQUIRED; if (creditWindow) settings.flowControl = FlowControl::messageWindow(creditWindow); settings.autoAck = ackFrequency; } @@ -107,6 +110,9 @@ void Receiver::execute(AsyncSession& session, bool /*isRetry*/) SubscriptionManager subs(session); subscription = subs.subscribe(*this, queue, settings); subs.run(); + if (settings.autoAck) { + subscription.accept(subscription.getUnaccepted()); + } } int main(int argc, char ** argv) @@ -115,7 +121,7 @@ int main(int argc, char ** argv) try { opts.parse(argc, argv); FailoverManager connection(opts.con); - Receiver receiver(opts.queue, opts.messages, opts.ignoreDuplicates, opts.creditWindow, opts.ackFrequency); + Receiver receiver(opts.queue, opts.messages, opts.ignoreDuplicates, opts.creditWindow, opts.ackFrequency, opts.browse); connection.execute(receiver); connection.close(); return 0; diff --git a/qpid/cpp/src/tests/replaying_sender.cpp b/qpid/cpp/src/tests/replaying_sender.cpp index 7e148e277f..ea2a13bd54 100644 --- a/qpid/cpp/src/tests/replaying_sender.cpp +++ b/qpid/cpp/src/tests/replaying_sender.cpp @@ -60,6 +60,8 @@ Sender::Sender(const std::string& queue, uint count_, uint reportFreq ) : sender void Sender::execute(AsyncSession& session, bool isRetry) { + if (verbosity > 0) + std::cout << "replaying_sender " << (isRetry ? "first " : "re-") << "connect." << endl; if (isRetry) sender.replay(session); else sender.init(session); while (sent < count) { @@ -70,7 +72,7 @@ void Sender::execute(AsyncSession& session, bool isRetry) sender.send(message); if (count > reportFrequency && !(sent % reportFrequency)) { if ( verbosity > 0 ) - std::cout << "sent " << sent << " of " << count << std::endl; + std::cout << "Sender sent " << sent << " of " << count << std::endl; } } message.setData("That's all, folks!"); diff --git a/qpid/cpp/src/tests/replication_test b/qpid/cpp/src/tests/replication_test index 9b6e5cfb29..605ee1376c 100755 --- a/qpid/cpp/src/tests/replication_test +++ b/qpid/cpp/src/tests/replication_test @@ -55,10 +55,12 @@ if test -d ${PYTHON_DIR} && test -e ../.libs/replicating_listener.so && test -e $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-a --generate-queue-events 2 $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-b --generate-queue-events 2 $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-c --generate-queue-events 1 + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-d --generate-queue-events 2 $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-a $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-b $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-c + #queue-d deliberately not declared on DR; this error case should be handled #publish and consume from test queus on broker A: for i in `seq 1 10`; do echo Message $i for A >> queue-a-input.repl; done @@ -68,10 +70,12 @@ if test -d ${PYTHON_DIR} && test -e ../.libs/replicating_listener.so && test -e ./sender --port $BROKER_A --routing-key queue-a --send-eos 1 < queue-a-input.repl ./sender --port $BROKER_A --routing-key queue-b --send-eos 1 < queue-b-input.repl ./sender --port $BROKER_A --routing-key queue-c --send-eos 1 < queue-c-input.repl + echo dummy | ./sender --port $BROKER_A --routing-key queue-d --send-eos 1 ./receiver --port $BROKER_A --queue queue-a --messages 5 > /dev/null ./receiver --port $BROKER_A --queue queue-b --messages 10 > /dev/null ./receiver --port $BROKER_A --queue queue-c --messages 10 > /dev/null + ./receiver --port $BROKER_A --queue queue-d > /dev/null #shutdown broker A then check that broker Bs versions of the queues are as expected ../qpidd -q --port $BROKER_A @@ -90,6 +94,8 @@ if test -d ${PYTHON_DIR} && test -e ../.libs/replicating_listener.so && test -e diff queue-b-backup.repl queue-b-expected.repl || FAIL=1 diff queue-c-backup.repl queue-c-input.repl || FAIL=1 + grep 'queue-d does not exist' replication-dest.log > /dev/null || echo "WARNING: Expected error to be logged!" + if [[ $FAIL ]]; then echo replication test failed: expectations not met! exit 1 diff --git a/qpid/cpp/src/tests/resuming_receiver.cpp b/qpid/cpp/src/tests/resuming_receiver.cpp index f49a115e1e..ef559a009d 100644 --- a/qpid/cpp/src/tests/resuming_receiver.cpp +++ b/qpid/cpp/src/tests/resuming_receiver.cpp @@ -53,6 +53,7 @@ class Listener : public MessageListener, bool gaps; uint reportFrequency; int verbosity; + bool done; }; @@ -62,7 +63,8 @@ Listener::Listener(int freq, int verbosity) lastSn(0), gaps(false), reportFrequency(freq), - verbosity(verbosity) + verbosity(verbosity), + done(false) {} @@ -70,6 +72,7 @@ void Listener::received(Message & message) { if (message.getData() == "That's all, folks!") { + done = true; if(verbosity > 0 ) { std::cout << "Shutting down listener for " @@ -111,14 +114,14 @@ void Listener::check() if (gaps) throw Exception("Detected gaps in sequence; messages appear to have been lost."); } -void Listener::execute(AsyncSession& session, bool isRetry) -{ - if (isRetry) { - // std::cout << "Resuming from " << count << std::endl; +void Listener::execute(AsyncSession& session, bool isRetry) { + if (verbosity > 0) + std::cout << "resuming_receiver " << (isRetry ? "first " : "re-") << "connect." << endl; + if (!done) { + SubscriptionManager subs(session); + subscription = subs.subscribe(*this, "message_queue"); + subs.run(); } - SubscriptionManager subs(session); - subscription = subs.subscribe(*this, "message_queue"); - subs.run(); } void Listener::editUrlList(std::vector<Url>& urls) diff --git a/qpid/cpp/src/tests/run_failover_soak b/qpid/cpp/src/tests/run_failover_soak index 9dddf59cf1..36dfed79a6 100755 --- a/qpid/cpp/src/tests/run_failover_soak +++ b/qpid/cpp/src/tests/run_failover_soak @@ -22,8 +22,8 @@ # Check AIS requirements and run tests if found. id -ng | grep '\<ais\>' >/dev/null || \ NOGROUP="The ais group is not your primary group." -ps -u root | grep aisexec >/dev/null || \ - NOAISEXEC="The aisexec daemon is not running as root" +ps -u root | grep 'aisexec\|corosync' >/dev/null || \ + NOAISEXEC="The aisexec/corosync daemon is not running as root" if test -n "$NOGROUP" -o -n "$NOAISEXEC"; then cat <<EOF @@ -47,10 +47,10 @@ host=127.0.0.1 src_root=.. module_dir=$src_root/.libs -n_messages=300000 -report_frequency=10000 -verbosity=1 +MESSAGES=${MESSAGES:-300000} +REPORT_FREQUENCY=${REPORT_FREQUENCY:-`expr $MESSAGES / 20`} +VERBOSITY=${VERBOSITY:-1} -exec `dirname $0`/failover_soak $src_root $module_dir $host ./declare_queues ./replaying_sender ./resuming_receiver $n_messages $report_frequency $verbosity +exec ./failover_soak $src_root $module_dir $host ./declare_queues ./replaying_sender ./resuming_receiver $MESSAGES $REPORT_FREQUENCY $VERBOSITY diff --git a/qpid/cpp/src/tests/sender.cpp b/qpid/cpp/src/tests/sender.cpp index 48062315fe..9d9e5be99d 100644 --- a/qpid/cpp/src/tests/sender.cpp +++ b/qpid/cpp/src/tests/sender.cpp @@ -24,6 +24,7 @@ #include <qpid/client/AsyncSession.h> #include <qpid/client/Message.h> #include <qpid/client/MessageReplayTracker.h> +#include <qpid/client/QueueOptions.h> #include <qpid/Exception.h> #include "TestOptions.h" @@ -40,13 +41,17 @@ struct Args : public qpid::TestOptions string destination; string key; uint sendEos; + bool durable; + string lvqMatchValue; - Args() : key("test-queue"), sendEos(0) + Args() : key("test-queue"), sendEos(0), durable(false) { - addOptions() + addOptions() ("exchange", qpid::optValue(destination, "EXCHANGE"), "Exchange to send messages to") ("routing-key", qpid::optValue(key, "KEY"), "Routing key to add to messages") - ("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input"); + ("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input") + ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable.") + ("lvq-match-value", qpid::optValue(lvqMatchValue, "KEY"), "The value to set for the LVQ match key property"); } }; @@ -55,7 +60,7 @@ const string EOS("eos"); class Sender : public FailoverManager::Command { public: - Sender(const std::string& destination, const std::string& key, uint sendEos); + Sender(const std::string& destination, const std::string& key, uint sendEos, bool durable, const std::string& lvqMatchValue); void execute(AsyncSession& session, bool isRetry); private: const std::string destination; @@ -65,8 +70,17 @@ class Sender : public FailoverManager::Command uint sent; }; -Sender::Sender(const std::string& dest, const std::string& key, uint eos) : - destination(dest), sender(10), message("", key), sendEos(eos), sent(0) {} +Sender::Sender(const std::string& dest, const std::string& key, uint eos, bool durable, const std::string& lvqMatchValue) : + destination(dest), sender(10), message("", key), sendEos(eos), sent(0) +{ + if (durable){ + message.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); + } + + if (!lvqMatchValue.empty()) { + message.getHeaders().setString(QueueOptions::strLVQMatchProperty, lvqMatchValue); + } +} void Sender::execute(AsyncSession& session, bool isRetry) { @@ -90,7 +104,7 @@ int main(int argc, char ** argv) try { opts.parse(argc, argv); FailoverManager connection(opts.con); - Sender sender(opts.destination, opts.key, opts.sendEos); + Sender sender(opts.destination, opts.key, opts.sendEos, opts.durable, opts.lvqMatchValue); connection.execute(sender); connection.close(); return 0; diff --git a/qpid/cpp/src/tests/ssl_test b/qpid/cpp/src/tests/ssl_test index c7b59b62ef..13965f3a03 100755 --- a/qpid/cpp/src/tests/ssl_test +++ b/qpid/cpp/src/tests/ssl_test @@ -39,8 +39,7 @@ create_certs() { } start_broker() { - ../qpidd --daemon --transport ssl --port 0 --ssl-port 0 --no-data-dir --no-module-dir --auth no --config $CONFIG --load-module ../.libs/ssl.so --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE > qpidd.port - PORT=`cat qpidd.port` + PORT=`../qpidd --daemon --transport ssl --port 0 --ssl-port 0 --no-data-dir --no-module-dir --auth no --config $CONFIG --load-module ../.libs/ssl.so --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE` } stop_broker() { diff --git a/qpid/cpp/src/tests/start_cluster b/qpid/cpp/src/tests/start_cluster index 4f0516500c..053b23da33 100755 --- a/qpid/cpp/src/tests/start_cluster +++ b/qpid/cpp/src/tests/start_cluster @@ -28,11 +28,10 @@ with_ais_group() { echo $* | newgrp ais } -test -f cluster.ports && { echo "cluster.ports file already exists" ; exit 1; } rm -f cluster*.log SIZE=${1:-1}; shift CLUSTER=`pwd` # Cluster name=pwd, avoid clashes. -OPTS="-d --no-module-dir --load-module ../.libs/cluster.so --cluster-name=$CLUSTER --no-data-dir --auth=no $*" +OPTS="-d --no-module-dir --load-module ../.libs/cluster.so --cluster-name=$CLUSTER --no-data-dir --auth=no $@" for (( i=0; i<SIZE; ++i )); do PORT=`with_ais_group ../qpidd -p0 --log-to-file=cluster$i.log $OPTS` || exit 1 diff --git a/qpid/cpp/src/tests/txshift.cpp b/qpid/cpp/src/tests/txshift.cpp index 5f9c4f17fa..dd67710526 100644 --- a/qpid/cpp/src/tests/txshift.cpp +++ b/qpid/cpp/src/tests/txshift.cpp @@ -174,8 +174,8 @@ int main(int argc, char** argv) for (size_t i = 0; i < opts.workers; i++) { workers.push_back(new Worker(connection, opts.workQueue)); } - for_each(workers.begin(), workers.end(), boost::bind(&Worker::start, _1)); - for_each(workers.begin(), workers.end(), boost::bind(&Worker::join, _1)); + std::for_each(workers.begin(), workers.end(), boost::bind(&Worker::start, _1)); + std::for_each(workers.begin(), workers.end(), boost::bind(&Worker::join, _1)); } return 0; diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index 2cf4e915b6..8fdde0ada6 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -125,13 +125,13 @@ <field name="member-id" type="uint64"/> <field name="connection-id" type="uint64"/> <field name="user-name" type="str8"/> + <field name="fragment" type="str32"/> </control> <!-- Complete a cluster state update. --> <control name="membership" code="0x21" label="Cluster membership details."> <field name="joiners" type="map"/> <!-- member-id -> URL --> <field name="members" type="map"/> <!-- member-id -> state --> - <field name="frame-id" type="uint64"/>> <!-- Frame id counter value --> </control> <!-- Set the position of a replicated queue. --> @@ -139,11 +139,12 @@ <field name="queue" type="str8"/> <field name="position" type="sequence-no"/> </control> - + <!-- Replicate encoded exchanges/queues. --> <control name="exchange" code="0x31"><field name="encoded" type="str32"/></control> <control name="queue" code="0x32"><field name="encoded" type="str32"/></control> - + <!-- Set expiry-id for subsequent messages. --> + <control name="expiry-id" code="0x33"><field name="expiry-id" type="uint64"/></control> </class> </amqp> |