summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-01-06 17:00:57 +0000
committerAlan Conway <aconway@apache.org>2010-01-06 17:00:57 +0000
commit828c153afcdc15766cf71945cdf5f98f471a08aa (patch)
treea8429d974abf5c1b75574c43d633254207caedce /qpid/cpp/src
parent7e3e4d5e664c52fbc7faf16cf49771d62ccd53e1 (diff)
downloadqpid-python-828c153afcdc15766cf71945cdf5f98f471a08aa.tar.gz
Don't hold up broker initialization for cluster initialization.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@896536 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp40
-rw-r--r--qpid/cpp/src/qpid/cluster/Multicaster.cpp11
-rw-r--r--qpid/cpp/src/qpid/cluster/PollableQueue.h2
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py25
4 files changed, 27 insertions, 51 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index d049001eb0..738a9fc5c4 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -258,15 +258,14 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
// Load my store status before we go into initialization
if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
store.load();
+ if (store.getState() == STORE_STATE_DIRTY_STORE)
+ broker.setRecovery(false); // Ditch my current store.
if (store.getClusterId())
clusterId = store.getClusterId(); // Use stored ID if there is one.
QPID_LOG(notice, "Cluster store state: " << store)
}
cpg.join(name);
- // Pump the CPG dispatch manually till we get initialized.
- while (state == INIT)
- cpg.dispatchOne();
}
Cluster::~Cluster() {
@@ -277,18 +276,6 @@ void Cluster::initialize() {
if (settings.quorum) quorum.start(poller);
if (myUrl.empty())
myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT));
- // Cluster constructor will leave us in either READY or JOINER state.
- switch (state) {
- case READY:
- mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
- break;
- case JOINER:
- mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
- break;
- default:
- assert(0);
- }
- QPID_LOG(notice, *this << (state == READY ? " joined" : " joining") << " cluster " << name);
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
broker.setExpiryPolicy(expiryPolicy);
dispatcher.start();
@@ -389,23 +376,9 @@ void Cluster::deliver(
deliverEvent(e);
}
-void Cluster::deliverEvent(const Event& e) {
- // During initialization, execute events directly in the same thread.
- // Once initialized, push to pollable queue to be processed in another thread.
- if (state == INIT)
- deliveredEvent(e);
- else
- deliverEventQueue.push(e);
-}
+void Cluster::deliverEvent(const Event& e) { deliverEventQueue.push(e); }
-void Cluster::deliverFrame(const EventFrame& e) {
- // During initialization, execute events directly in the same thread.
- // Once initialized, push to pollable queue to be processed in another thread.
- if (state == INIT)
- deliveredFrame(e);
- else
- deliverFrameQueue.push(e);
-}
+void Cluster::deliverFrame(const EventFrame& e) { deliverFrameQueue.push(e); }
const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) {
return (body && body->getMethod() &&
@@ -621,13 +594,16 @@ void Cluster::initMapCompleted(Lock& l) {
broker.setRecovery(false); // Ditch my current store.
broker.setClusterUpdatee(true);
state = JOINER;
+ mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
+ QPID_LOG(notice, *this << " joining cluster " << name);
}
else { // I can go ready.
discarding = false;
setReady(l);
memberUpdate(l);
+ mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
+ QPID_LOG(notice, *this << " joined cluster " << name);
}
- QPID_LOG(debug, *this << "Initialization complete");
}
}
diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.cpp b/qpid/cpp/src/qpid/cluster/Multicaster.cpp
index 229d7edb1e..4a8195438f 100644
--- a/qpid/cpp/src/qpid/cluster/Multicaster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Multicaster.cpp
@@ -55,15 +55,8 @@ void Multicaster::mcastBuffer(const char* data, size_t size, const ConnectionId&
void Multicaster::mcast(const Event& e) {
{
sys::Mutex::ScopedLock l(lock);
- if (!ready) {
- if (e.isConnection()) holdingQueue.push_back(e);
- else {
- iovec iov = e.toIovec();
- // FIXME aconway 2009-11-23: configurable retry --cluster-retry
- if (!cpg.mcast(&iov, 1))
- throw Exception("CPG flow control error during initialization");
- QPID_LOG(trace, "MCAST (direct) " << e);
- }
+ if (!ready && e.isConnection()) {
+ holdingQueue.push_back(e);
return;
}
}
diff --git a/qpid/cpp/src/qpid/cluster/PollableQueue.h b/qpid/cpp/src/qpid/cluster/PollableQueue.h
index 65f615d8b6..2aed6de5b9 100644
--- a/qpid/cpp/src/qpid/cluster/PollableQueue.h
+++ b/qpid/cpp/src/qpid/cluster/PollableQueue.h
@@ -55,7 +55,7 @@ template <class T> class PollableQueue : public sys::PollableQueue<T> {
return i;
}
catch (const std::exception& e) {
- QPID_LOG(error, message << ": " << e.what());
+ QPID_LOG(critical, message << ": " << e.what());
this->stop();
error();
return values.end();
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 178271e977..4bfbca415a 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -193,7 +193,8 @@ class StoreTests(BrokerTest):
a.terminate()
cluster2 = self.cluster(1, args=self.args())
try:
- a = cluster2.start("a", expect=EXPECT_EXIT_FAIL)
+ a = cluster2.start("a", expect=EXPECT_EXIT_OK)
+ a.ready()
self.fail("Expected exception")
except: pass
@@ -214,8 +215,10 @@ class StoreTests(BrokerTest):
self.assertEqual(c.wait(), 0)
# Mix members from both shutdown events, they should fail
- a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False)
- b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False)
+ a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
+ b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
+ self.assertRaises(Exception, lambda: a.ready())
+ self.assertRaises(Exception, lambda: b.ready())
def test_total_failure(self):
# Verify we abort with sutiable error message if no clean stores.
@@ -224,10 +227,14 @@ class StoreTests(BrokerTest):
b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=True)
a.kill()
b.kill()
- a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False)
- b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False)
- assert a.wait() != 0
- assert b.wait() != 0
+ a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
+ b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
+ self.assertRaises(Exception, lambda: a.ready())
+ self.assertRaises(Exception, lambda: b.ready())
msg = re.compile("critical.*no clean store")
- assert msg.search(file(a.log).read())
- assert msg.search(file(b.log).read())
+ assert a.search_log(msg)
+ assert b.search_log(msg)
+ # FIXME aconway 2009-12-03: verify correct store ID in log message
+ # FIXME aconway 2009-12-03: verify manual restore procedure
+
+