diff options
author | Alan Conway <aconway@apache.org> | 2011-09-16 20:16:53 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-09-16 20:16:53 +0000 |
commit | 358260bab45abbfea24f686f978b8dcaba10438c (patch) | |
tree | cb8abf8ef189f1fe32e47a7d68604b1d8b3b0bec | |
parent | 277889b4d238fb70e274891384fe56096a3dbc16 (diff) | |
download | qpid-python-358260bab45abbfea24f686f978b8dcaba10438c.tar.gz |
QPID-2920: New cluster release/requeue.
Almost functional, seeing sporadic hangs in qpid-cpp-benchmark with two brokers:
qpid-cpp-benchmark -b localhost:5556,localhost:5555 -r2 -m10000
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1171756 13f79535-47bb-0310-9956-ffa450edef68
26 files changed, 267 insertions, 157 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk index 1809c87ca8..ab6e90baec 100644 --- a/qpid/cpp/src/cluster.mk +++ b/qpid/cpp/src/cluster.mk @@ -83,6 +83,7 @@ cluster_la_SOURCES = \ qpid/cluster/OutputInterceptor.h \ qpid/cluster/PollerDispatch.cpp \ qpid/cluster/PollerDispatch.h \ + qpid/cluster/PrettyId.h \ qpid/cluster/ProxyInputHandler.h \ qpid/cluster/Quorum.h \ qpid/cluster/InitialStatusMap.h \ diff --git a/qpid/cpp/src/qpid/broker/Cluster.h b/qpid/cpp/src/qpid/broker/Cluster.h index 193332692b..0e8b3822a5 100644 --- a/qpid/cpp/src/qpid/broker/Cluster.h +++ b/qpid/cpp/src/qpid/broker/Cluster.h @@ -57,8 +57,7 @@ class Cluster /** A message is delivered to a queue. * Called before actually pushing the message to the queue. - *@return If true the message should be pushed to the queue now. - * otherwise the cluster code will push the message when it is replicated. + *@return If true the message should be enqueued now, false for delayed enqueue. */ virtual bool enqueue(Queue& queue, const boost::intrusive_ptr<Message>&) = 0; @@ -71,8 +70,10 @@ class Cluster /** A locally-acquired message is released by the consumer and re-queued. */ virtual void release(const QueuedMessage&) = 0; - /** A message is removed from the queue. */ - virtual void dequeue(const QueuedMessage&) = 0; + /** A message is removed from the queue. + *@return true if the message should be dequeued, false for delayed dequeue. + */ + virtual bool dequeue(const QueuedMessage&) = 0; // Consumers diff --git a/qpid/cpp/src/qpid/broker/NullCluster.h b/qpid/cpp/src/qpid/broker/NullCluster.h index 399e2a3ca6..16a62beace 100644 --- a/qpid/cpp/src/qpid/broker/NullCluster.h +++ b/qpid/cpp/src/qpid/broker/NullCluster.h @@ -42,7 +42,7 @@ class NullCluster : public Cluster virtual void routed(const boost::intrusive_ptr<Message>&) {} virtual void acquire(const QueuedMessage&) {} virtual void release(const QueuedMessage&) {} - virtual void dequeue(const QueuedMessage&) {} + virtual bool dequeue(const QueuedMessage&) { return false; } // Consumers diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 32b037bb21..6b632ed737 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -113,7 +113,7 @@ Queue::Queue(const string& _name, bool _autodelete, deleted(false), barrier(*this), autoDeleteTimeout(0), - dispatching(boost::bind(&Queue::acquireStopped,this)) + consuming(boost::bind(&Queue::consumingStopped,this)) { if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); @@ -154,7 +154,7 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){ // Check for deferred delivery in a cluster. if (broker && broker->deferDelivery(name, msg)) return; - // Same thing but for the new cluster interface. + // Check for deferred delivery with new cluster interface. if (broker && !broker->getCluster().enqueue(*this, msg)) return; @@ -227,39 +227,32 @@ void Queue::requeue(const QueuedMessage& msg){ } } } - - if (broker) broker->getCluster().release(msg); + if (broker) broker->getCluster().release(msg); // FIXME aconway 2011-09-12: review. rename requeue? copy.notify(); } /** Mark a scope that acquires a message. * - * ClusterAcquireScope is declared before are taken. The calling - * function sets qmsg with the lock held, but the call to - * Cluster::acquire() will happen after the lock is released. + * ClusterAcquireScope is declared before locks are taken. The + * calling function sets qmsg with the lock held, but the call to + * Cluster::acquire() will happen after the lock is released in + * ~ClusterAcquireScope(). * * Also marks a Stoppable as busy for the duration of the scope. **/ struct ClusterAcquireScope { - Broker* broker; - Queue& queue; QueuedMessage qmsg; - ClusterAcquireScope(Queue& q) : broker(q.getBroker()), queue(q) {} + ClusterAcquireScope() {} ~ClusterAcquireScope() { - if (broker) { - // FIXME aconway 2011-06-27: Move to QueueContext. - // Avoid the indirection via queuename. - if (qmsg.queue) broker->getCluster().acquire(qmsg); - else broker->getCluster().empty(queue); - } + if (qmsg.queue) qmsg.queue->getBroker()->getCluster().acquire(qmsg); } }; bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) { - ClusterAcquireScope acquireScope(*this); // Outside lock + ClusterAcquireScope acquireScope; // Outside lock Mutex::ScopedLock locker(messageLock); assertClusterSafe(); QPID_LOG(debug, "Attempting to acquire message at " << position); @@ -312,13 +305,13 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) { while (true) { - Stoppable::Scope stopper(dispatching); // FIXME aconway 2011-06-28: rename consuming - if (!stopper) { + Stoppable::Scope consumeScope(consuming); + if (!consumeScope) { QPID_LOG(trace, "Queue is stopped: " << name); listeners.addListener(c); return NO_MESSAGES; } - ClusterAcquireScope acquireScope(*this); // Outside the lock + ClusterAcquireScope acquireScope; // Outside the lock Mutex::ScopedLock locker(messageLock); if (messages->empty()) { // FIXME aconway 2011-06-07: ugly QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); @@ -461,7 +454,7 @@ void Queue::cancel(Consumer::shared_ptr c){ } QueuedMessage Queue::get(){ - ClusterAcquireScope acquireScope(*this); // Outside lock + ClusterAcquireScope acquireScope; // Outside lock Mutex::ScopedLock locker(messageLock); QueuedMessage msg(this); if (messages->pop(msg)) acquireScope.qmsg = msg; @@ -709,6 +702,10 @@ void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg) // return true if store exists, bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { + // FIXME aconway 2011-09-13: new cluster needs tx/dtx support. + if (!ctxt && broker) + if (!broker->getCluster().dequeue(msg)) return false; + ScopedUse u(barrier); if (!u.acquired) return false; { @@ -719,8 +716,6 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) } } - if (!ctxt && broker) broker->getCluster().dequeue(msg); // Outside lock - // This check prevents messages which have been forced persistent on one queue from dequeuing // from another on which no forcing has taken place and thus causing a store error. bool fp = msg.payload->isForcedPersistent(); @@ -737,7 +732,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) void Queue::dequeueCommitted(const QueuedMessage& msg) { - if (broker) broker->getCluster().dequeue(msg); // Outside lock + // FIXME aconway 2011-09-13: new cluster needs TX support. Mutex::ScopedLock locker(messageLock); dequeued(msg); if (mgmtObject != 0) { @@ -919,7 +914,7 @@ void Queue::notifyDeleted() set.notifyAll(); } -void Queue::acquireStopped() { +void Queue::consumingStopped() { if (broker) broker->getCluster().stopped(*this); } @@ -1291,15 +1286,13 @@ void Queue::UsageBarrier::destroy() while (count) parent.messageLock.wait(); } -// FIXME aconway 2011-05-06: naming - only affects consumers. stopDispatch()? -void Queue::stop() { +void Queue::stopConsumers() { QPID_LOG(trace, "Queue stopped: " << getName()); - // FIXME aconway 2011-05-25: rename dispatching - acquiring? - dispatching.stop(); + consuming.stop(); } -void Queue::start() { +void Queue::startConsumers() { QPID_LOG(trace, "Queue started: " << getName()); - dispatching.start(); + consuming.start(); notifyListener(); } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 9435750b4e..6c9c111dbb 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -132,9 +132,8 @@ class Queue : public boost::enable_shared_from_this<Queue>, UsageBarrier barrier; int autoDeleteTimeout; boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask; - // Allow dispatching consumer threads to be stopped. Used by cluster - sys::Stoppable dispatching; // FIXME aconway 2011-06-07: name: acquiring? - boost::intrusive_ptr<RefCounted> clusterContext; + sys::Stoppable consuming; // Allow consumer threads to be stopped, used by cluster + boost::intrusive_ptr<RefCounted> clusterContext; // Used by cluster void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false); void setPolicy(std::auto_ptr<QueuePolicy> policy); @@ -182,7 +181,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, void checkNotDeleted(); void notifyDeleted(); - void acquireStopped(); + void consumingStopped(); public: @@ -396,10 +395,10 @@ class Queue : public boost::enable_shared_from_this<Queue>, /** Stop consumers. Return when all consumer threads are stopped. *@pre Queue is active and not already stopping. */ - void stop(); + void stopConsumers(); /** Start consumers. */ - void start(); + void startConsumers(); /** Context information used in a cluster. */ boost::intrusive_ptr<RefCounted> getClusterContext() { return clusterContext; } diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp index f30a790547..4014b0ce37 100644 --- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp @@ -93,10 +93,9 @@ bool BrokerContext::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& m core.getRoutingMap().put(tssRoutingId, msg); } core.mcast(ClusterMessageEnqueueBody(ProtocolVersion(), tssRoutingId, queue.getName())); - // TODO aconway 2010-10-21: configable option for strict (wait - // for CPG deliver to do local deliver) vs. loose (local deliver - // immediately). - return false; + // TODO aconway 2010-10-21: review delivery options: strict (wait + // for CPG delivery vs loose (local deliver immediately). + return false; // Strict delivery, cluster will call Queue deliver. } void BrokerContext::routed(const boost::intrusive_ptr<Message>&) { @@ -113,25 +112,27 @@ void BrokerContext::acquire(const broker::QueuedMessage& qm) { ProtocolVersion(), qm.queue->getName(), qm.position)); } -// FIXME aconway 2011-05-24: need to handle acquire and release. -// Dequeue in the wrong place? -void BrokerContext::dequeue(const broker::QueuedMessage& qm) { - if (tssNoReplicate) return; - core.mcast(ClusterMessageDequeueBody( - ProtocolVersion(), qm.queue->getName(), qm.position)); +bool BrokerContext::dequeue(const broker::QueuedMessage& qm) { + if (!tssNoReplicate) + core.mcast(ClusterMessageDequeueBody( + ProtocolVersion(), qm.queue->getName(), qm.position)); + return false; // FIXME aconway 2011-09-14: needed? } -void BrokerContext::release(const broker::QueuedMessage& ) { - // FIXME aconway 2011-05-24: TODO +// FIXME aconway 2011-09-14: rename requeue? +void BrokerContext::release(const broker::QueuedMessage& qm) { + if (!tssNoReplicate) + core.mcast(ClusterMessageReleaseBody( + ProtocolVersion(), qm.queue->getName(), qm.position, qm.payload->getRedelivered())); } // FIXME aconway 2011-06-08: should be be using shared_ptr to q here? void BrokerContext::create(broker::Queue& q) { - if (tssNoReplicate) return; // FIXME aconway 2011-06-08: revisit - // FIXME aconway 2011-06-08: error handling- if already set... - // Create local context immediately, queue will be stopped until replicated. + q.stopConsumers(); // FIXME aconway 2011-09-14: Stop queue initially. + if (tssNoReplicate) return; + assert(!QueueContext::get(q)); boost::intrusive_ptr<QueueContext> context( - new QueueContext(q,core.getMulticaster())); + new QueueContext(q, core.getMulticaster())); std::string data(q.encodedSize(), '\0'); framing::Buffer buf(&data[0], data.size()); q.encode(buf); @@ -174,11 +175,12 @@ void BrokerContext::unbind(broker::Queue& q, broker::Exchange& ex, } // n is the number of consumers including the one just added. -// FIXME aconway 2011-06-27: rename, conflicting terms. +// FIXME aconway 2011-06-27: rename, conflicting terms. subscribe? void BrokerContext::consume(broker::Queue& q, size_t n) { QueueContext::get(q)->consume(n); } +// FIXME aconway 2011-09-13: rename unsubscribe? // n is the number of consumers after the cancel. void BrokerContext::cancel(broker::Queue& q, size_t n) { QueueContext::get(q)->cancel(n); diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h index fc19d6487b..6172296823 100644 --- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h +++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h @@ -57,7 +57,7 @@ class BrokerContext : public broker::Cluster bool enqueue(broker::Queue&, const boost::intrusive_ptr<broker::Message>&); void routed(const boost::intrusive_ptr<broker::Message>&); void acquire(const broker::QueuedMessage&); - void dequeue(const broker::QueuedMessage&); + bool dequeue(const broker::QueuedMessage&); void release(const broker::QueuedMessage&); // Consumers diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.cpp b/qpid/cpp/src/qpid/cluster/exp/Core.cpp index 7bcc068120..5241b9e414 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Core.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/Core.cpp @@ -68,7 +68,6 @@ void Core::fatal() { } void Core::mcast(const framing::AMQBody& body) { - QPID_LOG(trace, "cluster multicast: " << body); multicaster.mcast(body); } diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp index beebe9fc16..4653cbf1ca 100644 --- a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp @@ -22,6 +22,7 @@ #include "Core.h" #include "EventHandler.h" #include "HandlerBase.h" +#include "PrettyId.h" #include "qpid/broker/Broker.h" #include "qpid/cluster/types.h" #include "qpid/framing/AMQFrame.h" @@ -49,17 +50,6 @@ void EventHandler::start() { dispatcher.start(); } -// Print member ID or "self" if member is self -struct PrettyId { - MemberId id, self; - PrettyId(const MemberId& id_, const MemberId& self_) : id(id_), self(self_) {} -}; - -std::ostream& operator<<(std::ostream& o, const PrettyId& id) { - if (id.id == id.self) return o << "self"; - else return o << id.id; -} - // Deliver CPG message. void EventHandler::deliver( cpg_handle_t /*handle*/, diff --git a/qpid/cpp/src/qpid/cluster/exp/LockedMap.h b/qpid/cpp/src/qpid/cluster/exp/LockedMap.h index c0afe740f8..7294ff767e 100644 --- a/qpid/cpp/src/qpid/cluster/exp/LockedMap.h +++ b/qpid/cpp/src/qpid/cluster/exp/LockedMap.h @@ -39,8 +39,20 @@ class LockedMap Value get(const Key& key) const { sys::RWlock::ScopedRlock r(lock); typename Map::const_iterator i = map.find(key); - if (i == map.end()) return Value(); - else return i->second; + return (i == map.end()) ? Value() : i->second; + } + + /** Update value with the value for key. + *@return true if key was found. + */ + bool get(const Key& key, Value& value) const { + sys::RWlock::ScopedRlock r(lock); + typename Map::const_iterator i = map.find(key); + if (i != map.end()) { + value = i->second; + return true; + } + return false; } /** Associate value with key, overwriting any previous value for key. */ diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp index 7e9a1219ae..0dbbaca83b 100644 --- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp @@ -22,7 +22,9 @@ #include "Core.h" #include "MessageHandler.h" #include "BrokerContext.h" +#include "QueueContext.h" #include "EventHandler.h" +#include "PrettyId.h" #include "qpid/broker/Message.h" #include "qpid/broker/Broker.h" #include "qpid/broker/QueueRegistry.h" @@ -72,7 +74,7 @@ void MessageHandler::enqueue(RoutingId routingId, const std::string& q) { else msg = memberMap[sender()].routingMap[routingId]; if (!msg) throw Exception(QPID_MSG("Cluster enqueue on " << q - << " failed: unknown message")); + << " failed: unknown message")); BrokerContext::ScopedSuppressReplication ssr; queue->deliver(msg); } @@ -84,40 +86,51 @@ void MessageHandler::routed(RoutingId routingId) { memberMap[sender()].routingMap.erase(routingId); } +// FIXME aconway 2011-09-14: performance: pack acquires into a SequenceSet +// and scan queue once. void MessageHandler::acquire(const std::string& q, uint32_t position) { // Note acquires from other members. My own acquires were executed in // the connection thread if (sender() != self()) { - // FIXME aconway 2010-10-28: need to store acquired messages on QueueContext - // by broker for possible re-queuing if a broker leaves. - boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue failed"); + boost::shared_ptr<Queue> queue = findQueue(q, "Cluster acquire failed"); QueuedMessage qm; BrokerContext::ScopedSuppressReplication ssr; bool ok = queue->acquireMessageAt(position, qm); - (void)ok; // Avoid unused variable warnings. - assert(ok); // FIXME aconway 2011-08-04: failing this assertion. + (void)ok; // Avoid unused variable warnings. + assert(ok); // FIXME aconway 2011-09-14: error handling assert(qm.position.getValue() == position); assert(qm.payload); + // Save for possible requeue. + QueueContext::get(*queue)->acquire(qm); } -} + QPID_LOG(trace, "cluster message " << q << "[" << position + << "] acquired by " << PrettyId(sender(), self())); + } -void MessageHandler::dequeue(const std::string& q, uint32_t /*position*/) { +void MessageHandler::dequeue(const std::string& q, uint32_t position) { if (sender() == self()) { // FIXME aconway 2010-10-28: we should complete the ack that initiated // the dequeue at this point, see BrokerContext::dequeue - return; } - boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue failed"); - BrokerContext::ScopedSuppressReplication ssr; - // FIXME aconway 2011-05-12: Remove the acquired message from QueueContext. - // Do we need to call this? Review with gsim. - // QueuedMessage qm; - // Get qm from QueueContext? - // queue->dequeue(0, qm); + else { + // FIXME aconway 2011-09-15: new cluster, inefficient looks up + // message by position multiple times? + boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue failed"); + // Remove fom the unacked list + QueueContext::get(*queue)->dequeue(position); + BrokerContext::ScopedSuppressReplication ssr; + QueuedMessage qm = queue->find(position); + if (qm.queue) queue->dequeue(0, qm); + } } -void MessageHandler::release(const std::string& /*queue*/ , uint32_t /*position*/) { - // FIXME aconway 2011-05-24: +// FIXME aconway 2011-09-14: rename as requeue? +void MessageHandler::release(const std::string& q, uint32_t position, bool redelivered) { + // FIXME aconway 2011-09-15: review release/requeue logic. + if (sender() != self()) { + boost::shared_ptr<Queue> queue = findQueue(q, "Cluster release failed"); + QueueContext::get(*queue)->requeue(position, redelivered); + } } }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h index 0a010a8ecf..dba5b784ad 100644 --- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h +++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h @@ -60,7 +60,7 @@ class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler void routed(uint32_t routingId); void acquire(const std::string& queue, uint32_t position); void dequeue(const std::string& queue, uint32_t position); - void release(const std::string& queue, uint32_t position); + void release(const std::string& queue, uint32_t position, bool redelivered); private: struct Member { diff --git a/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp b/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp index 427c25093a..9d8a00e217 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp @@ -51,7 +51,8 @@ Multicaster::Multicaster(Cpg& cpg_, queue.start(); } -void Multicaster::mcast(const framing::AMQDataBlock& data) { +void Multicaster::mcast(const framing::AMQFrame& data) { + QPID_LOG(trace, "cluster multicast: " << data); BufferRef bufRef = buffers.get(data.encodedSize()); framing::Buffer buf(bufRef.begin(), bufRef.size()); data.encode(buf); diff --git a/qpid/cpp/src/qpid/cluster/exp/Multicaster.h b/qpid/cpp/src/qpid/cluster/exp/Multicaster.h index 6953d2bfbd..c28f29f1a3 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Multicaster.h +++ b/qpid/cpp/src/qpid/cluster/exp/Multicaster.h @@ -30,7 +30,7 @@ namespace qpid { namespace framing { -class AMQDataBlock; +class AMQFrame; class AMQBody; } @@ -54,7 +54,7 @@ class Multicaster ); /** Multicast an event */ - void mcast(const framing::AMQDataBlock&); + void mcast(const framing::AMQFrame&); void mcast(const framing::AMQBody&); private: diff --git a/qpid/cpp/src/qpid/cluster/exp/PrettyId.h b/qpid/cpp/src/qpid/cluster/exp/PrettyId.h new file mode 100644 index 0000000000..0f7651151b --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/PrettyId.h @@ -0,0 +1,46 @@ +#ifndef QPID_CLUSTER_EXP_PRETTYID_H +#define QPID_CLUSTER_EXP_PRETTYID_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/cluster/types.h" + +namespace qpid { +namespace cluster { + +/** + * Wrapper for a MemberId that prints as the member ID or the string + * "self" if the member is self. + */ +struct PrettyId { + MemberId id, self; + PrettyId(const MemberId& id_, const MemberId& self_) : id(id_), self(self_) {} +}; + +inline std::ostream& operator<<(std::ostream& o, const PrettyId& id) { + if (id.id == id.self) return o << "self"; + else return o << id.id; +} + + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_EXP_PRETTYID_H*/ diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp index 60b218da14..55006911a6 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp @@ -20,13 +20,16 @@ */ #include "QueueContext.h" + #include "Multicaster.h" +#include "BrokerContext.h" // for ScopedSuppressReplication #include "qpid/framing/ProtocolVersion.h" #include "qpid/framing/ClusterQueueResubscribeBody.h" #include "qpid/framing/ClusterQueueSubscribeBody.h" #include "qpid/framing/ClusterQueueUnsubscribeBody.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/QueuedMessage.h" #include "qpid/log/Statement.h" #include "qpid/sys/Timer.h" @@ -41,7 +44,6 @@ class OwnershipTimeout : public sys::TimerTask { OwnershipTimeout(QueueContext& qc, const sys::Duration& interval) : TimerTask(interval, "QueueContext::OwnershipTimeout"), queueContext(qc) {} - // FIXME aconway 2011-07-27: thread safety on deletion? void fire() { queueContext.timeout(); } }; @@ -49,32 +51,35 @@ QueueContext::QueueContext(broker::Queue& q, Multicaster& m) : timer(q.getBroker()->getTimer()), queue(q), mcast(m), consumers(0) { q.setClusterContext(boost::intrusive_ptr<QueueContext>(this)); - q.stop(); // Initially stopped. } QueueContext::~QueueContext() { - // FIXME aconway 2011-07-27: revisit shutdown logic. - // timeout() could be called concurrently with destructor. - sys::Mutex::ScopedLock l(lock); if (timerTask) timerTask->cancel(); } +void QueueContext::cancelTimer(const sys::Mutex::ScopedLock&) { + if (timerTask) { // no need for timeout, sole owner. + timerTask->cancel(); + timerTask = 0; + } +} + +// Called by QueueReplica in CPG deliver thread when state changes. void QueueContext::replicaState(QueueOwnership state) { sys::Mutex::ScopedLock l(lock); switch (state) { case UNSUBSCRIBED: case SUBSCRIBED: + cancelTimer(l); + queue.stopConsumers(); break; case SOLE_OWNER: - queue.start(); - if (timerTask) { // no need for timeout. - timerTask->cancel(); - timerTask = 0; - } + cancelTimer(l); // Sole owner, no need for timer. + queue.startConsumers(); break; case SHARED_OWNER: - queue.start(); - if (timerTask) timerTask->cancel(); + cancelTimer(l); + queue.startConsumers(); // FIXME aconway 2011-07-28: configurable interval. timerTask = new OwnershipTimeout(*this, 100*sys::TIME_MSEC); timer.add(timerTask); @@ -82,7 +87,7 @@ void QueueContext::replicaState(QueueOwnership state) { } } -// FIXME aconway 2011-07-27: Dont spin token on an empty queue. Cancel timer. +// FIXME aconway 2011-07-27: Dont spin token on an empty queue. // Called in connection threads when a consumer is added void QueueContext::consume(size_t n) { @@ -96,18 +101,19 @@ void QueueContext::consume(size_t n) { void QueueContext::cancel(size_t n) { sys::Mutex::ScopedLock l(lock); consumers = n; - if (n == 0) queue.stop(); // FIXME aconway 2011-07-28: Ok inside lock? + // When consuming threads are stopped, this->stopped will be called. + if (n == 0) queue.stopConsumers(); // FIXME aconway 2011-07-28: Ok inside lock? } +// Called in timer thread. void QueueContext::timeout() { - QPID_LOG(critical, "FIXME Ownership timeout on queue " << queue.getName()); - queue.stop(); + // FIXME aconway 2011-09-14: need to deal with stray timeouts. + queue.stopConsumers(); // When all threads have stopped, queue will call stopped() } - -// Callback set up by queue.stop(), called when no threads are dispatching from the queue. -// Release the queue. +// Callback set up by queue.stopConsumers() called in connection thread. +// Called when no threads are dispatching from the queue. void QueueContext::stopped() { sys::Mutex::ScopedLock l(lock); // FIXME aconway 2011-07-28: review thread safety of state. @@ -116,16 +122,33 @@ void QueueContext::stopped() { if (consumers == 0) mcast.mcast(framing::ClusterQueueUnsubscribeBody( framing::ProtocolVersion(), queue.getName())); - else + else // FIXME aconway 2011-09-13: check if we're owner? mcast.mcast(framing::ClusterQueueResubscribeBody( framing::ProtocolVersion(), queue.getName())); } +void QueueContext::requeue(uint32_t position, bool redelivered) { + // FIXME aconway 2011-09-15: no lock, unacked has its own lock. + broker::QueuedMessage qm; + if (unacked.get(position, qm)) { + unacked.erase(position); + if (redelivered) qm.payload->redeliver(); + BrokerContext::ScopedSuppressReplication ssr; + queue.requeue(qm); + } +} + +void QueueContext::acquire(const broker::QueuedMessage& qm) { + unacked.put(qm.position, qm); +} + +void QueueContext::dequeue(uint32_t position) { + unacked.erase(position); +} + boost::intrusive_ptr<QueueContext> QueueContext::get(broker::Queue& q) { return boost::intrusive_ptr<QueueContext>( static_cast<QueueContext*>(q.getClusterContext().get())); } - - }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h index c244b57a2e..4571c6744a 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h @@ -22,7 +22,7 @@ * */ - +#include "LockedMap.h" #include <qpid/RefCounted.h> #include "qpid/sys/Time.h" #include <qpid/sys/Mutex.h> @@ -35,6 +35,7 @@ namespace qpid { namespace broker { class Queue; +class QueuedMessage; } namespace sys { class Timer; @@ -60,16 +61,16 @@ class QueueContext : public RefCounted { void replicaState(QueueOwnership); /** Called when queue is stopped, no threads are dispatching. - * Connection or deliver thread. + * May be called in connection or deliver thread. */ void stopped(); - /** Called when a consumer is added to the queue. + /** Called in connection thread when a consumer is added. *@param n: nubmer of consumers after new one is added. */ void consume(size_t n); - /** Called when a consumer is cancelled on the queue. + /** Called in connection thread when a consumer is cancelled on the queue. *@param n: nubmer of consumers after the cancel. */ void cancel(size_t n); @@ -77,9 +78,18 @@ class QueueContext : public RefCounted { /** Get the context for a broker queue. */ static boost::intrusive_ptr<QueueContext> get(broker::Queue&); - /** Called when the timer runs out: stop the queue. */ + /** Called in timer thread when the timer runs out. */ void timeout(); + /** Called by MessageHandler to requeue a message. */ + void requeue(uint32_t position, bool redelivered); + + /** Called by MessageHandler when a mesages is acquired. */ + void acquire(const broker::QueuedMessage& qm); + + /** Called by MesageHandler when a message is dequeued. */ + void dequeue(uint32_t position); + private: sys::Timer& timer; @@ -89,7 +99,10 @@ class QueueContext : public RefCounted { boost::intrusive_ptr<sys::TimerTask> timerTask; size_t consumers; - // FIXME aconway 2011-06-28: need to store acquired messages for possible re-queueing. + typedef LockedMap<uint32_t, broker::QueuedMessage> UnackedMap; // FIXME aconway 2011-09-15: don't need read/write map? Rename + UnackedMap unacked; + + void cancelTimer(const sys::Mutex::ScopedLock& l); }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp index 7d56025fb8..4c2b16e001 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp @@ -58,6 +58,7 @@ void QueueHandler::left(const MemberId& member) { } // FIXME aconway 2011-06-08: do we need to hold on to the shared pointer for lifecycle? +// FIXME aconway 2011-09-13: called from wiring handler, need to consider for multi-cpg. void QueueHandler::add(boost::shared_ptr<broker::Queue> q) { // FIXME aconway 2011-06-08: move create operation from Wiring to Queue handler. // FIXME aconway 2011-05-10: assert not already in map. diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp index 7bbd6e1422..8b451a3eaf 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp @@ -20,6 +20,7 @@ */ #include "QueueReplica.h" #include "QueueContext.h" +#include "PrettyId.h" #include "qpid/broker/Queue.h" #include "qpid/log/Statement.h" #include <algorithm> @@ -30,17 +31,17 @@ namespace cluster { QueueReplica::QueueReplica(boost::shared_ptr<broker::Queue> q, const MemberId& self_) : queue(q), self(self_), context(QueueContext::get(*q)) -{ - // q is initially stopped. -} +{} struct PrintSubscribers { const QueueReplica::MemberQueue& mq; - PrintSubscribers(const QueueReplica::MemberQueue& m) : mq(m) {} + MemberId self; + PrintSubscribers(const QueueReplica::MemberQueue& m, const MemberId& s) : mq(m), self(s) {} }; std::ostream& operator<<(std::ostream& o, const PrintSubscribers& ps) { - copy(ps.mq.begin(), ps.mq.end(), std::ostream_iterator<MemberId>(o, " ")); + for (QueueReplica::MemberQueue::const_iterator i = ps.mq.begin(); i != ps.mq.end(); ++i) + o << PrettyId(*i, ps.self) << " "; return o; } @@ -51,12 +52,10 @@ std::ostream& operator<<(std::ostream& o, QueueOwnership s) { std::ostream& operator<<(std::ostream& o, const QueueReplica& qr) { o << qr.queue->getName() << "(" << qr.getState() << "): " - << PrintSubscribers(qr.subscribers); + << PrintSubscribers(qr.subscribers, qr.getSelf()); return o; } -// FIXME aconway 2011-05-17: error handling for asserts. - void QueueReplica::subscribe(const MemberId& member) { QueueOwnership before = getState(); subscribers.push_back(member); @@ -73,15 +72,16 @@ void QueueReplica::unsubscribe(const MemberId& member) { } void QueueReplica::resubscribe(const MemberId& member) { - assert (member == subscribers.front()); // FIXME aconway 2011-06-27: error handling - QueueOwnership before = getState(); - subscribers.pop_front(); - subscribers.push_back(member); - update(before); + if (member == subscribers.front()) { // FIXME aconway 2011-09-13: should be assert? + QueueOwnership before = getState(); + subscribers.pop_front(); + subscribers.push_back(member); + update(before); + } } void QueueReplica::update(QueueOwnership before) { - QPID_LOG(trace, "QueueReplica " << *this << " (was " << before << ")"); + QPID_LOG(trace, "cluster: queue replica " << *this << " (was " << before << ")"); QueueOwnership after = getState(); if (before == after) return; context->replicaState(after); diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h index 4ebbc84ef0..a1dca2e33d 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h +++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h @@ -36,6 +36,7 @@ class Queue; namespace cluster { class QueueContext; +struct PrintSubscribers; /** * Queue state that is replicated among all cluster members. @@ -54,12 +55,9 @@ class QueueReplica : public RefCounted void unsubscribe(const MemberId&); void resubscribe(const MemberId&); + MemberId getSelf() const { return self; } + private: - - friend class PrintSubscribers; - friend std::ostream& operator<<(std::ostream&, QueueOwnership); - friend std::ostream& operator<<(std::ostream&, const QueueReplica&); - typedef std::deque<MemberId> MemberQueue; boost::shared_ptr<broker::Queue> queue; @@ -71,6 +69,11 @@ class QueueReplica : public RefCounted bool isOwner() const; bool isSubscriber(const MemberId&) const; void update(QueueOwnership before); + + friend struct PrintSubscribers; + friend std::ostream& operator<<(std::ostream&, QueueOwnership); + friend std::ostream& operator<<(std::ostream&, const QueueReplica&); + friend std::ostream& operator<<(std::ostream& o, const PrintSubscribers& ps); }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/sys/Stoppable.h b/qpid/cpp/src/qpid/sys/Stoppable.h index ac0f03d3a1..113a676503 100644 --- a/qpid/cpp/src/qpid/sys/Stoppable.h +++ b/qpid/cpp/src/qpid/sys/Stoppable.h @@ -33,11 +33,12 @@ namespace sys { * An activity that may be executed by multiple threads, and can be stopped. * * Stopping prevents new threads from entering and calls a callback - * when all busy threads leave. + * when all busy threads have left. */ class Stoppable { public: /** + * Initially not stopped. *@param stoppedCallback: called when all threads have stopped. */ Stoppable(boost::function<void()> stoppedCallback) @@ -55,7 +56,7 @@ class Stoppable { Stoppable& state; bool entered; public: - Scope(Stoppable& s) : state(s) { entered = s.enter(); } + Scope(Stoppable& s) : state(s) { entered = state.enter(); } ~Scope() { if (entered) state.exit(); } operator bool() const { return entered; } }; @@ -69,6 +70,7 @@ class Stoppable { */ void stop() { sys::Monitor::ScopedLock l(lock); + if (stopped) return; stopped = true; check(); } @@ -81,6 +83,8 @@ class Stoppable { stopped = false; } + private: + // Busy thread enters scope bool enter() { sys::Monitor::ScopedLock l(lock); @@ -96,8 +100,8 @@ class Stoppable { check(); } - private: void check() { + // Called with lock held. if (stopped && busy == 0 && notify) notify(); } diff --git a/qpid/cpp/src/tests/BrokerClusterCalls.cpp b/qpid/cpp/src/tests/BrokerClusterCalls.cpp index 01c0639bf0..7975210e4e 100644 --- a/qpid/cpp/src/tests/BrokerClusterCalls.cpp +++ b/qpid/cpp/src/tests/BrokerClusterCalls.cpp @@ -94,8 +94,9 @@ class DummyCluster : public broker::Cluster virtual void release(const broker::QueuedMessage& qm) { if (!isRouting) recordQm("release", qm); } - virtual void dequeue(const broker::QueuedMessage& qm) { + virtual bool dequeue(const broker::QueuedMessage& qm) { if (!isRouting) recordQm("dequeue", qm); + return false; } // Consumers diff --git a/qpid/cpp/src/tests/cluster2_tests.py b/qpid/cpp/src/tests/cluster2_tests.py index ad13986ad3..81bc71d22f 100755 --- a/qpid/cpp/src/tests/cluster2_tests.py +++ b/qpid/cpp/src/tests/cluster2_tests.py @@ -159,7 +159,7 @@ class Cluster2Tests(BrokerTest): self.session.acknowledge() except Empty: pass - cluster = self.cluster(3, cluster2=True, args=["-t"]) # FIXME aconway 2011-05-13: -t + cluster = self.cluster(3, cluster2=True) connections = [ b.connect() for b in cluster] sessions = [ c.session() for c in connections ] sender = sessions[0].sender("q;{create:always}") diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark index fcc76f6cf3..6da0c11944 100755 --- a/qpid/cpp/src/tests/qpid-cpp-benchmark +++ b/qpid/cpp/src/tests/qpid-cpp-benchmark @@ -115,7 +115,9 @@ def start_receive(queue, index, opts, ready_queue, broker, host): if opts.connection_options: command += ["--connection-options",opts.connection_options] if host: command = ssh_command(host, command) - return clients.add(Popen(command, stdout=PIPE, stderr=PIPE)) + # FIXME aconway 2011-09-15: + # return clients.add(Popen(command, stdout=PIPE, stderr=PIPE)) + return clients.add(Popen(command, stdout=PIPE)) def start_send(queue, opts, broker, host): address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:always"])) @@ -128,7 +130,9 @@ def start_send(queue, opts, broker, host): "--report-total", "--report-header=no", "--timestamp=%s"%(opts.timestamp and "yes" or "no"), - "--sequence=no", + # FIXME aconway 2011-09-15: + # "--sequence=no", + "--sequence=yes", "--flow-control", str(opts.flow_control), "--durable", str(opts.durable) ] @@ -166,12 +170,12 @@ def recreate_queues(queues, brokers): for q in queues: try: s.sender("%s;{delete:always}"%(q)).close() except qpid.messaging.exceptions.NotFound: pass - # FIXME aconway 2011-05-04: async wiring, wait for changes to propagate. + # FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate for b in brokers: while queue_exists(q,b): time.sleep(0.1); - for q in queues: - s.sender("%s;{create:always}"%q) - # FIXME aconway 2011-05-04: async wiring, wait for changes to propagate. + for q in queues: + s.sender("%s;{create:always}"%q) + # FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate for b in brokers: while not queue_exists(q,b): time.sleep(0.1); c.close() @@ -182,8 +186,6 @@ def print_header(timestamp): print "send-tp\t\trecv-tp%s"%latency_header def parse(parser, lines): # Parse sender/receiver output - for l in lines: - fn_val = zip(parser, l) return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines] def parse_senders(senders): diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp index fc33685407..ae4f341efa 100644 --- a/qpid/cpp/src/tests/qpid-receive.cpp +++ b/qpid/cpp/src/tests/qpid-receive.cpp @@ -198,6 +198,7 @@ int main(int argc, char ** argv) std::map<std::string,Sender> replyTo; while (!done && receiver.fetch(msg, timeout)) { + cerr << "FIXME " << msg.getProperties()[SN] << endl; if (!started) { // Start the time on receipt of the first message to avoid counting // idle time at process startup. @@ -207,6 +208,7 @@ int main(int argc, char ** argv) reporter.message(msg); if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) { if (msg.getContent() == EOS) { + cerr << "FIXME eos" << endl; done = true; } else { ++count; @@ -224,7 +226,10 @@ int main(int argc, char ** argv) } if (opts.printContent) std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages - if (opts.messages && count >= opts.messages) done = true; + if (opts.messages && count >= opts.messages) { + cerr << "FIXME "<< count << " >= " << opts.messages << endl; + done = true; + } } } else if (opts.checkRedelivered && !msg.getRedelivered()) { throw qpid::Exception("duplicate sequence number received, message not marked as redelivered!"); diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index 1fed9e7de1..c84d8e3ef5 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -361,6 +361,7 @@ <control name="release" code="0x6"> <field name="queue" type="queue.name"/> <field name="position" type="uint32"/> + <field name="redelivered" type="bit"/> </control> </class> |