summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2012-09-13 21:36:27 +0000
committerAndrew Stitcher <astitcher@apache.org>2012-09-13 21:36:27 +0000
commit624203416ec8cc4ca51879f25bf237519c27564d (patch)
tree545aec751a0ab6c6d6f00f4b420e50647ac895f9 /cpp/src
parent48ad8d0334be1f009400d8e30dda0ef17d322830 (diff)
downloadqpid-python-624203416ec8cc4ca51879f25bf237519c27564d.tar.gz
NO-JIRA: Removed now unused cluster specific ClusterSafe code.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1384555 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/CMakeLists.txt1
-rw-r--r--cpp/src/Makefile.am2
-rw-r--r--cpp/src/qpid/broker/Connection.cpp8
-rw-r--r--cpp/src/qpid/broker/Lvq.cpp2
-rw-r--r--cpp/src/qpid/broker/Queue.cpp6
-rw-r--r--cpp/src/qpid/broker/QueueFlowLimit.cpp1
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp13
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp6
-rw-r--r--cpp/src/qpid/sys/ClusterSafe.cpp66
-rw-r--r--cpp/src/qpid/sys/ClusterSafe.h87
10 files changed, 3 insertions, 189 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index 6d32a79e63..21e82a97b9 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -936,7 +936,6 @@ set (qpidcommon_SOURCES
qpid/management/ManagementObject.cpp
qpid/sys/AggregateOutput.cpp
qpid/sys/AsynchIOHandler.cpp
- qpid/sys/ClusterSafe.cpp
qpid/sys/Dispatcher.cpp
qpid/sys/DispatchHandle.cpp
qpid/sys/Runnable.cpp
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 350351ab9c..c39dd8114c 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -462,8 +462,6 @@ libqpidcommon_la_SOURCES += \
qpid/sys/AtomicValue_gcc.h \
qpid/sys/AtomicValue_mutex.h \
qpid/sys/BlockingQueue.h \
- qpid/sys/ClusterSafe.h \
- qpid/sys/ClusterSafe.cpp \
qpid/sys/Codec.h \
qpid/sys/ConnectionCodec.h \
qpid/sys/ConnectionInputHandler.h \
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 5818759cbe..08a9c756d0 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -26,7 +26,6 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/sys/SecuritySettings.h"
-#include "qpid/sys/ClusterSafe.h"
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
@@ -140,7 +139,7 @@ Connection::~Connection()
if (mgmtObject != 0) {
// In a cluster, Connections destroyed during shutdown are in
// a cluster-unsafe context. Don't raise an event in that case.
- if (!link && isClusterSafe())
+ if (!link)
agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, ConnectionState::getUserId(), mgmtObject->get_remoteProperties()));
QPID_LOG_CAT(debug, model, "Delete connection. user:" << ConnectionState::getUserId()
<< " rhost:" << mgmtId );
@@ -188,7 +187,7 @@ bool isMessage(const AMQMethodBody* method)
void Connection::recordFromServer(const framing::AMQFrame& frame)
{
// Don't record management stats in cluster-unsafe contexts
- if (mgmtObject != 0 && isClusterSafe())
+ if (mgmtObject != 0)
{
qmf::org::apache::qpid::broker::Connection::PerThreadStats *cStats = mgmtObject->getStatistics();
cStats->framesToClient += 1;
@@ -203,7 +202,7 @@ void Connection::recordFromServer(const framing::AMQFrame& frame)
void Connection::recordFromClient(const framing::AMQFrame& frame)
{
// Don't record management stats in cluster-unsafe contexts
- if (mgmtObject != 0 && isClusterSafe())
+ if (mgmtObject != 0)
{
qmf::org::apache::qpid::broker::Connection::PerThreadStats *cStats = mgmtObject->getStatistics();
cStats->framesFromClient += 1;
@@ -358,7 +357,6 @@ void Connection::doIoCallbacks() {
ScopedLock<Mutex> l(ioCallbackLock);
// Although IO callbacks execute in the connection thread context, they are
// not cluster safe because they are queued for execution in non-IO threads.
- ClusterUnsafeScope cus;
while (!ioCallbacks.empty()) {
boost::function0<void> cb = ioCallbacks.front();
ioCallbacks.pop();
diff --git a/cpp/src/qpid/broker/Lvq.cpp b/cpp/src/qpid/broker/Lvq.cpp
index d053616c8a..d71c64d2fb 100644
--- a/cpp/src/qpid/broker/Lvq.cpp
+++ b/cpp/src/qpid/broker/Lvq.cpp
@@ -20,7 +20,6 @@
*/
#include "Lvq.h"
#include "MessageMap.h"
-#include "qpid/sys/ClusterSafe.h"
#include "qpid/sys/Monitor.h"
namespace qpid {
@@ -33,7 +32,6 @@ Lvq::Lvq(const std::string& n, std::auto_ptr<MessageMap> m, const QueueSettings&
void Lvq::push(Message& message, bool isRecovery)
{
- qpid::sys::assertClusterSafe();
QueueListeners::NotificationSet copy;
Message old;
bool removed;
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index b7096b5ea0..8a3847d68c 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -43,7 +43,6 @@
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
-#include "qpid/sys/ClusterSafe.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
#include "qpid/types/Variant.h"
@@ -306,7 +305,6 @@ void Queue::process(Message& msg)
void Queue::release(const QueueCursor& position, bool markRedelivered)
{
- assertClusterSafe();
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
@@ -332,7 +330,6 @@ bool Queue::dequeueMessageAt(const SequenceNumber& position)
boost::intrusive_ptr<PersistableMessage> pmsg;
{
Mutex::ScopedLock locker(messageLock);
- assertClusterSafe();
QPID_LOG(debug, "Attempting to dequeue message at " << position);
QueueCursor cursor;
Message* msg = messages->find(position, &cursor);
@@ -352,7 +349,6 @@ bool Queue::dequeueMessageAt(const SequenceNumber& position)
bool Queue::acquire(const QueueCursor& position, const std::string& consumer)
{
Mutex::ScopedLock locker(messageLock);
- assertClusterSafe();
Message* msg;
msg = messages->find(position);
@@ -479,7 +475,6 @@ bool Queue::find(SequenceNumber pos, Message& msg) const
void Queue::consume(Consumer::shared_ptr c, bool requestExclusive)
{
- assertClusterSafe();
{
Mutex::ScopedLock locker(messageLock);
// NOTE: consumerCount is actually a count of all
@@ -737,7 +732,6 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty,
void Queue::push(Message& message, bool /*isRecovery*/)
{
- assertClusterSafe();
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
diff --git a/cpp/src/qpid/broker/QueueFlowLimit.cpp b/cpp/src/qpid/broker/QueueFlowLimit.cpp
index 11b9cbae63..c52cdee6a4 100644
--- a/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -29,7 +29,6 @@
#include "qpid/log/Statement.h"
#include "qpid/sys/Mutex.h"
#include "qpid/broker/SessionState.h"
-#include "qpid/sys/ClusterSafe.h"
#include "qmf/org/apache/qpid/broker/Queue.h"
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 5d96467bbf..65530394a3 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -35,7 +35,6 @@
#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/IsInSequenceSet.h"
#include "qpid/log/Statement.h"
-#include "qpid/sys/ClusterSafe.h"
#include "qpid/ptr_map.h"
#include "qpid/broker/AclModule.h"
@@ -345,7 +344,6 @@ bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Messa
}
bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg, boost::shared_ptr<Consumer> consumer)
{
- assertClusterSafe();
allocateCredit(msg);
DeliveryRecord record(cursor, msg.getSequence(), queue, getTag(),
consumer, acquire, !ackExpected, credit.isWindowMode(), amqp_0_10::MessageTransfer::getRequiredCredit(msg));
@@ -376,7 +374,6 @@ bool SemanticState::ConsumerImpl::filter(const Message&)
bool SemanticState::ConsumerImpl::accept(const Message& msg)
{
- assertClusterSafe();
// TODO aconway 2009-06-08: if we have byte & message credit but
// checkCredit fails because the message is to big, we should
// remain on queue's listener list for possible smaller messages
@@ -400,7 +397,6 @@ ostream& operator<<(ostream& o, const ConsumerName& pc) {
void SemanticState::ConsumerImpl::allocateCredit(const Message& msg)
{
- assertClusterSafe();
Credit original = credit;
credit.consume(1, qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg));
QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this)
@@ -492,7 +488,6 @@ void SemanticState::requestDispatch()
void SemanticState::ConsumerImpl::requestDispatch()
{
- assertClusterSafe();
if (blocked) {
parent->session.getConnection().outputTasks.addOutputTask(this);
parent->session.getConnection().outputTasks.activateOutput();
@@ -593,7 +588,6 @@ void SemanticState::stop(const std::string& destination)
void SemanticState::ConsumerImpl::setWindowMode()
{
- assertClusterSafe();
credit.setWindowMode(true);
if (mgmtObject){
mgmtObject->set_creditMode("WINDOW");
@@ -602,7 +596,6 @@ void SemanticState::ConsumerImpl::setWindowMode()
void SemanticState::ConsumerImpl::setCreditMode()
{
- assertClusterSafe();
credit.setWindowMode(false);
if (mgmtObject){
mgmtObject->set_creditMode("CREDIT");
@@ -611,13 +604,11 @@ void SemanticState::ConsumerImpl::setCreditMode()
void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
{
- assertClusterSafe();
credit.addByteCredit(value);
}
void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
{
- assertClusterSafe();
credit.addMessageCredit(value);
}
@@ -645,7 +636,6 @@ void SemanticState::ConsumerImpl::flush()
void SemanticState::ConsumerImpl::stop()
{
- assertClusterSafe();
credit.cancel();
}
@@ -711,7 +701,6 @@ bool SemanticState::ConsumerImpl::doOutput()
void SemanticState::ConsumerImpl::enableNotify()
{
Mutex::ScopedLock l(lock);
- assertClusterSafe();
notifyEnabled = true;
}
@@ -729,7 +718,6 @@ bool SemanticState::ConsumerImpl::isNotifyEnabled() const {
void SemanticState::ConsumerImpl::notify()
{
Mutex::ScopedLock l(lock);
- assertClusterSafe();
if (notifyEnabled) {
parent->session.getConnection().outputTasks.addOutputTask(this);
parent->session.getConnection().outputTasks.activateOutput();
@@ -754,7 +742,6 @@ isInSequenceSetAnd(const SequenceSet& s, Predicate p) {
}
void SemanticState::accepted(const SequenceSet& commands) {
- assertClusterSafe();
if (txBuffer.get()) {
//in transactional mode, don't dequeue or remove, just
//maintain set of acknowledged messages:
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 88cdf7e03a..fe357eb949 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -25,7 +25,6 @@
#include "qpid/broker/DeliveryRecord.h"
#include "qpid/broker/SessionManager.h"
#include "qpid/broker/SessionHandler.h"
-#include "qpid/sys/ClusterSafe.h"
#include "qpid/framing/AMQContentBody.h"
#include "qpid/framing/AMQHeaderBody.h"
#include "qpid/framing/AMQMethodBody.h"
@@ -251,11 +250,6 @@ void SessionState::completeRcvMsg(SequenceNumber id,
bool requiresAccept,
bool requiresSync)
{
- // Mark this as a cluster-unsafe scope since it can be called in
- // journal threads or connection threads as part of asynchronous
- // command completion.
- sys::ClusterUnsafeScope cus;
-
bool callSendCompletion = false;
receiverCompleted(id);
if (requiresAccept)
diff --git a/cpp/src/qpid/sys/ClusterSafe.cpp b/cpp/src/qpid/sys/ClusterSafe.cpp
deleted file mode 100644
index dd37615145..0000000000
--- a/cpp/src/qpid/sys/ClusterSafe.cpp
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "ClusterSafe.h"
-#include "qpid/log/Statement.h"
-#include "qpid/sys/Thread.h"
-#include <stdlib.h>
-
-namespace qpid {
-namespace sys {
-
-namespace {
-bool inCluster = false;
-QPID_TSS bool inContext = false;
-}
-
-bool isClusterSafe() { return !inCluster || inContext; }
-
-void assertClusterSafe() {
- if (!isClusterSafe()) {
- QPID_LOG(critical, "Modified cluster state outside of cluster context");
- ::abort();
- }
-}
-
-ClusterSafeScope::ClusterSafeScope() {
- save = inContext;
- inContext = true;
-}
-
-ClusterSafeScope::~ClusterSafeScope() {
- assert(inContext);
- inContext = save;
-}
-
-ClusterUnsafeScope::ClusterUnsafeScope() {
- save = inContext;
- inContext = false;
-}
-
-ClusterUnsafeScope::~ClusterUnsafeScope() {
- assert(!inContext);
- inContext = save;
-}
-
-void enableClusterSafe() { inCluster = true; }
-
-}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/ClusterSafe.h b/cpp/src/qpid/sys/ClusterSafe.h
deleted file mode 100644
index 27e4eb46a5..0000000000
--- a/cpp/src/qpid/sys/ClusterSafe.h
+++ /dev/null
@@ -1,87 +0,0 @@
-#ifndef QPID_SYS_CLUSTERSAFE_H
-#define QPID_SYS_CLUSTERSAFE_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/CommonImportExport.h"
-
-namespace qpid {
-namespace sys {
-
-/**
- * Assertion to add to code that modifies clustered state.
- *
- * In a non-clustered broker this is a no-op.
- *
- * In a clustered broker, checks that it is being called
- * in a context where it is safe to modify clustered state.
- * If not it aborts the process as this is a serious bug.
- *
- * This function is in the common library rather than the cluster
- * library because it is called by code in the broker library.
- */
-QPID_COMMON_EXTERN void assertClusterSafe();
-
-/**
- * In a non-clustered broker, returns true.
- *
- * In a clustered broker returns true if we are in a context where it
- * is safe to modify cluster state.
- *
- * This function is in the common library rather than the cluster
- * library because it is called by code in the broker library.
- */
-QPID_COMMON_EXTERN bool isClusterSafe();
-
-/**
- * Mark a scope as cluster safe. Sets isClusterSafe in constructor and resets
- * to previous value in destructor.
- */
-class ClusterSafeScope {
- public:
- ClusterSafeScope();
- ~ClusterSafeScope();
- private:
- bool save;
-};
-
-/**
- * Mark a scope as cluster unsafe. Clears isClusterSafe in constructor and resets
- * to previous value in destructor.
- */
-class ClusterUnsafeScope {
- public:
- QPID_COMMON_EXTERN ClusterUnsafeScope();
- QPID_COMMON_EXTERN ~ClusterUnsafeScope();
- private:
- bool save;
-};
-
-/**
- * Enable cluster-safe assertions. By default they are no-ops.
- * Called by cluster code.
- */
-void enableClusterSafe();
-
-}} // namespace qpid::sys
-
-#endif /*!QPID_SYS_CLUSTERSAFE_H*/