diff options
author | Alan Conway <aconway@apache.org> | 2008-08-11 18:41:42 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-08-11 18:41:42 +0000 |
commit | ebed79208a920e4986611e4b31f97921dbc93945 (patch) | |
tree | 6514a1f06a02e03a9b81a718e09012800c28c707 | |
parent | 5c2e3052815e76e7565038f771cdb235e0516816 (diff) | |
download | qpid-python-ebed79208a920e4986611e4b31f97921dbc93945.tar.gz |
Integrate CPG file descriptor into broker polling.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@684865 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/Makefile.am | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 60 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 9 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ConnectionInterceptor.cpp | 16 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ConnectionInterceptor.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cpg.cpp | 19 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cpg.h | 19 | ||||
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 25 |
11 files changed, 95 insertions, 64 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 22c92101c4..bb82551de0 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -43,16 +43,12 @@ endif # GENERATE include $(srcdir)/rubygen.mk include $(srcdir)/managementgen.mk -DISTCLEANFILES=$(srcdir)/rubygen.mk $(srcdir)/managementgen.mk - # Code generated by C++ noinst_PROGRAMS=generate_MaxMethodBodySize_h generate_MaxMethodBodySize_h_SOURCES=gen/generate_MaxMethodBodySize_h.cpp qpid/framing/MaxMethodBodySize.h: generate_MaxMethodBodySize_h ./generate_MaxMethodBodySize_h BUILT_SOURCES=qpid/framing/MaxMethodBodySize.h -DISTCLEANFILES+=qpid/framing/MaxMethodBodySize.h - ## Compiler flags diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index c7250d354c..4d7c07649b 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -395,5 +395,7 @@ void Broker::connect( connect(addr.host, addr.port, false, failed, f); } +boost::shared_ptr<sys::Poller> Broker::getPoller() { return poller; } + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index b5f5aca8ba..f7399c375f 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -177,6 +177,8 @@ class Broker : public sys::Runnable, public Plugin::Target, // For the present just return the first ProtocolFactory registered. boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory() const; + /** Expose poller so plugins can register their descriptors. */ + boost::shared_ptr<sys::Poller> getPoller(); }; }} diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 1d81a50e1c..3a0d11b5d1 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -28,6 +28,7 @@ #include "qpid/log/Statement.h" #include "qpid/memory.h" #include "qpid/shared_ptr.h" + #include <boost/bind.hpp> #include <boost/cast.hpp> #include <algorithm> @@ -57,25 +58,32 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) { Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : broker(&b), + poller(b.getPoller()), cpg(*this), name(name_), url(url_), - self(cpg.self()) + self(cpg.self()), + cpgDispatchHandle(cpg, + boost::bind(&Cluster::dispatch, this, _1), // read + 0, // write + boost::bind(&Cluster::disconnect, this, _1)) // disconnect { broker->addFinalizer(boost::bind(&Cluster::leave, this)); QPID_LOG(trace, "Joining cluster: " << name_); cpg.join(name); notify(); - dispatcher=Thread(*this); - // Wait till we show up in the cluster map. - { - Mutex::ScopedLock l(lock); - while (empty()) - lock.wait(); - } + + // FIXME aconway 2008-08-11: can we remove this loop? + // Dispatch till we show up in the cluster map. + while (empty()) + cpg.dispatchOne(); + + // Start dispatching from the poller. + cpgDispatchHandle.startWatch(poller); } -Cluster::~Cluster() {} +Cluster::~Cluster() { +} // local connection initializes plugins void Cluster::initialize(broker::Connection& c) { @@ -87,14 +95,19 @@ void Cluster::initialize(broker::Connection& c) { void Cluster::leave() { Mutex::ScopedLock l(lock); if (!broker) return; // Already left. - assert(Thread::current().id() != dispatcher.id()); // Must not be called in the dispatch thread. + // At this point the poller has already been shut down so + // no dispatches can occur thru the cpgDispatchHandle. + // + // FIXME aconway 2008-08-11: assert this is the cae. + QPID_LOG(debug, "Leaving cluster " << *this); cpg.leave(name); - // The dispatch thread sets broker=0 when the final config-change - // is delivered. - while(broker) lock.wait(); + // broker= is set to 0 when the final config-change is delivered. + while(broker) { + Mutex::ScopedUnlock u(lock); + cpg.dispatchAll(); + } cpg.shutdown(); - dispatcher.join(); } template <class T> void decodePtr(Buffer& buf, T*& ptr) { @@ -134,6 +147,13 @@ Cluster::MemberList Cluster::getMembers() const { return result; } +// ################ HERE - leaking shadow connections. +// FIXME aconway 2008-08-11: revisit memory management for shadow +// connections, what if the Connection is closed other than via +// disconnect? Dangling pointer in shadow map. Use ptr_map for shadow +// map, add deleted state to ConnectionInterceptor? Interceptors need +// to know about map? Check how Connections can be deleted. + ConnectionInterceptor* Cluster::getShadowConnection(const Cpg::Id& member, void* remotePtr) { ShadowConnectionId id(member, remotePtr); ShadowConnectionMap::iterator i = shadowConnectionMap.find(id); @@ -233,8 +253,16 @@ void Cluster::configChange( lock.notifyAll(); // Threads waiting for membership changes. } -void Cluster::run() { - cpg.dispatchBlocking(); +void Cluster::dispatch(sys::DispatchHandle& h) { + cpg.dispatchAll(); + h.rewatch(); +} + +void Cluster::disconnect(sys::DispatchHandle& h) { + h.stopWatch(); + // FIXME aconway 2008-08-11: error handling if we are disconnected. + // Kill the broker? + assert(0); } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index f5a695de24..1018684e7e 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -24,6 +24,7 @@ #include "qpid/broker/Broker.h" #include "qpid/broker/Connection.h" +#include "qpid/sys/Dispatcher.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" @@ -47,7 +48,7 @@ class ConnectionInterceptor; * Connection to the cluster. * Keeps cluster membership data. */ -class Cluster : private sys::Runnable, private Cpg::Handler, public RefCounted +class Cluster : private Cpg::Handler, public RefCounted { public: typedef boost::tuple<Cpg::Id, void*> ShadowConnectionId; @@ -115,7 +116,8 @@ class Cluster : private sys::Runnable, private Cpg::Handler, public RefCounted struct cpg_address */*joined*/, int /*nJoined*/ ); - void run(); + void dispatch(sys::DispatchHandle&); + void disconnect(sys::DispatchHandle&); void handleMethod(Id from, ConnectionInterceptor* connection, framing::AMQMethodBody& method); @@ -123,14 +125,15 @@ class Cluster : private sys::Runnable, private Cpg::Handler, public RefCounted mutable sys::Monitor lock; // Protect access to members. broker::Broker* broker; + boost::shared_ptr<sys::Poller> poller; Cpg cpg; Cpg::Name name; Url url; MemberMap members; - sys::Thread dispatcher; Id self; ShadowConnectionMap shadowConnectionMap; ShadowConnectionOutputHandler shadowOut; + sys::DispatchHandle cpgDispatchHandle; friend std::ostream& operator <<(std::ostream&, const Cluster&); friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&); diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index a2c66e3790..1d07660455 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -18,7 +18,6 @@ #include "ConnectionInterceptor.h" - #include "qpid/broker/Broker.h" #include "qpid/cluster/Cluster.h" #include "qpid/Plugin.h" diff --git a/cpp/src/qpid/cluster/ConnectionInterceptor.cpp b/cpp/src/qpid/cluster/ConnectionInterceptor.cpp index 656f05e685..81d496597a 100644 --- a/cpp/src/qpid/cluster/ConnectionInterceptor.cpp +++ b/cpp/src/qpid/cluster/ConnectionInterceptor.cpp @@ -52,6 +52,7 @@ void ConnectionInterceptor::received(framing::AMQFrame& f) { } void ConnectionInterceptor::deliver(framing::AMQFrame& f) { + // ostringstream os; os << f; printf("Received: %s\n", os.str().c_str()); // FIXME aconway 2008-08-08: remove receivedNext(f); } @@ -83,16 +84,21 @@ void ConnectionInterceptor::deliverClosed() { bool ConnectionInterceptor::doOutput() { if (connection->hasOutput()) { - printf("doOutput send %p\n", (void*)this); + QPID_LOG(debug, "Intercept doOutput, call doOutputNext"); // FIXME aconway 2008-08-08: remove cluster.send(AMQFrame(in_place<ClusterConnectionDoOutputBody>()), this); - } - + return doOutputNext(); + } return false; } void ConnectionInterceptor::deliverDoOutput() { - printf("doOutput deliver %p\n", (void*)this); - doOutputNext(); + if (isShadow()) { + QPID_LOG(debug, "Shadow deliver do output, call doOutputNext"); // FIXME aconway 2008-08-08: remove + doOutputNext(); + } + else { + QPID_LOG(debug, "Primary deliver doOutput, ignore."); // FIXME aconway 2008-08-08: remove + } } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ConnectionInterceptor.h b/cpp/src/qpid/cluster/ConnectionInterceptor.h index 7a955ddd80..a256738aeb 100644 --- a/cpp/src/qpid/cluster/ConnectionInterceptor.h +++ b/cpp/src/qpid/cluster/ConnectionInterceptor.h @@ -55,6 +55,8 @@ class ConnectionInterceptor { void doOutput() {} void activateOutput() {} }; + + bool isShadow() { return shadowId != Cluster::ShadowConnectionId(0,0); } // Functions to intercept to Connection extension points. void received(framing::AMQFrame&); diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp index 6b01d73197..2ffd3509bf 100644 --- a/cpp/src/qpid/cluster/Cpg.cpp +++ b/cpp/src/qpid/cluster/Cpg.cpp @@ -17,8 +17,9 @@ */ #include "Cpg.h" - #include "qpid/sys/Mutex.h" +// Note cpg is currently unix-specific. Refactor if availble on other platforms. +#include "qpid/sys/posix/PrivatePosix.h" #include "qpid/log/Statement.h" #include <vector> @@ -62,11 +63,21 @@ void Cpg::globalConfigChange( cpgFromHandle(handle)->handler.configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined); } -Cpg::Cpg(Handler& h) : handler(h), isShutdown(false) { +int Cpg::getFd() { + int fd; + check(cpg_fd_get(handle, &fd), "Cannot get CPG file descriptor"); + return fd; +} + +Cpg::Cpg(Handler& h) : IOHandle(new sys::IOHandlePrivate), handler(h), isShutdown(false) { cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange }; check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG"); check(cpg_context_set(handle, this), "Cannot set CPG context"); - QPID_LOG(debug, "Initialize CPG handle 0x" << std::hex << handle); + // Note: CPG is currently unix-specific. If CPG is ported to + // windows then this needs to be refactored into + // qpid::sys::<platform> + IOHandle::impl->fd = getFd(); + QPID_LOG(debug, "Initialized CPG handle 0x" << std::hex << handle); } Cpg::~Cpg() { @@ -93,6 +104,7 @@ bool Cpg::isFlowControlEnabled() { // TODO aconway 2008-08-07: better handling of flow control. // Wait for flow control to be disabled. +// FIXME aconway 2008-08-08: does flow control check involve a round-trip? If so maybe remove... void Cpg::waitForFlowControl() { int delayNs=1000; // one millisecond int tries=8; // double the delay on each try. @@ -178,7 +190,6 @@ ostream& operator <<(ostream& out, const cpg_name& name) { return out << string(name.value, name.length); } - }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h index ab5af16b3d..96fd692a77 100644 --- a/cpp/src/qpid/cluster/Cpg.h +++ b/cpp/src/qpid/cluster/Cpg.h @@ -20,11 +20,15 @@ */ #include "qpid/Exception.h" +#include "qpid/sys/IOHandle.h" #include "qpid/cluster/Dispatchable.h" #include <boost/tuple/tuple.hpp> #include <boost/tuple/tuple_comparison.hpp> +#include <boost/scoped_ptr.hpp> + #include <cassert> + #include <string.h> extern "C" { @@ -34,14 +38,15 @@ extern "C" { namespace qpid { namespace cluster { + /** - * Lightweight C++ interface to cpg.h operations. + * Lightweight C++ interface to cpg.h operations. + * * Manages a single CPG handle, initialized in ctor, finialzed in destructor. - * On error all functions throw Cpg::Exception + * On error all functions throw Cpg::Exception. * - * NOTE: only one at a time can exist per process. */ -class Cpg : public Dispatchable { +class Cpg : public sys::IOHandle { public: struct Exception : public ::qpid::Exception { Exception(const std::string& msg) : ::qpid::Exception(msg) {} @@ -60,7 +65,6 @@ class Cpg : public Dispatchable { std::string str() const { return std::string(value, length); } }; - // boost::tuple gives us == and < for free. struct Id : public boost::tuple<uint32_t, uint32_t> { Id(uint32_t n=0, uint32_t p=0) : boost::tuple<uint32_t, uint32_t>(n, p) {} @@ -125,11 +129,13 @@ class Cpg : public Dispatchable { Id self() const; + int getFd(); + private: static std::string errorStr(cpg_error_t err, const std::string& msg); static std::string cantJoinMsg(const Name&); static std::string cantLeaveMsg(const Name&); std::string cantMcastMsg(const Name&); - + static void check(cpg_error_t result, const std::string& msg) { if (result != CPG_OK) throw Exception(errorStr(result, msg)); } @@ -172,5 +178,4 @@ inline bool operator!=(const cpg_name& a, const cpg_name& b) { return !(a == b); }} // namespace qpid::cluster - #endif /*!CPG_H*/ diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index da542352d9..49c5264990 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -162,28 +162,6 @@ struct Callback : public Cpg::Handler { } }; -#if 0 // FIXME aconway 2008-08-06: - -QPID_AUTO_TEST_CASE(CpgBasic) { - // Verify basic functionality of cpg. This will catch any - // openais configuration or permission errors. - // - Cpg::Name group("CpgBasic"); - Callback cb(group.str()); - Cpg cpg(cb); - cpg.join(group); - iovec iov = { (void*)"Hello!", 6 }; - cpg.mcast(group, &iov, 1); - cpg.leave(group); - cpg.dispatchSome(); - - BOOST_REQUIRE_EQUAL(1u, cb.delivered.size()); - BOOST_CHECK_EQUAL("Hello!", cb.delivered.front()); - BOOST_REQUIRE_EQUAL(2u, cb.configChanges.size()); - BOOST_CHECK_EQUAL(1, cb.configChanges[0]); - BOOST_CHECK_EQUAL(0, cb.configChanges[1]); -} - QPID_AUTO_TEST_CASE(testForkedBroker) { // Verify the ForkedBroker works as expected. const char* argv[] = { "", "--auth=no", "--no-data-dir", "--log-prefix=testForkedBroker" }; @@ -250,7 +228,7 @@ QPID_AUTO_TEST_CASE(testMessageDequeue) { BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount()); BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount()); } -#endif + QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) { ClusterFixture cluster(3); // First start a subscription. @@ -276,6 +254,5 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) { BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount()); } -// TODO aconway 2008-06-25: failover. QPID_AUTO_TEST_SUITE_END() |