summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-07-08 15:22:37 +0000
committerAlan Conway <aconway@apache.org>2008-07-08 15:22:37 +0000
commit43664d69dc90c128ad3f73327ac8331b02d7a38c (patch)
treee92629756eb899c3ad6cdac129a84e1966b4db40 /cpp
parent50d4782e2f3676723a2df5bf6a420d45fb55467d (diff)
downloadqpid-python-43664d69dc90c128ad3f73327ac8331b02d7a38c.tar.gz
Removed static Cpg::handlers, fixed ForkedBroker shutdown.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@674855 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp1
-rw-r--r--cpp/src/qpid/cluster/Cpg.cpp52
-rw-r--r--cpp/src/qpid/cluster/Cpg.h16
-rw-r--r--cpp/src/tests/ForkedBroker.h33
-rw-r--r--cpp/src/tests/cluster_test.cpp10
5 files changed, 53 insertions, 59 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 2727d5af0a..d97a840f82 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -114,6 +114,7 @@ Cluster::~Cluster() {
QPID_LOG(trace, *this << " Leaving cluster.");
try {
cpg.leave(name);
+ cpg.shutdown();
dispatcher.join();
}
catch (const std::exception& e) {
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp
index 7831f66da1..3118e11e57 100644
--- a/cpp/src/qpid/cluster/Cpg.cpp
+++ b/cpp/src/qpid/cluster/Cpg.cpp
@@ -32,36 +32,14 @@ namespace cluster {
using namespace std;
-// Global vector of Cpg pointers by handle.
-// TODO aconway 2007-06-12: Replace this with cpg_get/set_context,
-// coming in in RHEL 5.1.
-class Cpg::Handles
-{
- public:
- void put(cpg_handle_t handle, Cpg::Handler* handler) {
- sys::Mutex::ScopedLock l(lock);
- uint32_t index=uint32_t(handle); // Lower 32 bits is an array index.
- if (index >= handles.size())
- handles.resize(index+1, 0);
- handles[index] = handler;
- }
-
- Cpg::Handler* get(cpg_handle_t handle) {
- sys::Mutex::ScopedLock l(lock);
- uint32_t index=uint32_t(handle); // Lower 32 bits is an array index.
- assert(index < handles.size());
- assert(handles[index]);
- return handles[index];
- }
-
- private:
- sys::Mutex lock;
- vector<Cpg::Handler*> handles;
-};
-
-Cpg::Handles Cpg::handles;
+Cpg* Cpg::cpgFromHandle(cpg_handle_t handle) {
+ void* cpg=0;
+ check(cpg_context_get(handle, &cpg), "Cannot get CPG instance.");
+ if (!cpg) throw Exception("Cannot get CPG instance.");
+ return reinterpret_cast<Cpg*>(cpg);
+}
-// Global callback functions call per-object callbacks via handles vector.
+// Global callback functions.
void Cpg::globalDeliver (
cpg_handle_t handle,
struct cpg_name *group,
@@ -70,9 +48,7 @@ void Cpg::globalDeliver (
void* msg,
int msg_len)
{
- Cpg::Handler* handler=handles.get(handle);
- if (handler)
- handler->deliver(handle, group, nodeid, pid, msg, msg_len);
+ cpgFromHandle(handle)->handler.deliver(handle, group, nodeid, pid, msg, msg_len);
}
void Cpg::globalConfigChange(
@@ -83,15 +59,13 @@ void Cpg::globalConfigChange(
struct cpg_address *joined, int nJoined
)
{
- Cpg::Handler* handler=handles.get(handle);
- if (handler)
- handler->configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined);
+ cpgFromHandle(handle)->handler.configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined);
}
Cpg::Cpg(Handler& h) : handler(h) {
cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange };
check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG");
- handles.put(handle, &handler);
+ check(cpg_context_set(handle, this), "Cannot set CPG context");
QPID_LOG(debug, "Initialize CPG handle 0x" << std::hex << handle);
}
@@ -104,10 +78,10 @@ Cpg::~Cpg() {
}
void Cpg::shutdown() {
- if (handles.get(handle)) {
- QPID_LOG(debug, "Finalize CPG handle " << std::hex << handle);
- handles.put(handle, 0);
+ if (handle) {
+ cpg_context_set(handle, 0);
check(cpg_finalize(handle), "Error in shutdown of CPG");
+ handle = 0;
}
}
diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h
index a918fb0cbf..d3142efcb6 100644
--- a/cpp/src/qpid/cluster/Cpg.h
+++ b/cpp/src/qpid/cluster/Cpg.h
@@ -38,6 +38,8 @@ namespace cluster {
* Lightweight C++ interface to cpg.h operations.
* Manages a single CPG handle, initialized in ctor, finialzed in destructor.
* On error all functions throw Cpg::Exception
+ *
+ * NOTE: only one at a time can exist per process.
*/
class Cpg : public Dispatchable {
public:
@@ -95,7 +97,7 @@ class Cpg : public Dispatchable {
*/
Cpg(Handler&);
- /** Destructor calls shutdown. */
+ /** Destructor calls shutdown if not already calledx. */
~Cpg();
/** Disconnect from CPG */
@@ -134,22 +136,17 @@ class Cpg : public Dispatchable {
Id self() const;
private:
- class Handles;
- struct ClearHandleOnExit;
- friend class Handles;
- friend struct ClearHandleOnExit;
-
static std::string errorStr(cpg_error_t err, const std::string& msg);
static std::string cantJoinMsg(const Name&);
static std::string cantLeaveMsg(const Name&);
static std::string cantMcastMsg(const Name&);
static void check(cpg_error_t result, const std::string& msg) {
- // TODO aconway 2007-06-01: Logging and exceptions.
- if (result != CPG_OK)
- throw Exception(errorStr(result, msg));
+ if (result != CPG_OK) throw Exception(errorStr(result, msg));
}
+ static Cpg* cpgFromHandle(cpg_handle_t);
+
static void globalDeliver(
cpg_handle_t handle,
struct cpg_name *group,
@@ -166,7 +163,6 @@ class Cpg : public Dispatchable {
struct cpg_address *joined, int nJoined
);
- static Handles handles;
cpg_handle_t handle;
Handler& handler;
};
diff --git a/cpp/src/tests/ForkedBroker.h b/cpp/src/tests/ForkedBroker.h
index c2f3346296..5fb1ce8478 100644
--- a/cpp/src/tests/ForkedBroker.h
+++ b/cpp/src/tests/ForkedBroker.h
@@ -22,6 +22,7 @@
*
*/
+#include "qpid/Exception.h"
#include "qpid/sys/Fork.h"
#include "qpid/log/Logger.h"
#include "qpid/broker/Broker.h"
@@ -48,7 +49,7 @@
*
*/
class ForkedBroker : public qpid::sys::ForkWithMessage {
- pid_t childPid;
+ pid_t pid;
uint16_t port;
qpid::broker::Broker::Options opts;
std::string prefix;
@@ -56,22 +57,33 @@ class ForkedBroker : public qpid::sys::ForkWithMessage {
public:
struct ChildExit {}; // Thrown in child processes.
- ForkedBroker(const qpid::broker::Broker::Options& opts_, const std::string& prefix_=std::string())
- : childPid(0), port(0), opts(opts_), prefix(prefix_) { fork(); }
+ ForkedBroker(const qpid::broker::Broker::Options& opts_=qpid::broker::Broker::Options(),
+ const std::string& prefix_=std::string())
+ : pid(0), port(0), opts(opts_), prefix(prefix_) { fork(); }
- ~ForkedBroker() { stop(); }
+ ~ForkedBroker() {
+ try { stop(); }
+ catch(const std::exception& e) {
+ QPID_LOG(error, e.what());
+ }
+ }
void stop() {
- if (childPid > 0) {
- ::kill(childPid, SIGINT);
- ::waitpid(childPid, 0, 0);
+ if (pid > 0) { // I am the parent, clean up children.
+ if (::kill(pid, SIGINT) < 0)
+ throw qpid::Exception(QPID_MSG("Can't kill process " << pid << ": " << qpid::strError(errno)));
+ int status = 0;
+ if (::waitpid(pid, &status, 0) < 0)
+ throw qpid::Exception(QPID_MSG("Waiting for process " << pid << ": " << qpid::strError(errno)));
+ if (WEXITSTATUS(status) != 0)
+ throw qpid::Exception(QPID_MSG("Process " << pid << " exited with status: " << WEXITSTATUS(status)));
}
}
- void parent(pid_t pid) {
- childPid = pid;
+ void parent(pid_t pid_) {
+ pid = pid_;
qpid::log::Logger::instance().setPrefix("parent");
- std::string portStr = wait(2);
+ std::string portStr = wait(5);
port = boost::lexical_cast<uint16_t>(portStr);
}
@@ -88,6 +100,7 @@ class ForkedBroker : public qpid::sys::ForkWithMessage {
// Force exit in the child process, otherwise we will try to
// carry with parent tests.
+ broker.reset(); // Run broker dtor before we exit.
exit(0);
}
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 2fa7cd325d..82d18aceff 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -147,6 +147,16 @@ QPID_AUTO_TEST_CASE(CpgBasic) {
}
+QPID_AUTO_TEST_CASE(testForkedBroker) {
+ // Verify the ForkedBroker works as expected.
+ Broker::Options opts;
+ opts.auth="no";
+ opts.noDataDir=true;
+ ForkedBroker broker(opts);
+ Client c(broker.getPort());
+ BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("amq.direct").getType());
+}
+
QPID_AUTO_TEST_CASE(testWiringReplication) {
ClusterFixture cluster(2); // FIXME aconway 2008-07-02: 3 brokers
Client c0(cluster[0].getPort());