From c42d9df9b8af5dc7d5decdcb5818a100ee8df0a3 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 28 Jul 2011 21:38:06 +0000 Subject: QPID-2920: Broken checkpoint: passing dequeue mutex test with issues - handler/context/replica convention (see overview.h doc notes) - rename BrokerHandler to BrokerContext - notify Cluster (BrokerContext) on queue stopped or empty (need empty?) - Implementing Stoppable & stoppable scopes in Queue.cpp - Move queue ownership logic from dequeue to acquire Releasing on message count will not work, switch to timer based release. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920@1152008 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/Makefile.am | 1 + qpid/cpp/src/cluster.mk | 10 +- qpid/cpp/src/qpid/broker/Cluster.h | 4 + qpid/cpp/src/qpid/broker/NullCluster.h | 6 + qpid/cpp/src/qpid/broker/Queue.cpp | 80 +++++---- qpid/cpp/src/qpid/broker/Queue.h | 30 ++-- qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp | 205 +++++++++++++++++++++++ qpid/cpp/src/qpid/cluster/exp/BrokerContext.h | 93 ++++++++++ qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp | 154 ----------------- qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h | 86 ---------- qpid/cpp/src/qpid/cluster/exp/Core.cpp | 20 ++- qpid/cpp/src/qpid/cluster/exp/Core.h | 9 +- qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp | 2 +- qpid/cpp/src/qpid/cluster/exp/EventHandler.h | 6 +- qpid/cpp/src/qpid/cluster/exp/HandlerBase.h | 3 +- qpid/cpp/src/qpid/cluster/exp/LockedMap.h | 2 +- qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp | 44 +++-- qpid/cpp/src/qpid/cluster/exp/MessageHandler.h | 8 +- qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp | 105 ++++++++++++ qpid/cpp/src/qpid/cluster/exp/QueueContext.h | 93 ++++++++++ qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp | 79 +++++++++ qpid/cpp/src/qpid/cluster/exp/QueueHandler.h | 82 +++++++++ qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp | 115 +++++++++++++ qpid/cpp/src/qpid/cluster/exp/QueueReplica.h | 85 ++++++++++ qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp | 48 ++++-- qpid/cpp/src/qpid/cluster/exp/WiringHandler.h | 6 +- qpid/cpp/src/qpid/cluster/exp/overview.h | 13 ++ qpid/cpp/src/qpid/sys/Stoppable.h | 49 ++++-- qpid/cpp/src/tests/BrokerClusterCalls.cpp | 16 +- qpid/cpp/src/tests/cluster2_tests.py | 67 +++++++- qpid/cpp/src/tests/qpid-test-cluster | 3 +- qpid/cpp/xml/cluster.xml | 53 +++++- 32 files changed, 1217 insertions(+), 360 deletions(-) create mode 100644 qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp create mode 100644 qpid/cpp/src/qpid/cluster/exp/BrokerContext.h delete mode 100644 qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp delete mode 100644 qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h create mode 100644 qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp create mode 100644 qpid/cpp/src/qpid/cluster/exp/QueueContext.h create mode 100644 qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp create mode 100644 qpid/cpp/src/qpid/cluster/exp/QueueHandler.h create mode 100644 qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp create mode 100644 qpid/cpp/src/qpid/cluster/exp/QueueReplica.h create mode 100644 qpid/cpp/src/qpid/cluster/exp/overview.h diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 8ede09fa79..8d22850360 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -452,6 +452,7 @@ libqpidcommon_la_SOURCES += \ qpid/sys/AtomicValue_gcc.h \ qpid/sys/AtomicValue_mutex.h \ qpid/sys/BlockingQueue.h \ + qpid/sys/BusyThreads.h \ qpid/sys/ClusterSafe.h \ qpid/sys/ClusterSafe.cpp \ qpid/sys/Codec.h \ diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk index 69b0228126..1809c87ca8 100644 --- a/qpid/cpp/src/cluster.mk +++ b/qpid/cpp/src/cluster.mk @@ -110,8 +110,8 @@ cluster2_la_SOURCES = \ qpid/cluster/Cpg.h \ qpid/cluster/PollerDispatch.cpp \ qpid/cluster/PollerDispatch.h \ - qpid/cluster/exp/BrokerHandler.cpp \ - qpid/cluster/exp/BrokerHandler.h \ + qpid/cluster/exp/BrokerContext.cpp \ + qpid/cluster/exp/BrokerContext.h \ qpid/cluster/exp/BufferFactory.h \ qpid/cluster/exp/Cluster2Plugin.cpp \ qpid/cluster/exp/Core.cpp \ @@ -124,6 +124,12 @@ cluster2_la_SOURCES = \ qpid/cluster/exp/MessageHandler.h \ qpid/cluster/exp/Multicaster.cpp \ qpid/cluster/exp/Multicaster.h \ + qpid/cluster/exp/QueueContext.cpp \ + qpid/cluster/exp/QueueContext.h \ + qpid/cluster/exp/QueueHandler.cpp \ + qpid/cluster/exp/QueueHandler.h \ + qpid/cluster/exp/QueueReplica.cpp \ + qpid/cluster/exp/QueueReplica.h \ qpid/cluster/exp/WiringHandler.cpp \ qpid/cluster/exp/WiringHandler.h diff --git a/qpid/cpp/src/qpid/broker/Cluster.h b/qpid/cpp/src/qpid/broker/Cluster.h index c927d35ba3..193332692b 100644 --- a/qpid/cpp/src/qpid/broker/Cluster.h +++ b/qpid/cpp/src/qpid/broker/Cluster.h @@ -80,6 +80,10 @@ class Cluster virtual void consume(Queue&, size_t consumerCount) = 0; /** A consumer cancels its subscription to a queue */ virtual void cancel(Queue&, size_t consumerCount) = 0; + /** A queue becomes empty */ + virtual void empty(Queue&) = 0; + /** A queue has been stopped */ + virtual void stopped(Queue&) = 0; // Wiring diff --git a/qpid/cpp/src/qpid/broker/NullCluster.h b/qpid/cpp/src/qpid/broker/NullCluster.h index efda8bb1ab..399e2a3ca6 100644 --- a/qpid/cpp/src/qpid/broker/NullCluster.h +++ b/qpid/cpp/src/qpid/broker/NullCluster.h @@ -49,6 +49,11 @@ class NullCluster : public Cluster virtual void consume(Queue&, size_t) {} virtual void cancel(Queue&, size_t) {} + // Queues + + virtual void stopped(Queue&) {} + virtual void empty(Queue&) {} + // Wiring virtual void create(Queue&) {} @@ -59,6 +64,7 @@ class NullCluster : public Cluster const std::string&, const framing::FieldTable&) {} virtual void unbind(Queue&, Exchange&, const std::string&, const framing::FieldTable&) {} + }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index f593d7e443..84f025824c 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -112,7 +112,8 @@ Queue::Queue(const string& _name, bool _autodelete, broker(b), deleted(false), barrier(*this), - autoDeleteTimeout(0) + autoDeleteTimeout(0), + dispatching(boost::bind(&Queue::acquireStopped,this)) { if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); @@ -231,29 +232,40 @@ void Queue::requeue(const QueuedMessage& msg){ copy.notify(); } -// Inform the cluster of an acquired message on exit from a function -// that does the acquiring. ClusterAcquireOnExit is declared *before* -// any locks are taken. The calling function sets qmsg to the acquired -// message with a lock held, but the call to Cluster::acquire() will -// be outside the lock. -struct ClusterAcquireOnExit { +/** Mark a scope that acquires a message. + * + * ClusterAcquireScope is declared before are taken. The calling + * function sets qmsg with the lock held, but the call to + * Cluster::acquire() will happen after the lock is released. + * + * Also marks a Stoppable as busy for the duration of the scope. + **/ +struct ClusterAcquireScope { Broker* broker; + Queue& queue; QueuedMessage qmsg; - ClusterAcquireOnExit(Broker* b) : broker(b) {} - ~ClusterAcquireOnExit() { - if (broker && qmsg.queue) broker->getCluster().acquire(qmsg); + + ClusterAcquireScope(Queue& q) : broker(q.getBroker()), queue(q) {} + + ~ClusterAcquireScope() { + if (broker) { + // FIXME aconway 2011-06-27: Move to QueueContext. + // Avoid the indirection via queuename. + if (qmsg.queue) broker->getCluster().acquire(qmsg); + else broker->getCluster().empty(queue); + } } }; bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) { - ClusterAcquireOnExit willAcquire(broker); // Outside lock + ClusterAcquireScope acquireScope(*this); // Outside lock Mutex::ScopedLock locker(messageLock); assertClusterSafe(); QPID_LOG(debug, "Attempting to acquire message at " << position); if (messages->remove(position, message)) { QPID_LOG(debug, "Acquired message at " << position << " from " << name); - willAcquire.qmsg = message; + acquireScope.qmsg = message; return true; } else { QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position"); @@ -300,9 +312,15 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) { while (true) { - ClusterAcquireOnExit willAcquire(broker); // Outside the lock + Stoppable::Scope stopper(dispatching); // FIXME aconway 2011-06-28: rename consuming + if (!stopper) { + QPID_LOG(trace, "Queue is stopped: " << name); + listeners.addListener(c); + return NO_MESSAGES; + } + ClusterAcquireScope acquireScope(*this); // Outside the lock Mutex::ScopedLock locker(messageLock); - if (messages->empty()) { + if (messages->empty()) { // FIXME aconway 2011-06-07: ugly QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); listeners.addListener(c); return NO_MESSAGES; @@ -317,7 +335,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ if (c->filter(msg.payload)) { if (c->accept(msg.payload)) { m = msg; - willAcquire.qmsg = msg; + acquireScope.qmsg = msg; pop(); return CONSUMED; } else { @@ -374,18 +392,11 @@ void Queue::removeListener(Consumer::shared_ptr c) bool Queue::dispatch(Consumer::shared_ptr c) { - Stoppable::Scope doDispatch(dispatching); - if (doDispatch) { - QueuedMessage msg(this); - if (getNextMessage(msg, c)) { - c->deliver(msg); - return true; - } else { - return false; - } - } else { // Dispatching is stopped - Mutex::ScopedLock locker(messageLock); - listeners.addListener(c); // FIXME aconway 2011-05-05: + QueuedMessage msg(this); + if (getNextMessage(msg, c)) { + c->deliver(msg); + return true; + } else { return false; } } @@ -450,10 +461,10 @@ void Queue::cancel(Consumer::shared_ptr c){ } QueuedMessage Queue::get(){ - ClusterAcquireOnExit willAcquire(broker); // Outside lock + ClusterAcquireScope acquireScope(*this); // Outside lock Mutex::ScopedLock locker(messageLock); QueuedMessage msg(this); - if (messages->pop(msg)) willAcquire.qmsg = msg; + if (messages->pop(msg)) acquireScope.qmsg = msg; return msg; } @@ -704,7 +715,9 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) if (!isEnqueued(msg)) return false; if (!ctxt) dequeued(msg); } + if (!ctxt && broker) broker->getCluster().dequeue(msg); // Outside lock + // This check prevents messages which have been forced persistent on one queue from dequeuing // from another on which no forcing has taken place and thus causing a store error. bool fp = msg.payload->isForcedPersistent(); @@ -902,6 +915,10 @@ void Queue::notifyDeleted() set.notifyAll(); } +void Queue::acquireStopped() { + if (broker) broker->getCluster().stopped(*this); +} + void Queue::bound(const string& exchange, const string& key, const FieldTable& args) { @@ -1234,7 +1251,7 @@ bool Queue::bind(boost::shared_ptr exchange, const std::string& key, } -const Broker* Queue::getBroker() +Broker* Queue::getBroker() { return broker; } @@ -1268,10 +1285,13 @@ void Queue::UsageBarrier::destroy() // FIXME aconway 2011-05-06: naming - only affects consumers. stopDispatch()? void Queue::stop() { + // FIXME aconway 2011-05-25: rename dispatching - acquiring? dispatching.stop(); } void Queue::start() { + QPID_LOG(critical, "FIXME start context=" << clusterContext); + assert(clusterContext); // FIXME aconway 2011-06-08: XXX dispatching.start(); notifyListener(); } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 1588ae1171..0ba7b362e9 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -21,6 +21,7 @@ * under the License. * */ +#include "qpid/log/Statement.h" // FIXME XXX aconway 2011-06-08: remove #include "qpid/broker/BrokerImportExport.h" #include "qpid/broker/OwnershipToken.h" @@ -130,8 +131,9 @@ class Queue : public boost::enable_shared_from_this, UsageBarrier barrier; int autoDeleteTimeout; boost::intrusive_ptr autoDeleteTask; - // Allow dispatching consumer threads to be stopped. - sys::Stoppable dispatching; + // Allow dispatching consumer threads to be stopped. Used by cluster + sys::Stoppable dispatching; // FIXME aconway 2011-06-07: name: acquiring? + boost::intrusive_ptr clusterContext; void push(boost::intrusive_ptr& msg, bool isRecovery=false); void setPolicy(std::auto_ptr policy); @@ -179,6 +181,7 @@ class Queue : public boost::enable_shared_from_this, void checkNotDeleted(); void notifyDeleted(); + void acquireStopped(); public: @@ -379,20 +382,25 @@ class Queue : public boost::enable_shared_from_this, void flush(); - const Broker* getBroker(); + Broker* getBroker(); - /** Stop consumers. Return when all consumer threads are stopped. - *@pre Queue is active and not already stopping. - */ + /** Stop consumers. Return when all consumer threads are stopped. */ void stop(); - /** Start consumers. - *@pre Queue is stopped and idle: no thread in dispatch. - */ + /** Start consumers. */ void start(); - /** Context data attached and used by cluster code. */ - boost::intrusive_ptr clusterContext; + /** Context information used in a cluster. */ + boost::intrusive_ptr getClusterContext() { + // FIXME aconway 2011-06-08: XXX + QPID_LOG(critical, "FIXME q get context " << name << clusterContext); + return clusterContext; + } + void setClusterContext(boost::intrusive_ptr context) { + // FIXME aconway 2011-06-08: XXX + clusterContext = context; + QPID_LOG(critical, "FIXME q set context " << name << clusterContext); + } }; }} // qpid::broker diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp new file mode 100644 index 0000000000..465a5de021 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp @@ -0,0 +1,205 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Core.h" +#include "BrokerContext.h" +#include "QueueContext.h" +#include "QueueHandler.h" +#include "qpid/framing/ClusterMessageRoutingBody.h" +#include "qpid/framing/ClusterMessageRoutedBody.h" +#include "qpid/framing/ClusterMessageEnqueueBody.h" +#include "qpid/framing/ClusterMessageAcquireBody.h" +#include "qpid/framing/ClusterMessageDequeueBody.h" +#include "qpid/framing/ClusterMessageReleaseBody.h" +#include "qpid/framing/ClusterWiringCreateQueueBody.h" +#include "qpid/framing/ClusterWiringCreateExchangeBody.h" +#include "qpid/framing/ClusterWiringDestroyQueueBody.h" +#include "qpid/framing/ClusterWiringDestroyExchangeBody.h" +#include "qpid/framing/ClusterWiringBindBody.h" +#include "qpid/framing/ClusterWiringUnbindBody.h" +#include "qpid/framing/ClusterQueueSubscribeBody.h" +#include "qpid/sys/Thread.h" +#include "qpid/broker/QueuedMessage.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/Exchange.h" +#include "qpid/framing/Buffer.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace cluster { + +using namespace framing; +using namespace broker; + +namespace { +// noReplicate means the current thread is handling a message +// received from the cluster so it should not be replciated. +QPID_TSS bool tssNoReplicate = false; + +// Routing ID of the message being routed in the current thread. +// 0 if we are not currently routing a message. +QPID_TSS RoutingId tssRoutingId = 0; +} + +BrokerContext::ScopedSuppressReplication::ScopedSuppressReplication() { + assert(!tssNoReplicate); + tssNoReplicate = true; +} + +BrokerContext::ScopedSuppressReplication::~ScopedSuppressReplication() { + assert(tssNoReplicate); + tssNoReplicate = false; +} + +BrokerContext::BrokerContext(Core& c, boost::intrusive_ptr q) + : core(c), queueHandler(q) {} + +RoutingId BrokerContext::nextRoutingId() { + RoutingId id = ++routingId; + if (id == 0) id = ++routingId; // Avoid 0 on wrap-around. + return id; +} + +void BrokerContext::routing(const boost::intrusive_ptr&) { } + +bool BrokerContext::enqueue(Queue& queue, const boost::intrusive_ptr& msg) +{ + if (tssNoReplicate) return true; + if (!tssRoutingId) { // This is the first enqueue, so send the message + tssRoutingId = nextRoutingId(); + // FIXME aconway 2010-10-20: replicate message in fixed size buffers. + std::string data(msg->encodedSize(),char()); + framing::Buffer buf(&data[0], data.size()); + msg->encode(buf); + core.mcast(ClusterMessageRoutingBody(ProtocolVersion(), tssRoutingId, data)); + core.getRoutingMap().put(tssRoutingId, msg); + } + core.mcast(ClusterMessageEnqueueBody(ProtocolVersion(), tssRoutingId, queue.getName())); + // TODO aconway 2010-10-21: configable option for strict (wait + // for CPG deliver to do local deliver) vs. loose (local deliver + // immediately). + return false; +} + +void BrokerContext::routed(const boost::intrusive_ptr&) { + if (tssRoutingId) { // we enqueued at least one message. + core.mcast(ClusterMessageRoutedBody(ProtocolVersion(), tssRoutingId)); + // Note: routingMap is cleaned up on CPG delivery in MessageHandler. + tssRoutingId = 0; + } +} + +void BrokerContext::acquire(const broker::QueuedMessage& qm) { + if (tssNoReplicate) return; + QueueContext::get(*qm.queue)->acquire(); + core.mcast(ClusterMessageAcquireBody( + ProtocolVersion(), qm.queue->getName(), qm.position)); +} + +// FIXME aconway 2011-05-24: need to handle acquire and release. +// Dequeue in the wrong place? +void BrokerContext::dequeue(const broker::QueuedMessage& qm) { + if (tssNoReplicate) return; + core.mcast(ClusterMessageDequeueBody( + ProtocolVersion(), qm.queue->getName(), qm.position)); +} + +void BrokerContext::release(const broker::QueuedMessage& ) { + // FIXME aconway 2011-05-24: TODO +} + +// FIXME aconway 2011-06-08: should be be using shared_ptr to q here? +void BrokerContext::create(broker::Queue& q) { + if (tssNoReplicate) return; // FIXME aconway 2011-06-08: revisit + // FIXME aconway 2011-06-08: error handling- if already set... + // Create local context immediately, queue will be stopped until replicated. + boost::intrusive_ptr context( + new QueueContext(q,core.getMulticaster())); + std::string data(q.encodedSize(), '\0'); + framing::Buffer buf(&data[0], data.size()); + q.encode(buf); + core.mcast(ClusterWiringCreateQueueBody(ProtocolVersion(), data)); + QPID_LOG(critical, "FIXME BrokerContext create " << q.getName() << q.getClusterContext().get()); +} + +void BrokerContext::destroy(broker::Queue& q) { + if (tssNoReplicate) return; + core.mcast(ClusterWiringDestroyQueueBody(ProtocolVersion(), q.getName())); +} + +void BrokerContext::create(broker::Exchange& ex) { + if (tssNoReplicate) return; + std::string data(ex.encodedSize(), '\0'); + framing::Buffer buf(&data[0], data.size()); + ex.encode(buf); + core.mcast(ClusterWiringCreateExchangeBody(ProtocolVersion(), data)); +} + +void BrokerContext::destroy(broker::Exchange& ex) { + if (tssNoReplicate) return; + core.mcast(ClusterWiringDestroyExchangeBody(ProtocolVersion(), ex.getName())); +} + +void BrokerContext::bind(broker::Queue& q, broker::Exchange& ex, + const std::string& key, const framing::FieldTable& args) +{ + if (tssNoReplicate) return; + core.mcast(ClusterWiringBindBody( + ProtocolVersion(), q.getName(), ex.getName(), key, args)); +} + +void BrokerContext::unbind(broker::Queue& q, broker::Exchange& ex, + const std::string& key, const framing::FieldTable& args) +{ + if (tssNoReplicate) return; + core.mcast(ClusterWiringUnbindBody( + ProtocolVersion(), q.getName(), ex.getName(), key, args)); +} + +// n is the number of consumers including the one just added. +// FIXME aconway 2011-06-27: rename, conflicting terms. +void BrokerContext::consume(broker::Queue& q, size_t n) { + if (n == 1) { + // FIXME aconway 2011-06-27: should be on QueueContext for symmetry? + core.mcast(ClusterQueueSubscribeBody(ProtocolVersion(), q.getName())); + } +} + +// n is the number of consumers after the cancel. +void BrokerContext::cancel(broker::Queue& q, size_t n) { + if (n == 0) QueueContext::get(q)->unsubscribed(); +} + +void BrokerContext::empty(broker::Queue& ) { + // FIXME aconway 2011-06-28: is this needed? +} + +void BrokerContext::stopped(broker::Queue& q) { + boost::intrusive_ptr qc = QueueContext::get(q); + // Don't forward the stopped call if the queue does not yet have a cluster context + // this when the queue is first created locally. + if (qc){ + QPID_LOG(critical, "FIXME BrokerContext::stopped " << q.getName()); + qc->stopped(); + } +} + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h new file mode 100644 index 0000000000..fc19d6487b --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h @@ -0,0 +1,93 @@ +#ifndef QPID_CLUSTER_BROKERCONTEXT_H +#define QPID_CLUSTER_BROKERCONTEXT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/Cluster.h" +#include "qpid/sys/AtomicValue.h" + +namespace qpid { +namespace cluster { +class Core; +class QueueHandler; +class QueueContext; + +// TODO aconway 2010-10-19: experimental cluster code. + +/** + * Implements broker::Cluster interface, handles events in broker code. + */ +class BrokerContext : public broker::Cluster +{ + public: + /** Suppress replication while in scope. + * Used to prevent re-replication of messages received from the cluster. + */ + struct ScopedSuppressReplication { + ScopedSuppressReplication(); + ~ScopedSuppressReplication(); + }; + + BrokerContext(Core&, boost::intrusive_ptr); + + // FIXME aconway 2010-10-20: implement all points. + + // Messages + + void routing(const boost::intrusive_ptr&); + bool enqueue(broker::Queue&, const boost::intrusive_ptr&); + void routed(const boost::intrusive_ptr&); + void acquire(const broker::QueuedMessage&); + void dequeue(const broker::QueuedMessage&); + void release(const broker::QueuedMessage&); + + // Consumers + + void consume(broker::Queue&, size_t); + void cancel(broker::Queue&, size_t); + + // Queues + void empty(broker::Queue&); + void stopped(broker::Queue&); + + // Wiring + + void create(broker::Queue&); + void destroy(broker::Queue&); + void create(broker::Exchange&); + void destroy(broker::Exchange&); + void bind(broker::Queue&, broker::Exchange&, + const std::string&, const framing::FieldTable&); + void unbind(broker::Queue&, broker::Exchange&, + const std::string&, const framing::FieldTable&); + + + private: + uint32_t nextRoutingId(); + + Core& core; + boost::intrusive_ptr queueHandler; + sys::AtomicValue routingId; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_BROKERCONTEXT_H*/ diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp deleted file mode 100644 index 269e0b2ba3..0000000000 --- a/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp +++ /dev/null @@ -1,154 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "Core.h" -#include "BrokerHandler.h" -#include "qpid/framing/ClusterMessageRoutingBody.h" -#include "qpid/framing/ClusterMessageRoutedBody.h" -#include "qpid/framing/ClusterMessageEnqueueBody.h" -#include "qpid/framing/ClusterMessageDequeueBody.h" -#include "qpid/framing/ClusterWiringCreateQueueBody.h" -#include "qpid/framing/ClusterWiringCreateExchangeBody.h" -#include "qpid/framing/ClusterWiringDestroyQueueBody.h" -#include "qpid/framing/ClusterWiringDestroyExchangeBody.h" -#include "qpid/framing/ClusterWiringBindBody.h" -#include "qpid/framing/ClusterWiringUnbindBody.h" -#include "qpid/sys/Thread.h" -#include "qpid/broker/QueuedMessage.h" -#include "qpid/broker/Queue.h" -#include "qpid/broker/Exchange.h" -#include "qpid/framing/Buffer.h" -#include "qpid/log/Statement.h" - -namespace qpid { -namespace cluster { - -using namespace framing; -using namespace broker; - -namespace { -// noReplicate means the current thread is handling a message -// received from the cluster so it should not be replciated. -QPID_TSS bool tssNoReplicate = false; - -// Routing ID of the message being routed in the current thread. -// 0 if we are not currently routing a message. -QPID_TSS RoutingId tssRoutingId = 0; -} - -BrokerHandler::ScopedSuppressReplication::ScopedSuppressReplication() { - assert(!tssNoReplicate); - tssNoReplicate = true; -} - -BrokerHandler::ScopedSuppressReplication::~ScopedSuppressReplication() { - assert(tssNoReplicate); - tssNoReplicate = false; -} - -BrokerHandler::BrokerHandler(Core& c) : core(c) {} - -RoutingId BrokerHandler::nextRoutingId() { - RoutingId id = ++routingId; - if (id == 0) id = ++routingId; // Avoid 0 on wrap-around. - return id; -} - -void BrokerHandler::routing(const boost::intrusive_ptr&) { } - -bool BrokerHandler::enqueue(Queue& queue, const boost::intrusive_ptr& msg) -{ - if (tssNoReplicate) return true; - if (!tssRoutingId) { // This is the first enqueue, so send the message - tssRoutingId = nextRoutingId(); - // FIXME aconway 2010-10-20: replicate message in fixed size buffers. - std::string data(msg->encodedSize(),char()); - framing::Buffer buf(&data[0], data.size()); - msg->encode(buf); - core.mcast(ClusterMessageRoutingBody(ProtocolVersion(), tssRoutingId, data)); - core.getRoutingMap().put(tssRoutingId, msg); - } - core.mcast(ClusterMessageEnqueueBody(ProtocolVersion(), tssRoutingId, queue.getName())); - // TODO aconway 2010-10-21: configable option for strict (wait - // for CPG deliver to do local deliver) vs. loose (local deliver - // immediately). - return false; -} - -void BrokerHandler::routed(const boost::intrusive_ptr&) { - if (tssRoutingId) { // we enqueued at least one message. - core.mcast(ClusterMessageRoutedBody(ProtocolVersion(), tssRoutingId)); - // Note: routingMap is cleaned up on CPG delivery in MessageHandler. - tssRoutingId = 0; - } -} - -void BrokerHandler::dequeue(const broker::QueuedMessage& qm) { - if (tssNoReplicate) return; - // FIXME aconway 2010-10-28: we also need to delay completion of the - // ack that caused this dequeue until self-delivery of the mcast below. - core.mcast(ClusterMessageDequeueBody( - ProtocolVersion(), qm.queue->getName(), qm.position)); -} - -void BrokerHandler::create(broker::Queue& q) { - if (tssNoReplicate) return; - std::string data(q.encodedSize(), '\0'); - framing::Buffer buf(&data[0], data.size()); - q.encode(buf); - core.mcast(ClusterWiringCreateQueueBody(ProtocolVersion(), data)); -} - -void BrokerHandler::destroy(broker::Queue& q) { - if (tssNoReplicate) return; - core.mcast(ClusterWiringDestroyQueueBody(ProtocolVersion(), q.getName())); -} - -void BrokerHandler::create(broker::Exchange& ex) { - if (tssNoReplicate) return; - std::string data(ex.encodedSize(), '\0'); - framing::Buffer buf(&data[0], data.size()); - ex.encode(buf); - core.mcast(ClusterWiringCreateExchangeBody(ProtocolVersion(), data)); -} - -void BrokerHandler::destroy(broker::Exchange& ex) { - if (tssNoReplicate) return; - core.mcast(ClusterWiringDestroyExchangeBody(ProtocolVersion(), ex.getName())); -} - -void BrokerHandler::bind(broker::Queue& q, broker::Exchange& ex, - const std::string& key, const framing::FieldTable& args) -{ - if (tssNoReplicate) return; - core.mcast(ClusterWiringBindBody( - ProtocolVersion(), q.getName(), ex.getName(), key, args)); -} - -void BrokerHandler::unbind(broker::Queue& q, broker::Exchange& ex, - const std::string& key, const framing::FieldTable& args) -{ - if (tssNoReplicate) return; - core.mcast(ClusterWiringUnbindBody( - ProtocolVersion(), q.getName(), ex.getName(), key, args)); -} - -}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h b/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h deleted file mode 100644 index 1cfcc75863..0000000000 --- a/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h +++ /dev/null @@ -1,86 +0,0 @@ -#ifndef QPID_CLUSTER_BROKERHANDLER_H -#define QPID_CLUSTER_BROKERHANDLER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/broker/Cluster.h" -#include "qpid/sys/AtomicValue.h" - -namespace qpid { -namespace cluster { -class Core; - -// TODO aconway 2010-10-19: experimental cluster code. - -/** - * Implements broker::Cluster interface, handles events in broker code. - */ -class BrokerHandler : public broker::Cluster -{ - public: - /** Suppress replication while in scope. - * Used to prevent re-replication of messages received from the cluster. - */ - struct ScopedSuppressReplication { - ScopedSuppressReplication(); - ~ScopedSuppressReplication(); - }; - - BrokerHandler(Core&); - - // FIXME aconway 2010-10-20: implement all points. - - // Messages - - void routing(const boost::intrusive_ptr&); - bool enqueue(broker::Queue&, const boost::intrusive_ptr&); - void routed(const boost::intrusive_ptr&); - void acquire(const broker::QueuedMessage&) {} - void release(const broker::QueuedMessage&) {} - void dequeue(const broker::QueuedMessage&); - - // Consumers - - void consume(broker::Queue&, size_t) {} - void cancel(broker::Queue&, size_t) {} - - // Wiring - - void create(broker::Queue&); - void destroy(broker::Queue&); - void create(broker::Exchange&); - void destroy(broker::Exchange&); - void bind(broker::Queue&, broker::Exchange&, - const std::string&, const framing::FieldTable&); - void unbind(broker::Queue&, broker::Exchange&, - const std::string&, const framing::FieldTable&); - - - private: - uint32_t nextRoutingId(); - - Core& core; - sys::AtomicValue routingId; -}; -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_BROKERHANDLER_H*/ diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.cpp b/qpid/cpp/src/qpid/cluster/exp/Core.cpp index e1dba349a1..7bcc068120 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Core.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/Core.cpp @@ -21,9 +21,11 @@ #include "Core.h" #include "EventHandler.h" -#include "BrokerHandler.h" +#include "BrokerContext.h" #include "WiringHandler.h" #include "MessageHandler.h" +#include "QueueContext.h" +#include "QueueHandler.h" #include "qpid/broker/Broker.h" #include "qpid/broker/SignalHandler.h" #include "qpid/framing/AMQFrame.h" @@ -39,12 +41,17 @@ Core::Core(const Settings& s, broker::Broker& b) : eventHandler(new EventHandler(*this)), multicaster(eventHandler->getCpg(), b.getPoller(), boost::bind(&Core::fatal, this)) { - eventHandler->add(boost::shared_ptr(new WiringHandler(*eventHandler))); - eventHandler->add(boost::shared_ptr(new MessageHandler(*eventHandler))); + boost::intrusive_ptr queueHandler( + new QueueHandler(*eventHandler, multicaster)); + eventHandler->add(queueHandler); + eventHandler->add(boost::intrusive_ptr( + new WiringHandler(*eventHandler, queueHandler))); + eventHandler->add(boost::intrusive_ptr( + new MessageHandler(*eventHandler))); - std::auto_ptr bh(new BrokerHandler(*this)); + std::auto_ptr bh(new BrokerContext(*this, queueHandler)); brokerHandler = bh.get(); - // BrokerHandler belongs to Broker + // BrokerContext belongs to Broker broker.setCluster(std::auto_ptr(bh)); eventHandler->start(); eventHandler->getCpg().join(s.name); @@ -62,8 +69,7 @@ void Core::fatal() { void Core::mcast(const framing::AMQBody& body) { QPID_LOG(trace, "cluster multicast: " << body); - framing::AMQFrame f(body); - multicaster.mcast(f); + multicaster.mcast(body); } }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.h b/qpid/cpp/src/qpid/cluster/exp/Core.h index 8b83a0004d..d0dc8e57a8 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Core.h +++ b/qpid/cpp/src/qpid/cluster/exp/Core.h @@ -44,7 +44,7 @@ class Broker; namespace cluster { class EventHandler; -class BrokerHandler; +class BrokerContext; /** * Cluster core state machine. @@ -77,16 +77,17 @@ class Core broker::Broker& getBroker() { return broker; } EventHandler& getEventHandler() { return *eventHandler; } - BrokerHandler& getBrokerHandler() { return *brokerHandler; } + BrokerContext& getBrokerContext() { return *brokerHandler; } + Multicaster& getMulticaster() { return multicaster; } /** Map of messages that are currently being routed. - * Used to pass messages being routed from BrokerHandler to MessageHandler + * Used to pass messages being routed from BrokerContext to MessageHandler */ RoutingMap& getRoutingMap() { return routingMap; } private: broker::Broker& broker; std::auto_ptr eventHandler; // Handles CPG events. - BrokerHandler* brokerHandler; // Handles broker events. + BrokerContext* brokerHandler; // Handles broker events. RoutingMap routingMap; Multicaster multicaster; }; diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp index 2138004380..beebe9fc16 100644 --- a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp @@ -41,7 +41,7 @@ EventHandler::EventHandler(Core& c) : EventHandler::~EventHandler() {} -void EventHandler::add(const boost::shared_ptr& handler) { +void EventHandler::add(const boost::intrusive_ptr& handler) { handlers.push_back(handler); } diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.h b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h index b946c27084..93423778f1 100644 --- a/qpid/cpp/src/qpid/cluster/exp/EventHandler.h +++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h @@ -27,7 +27,7 @@ #include "qpid/cluster/Cpg.h" #include "qpid/cluster/PollerDispatch.h" #include "qpid/cluster/types.h" -#include +#include #include namespace qpid { @@ -52,7 +52,7 @@ class EventHandler : public Cpg::Handler ~EventHandler(); /** Add a handler */ - void add(const boost::shared_ptr&); + void add(const boost::intrusive_ptr&); /** Start polling */ void start(); @@ -87,7 +87,7 @@ class EventHandler : public Cpg::Handler MemberId sender; // sender of current event. MemberId self; - typedef std::vector > Handlers; + typedef std::vector > Handlers; Handlers handlers; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h b/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h index b153f56a01..f0c6650994 100644 --- a/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h +++ b/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h @@ -21,6 +21,7 @@ * under the License. * */ +#include "qpid/RefCounted.h" #include "qpid/cluster/types.h" namespace qpid { @@ -35,7 +36,7 @@ class EventHandler; /** * Base class for handlers of events, children of the EventHandler. */ -class HandlerBase +class HandlerBase : public RefCounted { public: HandlerBase(EventHandler&); diff --git a/qpid/cpp/src/qpid/cluster/exp/LockedMap.h b/qpid/cpp/src/qpid/cluster/exp/LockedMap.h index 0736e7ac35..c0afe740f8 100644 --- a/qpid/cpp/src/qpid/cluster/exp/LockedMap.h +++ b/qpid/cpp/src/qpid/cluster/exp/LockedMap.h @@ -54,7 +54,7 @@ class LockedMap */ bool add(const Key& key, const Value& value) { sys::RWlock::ScopedWlock w(lock); - return map.insert(key, value).second; + return map.insert(std::make_pair(key, value)).second; } /** Erase the value associated with key if any. Return true if a value was erased. */ diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp index d4095e5bc1..86894b9dd9 100644 --- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp @@ -21,7 +21,7 @@ #include "Core.h" #include "MessageHandler.h" -#include "BrokerHandler.h" +#include "BrokerContext.h" #include "EventHandler.h" #include "qpid/broker/Message.h" #include "qpid/broker/Broker.h" @@ -73,7 +73,7 @@ void MessageHandler::enqueue(RoutingId routingId, const std::string& q) { msg = memberMap[sender()].routingMap[routingId]; if (!msg) throw Exception(QPID_MSG("Cluster enqueue on " << q << " failed: unknown message")); - BrokerHandler::ScopedSuppressReplication ssr; + BrokerContext::ScopedSuppressReplication ssr; queue->deliver(msg); } @@ -84,22 +84,40 @@ void MessageHandler::routed(RoutingId routingId) { memberMap[sender()].routingMap.erase(routingId); } -void MessageHandler::dequeue(const std::string& q, uint32_t position) { +void MessageHandler::acquire(const std::string& q, uint32_t position) { + // Note acquires from other members. My own acquires were exeuted in + // the connection thread + if (sender() != self()) { + // FIXME aconway 2010-10-28: need to store acquired messages on QueueContext + // by broker for possible re-queuing if a broker leaves. + boost::shared_ptr queue = findQueue(q, "Cluster dequeue failed"); + QueuedMessage qm; + BrokerContext::ScopedSuppressReplication ssr; + bool ok = queue->acquireMessageAt(position, qm); + (void)ok; // Avoid unused variable warnings. + assert(ok); + assert(qm.position.getValue() == position); + assert(qm.payload); + } +} + +void MessageHandler::dequeue(const std::string& q, uint32_t /*position*/) { if (sender() == self()) { // FIXME aconway 2010-10-28: we should complete the ack that initiated - // the dequeue at this point, see BrokerHandler::dequeue + // the dequeue at this point, see BrokerContext::dequeue return; } boost::shared_ptr queue = findQueue(q, "Cluster dequeue failed"); - BrokerHandler::ScopedSuppressReplication ssr; - QueuedMessage qm; - // FIXME aconway 2010-10-28: when we replicate acquires, the acquired - // messages will be stored by MessageHandler::acquire. - if (queue->acquireMessageAt(position, qm)) { - assert(qm.position.getValue() == position); - assert(qm.payload); - queue->dequeue(0, qm); - } + BrokerContext::ScopedSuppressReplication ssr; + // FIXME aconway 2011-05-12: Remove the acquired message from QueueContext. + // Do we need to call this? Review with gsim. + // QueuedMessage qm; + // Get qm from QueueContext? + // queue->dequeue(0, qm); +} + +void MessageHandler::release(const std::string& /*queue*/ , uint32_t /*position*/) { + // FIXME aconway 2011-05-24: } }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h index f87f22a1ec..0a010a8ecf 100644 --- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h +++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h @@ -39,7 +39,10 @@ class Queue; namespace cluster { class EventHandler; -class BrokerHandler; +class BrokerContext; + +// FIXME aconway 2011-06-28: doesn't follow the same Handler/Replica/Context pattern as for queue. +// Make this consistent. /** * Handler for message disposition events. @@ -55,7 +58,10 @@ class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler void routing(uint32_t routingId, const std::string& message); void enqueue(uint32_t routingId, const std::string& queue); void routed(uint32_t routingId); + void acquire(const std::string& queue, uint32_t position); void dequeue(const std::string& queue, uint32_t position); + void release(const std::string& queue, uint32_t position); + private: struct Member { typedef std::map > RoutingMap; diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp new file mode 100644 index 0000000000..6c97c906e8 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp @@ -0,0 +1,105 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "QueueContext.h" +#include "Multicaster.h" +#include "qpid/framing/ClusterQueueResubscribeBody.h" +#include "qpid/framing/ClusterQueueUnsubscribeBody.h" +#include "qpid/broker/Queue.h" +#include "qpid/log/Statement.h" + + +namespace qpid { +namespace cluster { + +QueueContext::QueueContext(broker::Queue& q, Multicaster& m) + : owner(NOT_OWNER), count(0), queue(q), mcast(m) +{ + QPID_LOG(debug, "Assign cluster context to queue " << q.getName()); + q.stop(); // Initially stopped. Must all before setClusterContext + q.setClusterContext(boost::intrusive_ptr(this)); + +} + +// Called by QueueReplica in deliver thread. +void QueueContext::sharedOwner(size_t limit) { + QPID_LOG(critical, "FIXME QueueContext::sharedOwner " << queue.getName() << queue.getClusterContext().get()); + sys::Mutex::ScopedLock l(lock); + count = limit; + if (owner == NOT_OWNER) queue.start(); // FIXME aconway 2011-06-09: ok inside mutex? + owner = SHARED_OWNER; +} + +// Called by QueueReplica in deliver thread. +void QueueContext::soleOwner() { + QPID_LOG(critical, "FIXME QueueContext::soleOwner " << queue.getName() << queue.getClusterContext().get()); + sys::Mutex::ScopedLock l(lock); + count = 0; + if (owner == NOT_OWNER) queue.start(); // FIXME aconway 2011-06-09: ok inside mutex? + owner = SOLE_OWNER; +} + +// Called by BrokerContext in connection thread(s) on acquiring a message +void QueueContext::acquire() { + bool stop = false; + { + sys::Mutex::ScopedLock l(lock); + assert(owner != NOT_OWNER); // Can't acquire on a queue we don't own. + QPID_LOG(critical, "FIXME QueueContext::acquire " << queue.getName() + << " owner=" << owner << " count=" << count); + if (owner == SHARED_OWNER) { + // Note count could be 0 if there are concurrent calls to acquire. + if (count && --count == 0) { + stop = true; + } + } + } + // FIXME aconway 2011-06-28: could have multiple stop() threads... + if (stop) queue.stop(); +} + +// Callback set up by queue.stop() +void QueueContext::stopped() { + sys::Mutex::ScopedLock l(lock); + if (owner == NOT_OWNER) { + mcast.mcast(framing::ClusterQueueUnsubscribeBody( + framing::ProtocolVersion(), queue.getName())); + } else { + owner = NOT_OWNER; + mcast.mcast(framing::ClusterQueueResubscribeBody( + framing::ProtocolVersion(), queue.getName())); + } +} + +void QueueContext::unsubscribed() { + QPID_LOG(critical, "FIXME QueueContext unsubscribed, stopping " << queue.getName()); + queue.stop(); + sys::Mutex::ScopedLock l(lock); + owner = NOT_OWNER; +} + +boost::intrusive_ptr QueueContext::get(broker::Queue& q) { + return boost::intrusive_ptr( + static_cast(q.getClusterContext().get())); +} + + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h new file mode 100644 index 0000000000..5bafb5eb0f --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h @@ -0,0 +1,93 @@ +#ifndef QPID_CLUSTER_EXP_QUEUESTATE_H +#define QPID_CLUSTER_EXP_QUEUESTATE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include +#include +#include + + +// FIXME aconway 2011-06-08: refactor broker::Cluster to put queue ups on +// class broker::Cluster::Queue. This becomes the cluster context. + +namespace qpid { +namespace broker { +class Queue; +} +namespace cluster { + +class Multicaster; + + /** + * Queue state that is not replicated to the cluster. + * Manages the local queue start/stop status + * + * Thread safe: Called by connection and dispatch threads. + */ +class QueueContext : public RefCounted { + // FIXME aconway 2011-06-07: consistent use of shared vs. intrusive ptr? + public: + QueueContext(broker::Queue& q, Multicaster& m); + + /** Sharing ownership of queue, can acquire up to limit before releasing. + * Called in deliver thread. + */ + void sharedOwner(size_t limit); + + /** Sole owner of queue, no limits to acquiring */ + void soleOwner(); + + /** + * Count an acquired message against the limit. + * Called from connection threads while consuming messages + */ + void acquire(); + + /** Called if the queue becomes empty, from connection thread. */ + void empty(); + + /** Called when queue is stopped, connection or deliver thread. */ + void stopped(); + + /** Called when the last subscription to a queue is cancelled */ + void unsubscribed(); + + /** Get the context for a broker queue. */ + static boost::intrusive_ptr get(broker::Queue&); + + private: + void release(); + + sys::Mutex lock; + enum { NOT_OWNER, SOLE_OWNER, SHARED_OWNER } owner; + size_t count; // Count of dequeues remaining, 0 means no limit. + broker::Queue& queue; // FIXME aconway 2011-06-08: should be shared/weak ptr? + Multicaster& mcast; + + // FIXME aconway 2011-06-28: need to store acquired messages for possible re-queueing. +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_EXP_QUEUESTATE_H*/ diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp new file mode 100644 index 0000000000..7d56025fb8 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "QueueHandler.h" +#include "EventHandler.h" +#include "QueueReplica.h" +#include "QueueContext.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/QueuedMessage.h" +#include "qpid/framing/AllInvoker.h" +#include "qpid/Exception.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace cluster { + +// FIXME aconway 2011-05-11: make Multicaster+EventHandler available as Group, clean this up? +QueueHandler::QueueHandler(EventHandler& eh, Multicaster& m) + : HandlerBase(eh), multicaster(m) {} + +bool QueueHandler::invoke(const framing::AMQBody& body) { + return framing::invoke(*this, body).wasHandled(); +} + +void QueueHandler::subscribe(const std::string& queue) { + find(queue)->subscribe(sender()); +} +void QueueHandler::unsubscribe(const std::string& queue) { + find(queue)->unsubscribe(sender()); +} +void QueueHandler::resubscribe(const std::string& queue) { + find(queue)->resubscribe(sender()); +} + +void QueueHandler::left(const MemberId& member) { + // Unsubscribe for members that leave. + // FIXME aconway 2011-06-28: also need to re-queue acquired messages. + for (QueueMap::iterator i = queues.begin(); i != queues.end(); ++i) + i->second->unsubscribe(member); +} + +// FIXME aconway 2011-06-08: do we need to hold on to the shared pointer for lifecycle? +void QueueHandler::add(boost::shared_ptr q) { + // FIXME aconway 2011-06-08: move create operation from Wiring to Queue handler. + // FIXME aconway 2011-05-10: assert not already in map. + + // Local queues already have a context, remote queues need one. + if (!QueueContext::get(*q)) + new QueueContext(*q, multicaster); // Context attaches itself to the Queue + queues[q->getName()] = boost::intrusive_ptr( + new QueueReplica(q, self())); +} + +boost::intrusive_ptr QueueHandler::find(const std::string& queue) { + QueueMap::iterator i = queues.find(queue); + if (i == queues.end()) + throw Exception(QPID_MSG("Unknown queue " << queue << " in cluster queue handler")); + return i->second; +} + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h new file mode 100644 index 0000000000..6494efb1b3 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h @@ -0,0 +1,82 @@ +#ifndef QPID_CLUSTER_QUEUEHANDLER_H +#define QPID_CLUSTER_QUEUEHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "HandlerBase.h" +#include "LockedMap.h" +#include "qpid/framing/AMQP_AllOperations.h" +#include "boost/shared_ptr.hpp" +#include "boost/intrusive_ptr.hpp" +#include + +namespace qpid { + +namespace broker { +class Queue; +class QueuedMessage; +} + +namespace cluster { + +class EventHandler; +class QueueReplica; +class Multicaster; + +/** + * Handler for queue subscription events. + * + * THREAD UNSAFE: only accessed in cluster deliver thread, on delivery + * of queue controls and also from WiringHandler on delivery of queue + * create. + */ +class QueueHandler : public framing::AMQP_AllOperations::ClusterQueueHandler, + public HandlerBase +{ + public: + QueueHandler(EventHandler&, Multicaster&); + + bool invoke(const framing::AMQBody& body); + + // Events + void subscribe(const std::string& queue); + void unsubscribe(const std::string& queue); + void resubscribe(const std::string& queue); + void left(const MemberId&); + + void add(boost::shared_ptr); + + // NB: These functions ar called in connection threads, not deliver threads. + void acquired(const broker::QueuedMessage& qm); + void empty(const broker::Queue& q); + + private: + typedef std::map > QueueMap; + + boost::intrusive_ptr find(const std::string& queue); + + QueueMap queues; + Multicaster& multicaster; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_QUEUEHANDLER_H*/ diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp new file mode 100644 index 0000000000..551477a920 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp @@ -0,0 +1,115 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "QueueReplica.h" +#include "QueueContext.h" +#include "qpid/broker/Queue.h" +#include "qpid/log/Statement.h" +#include + +namespace qpid { +namespace cluster { + +QueueReplica::QueueReplica(boost::shared_ptr q, + const MemberId& self_) + : queue(q), self(self_), context(QueueContext::get(*q)) +{ + // q is initially stopped. +} + +struct PrintSubscribers { + const QueueReplica::MemberQueue& mq; + PrintSubscribers(const QueueReplica::MemberQueue& m) : mq(m) {} +}; + +std::ostream& operator<<(std::ostream& o, const PrintSubscribers& ps) { + copy(ps.mq.begin(), ps.mq.end(), std::ostream_iterator(o, " ")); + return o; +} + +std::ostream& operator<<(std::ostream& o, QueueReplica::State s) { + static char* tags[] = { "UNSUBSCRIBED", "SUBSCRIBED", "SOLE_OWNER", "SHARED_OWNER" }; + return o << tags[s]; +} + +std::ostream& operator<<(std::ostream& o, const QueueReplica& qr) { + o << qr.queue->getName() << "(" << qr.getState() << "): " + << PrintSubscribers(qr.subscribers); + return o; +} + +// FIXME aconway 2011-05-17: error handling for asserts. + +void QueueReplica::subscribe(const MemberId& member) { + State before = getState(); + subscribers.push_back(member); + update(before); +} + +void QueueReplica::unsubscribe(const MemberId& member) { + State before = getState(); + MemberQueue::iterator i = std::remove(subscribers.begin(), subscribers.end(), member); + if (i != subscribers.end()) { + subscribers.erase(i, subscribers.end()); + update(before); + } +} + +void QueueReplica::resubscribe(const MemberId& member) { + assert (member == subscribers.front()); // FIXME aconway 2011-06-27: error handling + State before = getState(); + subscribers.pop_front(); + subscribers.push_back(member); + update(before); +} + +void QueueReplica::update(State before) { + const int acquireLimit = 10; // FIXME aconway 2011-06-23: configurable + State after = getState(); + if (before == after) return; + QPID_LOG(trace, "QueueReplica " << *this << " (was " << before << ")"); + switch (after) { + case UNSUBSCRIBED: break; + case SUBSCRIBED: break; + case SOLE_OWNER: + context->soleOwner(); + break; + case SHARED_OWNER: + context->sharedOwner(acquireLimit); + break; + } +} + +QueueReplica::State QueueReplica::getState() const { + if (isOwner()) + return (subscribers.size() > 1) ? SHARED_OWNER : SOLE_OWNER; + return (isSubscriber(self)) ? SUBSCRIBED : UNSUBSCRIBED; +} + +bool QueueReplica::isOwner() const { + return !subscribers.empty() && subscribers.front() == self; +} + +bool QueueReplica::isSubscriber(const MemberId& member) const { + // FIXME aconway 2011-06-27: linear search here, is it a performance issue? + return std::find(subscribers.begin(), subscribers.end(), member) != subscribers.end(); +} + +}} // namespace qpid::cluster::exp diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h new file mode 100644 index 0000000000..a322a8b9c0 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h @@ -0,0 +1,85 @@ +#ifndef QPID_CLUSTER_QUEUEMODEL_H +#define QPID_CLUSTER_QUEUEMODEL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/RefCounted.h" +#include "qpid/cluster/types.h" +#include +#include +#include + +namespace qpid { + +namespace broker { +class Queue; +} + +namespace cluster { +class QueueHandler; +class QueueContext; + +/** + * Queue state that is replicated among all cluster members. + * + * Handles queue subscription controls by starting/stopping the queue. + * + * THREAD UNSAFE: only used in cluster deliver thread, on delivery + * of queue controls and also from WiringHandler on delivery of queue + * create. + */ +class QueueReplica : public RefCounted +{ + public: + QueueReplica(boost::shared_ptr , const MemberId& ); + void subscribe(const MemberId&); + void unsubscribe(const MemberId&); + void resubscribe(const MemberId&); + + private: + enum State { + UNSUBSCRIBED, + SUBSCRIBED, + SOLE_OWNER, + SHARED_OWNER + }; + + friend class PrintSubscribers; + friend std::ostream& operator<<(std::ostream&, State); + friend std::ostream& operator<<(std::ostream&, const QueueReplica&); + + typedef std::deque MemberQueue; + + boost::shared_ptr queue; + MemberQueue subscribers; + MemberId self; + boost::intrusive_ptr context; + + State getState() const; + bool isOwner() const; + bool isSubscriber(const MemberId&) const; + void update(State before); +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_QUEUEMODEL_H*/ diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp index 04a76b9758..1b3286792f 100644 --- a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp @@ -22,7 +22,8 @@ #include "Core.h" #include "WiringHandler.h" #include "EventHandler.h" -#include "BrokerHandler.h" +#include "QueueHandler.h" +#include "BrokerContext.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/Queue.h" @@ -32,18 +33,20 @@ #include "qpid/framing/Buffer.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" -#include +#include namespace qpid { namespace cluster { using namespace broker; using framing::FieldTable; -WiringHandler::WiringHandler(EventHandler& e) : +WiringHandler::WiringHandler(EventHandler& e, + const boost::intrusive_ptr& qh) : HandlerBase(e), broker(e.getCore().getBroker()), recovery(broker.getQueues(), broker.getExchanges(), - broker.getLinks(), broker.getDtxManager()) + broker.getLinks(), broker.getDtxManager()), + queueHandler(qh) {} bool WiringHandler::invoke(const framing::AMQBody& body) { @@ -51,24 +54,39 @@ bool WiringHandler::invoke(const framing::AMQBody& body) { } void WiringHandler::createQueue(const std::string& data) { - if (sender() == self()) return; - BrokerHandler::ScopedSuppressReplication ssr; - framing::Buffer buf(const_cast(&data[0]), data.size()); - // TODO aconway 2011-02-21: asymetric - RecoveryManager vs Broker::create*() - RecoverableQueue::shared_ptr queue = recovery.recoverQueue(buf); - QPID_LOG(debug, "cluster: create queue " << queue->getName()); + // FIXME aconway 2011-05-25: Needs async completion. + std::string name; + if (sender() != self()) { // Created by another member, need to create locally. + BrokerContext::ScopedSuppressReplication ssr; + framing::Buffer buf(const_cast(&data[0]), data.size()); + // TODO aconway 2011-02-21: asymetric - RecoveryManager vs Broker::create*() + RecoverableQueue::shared_ptr rq = recovery.recoverQueue(buf); + name = rq->getName(); + } + else { // Created locally, Queue and QueueContext already exist. + framing::Buffer buffer(const_cast(&data[0]), data.size()); + // FIXME aconway 2011-05-10: implicit knowledge of queue encoding. + buffer.getShortString(name); + } + boost::shared_ptr q = broker.getQueues().find(name); + assert(q); // FIXME aconway 2011-05-10: error handling. + // TODO aconway 2011-05-10: if we implement multi-group for queues then + // this call is a problem: comes from wiring delivery thread, not queues. + // FIXME aconway 2011-06-08: move wiring ops to Queue and Exchange handlers.. + queueHandler->add(q); + QPID_LOG(debug, "cluster: create queue " << q->getName()); } void WiringHandler::destroyQueue(const std::string& name) { if (sender() == self()) return; QPID_LOG(debug, "cluster: destroy queue " << name); - BrokerHandler::ScopedSuppressReplication ssr; + BrokerContext::ScopedSuppressReplication ssr; broker.deleteQueue(name, std::string(), std::string()); } void WiringHandler::createExchange(const std::string& data) { if (sender() == self()) return; - BrokerHandler::ScopedSuppressReplication ssr; + BrokerContext::ScopedSuppressReplication ssr; framing::Buffer buf(const_cast(&data[0]), data.size()); // TODO aconway 2011-02-21: asymetric - RecoveryManager vs Broker::create*() RecoverableExchange::shared_ptr exchange = recovery.recoverExchange(buf); @@ -78,7 +96,7 @@ void WiringHandler::createExchange(const std::string& data) { void WiringHandler::destroyExchange(const std::string& name) { if (sender() == self()) return; QPID_LOG(debug, "cluster: destroy exchange " << name); - BrokerHandler::ScopedSuppressReplication ssr; + BrokerContext::ScopedSuppressReplication ssr; broker.getExchanges().destroy(name); } @@ -91,7 +109,7 @@ void WiringHandler::bind( << " exchange=" << exchangeName << " key=" << routingKey << " arguments=" << arguments); - BrokerHandler::ScopedSuppressReplication ssr; + BrokerContext::ScopedSuppressReplication ssr; broker.bind(queueName, exchangeName, routingKey, arguments, std::string(), std::string()); } @@ -104,7 +122,7 @@ void WiringHandler::unbind( << " exchange=" << exchangeName << " key=" << routingKey << " arguments=" << arguments); - BrokerHandler::ScopedSuppressReplication ssr; + BrokerContext::ScopedSuppressReplication ssr; broker.unbind(queueName, exchangeName, routingKey, std::string(), std::string()); } diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h index e375cf6a95..71aa6e52e9 100644 --- a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h +++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h @@ -42,7 +42,7 @@ class Broker; namespace cluster { class EventHandler; - +class QueueHandler; /** * Handler for wiring disposition events. @@ -51,7 +51,7 @@ class WiringHandler : public framing::AMQP_AllOperations::ClusterWiringHandler, public HandlerBase { public: - WiringHandler(EventHandler&); + WiringHandler(EventHandler&, const boost::intrusive_ptr& qh); bool invoke(const framing::AMQBody& body); @@ -66,8 +66,10 @@ class WiringHandler : public framing::AMQP_AllOperations::ClusterWiringHandler, private: + broker::Broker& broker; broker::RecoveryManagerImpl recovery; + boost::intrusive_ptr queueHandler; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/overview.h b/qpid/cpp/src/qpid/cluster/exp/overview.h new file mode 100644 index 0000000000..3a0189d750 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/overview.h @@ -0,0 +1,13 @@ +// This file is documentation in doxygen format. +/** + +

New cluster implementation overview + +There are 3 areas indicated by a suffix on class names: + +- Replica: State that is replicated to the entire cluster. Only called by Handlers in the deliver thread. +- Context: State that is private to this member. Called by both Replia and broker objects in deliver and connection threads. +- Handler: Dispatch CPG messages by calling Replica objects in the deliver thread. + + +**/ diff --git a/qpid/cpp/src/qpid/sys/Stoppable.h b/qpid/cpp/src/qpid/sys/Stoppable.h index af21af46ba..6bb02bc6af 100644 --- a/qpid/cpp/src/qpid/sys/Stoppable.h +++ b/qpid/cpp/src/qpid/sys/Stoppable.h @@ -21,17 +21,27 @@ * under the License. * */ + +#include + namespace qpid { namespace sys { +// FIXME aconway 2011-05-25: needs better name + /** * An activity that may be executed by multiple threads, and can be stopped. - * Stopping prevents new threads from entering and waits till exiting busy threads leave. + * + * Stopping prevents new threads from entering and calls a callback + * when all busy threads leave. */ class Stoppable { public: - Stoppable() : busy(0), stopped(false) {} - ~Stoppable() { stop(); } + /** + *@param stoppedCallback: called when all threads have stopped. + */ + Stoppable(boost::function stoppedCallback) + : busy(0), stopped(false), notify(stoppedCallback) {} /** Mark the scope of a busy thread like this: *
@@ -52,38 +62,49 @@ class Stoppable {
 
   friend class Scope;
 
-    /** Mark  stopped, wait for all threads to leave their busy scope. */
+    /**
+     * Set state to "stopped", so no new threads can enter.
+     * Call notify function when all busy threads have left.
+     */
+    // FIXME aconway 2011-06-27: not guaranteed that stopped will be called,
+    // deadlock?
     void stop() {
         sys::Monitor::ScopedLock l(lock);
         stopped = true;
-        while (busy > 0) lock.wait();
+        check();
     }
 
-    /** Set the state to started.
-     *@pre state is stopped and no theads are busy.
+    /** Set the state to "started", allow threads to enter.
      */
     void start() {
         sys::Monitor::ScopedLock l(lock);
-        assert(stopped && busy == 0); // FIXME aconway 2011-05-06: error handling.
         stopped = false;
     }
 
-  private:
-    uint busy;
-    bool stopped;
-    sys::Monitor lock;
-
+    // Busy thread enters scope
     bool enter() {
         sys::Monitor::ScopedLock l(lock);
         if (!stopped) ++busy;
         return !stopped;
     }
 
+    // Busy thread exits scope
     void exit() {
         sys::Monitor::ScopedLock l(lock);
         assert(busy > 0);
-        if (--busy == 0) lock.notifyAll();
+        --busy;
+        check();
+    }
+
+  private:
+    void check() {
+        if (stopped && busy == 0 && notify) notify();
     }
+
+    uint busy;
+    bool stopped;
+    sys::Monitor lock;
+    boost::function< void() > notify;
 };
 
 }} // namespace qpid::sys
diff --git a/qpid/cpp/src/tests/BrokerClusterCalls.cpp b/qpid/cpp/src/tests/BrokerClusterCalls.cpp
index 53d0f2102a..4311cf51cf 100644
--- a/qpid/cpp/src/tests/BrokerClusterCalls.cpp
+++ b/qpid/cpp/src/tests/BrokerClusterCalls.cpp
@@ -56,13 +56,18 @@ class DummyCluster : public broker::Cluster
      */
     bool isRouting;
 
+    // Record a QueuedMessage
     void recordQm(const string& op, const broker::QueuedMessage& qm) {
         history += (format("%s(%s, %d, %s)") % op % qm.queue->getName()
                     % qm.position % qm.payload->getFrames().getContent()).str();
     }
+
+    // Record a message
     void recordMsg(const string& op, broker::Queue& q, intrusive_ptr msg) {
         history += (format("%s(%s, %s)") % op % q.getName() % msg->getFrames().getContent()).str();
     }
+
+    // Record a string
     void recordStr(const string& op, const string& name) {
         history += (format("%s(%s)") % op % name).str();
     }
@@ -102,6 +107,11 @@ class DummyCluster : public broker::Cluster
         history += (format("cancel(%s, %d)") % q.getName() % n).str();
     }
 
+    // Queues
+    // FIXME aconway 2011-05-18: update test to exercise empty()
+    virtual void empty(broker::Queue& q) { recordStr("empty", q.getName()); }
+    virtual void stopped(broker::Queue& q) { recordStr("stopped", q.getName()); }
+
     // Wiring
 
     virtual void create(broker::Queue& q) { recordStr("createq", q.getName()); }
@@ -230,7 +240,7 @@ QPID_AUTO_TEST_CASE(testReleaseReject) {
     h.clear();
     i = 0;
     m = Message("t");
-    m.setTtl(Duration(1));                // Timeout 1ms
+    m.setTtl(Duration(1));      // Timeout 1ms
     sender.send(m);
     usleep(2000);               // Sleep 2ms
     bool received = receiver.fetch(m, Duration::IMMEDIATE);
@@ -239,6 +249,10 @@ QPID_AUTO_TEST_CASE(testReleaseReject) {
     BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, t)");
     BOOST_CHECK_EQUAL(h.at(i++), "routed(t)");
     BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, t)");
+    // Note: empty is called once for each receiver.
+    BOOST_CHECK_EQUAL(h.at(i++), "empty(q)");
+    BOOST_CHECK_EQUAL(h.at(i++), "empty(q)");
+    BOOST_CHECK_EQUAL(h.at(i++), "empty(q)");
     BOOST_CHECK_EQUAL(h.size(), i);
 
     // Message replaced on LVQ
diff --git a/qpid/cpp/src/tests/cluster2_tests.py b/qpid/cpp/src/tests/cluster2_tests.py
index f17dfe2961..1cf749cdb4 100755
--- a/qpid/cpp/src/tests/cluster2_tests.py
+++ b/qpid/cpp/src/tests/cluster2_tests.py
@@ -33,8 +33,27 @@ log = getLogger("qpid.cluster_tests")
 class Cluster2Tests(BrokerTest):
     """Tests for new cluster code."""
 
-    def verify_content(self, content, receiver):
-        for c in content: self.assertEqual(c, receiver.fetch(1).content)
+    def queue_exists(self, queue, connection):
+        s = connection.session()
+        try:
+            s.sender(queue)
+            return True
+        except qpid.messaging.exceptions.NotFound:
+            return False
+
+    # FIXME aconway 2011-06-22: needed to compensate for
+    # async wiring in early cluster2 prototype
+    def wait_for_queue(self, queue, connections, timeout=10):
+        deadline = time.time() + timeout
+        for c in connections:
+            while not self.queue_exists(queue,c):
+                if time.time() >  timeout: fail("Time out in wait_for_queue(%s))"%queue)
+                time.sleep(0.01)
+
+    # FIXME aconway 2011-05-17: remove, use assert_browse.
+    def verify_content(self, expect, receiver):
+        actual = [receiver.fetch(1).content for x in expect]
+        self.assertEqual(expect, actual)
         self.assertRaises(Empty, receiver.fetch, 0)
 
     def test_message_enqueue(self):
@@ -74,12 +93,15 @@ class Cluster2Tests(BrokerTest):
         s0 = sn0.sender("q;{create:always,delete:always}")
         r0 = sn0.receiver("q")
         sn1 = cluster[1].connect().session()
-        r1 = sn1.receiver("q;{create:always}") # Not yet replicating wiring.
+        r1 = sn1.receiver("q;{create:always}")
 
         content = ["a","b","c"]
         for m in content: s0.send(Message(m))
-         # Verify enqueued on cluster[1]
+        # Verify enqueued on members 0 and 1
+        # FIXME aconway 2011-05-13:
+        self.verify_content(content, sn0.receiver("q;{mode:browse}"))
         self.verify_content(content, sn1.receiver("q;{mode:browse}"))
+
         # Dequeue on cluster[0]
         self.assertEqual(r0.fetch(1).content, "a")
         sn0.acknowledge(sync=True)
@@ -114,3 +136,40 @@ class Cluster2Tests(BrokerTest):
         self.assertRaises(NotFound, cluster[1].connect().session().receiver, "ex")
 
         # FIXME aconway 2010-10-29: test unbind, may need to use old API.
+
+    def test_dequeue_mutex(self):
+        """Ensure that one and only one consumer receives each dequeued message."""
+        class Receiver(Thread):
+            def __init__(self, session):
+                self.session = session
+                self.receiver = session.receiver("q")
+                self.messages = []
+                Thread.__init__(self)
+
+            def run(self):
+                try:
+                    while True:
+                        self.messages.append(self.receiver.fetch(1))
+                        self.session.acknowledge()
+                except Empty: pass
+
+        cluster = self.cluster(3, cluster2=True, args=["-t"]) # FIXME aconway 2011-05-13: -t
+        connections = [ b.connect() for  b in cluster]
+        sessions = [ c.session() for c in connections ]
+        sender = sessions[0].sender("q;{create:always}")
+        self.wait_for_queue("q", connections)
+
+        receivers = [ Receiver(s) for s in sessions ]
+        for r in receivers: r.start()
+
+        n = 0
+        t = time.time() + 1             # Send for 1 second.
+        while time.time() < t:
+            sender.send(str(n))
+            n += 1
+        for r in receivers: r.join();
+        print "FIXME", [len(r.messages) for r in receivers] # FIXME aconway 2011-05-17:
+        for r in receivers: assert len(r.messages) # At least one message to each
+        messages = [int(m.content) for r in receivers for m in r.messages ]
+        messages.sort()
+        self.assertEqual(range(n), messages)
diff --git a/qpid/cpp/src/tests/qpid-test-cluster b/qpid/cpp/src/tests/qpid-test-cluster
index 9887406ef9..7522a7fdfd 100755
--- a/qpid/cpp/src/tests/qpid-test-cluster
+++ b/qpid/cpp/src/tests/qpid-test-cluster
@@ -28,7 +28,7 @@ Options:
              Default is $DEFAULT_ENV.
  -c CONFIG   Use CONFIG as qpidd config file. Copies CONFIG to each host.
              Default is $DEFAULT_CONF
- -d          Delete data-dir and log file before starting broker.	     
+ -d          Delete data-dir and log file before starting broker.
 "
     exit 1
 }
@@ -82,6 +82,7 @@ do_start() {
 }
 
 do_stop() {
+
     for h in $HOSTS; do
 	ssh $h "$SOURCE_ENV qpidd -q --no-module-dir --no-data-dir $QPIDD_ARGS"
     done
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index e0cd647894..aac764ee62 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -8,9 +8,9 @@
 - to you under the Apache License, Version 2.0 (the
 - "License"); you may not use this file except in compliance
 - with the License.  You may obtain a copy of the License at
-- 
+-
 -   http://www.apache.org/licenses/LICENSE-2.0
-- 
+-
 - Unless required by applicable law or agreed to in writing,
 - software distributed under the License is distributed on an
 - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -81,7 +81,7 @@
     
       
     
-    
+
     
       
 	
@@ -89,7 +89,7 @@
 	
       
     
-	
+
     
     
       
@@ -149,7 +149,7 @@
 
     
     
-    
+
     
-     
+    
       
       
       
@@ -204,7 +204,7 @@
     
     
       
-    
+
     
     
       
@@ -294,6 +294,7 @@
       
     
 
+    
     
       
       
@@ -303,10 +304,22 @@
       
     
 
-    
+    
+    
       
       
     
+
+    
+      
+      
+    
+
+    
+      
+      
+    
+
   
 
   
@@ -341,4 +354,26 @@
     
 
   
+
+  
+
+  
+    
+    
+      
+    
+    
+    
+      
+    
+    
+    
+      
+    
+  
+
 
-- 
cgit v1.2.1