diff options
author | Alan Conway <aconway@apache.org> | 2008-07-08 15:22:37 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-07-08 15:22:37 +0000 |
commit | 43664d69dc90c128ad3f73327ac8331b02d7a38c (patch) | |
tree | e92629756eb899c3ad6cdac129a84e1966b4db40 /cpp | |
parent | 50d4782e2f3676723a2df5bf6a420d45fb55467d (diff) | |
download | qpid-python-43664d69dc90c128ad3f73327ac8331b02d7a38c.tar.gz |
Removed static Cpg::handlers, fixed ForkedBroker shutdown.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@674855 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cpg.cpp | 52 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cpg.h | 16 | ||||
-rw-r--r-- | cpp/src/tests/ForkedBroker.h | 33 | ||||
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 10 |
5 files changed, 53 insertions, 59 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 2727d5af0a..d97a840f82 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -114,6 +114,7 @@ Cluster::~Cluster() { QPID_LOG(trace, *this << " Leaving cluster."); try { cpg.leave(name); + cpg.shutdown(); dispatcher.join(); } catch (const std::exception& e) { diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp index 7831f66da1..3118e11e57 100644 --- a/cpp/src/qpid/cluster/Cpg.cpp +++ b/cpp/src/qpid/cluster/Cpg.cpp @@ -32,36 +32,14 @@ namespace cluster { using namespace std; -// Global vector of Cpg pointers by handle. -// TODO aconway 2007-06-12: Replace this with cpg_get/set_context, -// coming in in RHEL 5.1. -class Cpg::Handles -{ - public: - void put(cpg_handle_t handle, Cpg::Handler* handler) { - sys::Mutex::ScopedLock l(lock); - uint32_t index=uint32_t(handle); // Lower 32 bits is an array index. - if (index >= handles.size()) - handles.resize(index+1, 0); - handles[index] = handler; - } - - Cpg::Handler* get(cpg_handle_t handle) { - sys::Mutex::ScopedLock l(lock); - uint32_t index=uint32_t(handle); // Lower 32 bits is an array index. - assert(index < handles.size()); - assert(handles[index]); - return handles[index]; - } - - private: - sys::Mutex lock; - vector<Cpg::Handler*> handles; -}; - -Cpg::Handles Cpg::handles; +Cpg* Cpg::cpgFromHandle(cpg_handle_t handle) { + void* cpg=0; + check(cpg_context_get(handle, &cpg), "Cannot get CPG instance."); + if (!cpg) throw Exception("Cannot get CPG instance."); + return reinterpret_cast<Cpg*>(cpg); +} -// Global callback functions call per-object callbacks via handles vector. +// Global callback functions. void Cpg::globalDeliver ( cpg_handle_t handle, struct cpg_name *group, @@ -70,9 +48,7 @@ void Cpg::globalDeliver ( void* msg, int msg_len) { - Cpg::Handler* handler=handles.get(handle); - if (handler) - handler->deliver(handle, group, nodeid, pid, msg, msg_len); + cpgFromHandle(handle)->handler.deliver(handle, group, nodeid, pid, msg, msg_len); } void Cpg::globalConfigChange( @@ -83,15 +59,13 @@ void Cpg::globalConfigChange( struct cpg_address *joined, int nJoined ) { - Cpg::Handler* handler=handles.get(handle); - if (handler) - handler->configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined); + cpgFromHandle(handle)->handler.configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined); } Cpg::Cpg(Handler& h) : handler(h) { cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange }; check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG"); - handles.put(handle, &handler); + check(cpg_context_set(handle, this), "Cannot set CPG context"); QPID_LOG(debug, "Initialize CPG handle 0x" << std::hex << handle); } @@ -104,10 +78,10 @@ Cpg::~Cpg() { } void Cpg::shutdown() { - if (handles.get(handle)) { - QPID_LOG(debug, "Finalize CPG handle " << std::hex << handle); - handles.put(handle, 0); + if (handle) { + cpg_context_set(handle, 0); check(cpg_finalize(handle), "Error in shutdown of CPG"); + handle = 0; } } diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h index a918fb0cbf..d3142efcb6 100644 --- a/cpp/src/qpid/cluster/Cpg.h +++ b/cpp/src/qpid/cluster/Cpg.h @@ -38,6 +38,8 @@ namespace cluster { * Lightweight C++ interface to cpg.h operations. * Manages a single CPG handle, initialized in ctor, finialzed in destructor. * On error all functions throw Cpg::Exception + * + * NOTE: only one at a time can exist per process. */ class Cpg : public Dispatchable { public: @@ -95,7 +97,7 @@ class Cpg : public Dispatchable { */ Cpg(Handler&); - /** Destructor calls shutdown. */ + /** Destructor calls shutdown if not already calledx. */ ~Cpg(); /** Disconnect from CPG */ @@ -134,22 +136,17 @@ class Cpg : public Dispatchable { Id self() const; private: - class Handles; - struct ClearHandleOnExit; - friend class Handles; - friend struct ClearHandleOnExit; - static std::string errorStr(cpg_error_t err, const std::string& msg); static std::string cantJoinMsg(const Name&); static std::string cantLeaveMsg(const Name&); static std::string cantMcastMsg(const Name&); static void check(cpg_error_t result, const std::string& msg) { - // TODO aconway 2007-06-01: Logging and exceptions. - if (result != CPG_OK) - throw Exception(errorStr(result, msg)); + if (result != CPG_OK) throw Exception(errorStr(result, msg)); } + static Cpg* cpgFromHandle(cpg_handle_t); + static void globalDeliver( cpg_handle_t handle, struct cpg_name *group, @@ -166,7 +163,6 @@ class Cpg : public Dispatchable { struct cpg_address *joined, int nJoined ); - static Handles handles; cpg_handle_t handle; Handler& handler; }; diff --git a/cpp/src/tests/ForkedBroker.h b/cpp/src/tests/ForkedBroker.h index c2f3346296..5fb1ce8478 100644 --- a/cpp/src/tests/ForkedBroker.h +++ b/cpp/src/tests/ForkedBroker.h @@ -22,6 +22,7 @@ * */ +#include "qpid/Exception.h" #include "qpid/sys/Fork.h" #include "qpid/log/Logger.h" #include "qpid/broker/Broker.h" @@ -48,7 +49,7 @@ * */ class ForkedBroker : public qpid::sys::ForkWithMessage { - pid_t childPid; + pid_t pid; uint16_t port; qpid::broker::Broker::Options opts; std::string prefix; @@ -56,22 +57,33 @@ class ForkedBroker : public qpid::sys::ForkWithMessage { public: struct ChildExit {}; // Thrown in child processes. - ForkedBroker(const qpid::broker::Broker::Options& opts_, const std::string& prefix_=std::string()) - : childPid(0), port(0), opts(opts_), prefix(prefix_) { fork(); } + ForkedBroker(const qpid::broker::Broker::Options& opts_=qpid::broker::Broker::Options(), + const std::string& prefix_=std::string()) + : pid(0), port(0), opts(opts_), prefix(prefix_) { fork(); } - ~ForkedBroker() { stop(); } + ~ForkedBroker() { + try { stop(); } + catch(const std::exception& e) { + QPID_LOG(error, e.what()); + } + } void stop() { - if (childPid > 0) { - ::kill(childPid, SIGINT); - ::waitpid(childPid, 0, 0); + if (pid > 0) { // I am the parent, clean up children. + if (::kill(pid, SIGINT) < 0) + throw qpid::Exception(QPID_MSG("Can't kill process " << pid << ": " << qpid::strError(errno))); + int status = 0; + if (::waitpid(pid, &status, 0) < 0) + throw qpid::Exception(QPID_MSG("Waiting for process " << pid << ": " << qpid::strError(errno))); + if (WEXITSTATUS(status) != 0) + throw qpid::Exception(QPID_MSG("Process " << pid << " exited with status: " << WEXITSTATUS(status))); } } - void parent(pid_t pid) { - childPid = pid; + void parent(pid_t pid_) { + pid = pid_; qpid::log::Logger::instance().setPrefix("parent"); - std::string portStr = wait(2); + std::string portStr = wait(5); port = boost::lexical_cast<uint16_t>(portStr); } @@ -88,6 +100,7 @@ class ForkedBroker : public qpid::sys::ForkWithMessage { // Force exit in the child process, otherwise we will try to // carry with parent tests. + broker.reset(); // Run broker dtor before we exit. exit(0); } diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 2fa7cd325d..82d18aceff 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -147,6 +147,16 @@ QPID_AUTO_TEST_CASE(CpgBasic) { } +QPID_AUTO_TEST_CASE(testForkedBroker) { + // Verify the ForkedBroker works as expected. + Broker::Options opts; + opts.auth="no"; + opts.noDataDir=true; + ForkedBroker broker(opts); + Client c(broker.getPort()); + BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("amq.direct").getType()); +} + QPID_AUTO_TEST_CASE(testWiringReplication) { ClusterFixture cluster(2); // FIXME aconway 2008-07-02: 3 brokers Client c0(cluster[0].getPort()); |