summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-08-11 18:41:42 +0000
committerAlan Conway <aconway@apache.org>2008-08-11 18:41:42 +0000
commitebed79208a920e4986611e4b31f97921dbc93945 (patch)
tree6514a1f06a02e03a9b81a718e09012800c28c707
parent5c2e3052815e76e7565038f771cdb235e0516816 (diff)
downloadqpid-python-ebed79208a920e4986611e4b31f97921dbc93945.tar.gz
Integrate CPG file descriptor into broker polling.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@684865 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/Makefile.am4
-rw-r--r--cpp/src/qpid/broker/Broker.cpp2
-rw-r--r--cpp/src/qpid/broker/Broker.h2
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp60
-rw-r--r--cpp/src/qpid/cluster/Cluster.h9
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp1
-rw-r--r--cpp/src/qpid/cluster/ConnectionInterceptor.cpp16
-rw-r--r--cpp/src/qpid/cluster/ConnectionInterceptor.h2
-rw-r--r--cpp/src/qpid/cluster/Cpg.cpp19
-rw-r--r--cpp/src/qpid/cluster/Cpg.h19
-rw-r--r--cpp/src/tests/cluster_test.cpp25
11 files changed, 95 insertions, 64 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 22c92101c4..bb82551de0 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -43,16 +43,12 @@ endif # GENERATE
include $(srcdir)/rubygen.mk
include $(srcdir)/managementgen.mk
-DISTCLEANFILES=$(srcdir)/rubygen.mk $(srcdir)/managementgen.mk
-
# Code generated by C++
noinst_PROGRAMS=generate_MaxMethodBodySize_h
generate_MaxMethodBodySize_h_SOURCES=gen/generate_MaxMethodBodySize_h.cpp
qpid/framing/MaxMethodBodySize.h: generate_MaxMethodBodySize_h
./generate_MaxMethodBodySize_h
BUILT_SOURCES=qpid/framing/MaxMethodBodySize.h
-DISTCLEANFILES+=qpid/framing/MaxMethodBodySize.h
-
## Compiler flags
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index c7250d354c..4d7c07649b 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -395,5 +395,7 @@ void Broker::connect(
connect(addr.host, addr.port, false, failed, f);
}
+boost::shared_ptr<sys::Poller> Broker::getPoller() { return poller; }
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index b5f5aca8ba..f7399c375f 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -177,6 +177,8 @@ class Broker : public sys::Runnable, public Plugin::Target,
// For the present just return the first ProtocolFactory registered.
boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory() const;
+ /** Expose poller so plugins can register their descriptors. */
+ boost::shared_ptr<sys::Poller> getPoller();
};
}}
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 1d81a50e1c..3a0d11b5d1 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -28,6 +28,7 @@
#include "qpid/log/Statement.h"
#include "qpid/memory.h"
#include "qpid/shared_ptr.h"
+
#include <boost/bind.hpp>
#include <boost/cast.hpp>
#include <algorithm>
@@ -57,25 +58,32 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
broker(&b),
+ poller(b.getPoller()),
cpg(*this),
name(name_),
url(url_),
- self(cpg.self())
+ self(cpg.self()),
+ cpgDispatchHandle(cpg,
+ boost::bind(&Cluster::dispatch, this, _1), // read
+ 0, // write
+ boost::bind(&Cluster::disconnect, this, _1)) // disconnect
{
broker->addFinalizer(boost::bind(&Cluster::leave, this));
QPID_LOG(trace, "Joining cluster: " << name_);
cpg.join(name);
notify();
- dispatcher=Thread(*this);
- // Wait till we show up in the cluster map.
- {
- Mutex::ScopedLock l(lock);
- while (empty())
- lock.wait();
- }
+
+ // FIXME aconway 2008-08-11: can we remove this loop?
+ // Dispatch till we show up in the cluster map.
+ while (empty())
+ cpg.dispatchOne();
+
+ // Start dispatching from the poller.
+ cpgDispatchHandle.startWatch(poller);
}
-Cluster::~Cluster() {}
+Cluster::~Cluster() {
+}
// local connection initializes plugins
void Cluster::initialize(broker::Connection& c) {
@@ -87,14 +95,19 @@ void Cluster::initialize(broker::Connection& c) {
void Cluster::leave() {
Mutex::ScopedLock l(lock);
if (!broker) return; // Already left.
- assert(Thread::current().id() != dispatcher.id()); // Must not be called in the dispatch thread.
+ // At this point the poller has already been shut down so
+ // no dispatches can occur thru the cpgDispatchHandle.
+ //
+ // FIXME aconway 2008-08-11: assert this is the cae.
+
QPID_LOG(debug, "Leaving cluster " << *this);
cpg.leave(name);
- // The dispatch thread sets broker=0 when the final config-change
- // is delivered.
- while(broker) lock.wait();
+ // broker= is set to 0 when the final config-change is delivered.
+ while(broker) {
+ Mutex::ScopedUnlock u(lock);
+ cpg.dispatchAll();
+ }
cpg.shutdown();
- dispatcher.join();
}
template <class T> void decodePtr(Buffer& buf, T*& ptr) {
@@ -134,6 +147,13 @@ Cluster::MemberList Cluster::getMembers() const {
return result;
}
+// ################ HERE - leaking shadow connections.
+// FIXME aconway 2008-08-11: revisit memory management for shadow
+// connections, what if the Connection is closed other than via
+// disconnect? Dangling pointer in shadow map. Use ptr_map for shadow
+// map, add deleted state to ConnectionInterceptor? Interceptors need
+// to know about map? Check how Connections can be deleted.
+
ConnectionInterceptor* Cluster::getShadowConnection(const Cpg::Id& member, void* remotePtr) {
ShadowConnectionId id(member, remotePtr);
ShadowConnectionMap::iterator i = shadowConnectionMap.find(id);
@@ -233,8 +253,16 @@ void Cluster::configChange(
lock.notifyAll(); // Threads waiting for membership changes.
}
-void Cluster::run() {
- cpg.dispatchBlocking();
+void Cluster::dispatch(sys::DispatchHandle& h) {
+ cpg.dispatchAll();
+ h.rewatch();
+}
+
+void Cluster::disconnect(sys::DispatchHandle& h) {
+ h.stopWatch();
+ // FIXME aconway 2008-08-11: error handling if we are disconnected.
+ // Kill the broker?
+ assert(0);
}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index f5a695de24..1018684e7e 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -24,6 +24,7 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/Connection.h"
+#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
@@ -47,7 +48,7 @@ class ConnectionInterceptor;
* Connection to the cluster.
* Keeps cluster membership data.
*/
-class Cluster : private sys::Runnable, private Cpg::Handler, public RefCounted
+class Cluster : private Cpg::Handler, public RefCounted
{
public:
typedef boost::tuple<Cpg::Id, void*> ShadowConnectionId;
@@ -115,7 +116,8 @@ class Cluster : private sys::Runnable, private Cpg::Handler, public RefCounted
struct cpg_address */*joined*/, int /*nJoined*/
);
- void run();
+ void dispatch(sys::DispatchHandle&);
+ void disconnect(sys::DispatchHandle&);
void handleMethod(Id from, ConnectionInterceptor* connection, framing::AMQMethodBody& method);
@@ -123,14 +125,15 @@ class Cluster : private sys::Runnable, private Cpg::Handler, public RefCounted
mutable sys::Monitor lock; // Protect access to members.
broker::Broker* broker;
+ boost::shared_ptr<sys::Poller> poller;
Cpg cpg;
Cpg::Name name;
Url url;
MemberMap members;
- sys::Thread dispatcher;
Id self;
ShadowConnectionMap shadowConnectionMap;
ShadowConnectionOutputHandler shadowOut;
+ sys::DispatchHandle cpgDispatchHandle;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index a2c66e3790..1d07660455 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -18,7 +18,6 @@
#include "ConnectionInterceptor.h"
-
#include "qpid/broker/Broker.h"
#include "qpid/cluster/Cluster.h"
#include "qpid/Plugin.h"
diff --git a/cpp/src/qpid/cluster/ConnectionInterceptor.cpp b/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
index 656f05e685..81d496597a 100644
--- a/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
+++ b/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
@@ -52,6 +52,7 @@ void ConnectionInterceptor::received(framing::AMQFrame& f) {
}
void ConnectionInterceptor::deliver(framing::AMQFrame& f) {
+ // ostringstream os; os << f; printf("Received: %s\n", os.str().c_str()); // FIXME aconway 2008-08-08: remove
receivedNext(f);
}
@@ -83,16 +84,21 @@ void ConnectionInterceptor::deliverClosed() {
bool ConnectionInterceptor::doOutput() {
if (connection->hasOutput()) {
- printf("doOutput send %p\n", (void*)this);
+ QPID_LOG(debug, "Intercept doOutput, call doOutputNext"); // FIXME aconway 2008-08-08: remove
cluster.send(AMQFrame(in_place<ClusterConnectionDoOutputBody>()), this);
- }
-
+ return doOutputNext();
+ }
return false;
}
void ConnectionInterceptor::deliverDoOutput() {
- printf("doOutput deliver %p\n", (void*)this);
- doOutputNext();
+ if (isShadow()) {
+ QPID_LOG(debug, "Shadow deliver do output, call doOutputNext"); // FIXME aconway 2008-08-08: remove
+ doOutputNext();
+ }
+ else {
+ QPID_LOG(debug, "Primary deliver doOutput, ignore."); // FIXME aconway 2008-08-08: remove
+ }
}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ConnectionInterceptor.h b/cpp/src/qpid/cluster/ConnectionInterceptor.h
index 7a955ddd80..a256738aeb 100644
--- a/cpp/src/qpid/cluster/ConnectionInterceptor.h
+++ b/cpp/src/qpid/cluster/ConnectionInterceptor.h
@@ -55,6 +55,8 @@ class ConnectionInterceptor {
void doOutput() {}
void activateOutput() {}
};
+
+ bool isShadow() { return shadowId != Cluster::ShadowConnectionId(0,0); }
// Functions to intercept to Connection extension points.
void received(framing::AMQFrame&);
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp
index 6b01d73197..2ffd3509bf 100644
--- a/cpp/src/qpid/cluster/Cpg.cpp
+++ b/cpp/src/qpid/cluster/Cpg.cpp
@@ -17,8 +17,9 @@
*/
#include "Cpg.h"
-
#include "qpid/sys/Mutex.h"
+// Note cpg is currently unix-specific. Refactor if availble on other platforms.
+#include "qpid/sys/posix/PrivatePosix.h"
#include "qpid/log/Statement.h"
#include <vector>
@@ -62,11 +63,21 @@ void Cpg::globalConfigChange(
cpgFromHandle(handle)->handler.configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined);
}
-Cpg::Cpg(Handler& h) : handler(h), isShutdown(false) {
+int Cpg::getFd() {
+ int fd;
+ check(cpg_fd_get(handle, &fd), "Cannot get CPG file descriptor");
+ return fd;
+}
+
+Cpg::Cpg(Handler& h) : IOHandle(new sys::IOHandlePrivate), handler(h), isShutdown(false) {
cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange };
check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG");
check(cpg_context_set(handle, this), "Cannot set CPG context");
- QPID_LOG(debug, "Initialize CPG handle 0x" << std::hex << handle);
+ // Note: CPG is currently unix-specific. If CPG is ported to
+ // windows then this needs to be refactored into
+ // qpid::sys::<platform>
+ IOHandle::impl->fd = getFd();
+ QPID_LOG(debug, "Initialized CPG handle 0x" << std::hex << handle);
}
Cpg::~Cpg() {
@@ -93,6 +104,7 @@ bool Cpg::isFlowControlEnabled() {
// TODO aconway 2008-08-07: better handling of flow control.
// Wait for flow control to be disabled.
+// FIXME aconway 2008-08-08: does flow control check involve a round-trip? If so maybe remove...
void Cpg::waitForFlowControl() {
int delayNs=1000; // one millisecond
int tries=8; // double the delay on each try.
@@ -178,7 +190,6 @@ ostream& operator <<(ostream& out, const cpg_name& name) {
return out << string(name.value, name.length);
}
-
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h
index ab5af16b3d..96fd692a77 100644
--- a/cpp/src/qpid/cluster/Cpg.h
+++ b/cpp/src/qpid/cluster/Cpg.h
@@ -20,11 +20,15 @@
*/
#include "qpid/Exception.h"
+#include "qpid/sys/IOHandle.h"
#include "qpid/cluster/Dispatchable.h"
#include <boost/tuple/tuple.hpp>
#include <boost/tuple/tuple_comparison.hpp>
+#include <boost/scoped_ptr.hpp>
+
#include <cassert>
+
#include <string.h>
extern "C" {
@@ -34,14 +38,15 @@ extern "C" {
namespace qpid {
namespace cluster {
+
/**
- * Lightweight C++ interface to cpg.h operations.
+ * Lightweight C++ interface to cpg.h operations.
+ *
* Manages a single CPG handle, initialized in ctor, finialzed in destructor.
- * On error all functions throw Cpg::Exception
+ * On error all functions throw Cpg::Exception.
*
- * NOTE: only one at a time can exist per process.
*/
-class Cpg : public Dispatchable {
+class Cpg : public sys::IOHandle {
public:
struct Exception : public ::qpid::Exception {
Exception(const std::string& msg) : ::qpid::Exception(msg) {}
@@ -60,7 +65,6 @@ class Cpg : public Dispatchable {
std::string str() const { return std::string(value, length); }
};
-
// boost::tuple gives us == and < for free.
struct Id : public boost::tuple<uint32_t, uint32_t> {
Id(uint32_t n=0, uint32_t p=0) : boost::tuple<uint32_t, uint32_t>(n, p) {}
@@ -125,11 +129,13 @@ class Cpg : public Dispatchable {
Id self() const;
+ int getFd();
+
private:
static std::string errorStr(cpg_error_t err, const std::string& msg);
static std::string cantJoinMsg(const Name&);
static std::string cantLeaveMsg(const Name&); std::string cantMcastMsg(const Name&);
-
+
static void check(cpg_error_t result, const std::string& msg) {
if (result != CPG_OK) throw Exception(errorStr(result, msg));
}
@@ -172,5 +178,4 @@ inline bool operator!=(const cpg_name& a, const cpg_name& b) { return !(a == b);
}} // namespace qpid::cluster
-
#endif /*!CPG_H*/
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index da542352d9..49c5264990 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -162,28 +162,6 @@ struct Callback : public Cpg::Handler {
}
};
-#if 0 // FIXME aconway 2008-08-06:
-
-QPID_AUTO_TEST_CASE(CpgBasic) {
- // Verify basic functionality of cpg. This will catch any
- // openais configuration or permission errors.
- //
- Cpg::Name group("CpgBasic");
- Callback cb(group.str());
- Cpg cpg(cb);
- cpg.join(group);
- iovec iov = { (void*)"Hello!", 6 };
- cpg.mcast(group, &iov, 1);
- cpg.leave(group);
- cpg.dispatchSome();
-
- BOOST_REQUIRE_EQUAL(1u, cb.delivered.size());
- BOOST_CHECK_EQUAL("Hello!", cb.delivered.front());
- BOOST_REQUIRE_EQUAL(2u, cb.configChanges.size());
- BOOST_CHECK_EQUAL(1, cb.configChanges[0]);
- BOOST_CHECK_EQUAL(0, cb.configChanges[1]);
-}
-
QPID_AUTO_TEST_CASE(testForkedBroker) {
// Verify the ForkedBroker works as expected.
const char* argv[] = { "", "--auth=no", "--no-data-dir", "--log-prefix=testForkedBroker" };
@@ -250,7 +228,7 @@ QPID_AUTO_TEST_CASE(testMessageDequeue) {
BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount());
BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
}
-#endif
+
QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
ClusterFixture cluster(3);
// First start a subscription.
@@ -276,6 +254,5 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
}
-// TODO aconway 2008-06-25: failover.
QPID_AUTO_TEST_SUITE_END()