diff options
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 40 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Multicaster.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/PollableQueue.h | 2 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 25 |
4 files changed, 27 insertions, 51 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index d049001eb0..738a9fc5c4 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -258,15 +258,14 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : // Load my store status before we go into initialization if (! broker::NullMessageStore::isNullStore(&broker.getStore())) { store.load(); + if (store.getState() == STORE_STATE_DIRTY_STORE) + broker.setRecovery(false); // Ditch my current store. if (store.getClusterId()) clusterId = store.getClusterId(); // Use stored ID if there is one. QPID_LOG(notice, "Cluster store state: " << store) } cpg.join(name); - // Pump the CPG dispatch manually till we get initialized. - while (state == INIT) - cpg.dispatchOne(); } Cluster::~Cluster() { @@ -277,18 +276,6 @@ void Cluster::initialize() { if (settings.quorum) quorum.start(poller); if (myUrl.empty()) myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT)); - // Cluster constructor will leave us in either READY or JOINER state. - switch (state) { - case READY: - mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); - break; - case JOINER: - mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); - break; - default: - assert(0); - } - QPID_LOG(notice, *this << (state == READY ? " joined" : " joining") << " cluster " << name); broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); broker.setExpiryPolicy(expiryPolicy); dispatcher.start(); @@ -389,23 +376,9 @@ void Cluster::deliver( deliverEvent(e); } -void Cluster::deliverEvent(const Event& e) { - // During initialization, execute events directly in the same thread. - // Once initialized, push to pollable queue to be processed in another thread. - if (state == INIT) - deliveredEvent(e); - else - deliverEventQueue.push(e); -} +void Cluster::deliverEvent(const Event& e) { deliverEventQueue.push(e); } -void Cluster::deliverFrame(const EventFrame& e) { - // During initialization, execute events directly in the same thread. - // Once initialized, push to pollable queue to be processed in another thread. - if (state == INIT) - deliveredFrame(e); - else - deliverFrameQueue.push(e); -} +void Cluster::deliverFrame(const EventFrame& e) { deliverFrameQueue.push(e); } const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) { return (body && body->getMethod() && @@ -621,13 +594,16 @@ void Cluster::initMapCompleted(Lock& l) { broker.setRecovery(false); // Ditch my current store. broker.setClusterUpdatee(true); state = JOINER; + mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); + QPID_LOG(notice, *this << " joining cluster " << name); } else { // I can go ready. discarding = false; setReady(l); memberUpdate(l); + mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); + QPID_LOG(notice, *this << " joined cluster " << name); } - QPID_LOG(debug, *this << "Initialization complete"); } } diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.cpp b/qpid/cpp/src/qpid/cluster/Multicaster.cpp index 229d7edb1e..4a8195438f 100644 --- a/qpid/cpp/src/qpid/cluster/Multicaster.cpp +++ b/qpid/cpp/src/qpid/cluster/Multicaster.cpp @@ -55,15 +55,8 @@ void Multicaster::mcastBuffer(const char* data, size_t size, const ConnectionId& void Multicaster::mcast(const Event& e) { { sys::Mutex::ScopedLock l(lock); - if (!ready) { - if (e.isConnection()) holdingQueue.push_back(e); - else { - iovec iov = e.toIovec(); - // FIXME aconway 2009-11-23: configurable retry --cluster-retry - if (!cpg.mcast(&iov, 1)) - throw Exception("CPG flow control error during initialization"); - QPID_LOG(trace, "MCAST (direct) " << e); - } + if (!ready && e.isConnection()) { + holdingQueue.push_back(e); return; } } diff --git a/qpid/cpp/src/qpid/cluster/PollableQueue.h b/qpid/cpp/src/qpid/cluster/PollableQueue.h index 65f615d8b6..2aed6de5b9 100644 --- a/qpid/cpp/src/qpid/cluster/PollableQueue.h +++ b/qpid/cpp/src/qpid/cluster/PollableQueue.h @@ -55,7 +55,7 @@ template <class T> class PollableQueue : public sys::PollableQueue<T> { return i; } catch (const std::exception& e) { - QPID_LOG(error, message << ": " << e.what()); + QPID_LOG(critical, message << ": " << e.what()); this->stop(); error(); return values.end(); diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 178271e977..4bfbca415a 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -193,7 +193,8 @@ class StoreTests(BrokerTest): a.terminate() cluster2 = self.cluster(1, args=self.args()) try: - a = cluster2.start("a", expect=EXPECT_EXIT_FAIL) + a = cluster2.start("a", expect=EXPECT_EXIT_OK) + a.ready() self.fail("Expected exception") except: pass @@ -214,8 +215,10 @@ class StoreTests(BrokerTest): self.assertEqual(c.wait(), 0) # Mix members from both shutdown events, they should fail - a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False) - b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False) + a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False) + b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False) + self.assertRaises(Exception, lambda: a.ready()) + self.assertRaises(Exception, lambda: b.ready()) def test_total_failure(self): # Verify we abort with sutiable error message if no clean stores. @@ -224,10 +227,14 @@ class StoreTests(BrokerTest): b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=True) a.kill() b.kill() - a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False) - b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False) - assert a.wait() != 0 - assert b.wait() != 0 + a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False) + b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False) + self.assertRaises(Exception, lambda: a.ready()) + self.assertRaises(Exception, lambda: b.ready()) msg = re.compile("critical.*no clean store") - assert msg.search(file(a.log).read()) - assert msg.search(file(b.log).read()) + assert a.search_log(msg) + assert b.search_log(msg) + # FIXME aconway 2009-12-03: verify correct store ID in log message + # FIXME aconway 2009-12-03: verify manual restore procedure + + |