diff options
author | Alan Conway <aconway@apache.org> | 2010-01-20 17:07:54 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-01-20 17:07:54 +0000 |
commit | cd3166280e53b8587d4d257b7898577b65edc0b7 (patch) | |
tree | fabdc0bf29f6c025648d84349faadb317cfa2e68 /cpp/src | |
parent | 8d124f581b0571a9edb5603e6c282a2ecc081b5b (diff) | |
download | qpid-python-cd3166280e53b8587d4d257b7898577b65edc0b7.tar.gz |
Cluster-safe assertions.
Assert that replicated data structures are modified in a cluster-safe
context - in cluster delivery thread or during update. Assertions
added to Queue.cpp and SemanticState.cpp.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@901282 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 19 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/sys/ClusterSafe.cpp | 47 | ||||
-rw-r--r-- | cpp/src/qpid/sys/ClusterSafe.h | 60 |
7 files changed, 147 insertions, 4 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index da3d27cbe9..7c00a73a47 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -433,6 +433,8 @@ libqpidcommon_la_SOURCES += \ qpid/sys/AtomicValue_gcc.h \ qpid/sys/AtomicValue_mutex.h \ qpid/sys/BlockingQueue.h \ + qpid/sys/ClusterSafe.h \ + qpid/sys/ClusterSafe.cpp \ qpid/sys/Codec.h \ qpid/sys/ConnectionCodec.h \ qpid/sys/ConnectionInputHandler.h \ diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 849bf6d1f5..9b05373144 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -92,7 +92,7 @@ Broker::Options::Options(const std::string& name) : tcpNoDelay(false), requireEncrypted(false), maxSessionRate(0), - asyncQueueEvents(true) + asyncQueueEvents(false) // Must be false in a cluster. { int c = sys::SystemInfo::concurrency(); workerThreads=c+1; diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index dcc5116afa..3eb714186c 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -33,6 +33,7 @@ #include "qpid/management/ManagementAgent.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/FieldTable.h" +#include "qpid/sys/ClusterSafe.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" #include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h" @@ -44,6 +45,7 @@ #include <boost/bind.hpp> #include <boost/intrusive_ptr.hpp> + using namespace qpid::broker; using namespace qpid::sys; using namespace qpid::framing; @@ -144,7 +146,6 @@ bool Queue::isExcluded(boost::intrusive_ptr<Message>& msg) } void Queue::deliver(boost::intrusive_ptr<Message>& msg){ - if (msg->isImmediate() && getConsumerCount() == 0) { if (alternateExchange) { DeliverableMessage deliverable(msg); @@ -165,7 +166,7 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ push(msg); } mgntEnqStats(msg); - QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]"); + QPID_LOG(debug, "Message " << msg << " enqueued on " << name); } } @@ -202,6 +203,7 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){ } void Queue::requeue(const QueuedMessage& msg){ + assertClusterSafe(); QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); @@ -222,6 +224,7 @@ void Queue::requeue(const QueuedMessage& msg){ } void Queue::clearLVQIndex(const QueuedMessage& msg){ + assertClusterSafe(); const framing::FieldTable* ft = msg.payload ? msg.payload->getApplicationHeaders() : 0; if (lastValueQueue && ft){ string key = ft->getAsString(qpidVQMatchProperty); @@ -232,6 +235,7 @@ void Queue::clearLVQIndex(const QueuedMessage& msg){ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) { Mutex::ScopedLock locker(messageLock); + assertClusterSafe(); QPID_LOG(debug, "Attempting to acquire message at " << position); Messages::iterator i = findAt(position); @@ -251,6 +255,8 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess bool Queue::acquire(const QueuedMessage& msg) { Mutex::ScopedLock locker(messageLock); + assertClusterSafe(); + QPID_LOG(debug, "attempting to acquire " << msg.position); Messages::iterator i = findAt(msg.position); if ((i != messages.end() && i->position == msg.position) && // note that in some cases payload not be set @@ -272,6 +278,7 @@ bool Queue::acquire(const QueuedMessage& msg) { void Queue::notifyListener() { + assertClusterSafe(); QueueListeners::NotificationSet set; { Mutex::ScopedLock locker(messageLock); @@ -366,6 +373,7 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c) void Queue::removeListener(Consumer::shared_ptr c) { + assertClusterSafe(); QueueListeners::NotificationSet set; { Mutex::ScopedLock locker(messageLock); @@ -440,6 +448,7 @@ QueuedMessage Queue::find(SequenceNumber pos) const { } void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ + assertClusterSafe(); Mutex::ScopedLock locker(consumerLock); if(exclusive) { throw ResourceLockedException( @@ -539,6 +548,7 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) { void Queue::popMsg(QueuedMessage& qmsg) { + assertClusterSafe(); const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders(); if (lastValueQueue && ft){ string key = ft->getAsString(qpidVQMatchProperty); @@ -549,6 +559,7 @@ void Queue::popMsg(QueuedMessage& qmsg) } void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ + assertClusterSafe(); QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index d579f15279..68c62a72ef 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -34,6 +34,7 @@ #include "qpid/framing/SequenceSet.h" #include "qpid/framing/IsInSequenceSet.h" #include "qpid/log/Statement.h" +#include "qpid/sys/ClusterSafe.h" #include "qpid/ptr_map.h" #include "qpid/broker/AclModule.h" @@ -47,7 +48,6 @@ #include <assert.h> - namespace qpid { namespace broker { @@ -308,6 +308,7 @@ OwnershipToken* SemanticState::ConsumerImpl::getSession() bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) { + assertClusterSafe(); allocateCredit(msg.payload); DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing); bool sync = syncFrequency && ++deliveryCount >= syncFrequency; @@ -331,6 +332,7 @@ bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>) bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) { + assertClusterSafe(); // FIXME aconway 2009-06-08: if we have byte & message credit but // checkCredit fails because the message is to big, we should // remain on queue's listener list for possible smaller messages @@ -354,6 +356,7 @@ ostream& operator<<(ostream& o, const ConsumerName& pc) { void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) { + assertClusterSafe(); uint32_t originalMsgCredit = msgCredit; uint32_t originalByteCredit = byteCredit; if (msgCredit != 0xFFFFFFFF) { @@ -387,6 +390,7 @@ SemanticState::ConsumerImpl::~ConsumerImpl() void SemanticState::cancel(ConsumerImpl::shared_ptr c) { + assertClusterSafe(); c->disableNotify(); if (session.isAttached()) session.getConnection().outputTasks.removeOutputTask(c.get()); @@ -468,6 +472,7 @@ void SemanticState::requestDispatch() void SemanticState::ConsumerImpl::requestDispatch() { + assertClusterSafe(); if (blocked) { parent->session.getConnection().outputTasks.addOutputTask(this); parent->session.getConnection().outputTasks.activateOutput(); @@ -565,6 +570,7 @@ void SemanticState::stop(const std::string& destination) void SemanticState::ConsumerImpl::setWindowMode() { + assertClusterSafe(); windowing = true; if (mgmtObject){ mgmtObject->set_creditMode("WINDOW"); @@ -573,6 +579,7 @@ void SemanticState::ConsumerImpl::setWindowMode() void SemanticState::ConsumerImpl::setCreditMode() { + assertClusterSafe(); windowing = false; if (mgmtObject){ mgmtObject->set_creditMode("CREDIT"); @@ -581,6 +588,7 @@ void SemanticState::ConsumerImpl::setCreditMode() void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) { + assertClusterSafe(); if (byteCredit != 0xFFFFFFFF) { if (value == 0xFFFFFFFF) byteCredit = value; else byteCredit += value; @@ -589,6 +597,7 @@ void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) { + assertClusterSafe(); if (msgCredit != 0xFFFFFFFF) { if (value == 0xFFFFFFFF) msgCredit = value; else msgCredit += value; @@ -614,6 +623,7 @@ void SemanticState::ConsumerImpl::flush() void SemanticState::ConsumerImpl::stop() { + assertClusterSafe(); msgCredit = 0; byteCredit = 0; } @@ -667,12 +677,14 @@ bool SemanticState::ConsumerImpl::doOutput() void SemanticState::ConsumerImpl::enableNotify() { Mutex::ScopedLock l(lock); + assertClusterSafe(); notifyEnabled = true; } void SemanticState::ConsumerImpl::disableNotify() { Mutex::ScopedLock l(lock); + assertClusterSafe(); notifyEnabled = false; } @@ -684,6 +696,7 @@ bool SemanticState::ConsumerImpl::isNotifyEnabled() const { void SemanticState::ConsumerImpl::notify() { Mutex::ScopedLock l(lock); + assertClusterSafe(); if (notifyEnabled) { parent->session.getConnection().outputTasks.addOutputTask(this); parent->session.getConnection().outputTasks.activateOutput(); @@ -708,6 +721,7 @@ isInSequenceSetAnd(const SequenceSet& s, Predicate p) { } void SemanticState::accepted(const SequenceSet& commands) { + assertClusterSafe(); if (txBuffer.get()) { //in transactional mode, don't dequeue or remove, just //maintain set of acknowledged messages: @@ -740,6 +754,7 @@ void SemanticState::accepted(const SequenceSet& commands) { } void SemanticState::completed(const SequenceSet& commands) { + assertClusterSafe(); DeliveryRecords::iterator removed = remove_if(unacked.begin(), unacked.end(), isInSequenceSetAnd(commands, @@ -750,6 +765,7 @@ void SemanticState::completed(const SequenceSet& commands) { void SemanticState::attached() { + assertClusterSafe(); for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { i->second->enableNotify(); session.getConnection().outputTasks.addOutputTask(i->second.get()); @@ -759,6 +775,7 @@ void SemanticState::attached() void SemanticState::detached() { + assertClusterSafe(); for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { i->second->disableNotify(); session.getConnection().outputTasks.removeOutputTask(i->second.get()); diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index cc245d2f3f..53100fa0c1 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -105,6 +105,7 @@ */ #include "qpid/Exception.h" #include "qpid/cluster/Cluster.h" +#include "qpid/sys/ClusterSafe.h" #include "qpid/cluster/ClusterSettings.h" #include "qpid/cluster/Connection.h" #include "qpid/cluster/UpdateClient.h" @@ -152,6 +153,7 @@ #include <map> #include <ostream> + namespace qpid { namespace cluster { using namespace qpid; @@ -357,6 +359,7 @@ void Cluster::leave(Lock&) { state = LEFT; QPID_LOG(notice, *this << " leaving cluster " << name); // Finalize connections now now to avoid problems later in destructor. + ClusterSafeScope css; // Don't trigger cluster-safe assertions. LEAVE_TRY(localConnections.clear()); LEAVE_TRY(connections.clear()); LEAVE_TRY(broker::SignalHandler::shutdown()); @@ -440,6 +443,7 @@ void Cluster::flagError( // Handler for deliverFrameQueue. // This thread executes the main logic. void Cluster::deliveredFrame(const EventFrame& efConst) { + sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts. Mutex::ScopedLock l(lock); if (state == LEFT) return; EventFrame e(efConst); @@ -560,6 +564,7 @@ void Cluster::setReady(Lock&) { if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); mcast.setReady(); broker.getQueueEvents().enable(); + enableClusterSafe(); // Enable cluster-safe assertions. } void Cluster::initMapCompleted(Lock& l) { @@ -650,6 +655,7 @@ void Cluster::makeOffer(const MemberId& id, Lock& ) { // callbacks will be invoked. // void Cluster::brokerShutdown() { + sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts. try { cpg.shutdown(); } catch (const std::exception& e) { QPID_LOG(error, *this << " shutting down CPG: " << e.what()); diff --git a/cpp/src/qpid/sys/ClusterSafe.cpp b/cpp/src/qpid/sys/ClusterSafe.cpp new file mode 100644 index 0000000000..498a46d865 --- /dev/null +++ b/cpp/src/qpid/sys/ClusterSafe.cpp @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ClusterSafe.h" +#include "qpid/log/Statement.h" +#include "qpid/sys/Thread.h" +#include <stdlib.h> + +namespace qpid { +namespace sys { + +namespace { +bool inCluster = false; +QPID_TSS bool inContext = false; +} + +void assertClusterSafe() { + if (inCluster && !inContext) { + QPID_LOG(critical, "Modified cluster state outside of cluster context"); + ::abort(); + } +} + +ClusterSafeScope::ClusterSafeScope() { inContext = true; } +ClusterSafeScope::~ClusterSafeScope() { inContext = false; } + +void enableClusterSafe() { inCluster = true; } + +}} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/ClusterSafe.h b/cpp/src/qpid/sys/ClusterSafe.h new file mode 100644 index 0000000000..70f07a958a --- /dev/null +++ b/cpp/src/qpid/sys/ClusterSafe.h @@ -0,0 +1,60 @@ +#ifndef QPID_SYS_CLUSTERSAFE_H +#define QPID_SYS_CLUSTERSAFE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +namespace qpid { +namespace sys { + +/** + * Assertion to add to code that modifies clustered state. + * + * In a non-clustered broker this is a no-op. + * + * In a clustered broker, checks that it is being called + * in a context where it is safe to modify clustered state. + * If not it aborts the process as this is a serious bug. + * + * This function is in the common library rather than the cluster + * library because it is called by code in the broker library. + */ +void assertClusterSafe(); + +/** + * Base class for classes that encapsulate state which is replicated + * to all members of a cluster. Acts as a marker for clustered state + * and provides functions to assist detecting bugs in cluster + * behavior. + */ +struct ClusterSafeScope { + ClusterSafeScope(); + ~ClusterSafeScope(); +}; + +/** + * Enable cluster-safe assertions. By defaul they are no-ops. + */ +void enableClusterSafe(); + +}} // namespace qpid::sys + +#endif /*!QPID_SYS_CLUSTERSAFE_H*/ |