summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-01-06 17:00:57 +0000
committerAlan Conway <aconway@apache.org>2010-01-06 17:00:57 +0000
commit828c153afcdc15766cf71945cdf5f98f471a08aa (patch)
treea8429d974abf5c1b75574c43d633254207caedce
parent7e3e4d5e664c52fbc7faf16cf49771d62ccd53e1 (diff)
downloadqpid-python-828c153afcdc15766cf71945cdf5f98f471a08aa.tar.gz
Don't hold up broker initialization for cluster initialization.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@896536 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp40
-rw-r--r--qpid/cpp/src/qpid/cluster/Multicaster.cpp11
-rw-r--r--qpid/cpp/src/qpid/cluster/PollableQueue.h2
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py25
-rw-r--r--qpid/python/qpid/brokertest.py30
5 files changed, 52 insertions, 56 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index d049001eb0..738a9fc5c4 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -258,15 +258,14 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
// Load my store status before we go into initialization
if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
store.load();
+ if (store.getState() == STORE_STATE_DIRTY_STORE)
+ broker.setRecovery(false); // Ditch my current store.
if (store.getClusterId())
clusterId = store.getClusterId(); // Use stored ID if there is one.
QPID_LOG(notice, "Cluster store state: " << store)
}
cpg.join(name);
- // Pump the CPG dispatch manually till we get initialized.
- while (state == INIT)
- cpg.dispatchOne();
}
Cluster::~Cluster() {
@@ -277,18 +276,6 @@ void Cluster::initialize() {
if (settings.quorum) quorum.start(poller);
if (myUrl.empty())
myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT));
- // Cluster constructor will leave us in either READY or JOINER state.
- switch (state) {
- case READY:
- mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
- break;
- case JOINER:
- mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
- break;
- default:
- assert(0);
- }
- QPID_LOG(notice, *this << (state == READY ? " joined" : " joining") << " cluster " << name);
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
broker.setExpiryPolicy(expiryPolicy);
dispatcher.start();
@@ -389,23 +376,9 @@ void Cluster::deliver(
deliverEvent(e);
}
-void Cluster::deliverEvent(const Event& e) {
- // During initialization, execute events directly in the same thread.
- // Once initialized, push to pollable queue to be processed in another thread.
- if (state == INIT)
- deliveredEvent(e);
- else
- deliverEventQueue.push(e);
-}
+void Cluster::deliverEvent(const Event& e) { deliverEventQueue.push(e); }
-void Cluster::deliverFrame(const EventFrame& e) {
- // During initialization, execute events directly in the same thread.
- // Once initialized, push to pollable queue to be processed in another thread.
- if (state == INIT)
- deliveredFrame(e);
- else
- deliverFrameQueue.push(e);
-}
+void Cluster::deliverFrame(const EventFrame& e) { deliverFrameQueue.push(e); }
const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) {
return (body && body->getMethod() &&
@@ -621,13 +594,16 @@ void Cluster::initMapCompleted(Lock& l) {
broker.setRecovery(false); // Ditch my current store.
broker.setClusterUpdatee(true);
state = JOINER;
+ mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
+ QPID_LOG(notice, *this << " joining cluster " << name);
}
else { // I can go ready.
discarding = false;
setReady(l);
memberUpdate(l);
+ mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
+ QPID_LOG(notice, *this << " joined cluster " << name);
}
- QPID_LOG(debug, *this << "Initialization complete");
}
}
diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.cpp b/qpid/cpp/src/qpid/cluster/Multicaster.cpp
index 229d7edb1e..4a8195438f 100644
--- a/qpid/cpp/src/qpid/cluster/Multicaster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Multicaster.cpp
@@ -55,15 +55,8 @@ void Multicaster::mcastBuffer(const char* data, size_t size, const ConnectionId&
void Multicaster::mcast(const Event& e) {
{
sys::Mutex::ScopedLock l(lock);
- if (!ready) {
- if (e.isConnection()) holdingQueue.push_back(e);
- else {
- iovec iov = e.toIovec();
- // FIXME aconway 2009-11-23: configurable retry --cluster-retry
- if (!cpg.mcast(&iov, 1))
- throw Exception("CPG flow control error during initialization");
- QPID_LOG(trace, "MCAST (direct) " << e);
- }
+ if (!ready && e.isConnection()) {
+ holdingQueue.push_back(e);
return;
}
}
diff --git a/qpid/cpp/src/qpid/cluster/PollableQueue.h b/qpid/cpp/src/qpid/cluster/PollableQueue.h
index 65f615d8b6..2aed6de5b9 100644
--- a/qpid/cpp/src/qpid/cluster/PollableQueue.h
+++ b/qpid/cpp/src/qpid/cluster/PollableQueue.h
@@ -55,7 +55,7 @@ template <class T> class PollableQueue : public sys::PollableQueue<T> {
return i;
}
catch (const std::exception& e) {
- QPID_LOG(error, message << ": " << e.what());
+ QPID_LOG(critical, message << ": " << e.what());
this->stop();
error();
return values.end();
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 178271e977..4bfbca415a 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -193,7 +193,8 @@ class StoreTests(BrokerTest):
a.terminate()
cluster2 = self.cluster(1, args=self.args())
try:
- a = cluster2.start("a", expect=EXPECT_EXIT_FAIL)
+ a = cluster2.start("a", expect=EXPECT_EXIT_OK)
+ a.ready()
self.fail("Expected exception")
except: pass
@@ -214,8 +215,10 @@ class StoreTests(BrokerTest):
self.assertEqual(c.wait(), 0)
# Mix members from both shutdown events, they should fail
- a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False)
- b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False)
+ a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
+ b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
+ self.assertRaises(Exception, lambda: a.ready())
+ self.assertRaises(Exception, lambda: b.ready())
def test_total_failure(self):
# Verify we abort with sutiable error message if no clean stores.
@@ -224,10 +227,14 @@ class StoreTests(BrokerTest):
b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=True)
a.kill()
b.kill()
- a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False)
- b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False)
- assert a.wait() != 0
- assert b.wait() != 0
+ a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
+ b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
+ self.assertRaises(Exception, lambda: a.ready())
+ self.assertRaises(Exception, lambda: b.ready())
msg = re.compile("critical.*no clean store")
- assert msg.search(file(a.log).read())
- assert msg.search(file(b.log).read())
+ assert a.search_log(msg)
+ assert b.search_log(msg)
+ # FIXME aconway 2009-12-03: verify correct store ID in log message
+ # FIXME aconway 2009-12-03: verify manual restore procedure
+
+
diff --git a/qpid/python/qpid/brokertest.py b/qpid/python/qpid/brokertest.py
index 83d6c44d84..f09d1ec15c 100644
--- a/qpid/python/qpid/brokertest.py
+++ b/qpid/python/qpid/brokertest.py
@@ -20,7 +20,7 @@
# Support library for tests that start multiple brokers, e.g. cluster
# or federation
-import os, signal, string, tempfile, popen2, socket, threading, time, imp
+import os, signal, string, tempfile, popen2, socket, threading, time, imp, re
import qpid, traceback
from qpid import connection, messaging, util
from qpid.compat import format_exc
@@ -163,6 +163,13 @@ class Broker(Popen):
"A broker process. Takes care of start, stop and logging."
_broker_count = 0
+ def find_log(self):
+ self.log = "%s.log" % self.name
+ i = 1
+ while (os.path.exists(self.log)):
+ self.log = "%s-%d.log" % (self.name, i)
+ i += 1
+
def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING):
"""Start a broker daemon. name determines the data-dir and log
file names."""
@@ -174,7 +181,7 @@ class Broker(Popen):
else:
self.name = "broker%d" % Broker._broker_count
Broker._broker_count += 1
- self.log = self.name+".log"
+ self.find_log()
cmd += ["--log-to-file", self.log, "--log-prefix", self.name]
cmd += ["--log-to-stderr=no"]
self.datadir = self.name
@@ -182,7 +189,7 @@ class Broker(Popen):
Popen.__init__(self, cmd, expect)
test.cleanup_stop(self)
self.host = "localhost"
- log.debug("Started broker %s (%s)" % (self.name, self.pname))
+ log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
def port(self):
# Read port from broker process stdout if not already read.
@@ -240,8 +247,21 @@ class Broker(Popen):
return m
def host_port(self): return "%s:%s" % (self.host, self.port())
+
+ def search_log(self, regex):
+ """Search for regular expression in broker log, return match"""
+ return regex.search(file(self.log).read())
+
+ def get_member_id(self):
+ """Search log file for cluster member ID"""
+ match = self.search_log(re.compile(r"cluster\(([0-9.:]*) INIT\)"))
+ if not match: raise Exception("No cluster member-id found in "+log)
+ return match.group(1)
-
+ def ready(self):
+ """Wait till broker is ready to serve clients"""
+ self.connect().close()
+
class Cluster:
"""A cluster of brokers in a test."""
@@ -333,7 +353,7 @@ class BrokerTest(TestCase):
def wait():
"""Wait for all brokers in the cluster to be ready"""
for b in _brokers: b.connect().close()
-
+
class RethrownException(Exception):
"""Captures the original stack trace to be thrown later"""
def __init__(self, e, msg=""):