diff options
author | Alan Conway <aconway@apache.org> | 2011-09-16 20:17:25 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-09-16 20:17:25 +0000 |
commit | e253587dd57ffd1788d8adcb2133a6901bab995d (patch) | |
tree | efa828138ae5c6b23f947c7ca91d9f2de3446f6a | |
parent | 358260bab45abbfea24f686f978b8dcaba10438c (diff) | |
download | qpid-python-e253587dd57ffd1788d8adcb2133a6901bab995d.tar.gz |
QPID-2920: Fixing QueueContext state transtions for timed ownership.
- Renamed release to requeue for Cluster interface.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1171757 13f79535-47bb-0310-9956-ffa450edef68
21 files changed, 188 insertions, 105 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk index ab6e90baec..3e22ab696c 100644 --- a/qpid/cpp/src/cluster.mk +++ b/qpid/cpp/src/cluster.mk @@ -115,6 +115,7 @@ cluster2_la_SOURCES = \ qpid/cluster/exp/BrokerContext.h \ qpid/cluster/exp/BufferFactory.h \ qpid/cluster/exp/Cluster2Plugin.cpp \ + qpid/cluster/exp/CountdownTimer.h \ qpid/cluster/exp/Core.cpp \ qpid/cluster/exp/Core.h \ qpid/cluster/exp/EventHandler.cpp \ diff --git a/qpid/cpp/src/qpid/broker/Cluster.h b/qpid/cpp/src/qpid/broker/Cluster.h index 0e8b3822a5..83c5361f3b 100644 --- a/qpid/cpp/src/qpid/broker/Cluster.h +++ b/qpid/cpp/src/qpid/broker/Cluster.h @@ -57,7 +57,7 @@ class Cluster /** A message is delivered to a queue. * Called before actually pushing the message to the queue. - *@return If true the message should be enqueued now, false for delayed enqueue. + *@return If true the message should be enqueued now, false if it will be enqueued later. */ virtual bool enqueue(Queue& queue, const boost::intrusive_ptr<Message>&) = 0; @@ -68,10 +68,11 @@ class Cluster virtual void acquire(const QueuedMessage&) = 0; /** A locally-acquired message is released by the consumer and re-queued. */ - virtual void release(const QueuedMessage&) = 0; + virtual void requeue(const QueuedMessage&) = 0; /** A message is removed from the queue. - *@return true if the message should be dequeued, false for delayed dequeue. + *@return true if the message should be dequeued now, false if it + * will be dequeued later. */ virtual bool dequeue(const QueuedMessage&) = 0; diff --git a/qpid/cpp/src/qpid/broker/NullCluster.h b/qpid/cpp/src/qpid/broker/NullCluster.h index 16a62beace..e3ac6b7594 100644 --- a/qpid/cpp/src/qpid/broker/NullCluster.h +++ b/qpid/cpp/src/qpid/broker/NullCluster.h @@ -41,7 +41,7 @@ class NullCluster : public Cluster virtual bool enqueue(Queue&, const boost::intrusive_ptr<Message>&) { return true; } virtual void routed(const boost::intrusive_ptr<Message>&) {} virtual void acquire(const QueuedMessage&) {} - virtual void release(const QueuedMessage&) {} + virtual void requeue(const QueuedMessage&) {} virtual bool dequeue(const QueuedMessage&) { return false; } // Consumers diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 6b632ed737..1eec0c0b0a 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -227,7 +227,7 @@ void Queue::requeue(const QueuedMessage& msg){ } } } - if (broker) broker->getCluster().release(msg); // FIXME aconway 2011-09-12: review. rename requeue? + if (broker) broker->getCluster().requeue(msg); // FIXME aconway 2011-09-12: review. rename requeue? copy.notify(); } diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp index 4014b0ce37..e06068fd38 100644 --- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp @@ -28,7 +28,7 @@ #include "qpid/framing/ClusterMessageEnqueueBody.h" #include "qpid/framing/ClusterMessageAcquireBody.h" #include "qpid/framing/ClusterMessageDequeueBody.h" -#include "qpid/framing/ClusterMessageReleaseBody.h" +#include "qpid/framing/ClusterMessageRequeueBody.h" #include "qpid/framing/ClusterWiringCreateQueueBody.h" #include "qpid/framing/ClusterWiringCreateExchangeBody.h" #include "qpid/framing/ClusterWiringDestroyQueueBody.h" @@ -93,9 +93,7 @@ bool BrokerContext::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& m core.getRoutingMap().put(tssRoutingId, msg); } core.mcast(ClusterMessageEnqueueBody(ProtocolVersion(), tssRoutingId, queue.getName())); - // TODO aconway 2010-10-21: review delivery options: strict (wait - // for CPG delivery vs loose (local deliver immediately). - return false; // Strict delivery, cluster will call Queue deliver. + return false; // Strict order, wait for CPG self-delivery to enqueue. } void BrokerContext::routed(const boost::intrusive_ptr<Message>&) { @@ -113,22 +111,27 @@ void BrokerContext::acquire(const broker::QueuedMessage& qm) { } bool BrokerContext::dequeue(const broker::QueuedMessage& qm) { + // FIXME aconway 2011-09-15: should dequeue locally immediately + // instead of waiting for redeliver. No need for CPG order on + // dequeues. if (!tssNoReplicate) core.mcast(ClusterMessageDequeueBody( ProtocolVersion(), qm.queue->getName(), qm.position)); return false; // FIXME aconway 2011-09-14: needed? } -// FIXME aconway 2011-09-14: rename requeue? -void BrokerContext::release(const broker::QueuedMessage& qm) { +void BrokerContext::requeue(const broker::QueuedMessage& qm) { if (!tssNoReplicate) - core.mcast(ClusterMessageReleaseBody( - ProtocolVersion(), qm.queue->getName(), qm.position, qm.payload->getRedelivered())); + core.mcast(ClusterMessageRequeueBody( + ProtocolVersion(), + qm.queue->getName(), + qm.position, + qm.payload->getRedelivered())); } // FIXME aconway 2011-06-08: should be be using shared_ptr to q here? void BrokerContext::create(broker::Queue& q) { - q.stopConsumers(); // FIXME aconway 2011-09-14: Stop queue initially. + q.stopConsumers(); // Stop queue initially. if (tssNoReplicate) return; assert(!QueueContext::get(q)); boost::intrusive_ptr<QueueContext> context( @@ -192,8 +195,8 @@ void BrokerContext::empty(broker::Queue& ) { void BrokerContext::stopped(broker::Queue& q) { boost::intrusive_ptr<QueueContext> qc = QueueContext::get(q); - // Don't forward the stopped call if the queue does not yet have a cluster context - // this when the queue is first created locally. + // Don't forward the stopped call if the queue does not yet have a + // cluster context this when the queue is first created locally. if (qc) qc->stopped(); } diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h index 6172296823..0583b7edc7 100644 --- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h +++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h @@ -58,7 +58,7 @@ class BrokerContext : public broker::Cluster void routed(const boost::intrusive_ptr<broker::Message>&); void acquire(const broker::QueuedMessage&); bool dequeue(const broker::QueuedMessage&); - void release(const broker::QueuedMessage&); + void requeue(const broker::QueuedMessage&); // Consumers diff --git a/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h b/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h new file mode 100644 index 0000000000..5d16ce6e10 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h @@ -0,0 +1,95 @@ +#ifndef QPID_CLUSTER_EXP_COUNTDOWNTIMER_H +#define QPID_CLUSTER_EXP_COUNTDOWNTIMER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright countdown. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Timer.h" +#include <boost/function.hpp> + +namespace qpid { +namespace cluster { + +/** Manage the CountdownTimeout */ +class CountdownTimer { + public: + /** + * Resettable count-down timer for a fixed interval. + *@param cb callback when countdown expires. + *@param t Timer to use for countdown. + *@param d duration of countdown. + */ + CountdownTimer(boost::function<void()> cb, sys::Timer& t, sys::Duration d) + : task(new Task(*this, d)), timerRunning(false), callback(cb), timer(t) {} + + ~CountdownTimer() { stop(); } + + /** Start the countdown if not already started. */ + void start() { + sys::Mutex::ScopedLock l(lock); + if (!timerRunning) { + timerRunning = true; + task->restart(); + timer.add(task); + } + } + + /** Stop the countdown if not already stopped. */ + void stop() { + sys::Mutex::ScopedLock l(lock); + if (timerRunning) { + timerRunning = false; + task->cancel(); + } + } + + private: + + class Task : public sys::TimerTask { + CountdownTimer& parent; + public: + Task(CountdownTimer& ct, const sys::Duration& d) : + TimerTask(d, "CountdownTimer::Task"), parent(ct) {} + void fire() { parent.fire(); } + }; + + // Called when countdown expires. + void fire() { + bool doCallback = false; + { + sys::Mutex::ScopedLock l(lock); + doCallback = timerRunning; + timerRunning = false; + } + if (doCallback) callback(); + } + + sys::Mutex lock; + boost::intrusive_ptr<Task> task; + bool timerRunning; + boost::function<void()> callback; + sys::Timer& timer; +}; + + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_EXP_COUNTDOWNTIMER_H*/ diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp index 0dbbaca83b..14a39e1e61 100644 --- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp @@ -95,14 +95,15 @@ void MessageHandler::acquire(const std::string& q, uint32_t position) { boost::shared_ptr<Queue> queue = findQueue(q, "Cluster acquire failed"); QueuedMessage qm; BrokerContext::ScopedSuppressReplication ssr; - bool ok = queue->acquireMessageAt(position, qm); - (void)ok; // Avoid unused variable warnings. - assert(ok); // FIXME aconway 2011-09-14: error handling + if (!queue->acquireMessageAt(position, qm)) + throw Exception(QPID_MSG("Cluster acquire: message not found: " + << q << "[" << position << "]")); assert(qm.position.getValue() == position); assert(qm.payload); - // Save for possible requeue. + // Save on context for possible requeue if released/rejected. QueueContext::get(*queue)->acquire(qm); } + // FIXME aconway 2011-09-15: systematic logging across cluster module. QPID_LOG(trace, "cluster message " << q << "[" << position << "] acquired by " << PrettyId(sender(), self())); } @@ -124,11 +125,9 @@ void MessageHandler::dequeue(const std::string& q, uint32_t position) { } } -// FIXME aconway 2011-09-14: rename as requeue? -void MessageHandler::release(const std::string& q, uint32_t position, bool redelivered) { - // FIXME aconway 2011-09-15: review release/requeue logic. +void MessageHandler::requeue(const std::string& q, uint32_t position, bool redelivered) { if (sender() != self()) { - boost::shared_ptr<Queue> queue = findQueue(q, "Cluster release failed"); + boost::shared_ptr<Queue> queue = findQueue(q, "Cluster requeue failed"); QueueContext::get(*queue)->requeue(position, redelivered); } } diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h index dba5b784ad..40e004d89a 100644 --- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h +++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h @@ -60,7 +60,7 @@ class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler void routed(uint32_t routingId); void acquire(const std::string& queue, uint32_t position); void dequeue(const std::string& queue, uint32_t position); - void release(const std::string& queue, uint32_t position, bool redelivered); + void requeue(const std::string& queue, uint32_t position, bool redelivered); private: struct Member { diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp index 55006911a6..3d0ba40bce 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp @@ -31,63 +31,54 @@ #include "qpid/broker/Queue.h" #include "qpid/broker/QueuedMessage.h" #include "qpid/log/Statement.h" -#include "qpid/sys/Timer.h" namespace qpid { namespace cluster { - -class OwnershipTimeout : public sys::TimerTask { - QueueContext& queueContext; - - public: - OwnershipTimeout(QueueContext& qc, const sys::Duration& interval) : - TimerTask(interval, "QueueContext::OwnershipTimeout"), queueContext(qc) {} - - void fire() { queueContext.timeout(); } -}; - +// FIXME aconway 2011-09-16: configurable timeout. QueueContext::QueueContext(broker::Queue& q, Multicaster& m) - : timer(q.getBroker()->getTimer()), queue(q), mcast(m), consumers(0) + : ownership(UNSUBSCRIBED), + timer(boost::bind(&QueueContext::timeout, this), + q.getBroker()->getTimer(), + 100*sys::TIME_MSEC), + queue(q), mcast(m), consumers(0) { q.setClusterContext(boost::intrusive_ptr<QueueContext>(this)); } -QueueContext::~QueueContext() { - if (timerTask) timerTask->cancel(); -} +QueueContext::~QueueContext() {} -void QueueContext::cancelTimer(const sys::Mutex::ScopedLock&) { - if (timerTask) { // no need for timeout, sole owner. - timerTask->cancel(); - timerTask = 0; - } +// Invariant for ownership: +// UNSUBSCRIBED, SUBSCRIBED => timer stopped, queue stopped +// SOLE_OWNER => timer stopped, queue started +// SHARED_OWNER => timer started, queue started + +namespace { +bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; } } // Called by QueueReplica in CPG deliver thread when state changes. -void QueueContext::replicaState(QueueOwnership state) { +void QueueContext::replicaState(QueueOwnership newOwnership) { sys::Mutex::ScopedLock l(lock); - switch (state) { - case UNSUBSCRIBED: - case SUBSCRIBED: - cancelTimer(l); - queue.stopConsumers(); - break; - case SOLE_OWNER: - cancelTimer(l); // Sole owner, no need for timer. - queue.startConsumers(); - break; - case SHARED_OWNER: - cancelTimer(l); + QueueOwnership before = ownership; + QueueOwnership after = newOwnership; + ownership = after; + if (!isOwner(before) && !isOwner(after)) + ; // Nothing to do, now ownership change on this transition. + else if (isOwner(before) && !isOwner(after)) // Lost ownership + ; // Nothing to do, queue and timer were stopped before + // sending unsubscribe/resubscribe. + else if (!isOwner(before) && isOwner(after)) { // Took ownership queue.startConsumers(); - // FIXME aconway 2011-07-28: configurable interval. - timerTask = new OwnershipTimeout(*this, 100*sys::TIME_MSEC); - timer.add(timerTask); - break; + if (after == SHARED_OWNER) timer.start(); + } + else if (isOwner(before) && isOwner(after) && before != after) { + if (after == SOLE_OWNER) timer.stop(); + else timer.start(); } } -// FIXME aconway 2011-07-27: Dont spin token on an empty queue. +// FIXME aconway 2011-07-27: Dont spin the token on an empty or idle queue. // Called in connection threads when a consumer is added void QueueContext::consume(size_t n) { @@ -102,14 +93,16 @@ void QueueContext::cancel(size_t n) { sys::Mutex::ScopedLock l(lock); consumers = n; // When consuming threads are stopped, this->stopped will be called. - if (n == 0) queue.stopConsumers(); // FIXME aconway 2011-07-28: Ok inside lock? + if (n == 0) { + timer.stop(); + queue.stopConsumers(); // FIXME aconway 2011-07-28: Ok inside lock? + } } // Called in timer thread. void QueueContext::timeout() { - // FIXME aconway 2011-09-14: need to deal with stray timeouts. - queue.stopConsumers(); // When all threads have stopped, queue will call stopped() + queue.stopConsumers(); } // Callback set up by queue.stopConsumers() called in connection thread. @@ -117,18 +110,16 @@ void QueueContext::timeout() { void QueueContext::stopped() { sys::Mutex::ScopedLock l(lock); // FIXME aconway 2011-07-28: review thread safety of state. - // Deffered call to stopped doesn't sit well. - // queueActive is invalid while stop is in progress? if (consumers == 0) mcast.mcast(framing::ClusterQueueUnsubscribeBody( framing::ProtocolVersion(), queue.getName())); - else // FIXME aconway 2011-09-13: check if we're owner? + else // FIXME aconway 2011-09-13: check if we're owner? mcast.mcast(framing::ClusterQueueResubscribeBody( framing::ProtocolVersion(), queue.getName())); } void QueueContext::requeue(uint32_t position, bool redelivered) { - // FIXME aconway 2011-09-15: no lock, unacked has its own lock. + // No lock, unacked has its own lock. broker::QueuedMessage qm; if (unacked.get(position, qm)) { unacked.erase(position); diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h index 4571c6744a..54bc81b175 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h @@ -23,9 +23,10 @@ */ #include "LockedMap.h" -#include <qpid/RefCounted.h> +#include "CountdownTimer.h" +#include "qpid/RefCounted.h" #include "qpid/sys/Time.h" -#include <qpid/sys/Mutex.h> +#include "qpid/sys/Mutex.h" #include "qpid/cluster/types.h" #include <boost/intrusive_ptr.hpp> @@ -37,10 +38,6 @@ namespace broker { class Queue; class QueuedMessage; } -namespace sys { -class Timer; -class TimerTask; -} namespace cluster { @@ -91,18 +88,15 @@ class QueueContext : public RefCounted { void dequeue(uint32_t position); private: - sys::Timer& timer; - sys::Mutex lock; + QueueOwnership ownership; + CountdownTimer timer; broker::Queue& queue; // FIXME aconway 2011-06-08: should be shared/weak ptr? Multicaster& mcast; - boost::intrusive_ptr<sys::TimerTask> timerTask; size_t consumers; typedef LockedMap<uint32_t, broker::QueuedMessage> UnackedMap; // FIXME aconway 2011-09-15: don't need read/write map? Rename UnackedMap unacked; - - void cancelTimer(const sys::Mutex::ScopedLock& l); }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp index 4c2b16e001..37079a17a1 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp @@ -52,7 +52,6 @@ void QueueHandler::resubscribe(const std::string& queue) { void QueueHandler::left(const MemberId& member) { // Unsubscribe for members that leave. - // FIXME aconway 2011-06-28: also need to re-queue acquired messages. for (QueueMap::iterator i = queues.begin(); i != queues.end(); ++i) i->second->unsubscribe(member); } @@ -66,6 +65,7 @@ void QueueHandler::add(boost::shared_ptr<broker::Queue> q) { // Local queues already have a context, remote queues need one. if (!QueueContext::get(*q)) new QueueContext(*q, multicaster); // Context attaches itself to the Queue + // FIXME aconway 2011-09-15: thread safety: called from wiring handler.. queues[q->getName()] = boost::intrusive_ptr<QueueReplica>( new QueueReplica(q, self())); } diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp index 8b451a3eaf..0938498fa3 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp @@ -83,8 +83,7 @@ void QueueReplica::resubscribe(const MemberId& member) { void QueueReplica::update(QueueOwnership before) { QPID_LOG(trace, "cluster: queue replica " << *this << " (was " << before << ")"); QueueOwnership after = getState(); - if (before == after) return; - context->replicaState(after); + if (before != after) context->replicaState(after); } QueueOwnership QueueReplica::getState() const { diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h index a1dca2e33d..20aef058fc 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h +++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h @@ -56,7 +56,7 @@ class QueueReplica : public RefCounted void resubscribe(const MemberId&); MemberId getSelf() const { return self; } - + private: typedef std::deque<MemberId> MemberQueue; diff --git a/qpid/cpp/src/qpid/cluster/exp/README.txt b/qpid/cpp/src/qpid/cluster/exp/README.txt index 97f2a10d84..189c755f09 100644 --- a/qpid/cpp/src/qpid/cluster/exp/README.txt +++ b/qpid/cpp/src/qpid/cluster/exp/README.txt @@ -1,2 +1,4 @@ Experimental code to test ideas about a new cluster design. + +See overview.h diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp index ef4df3cf97..92f7183a08 100644 --- a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp @@ -56,22 +56,23 @@ bool WiringHandler::invoke(const framing::AMQBody& body) { void WiringHandler::createQueue(const std::string& data) { // FIXME aconway 2011-05-25: Needs async completion. std::string name; - if (sender() != self()) { // Created by another member, need to create locally. + if (sender() != self()) { // Created by another member, need to create locally. BrokerContext::ScopedSuppressReplication ssr; framing::Buffer buf(const_cast<char*>(&data[0]), data.size()); // TODO aconway 2011-02-21: asymetric - RecoveryManager vs Broker::create*() RecoverableQueue::shared_ptr rq = recovery.recoverQueue(buf); name = rq->getName(); } - else { // Created locally, Queue and QueueContext already exist. + else { // Created locally, Queue and QueueContext already exist. framing::Buffer buffer(const_cast<char*>(&data[0]), data.size()); // FIXME aconway 2011-05-10: implicit knowledge of queue encoding. buffer.getShortString(name); } boost::shared_ptr<broker::Queue> q = broker.getQueues().find(name); assert(q); // FIXME aconway 2011-05-10: error handling. - // TODO aconway 2011-05-10: if we implement multi-group for queues then - // this call is a problem: comes from wiring delivery thread, not queues. + // TODO aconway 2011-05-10: if we implement multi-group for queues + // then this call is a potential problem: comes from wiring + // delivery thread, not queues. queueHandler->add(q); QPID_LOG(debug, "cluster: create queue " << q->getName()); } diff --git a/qpid/cpp/src/qpid/cluster/exp/overview.h b/qpid/cpp/src/qpid/cluster/exp/overview.h index 3a0189d750..586a711827 100644 --- a/qpid/cpp/src/qpid/cluster/exp/overview.h +++ b/qpid/cpp/src/qpid/cluster/exp/overview.h @@ -3,10 +3,14 @@ <h1>New cluster implementation overview</h> -There are 3 areas indicated by a suffix on class names: +The code is broken down into 3 areas indicated by a suffix on class names: -- Replica: State that is replicated to the entire cluster. Only called by Handlers in the deliver thread. -- Context: State that is private to this member. Called by both Replia and broker objects in deliver and connection threads. +- Replica: State that is replicated to the entire cluster. + Only called by Handlers in the deliver thread. May call on Contexts. + +- Context: State private to this member and associated with a local entity + such as the Broker or a Queue. Called in deliver and connection threads. + - Handler: Dispatch CPG messages by calling Replica objects in the deliver thread. diff --git a/qpid/cpp/src/qpid/cluster/types.h b/qpid/cpp/src/qpid/cluster/types.h index 667d9b89fa..0c234b01c0 100644 --- a/qpid/cpp/src/qpid/cluster/types.h +++ b/qpid/cpp/src/qpid/cluster/types.h @@ -82,8 +82,6 @@ std::ostream& operator<<(std::ostream&, EventType); /** Number to identify a message being routed. */ typedef uint32_t RoutingId; -// FIXME aconway 2011-07-28: can we put these 2 back in the -// QueueReplica & QueueContext? /** State of a queue with respect to a cluster member. */ enum QueueOwnership { UNSUBSCRIBED, diff --git a/qpid/cpp/src/qpid/sys/Stoppable.h b/qpid/cpp/src/qpid/sys/Stoppable.h index 113a676503..6ddf926280 100644 --- a/qpid/cpp/src/qpid/sys/Stoppable.h +++ b/qpid/cpp/src/qpid/sys/Stoppable.h @@ -27,8 +27,6 @@ namespace qpid { namespace sys { -// FIXME aconway 2011-05-25: needs better name - /** * An activity that may be executed by multiple threads, and can be stopped. * @@ -72,7 +70,7 @@ class Stoppable { sys::Monitor::ScopedLock l(lock); if (stopped) return; stopped = true; - check(); + check(l); } /** Set the state to "started", allow threads to enter. @@ -97,10 +95,10 @@ class Stoppable { sys::Monitor::ScopedLock l(lock); assert(busy > 0); --busy; - check(); + check(l); } - void check() { + void check(const sys::Monitor::ScopedLock&) { // Called with lock held. if (stopped && busy == 0 && notify) notify(); } diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp index ae4f341efa..81f697dec0 100644 --- a/qpid/cpp/src/tests/qpid-receive.cpp +++ b/qpid/cpp/src/tests/qpid-receive.cpp @@ -198,7 +198,6 @@ int main(int argc, char ** argv) std::map<std::string,Sender> replyTo; while (!done && receiver.fetch(msg, timeout)) { - cerr << "FIXME " << msg.getProperties()[SN] << endl; if (!started) { // Start the time on receipt of the first message to avoid counting // idle time at process startup. @@ -208,7 +207,6 @@ int main(int argc, char ** argv) reporter.message(msg); if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) { if (msg.getContent() == EOS) { - cerr << "FIXME eos" << endl; done = true; } else { ++count; @@ -227,7 +225,6 @@ int main(int argc, char ** argv) if (opts.printContent) std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages if (opts.messages && count >= opts.messages) { - cerr << "FIXME "<< count << " >= " << opts.messages << endl; done = true; } } diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index c84d8e3ef5..dfcf30ecdf 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -358,7 +358,7 @@ <field name="position" type="uint32"/> </control> - <control name="release" code="0x6"> + <control name="requeue" code="0x6"> <field name="queue" type="queue.name"/> <field name="position" type="uint32"/> <field name="redelivered" type="bit"/> |