diff options
author | Alan Conway <aconway@apache.org> | 2011-03-07 21:01:49 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-03-07 21:01:49 +0000 |
commit | a53705eb2513e91efdbece8133fe052c261c52d7 (patch) | |
tree | bb51d22f2d9234a4926b452ca3bc4d464e527f0c | |
parent | 92da7592b082b3fc9e847b80d6db8538fb29be29 (diff) | |
download | qpid-python-a53705eb2513e91efdbece8133fe052c261c52d7.tar.gz |
QPID-3121: Cluster management inconsistency when using persistent store.
With the store doing async completions, completion IO callbacks could
be queued differently on different nodes. This led to inconsistent
management changes in a cluster when a connection was modified in an
IO callback.
Fix was to mark IO callback processing as not cluster safe, so
connections don't record management stats during an IO callback.
Test changes:
- enable durable tests in test_management.
- add substitutions to mask known issue of inconsistent "stats changed" messages.
- add transactional client to test_management.
- ignore heartbeat connection close logs in cluster_test_logs.py
- make brokertest.retry more accurate
- fix minor bug in brokertest.log_ready.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1078947 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 59 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/ClusterSafe.cpp | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/ClusterSafe.h | 18 | ||||
-rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 46 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster_test_logs.py | 13 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 4 |
7 files changed, 92 insertions, 61 deletions
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index 9648ffd687..c07e63e68c 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -340,6 +340,9 @@ void Connection::closed(){ // Physically closed, suspend open sessions. void Connection::doIoCallbacks() { { ScopedLock<Mutex> l(ioCallbackLock); + // Although IO callbacks execute in the connection thread context, they are + // not cluster safe because they are queued for execution in non-IO threads. + ClusterUnsafeScope cus; while (!ioCallbacks.empty()) { boost::function0<void> cb = ioCallbacks.front(); ioCallbacks.pop(); diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index a1f206e25d..ba1f989f7c 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -88,7 +88,7 @@ void SemanticState::closed() { //prevent requeued messages being redelivered to consumers for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { disable(i->second); - } + } if (dtxBuffer.get()) { dtxBuffer->fail(); } @@ -107,7 +107,7 @@ bool SemanticState::exists(const string& consumerTag){ return consumers.find(consumerTag) != consumers.end(); } -void SemanticState::consume(const string& tag, +void SemanticState::consume(const string& tag, Queue::shared_ptr queue, bool ackRequired, bool acquire, bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments) { @@ -197,7 +197,7 @@ void SemanticState::endDtx(const std::string& xid, bool fail) dtxBuffer->fail(); } else { dtxBuffer->markEnded(); - } + } dtxBuffer.reset(); } @@ -257,9 +257,9 @@ void SemanticState::record(const DeliveryRecord& delivery) const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency"); -SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, - const string& _name, - Queue::shared_ptr _queue, +SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, + const string& _name, + Queue::shared_ptr _queue, bool ack, bool _acquire, bool _exclusive, @@ -268,20 +268,20 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, const framing::FieldTable& _arguments -) : +) : Consumer(_acquire), - parent(_parent), - name(_name), - queue(_queue), - ackExpected(ack), + parent(_parent), + name(_name), + queue(_queue), + ackExpected(ack), acquire(_acquire), - blocked(true), + blocked(true), windowing(true), exclusive(_exclusive), resumeId(_resumeId), resumeTtl(_resumeTtl), arguments(_arguments), - msgCredit(0), + msgCredit(0), byteCredit(0), notifyEnabled(true), syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)), @@ -292,7 +292,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, { ManagementAgent* agent = parent->session.getBroker().getManagementAgent(); qpid::management::Manageable* ms = dynamic_cast<qpid::management::Manageable*> (&(parent->session)); - + if (agent != 0) { mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name, @@ -334,7 +334,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) if (!ackExpected && acquire) record.setEnded();//allows message to be released now its been delivered if (windowing || ackExpected || !acquire) { parent->record(record); - } + } if (acquire && !ackExpected) { queue->dequeue(0, msg); } @@ -354,7 +354,7 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) // checkCredit fails because the message is to big, we should // remain on queue's listener list for possible smaller messages // in future. - // + // blocked = !(filter(msg) && checkCredit(msg)); return !blocked; } @@ -375,7 +375,7 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) { assertClusterSafe(); uint32_t originalMsgCredit = msgCredit; - uint32_t originalByteCredit = byteCredit; + uint32_t originalByteCredit = byteCredit; if (msgCredit != 0xFFFFFFFF) { msgCredit--; } @@ -385,7 +385,7 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this) << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit << " now bytes: " << byteCredit << " msgs: " << msgCredit); - + } bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg) @@ -399,7 +399,7 @@ bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg) return enoughCredit; } -SemanticState::ConsumerImpl::~ConsumerImpl() +SemanticState::ConsumerImpl::~ConsumerImpl() { if (mgmtObject != 0) mgmtObject->resourceDestroy (); @@ -417,7 +417,7 @@ void SemanticState::unsubscribe(ConsumerImpl::shared_ptr c) Queue::shared_ptr queue = c->getQueue(); if(queue) { queue->cancel(c); - if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { + if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { Queue::tryAutoDelete(session.getBroker(), queue); } } @@ -460,7 +460,7 @@ const std::string nullstring; void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { msg->setTimestamp(getSession().getBroker().getExpiryPolicy()); - + std::string exchangeName = msg->getExchangeName(); if (!cacheExchange || cacheExchange->getName() != exchangeName) cacheExchange = session.getBroker().getExchanges().get(exchangeName); @@ -469,7 +469,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { /* verify the userid if specified: */ std::string id = msg->hasProperties<MessageProperties>() ? msg->getProperties<MessageProperties>()->getUserId() : nullstring; - + if (authMsg && !id.empty() && !(id == userID || (isDefaultRealm && id == userName))) { QPID_LOG(debug, "authorised user id : " << userID << " but user id in message declared as " << id); @@ -487,7 +487,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { if (!strategy.delivered) { //TODO:if discard-unroutable, just drop it - //TODO:else if accept-mode is explicit, reject it + //TODO:else if accept-mode is explicit, reject it //else route it to alternate exchange if (cacheExchange->getAlternate()) { cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); @@ -516,7 +516,7 @@ void SemanticState::ConsumerImpl::requestDispatch() } bool SemanticState::complete(DeliveryRecord& delivery) -{ +{ ConsumerImplMap::iterator i = consumers.find(delivery.getTag()); if (i != consumers.end()) { i->second->complete(delivery); @@ -544,7 +544,7 @@ void SemanticState::recover(bool requeue) unacked.clear(); for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue)); }else{ - for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this)); + for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this)); //unconfirmed messages re redelivered and therefore have their //id adjusted, confirmed messages are not and so the ordering //w.r.t id is lost @@ -676,7 +676,7 @@ Queue::shared_ptr SemanticState::getQueue(const string& name) const { } AckRange SemanticState::findRange(DeliveryId first, DeliveryId last) -{ +{ return DeliveryRecord::findRange(unacked, first, last); } @@ -767,13 +767,13 @@ void SemanticState::accepted(const SequenceSet& commands) { //in transactional mode, don't dequeue or remove, just //maintain set of acknowledged messages: accumulatedAck.add(commands); - + if (dtxBuffer.get()) { //if enlisted in a dtx, copy the relevant slice from //unacked and record it against that transaction TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); accumulatedAck.clear(); - dtxBuffer->enlist(txAck); + dtxBuffer->enlist(txAck); //mark the relevant messages as 'ended' in unacked //if the messages are already completed, they can be @@ -795,7 +795,6 @@ void SemanticState::accepted(const SequenceSet& commands) { } void SemanticState::completed(const SequenceSet& commands) { - assertClusterSafe(); DeliveryRecords::iterator removed = remove_if(unacked.begin(), unacked.end(), isInSequenceSetAnd(commands, @@ -806,7 +805,6 @@ void SemanticState::completed(const SequenceSet& commands) { void SemanticState::attached() { - assertClusterSafe(); for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { i->second->enableNotify(); session.getConnection().outputTasks.addOutputTask(i->second.get()); @@ -816,7 +814,6 @@ void SemanticState::attached() void SemanticState::detached() { - assertClusterSafe(); for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { i->second->disableNotify(); session.getConnection().outputTasks.removeOutputTask(i->second.get()); diff --git a/qpid/cpp/src/qpid/sys/ClusterSafe.cpp b/qpid/cpp/src/qpid/sys/ClusterSafe.cpp index b67b04c267..dd37615145 100644 --- a/qpid/cpp/src/qpid/sys/ClusterSafe.cpp +++ b/qpid/cpp/src/qpid/sys/ClusterSafe.cpp @@ -51,6 +51,16 @@ ClusterSafeScope::~ClusterSafeScope() { inContext = save; } +ClusterUnsafeScope::ClusterUnsafeScope() { + save = inContext; + inContext = false; +} + +ClusterUnsafeScope::~ClusterUnsafeScope() { + assert(!inContext); + inContext = save; +} + void enableClusterSafe() { inCluster = true; } }} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/ClusterSafe.h b/qpid/cpp/src/qpid/sys/ClusterSafe.h index 42e290f4c8..fd3ec32ae6 100644 --- a/qpid/cpp/src/qpid/sys/ClusterSafe.h +++ b/qpid/cpp/src/qpid/sys/ClusterSafe.h @@ -53,10 +53,8 @@ QPID_COMMON_EXTERN void assertClusterSafe(); QPID_COMMON_EXTERN bool isClusterSafe(); /** - * Base class for classes that encapsulate state which is replicated - * to all members of a cluster. Acts as a marker for clustered state - * and provides functions to assist detecting bugs in cluster - * behavior. + * Mark a scope as cluster safe. Sets isClusterSafe in constructor and resets + * to previous value in destructor. */ class ClusterSafeScope { public: @@ -67,6 +65,18 @@ class ClusterSafeScope { }; /** + * Mark a scope as cluster unsafe. Clears isClusterSafe in constructor and resets + * to previous value in destructor. + */ +class ClusterUnsafeScope { + public: + ClusterUnsafeScope(); + ~ClusterUnsafeScope(); + private: + bool save; +}; + +/** * Enable cluster-safe assertions. By default they are no-ops. * Called by cluster code. */ diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 6e771bf5d6..19e97ce7aa 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -67,7 +67,7 @@ class ExceptionWrapper: def __init__(self, obj, msg): self.obj = obj self.msg = msg - + def __getattr__(self, name): func = getattr(self.obj, name) if type(func) != callable: @@ -97,11 +97,12 @@ def retry(function, timeout=10, delay=.01): """Call function until it returns True or timeout expires. Double the delay for each retry. Return True if function returns true, False if timeout expires.""" + deadline = time.time() + timeout while not function(): - if delay > timeout: delay = timeout + remaining = deadline - time.time() + if remaining <= 0: return False + delay = min(delay, remaining) time.sleep(delay) - timeout -= delay - if timeout <= 0: return False delay *= 2 return True @@ -191,7 +192,7 @@ class Popen(subprocess.Popen): def unexpected(self,msg): err = error_line(self.outfile("err")) or error_line(self.outfile("out")) raise BadProcessStatus("%s %s%s" % (self.pname, msg, err)) - + def stop(self): # Clean up at end of test. try: if self.expect == EXPECT_UNKNOWN: @@ -213,7 +214,7 @@ class Popen(subprocess.Popen): self.unexpected("expected error") finally: self.wait() # Clean up the process. - + def communicate(self, input=None): if input: self.stdin.write(input) @@ -231,7 +232,7 @@ class Popen(subprocess.Popen): def poll(self, _deadstate=None): # _deadstate required by base class in python 2.4 if self.returncode is None: # Pass _deadstate only if it has been set, there is no _deadstate - # parameter in Python 2.6 + # parameter in Python 2.6 if _deadstate is None: ret = subprocess.Popen.poll(self) else: ret = subprocess.Popen.poll(self, _deadstate) @@ -255,7 +256,7 @@ class Popen(subprocess.Popen): os.kill( self.pid , signal.SIGTERM) except AttributeError: # no os.kill, using taskkill.. (Windows only) os.popen('TASKKILL /PID ' +str(self.pid) + ' /F') - + def kill(self): try: subprocess.Popen.kill(self) except AttributeError: # No terminate method @@ -289,7 +290,7 @@ class Broker(Popen): while (os.path.exists(self.log)): self.log = "%s-%d.log" % (self.name, i) i += 1 - + def get_log(self): return os.path.abspath(self.log) @@ -319,7 +320,7 @@ class Broker(Popen): cmd += ["--log-to-file", self.log] cmd += ["--log-to-stderr=no"] if log_level != None: - cmd += ["--log-enable=%s" % log_level] + cmd += ["--log-enable=%s" % log_level] self.datadir = self.name cmd += ["--data-dir", self.datadir] Popen.__init__(self, cmd, expect, drain=False) @@ -362,7 +363,7 @@ class Broker(Popen): s = c.session(str(qpid.datatypes.uuid4())) s.queue_declare(queue=queue) c.close() - + def _prep_sender(self, queue, durable, xprops): s = queue + "; {create:always, node:{durable:" + str(durable) if xprops != None: s += ", x-declare:{" + xprops + "}" @@ -406,13 +407,14 @@ class Broker(Popen): def log_ready(self): """Return true if the log file exists and contains a broker ready message""" - if self._log_ready: return True - self._log_ready = find_in_file("notice Broker running", self.log) + if not self._log_ready: + self._log_ready = find_in_file("notice Broker running", self.log) + return self._log_ready def ready(self, **kwargs): """Wait till broker is ready to serve clients""" # First make sure the broker is listening by checking the log. - if not retry(self.log_ready, timeout=30): + if not retry(self.log_ready, timeout=60): raise Exception( "Timed out waiting for broker %s%s"%(self.name, error_line(self.log,5))) # Create a connection and a session. For a cluster broker this will @@ -421,8 +423,8 @@ class Broker(Popen): c = self.connect(**kwargs) try: c.session() finally: c.close() - except: raise RethrownException( - "Broker %s failed ready test%s"%(self.name,error_line(self.log, 5))) + except Exception,e: raise RethrownException( + "Broker %s not responding: (%s)%s"%(self.name,e,error_line(self.log, 5))) def store_state(self): uuids = open(os.path.join(self.datadir, "cluster", "store.status")).readlines() @@ -431,7 +433,7 @@ class Broker(Popen): if uuids[0] == null_uuid: return "empty" if uuids[1] == null_uuid: return "dirty" return "clean" - + class Cluster: """A cluster of brokers in a test.""" @@ -486,7 +488,7 @@ class BrokerTest(TestCase): rootdir = os.getcwd() def configure(self, config): self.config=config - + def setUp(self): outdir = self.config.defines.get("OUTDIR") or "brokertest.tmp" self.dir = os.path.join(self.rootdir, outdir, self.id()) @@ -561,7 +563,7 @@ class StoppableThread(Thread): self.stopped = True self.join() if self.error: raise self.error - + class NumberedSender(Thread): """ Thread to run a sender client and send numbered messages until stopped. @@ -620,7 +622,7 @@ class NumberedSender(Thread): self.join() self.write_message(-1) # end-of-messages marker. if self.error: raise self.error - + class NumberedReceiver(Thread): """ Thread to run a receiver client and verify it receives @@ -647,7 +649,7 @@ class NumberedReceiver(Thread): def read_message(self): return int(self.receiver.stdout.readline()) - + def run(self): try: self.received = 0 @@ -679,7 +681,7 @@ class ErrorGenerator(StoppableThread): self.broker=broker broker.test.cleanup_stop(self) self.start() - + def run(self): c = self.broker.connect_old() try: diff --git a/qpid/cpp/src/tests/cluster_test_logs.py b/qpid/cpp/src/tests/cluster_test_logs.py index 25ddb3b74c..9f7d1e2f6c 100755 --- a/qpid/cpp/src/tests/cluster_test_logs.py +++ b/qpid/cpp/src/tests/cluster_test_logs.py @@ -61,10 +61,10 @@ def filter_log(log): 'warning CLOSING .* unsent data', 'Inter-broker link ', 'Running in a cluster, marking store', - 'debug Sending keepalive signal to watchdog', - 'last broker standing joined by 1 replicas, updating queue policies.' + 'debug Sending keepalive signal to watchdog', # Watchdog timer thread + 'last broker standing joined by 1 replicas, updating queue policies.', + 'Connection .* timed out: closing' # heartbeat connection close ]) - skip_re = re.compile(skip) # Regex to match a UUID uuid='\w\w\w\w\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w\w\w\w\w\w\w\w\w' # Substitutions to remove expected differences @@ -82,6 +82,13 @@ def filter_log(log): (r' map={.*_object_name:([^,}]*)[,}].*', r' \1'), # V2 map - just keep name (r'\d+-\d+-\d+--\d+', 'X-X-X--X'), # V1 Object IDs ] + # Substitutions to mask known issue: durable test shows inconsistent "changed stats for com.redhat.rhm.store:journal" messages. + skip += '|Changed V[12] statistics com.redhat.rhm.store:journal' + subs += [(r'to=console.obj.1.0.com.redhat.rhm.store.journal props=\d+ stats=\d+', + 'to=console.obj.1.0.com.redhat.rhm.store.journal props=NN stats=NN')] + + skip_re = re.compile(skip) + subs = [(re.compile(pattern), subst) for pattern, subst in subs] for l in open(log): if skip_re.search(l): continue for pattern,subst in subs: l = re.sub(pattern,subst,l) diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index b8407fbde8..3f19411d19 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -574,8 +574,10 @@ class LongTests(BrokerTest): """Start ordinary clients for a broker.""" cmds=[ ["qpid-tool", "localhost:%s"%(broker.port())], - ["qpid-perftest", "--count", 50000, + ["qpid-perftest", "--count=5000", "--durable=yes", "--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()], + ["qpid-txtest", "--queue-base-name", "tx-%s"%str(qpid.datatypes.uuid4()), + "--port", broker.port()], ["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())], ["testagent", "localhost", str(broker.port())] ] clients.append([ClientLoop(broker, cmd) for cmd in cmds]) |