summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-09-16 20:17:25 +0000
committerAlan Conway <aconway@apache.org>2011-09-16 20:17:25 +0000
commite253587dd57ffd1788d8adcb2133a6901bab995d (patch)
treeefa828138ae5c6b23f947c7ca91d9f2de3446f6a
parent358260bab45abbfea24f686f978b8dcaba10438c (diff)
downloadqpid-python-e253587dd57ffd1788d8adcb2133a6901bab995d.tar.gz
QPID-2920: Fixing QueueContext state transtions for timed ownership.
- Renamed release to requeue for Cluster interface. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1171757 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/cluster.mk1
-rw-r--r--qpid/cpp/src/qpid/broker/Cluster.h7
-rw-r--r--qpid/cpp/src/qpid/broker/NullCluster.h2
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp25
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BrokerContext.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h95
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp15
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageHandler.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp83
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.h16
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp3
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueReplica.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/README.txt2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp9
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/overview.h10
-rw-r--r--qpid/cpp/src/qpid/cluster/types.h2
-rw-r--r--qpid/cpp/src/qpid/sys/Stoppable.h8
-rw-r--r--qpid/cpp/src/tests/qpid-receive.cpp3
-rw-r--r--qpid/cpp/xml/cluster.xml2
21 files changed, 188 insertions, 105 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk
index ab6e90baec..3e22ab696c 100644
--- a/qpid/cpp/src/cluster.mk
+++ b/qpid/cpp/src/cluster.mk
@@ -115,6 +115,7 @@ cluster2_la_SOURCES = \
qpid/cluster/exp/BrokerContext.h \
qpid/cluster/exp/BufferFactory.h \
qpid/cluster/exp/Cluster2Plugin.cpp \
+ qpid/cluster/exp/CountdownTimer.h \
qpid/cluster/exp/Core.cpp \
qpid/cluster/exp/Core.h \
qpid/cluster/exp/EventHandler.cpp \
diff --git a/qpid/cpp/src/qpid/broker/Cluster.h b/qpid/cpp/src/qpid/broker/Cluster.h
index 0e8b3822a5..83c5361f3b 100644
--- a/qpid/cpp/src/qpid/broker/Cluster.h
+++ b/qpid/cpp/src/qpid/broker/Cluster.h
@@ -57,7 +57,7 @@ class Cluster
/** A message is delivered to a queue.
* Called before actually pushing the message to the queue.
- *@return If true the message should be enqueued now, false for delayed enqueue.
+ *@return If true the message should be enqueued now, false if it will be enqueued later.
*/
virtual bool enqueue(Queue& queue, const boost::intrusive_ptr<Message>&) = 0;
@@ -68,10 +68,11 @@ class Cluster
virtual void acquire(const QueuedMessage&) = 0;
/** A locally-acquired message is released by the consumer and re-queued. */
- virtual void release(const QueuedMessage&) = 0;
+ virtual void requeue(const QueuedMessage&) = 0;
/** A message is removed from the queue.
- *@return true if the message should be dequeued, false for delayed dequeue.
+ *@return true if the message should be dequeued now, false if it
+ * will be dequeued later.
*/
virtual bool dequeue(const QueuedMessage&) = 0;
diff --git a/qpid/cpp/src/qpid/broker/NullCluster.h b/qpid/cpp/src/qpid/broker/NullCluster.h
index 16a62beace..e3ac6b7594 100644
--- a/qpid/cpp/src/qpid/broker/NullCluster.h
+++ b/qpid/cpp/src/qpid/broker/NullCluster.h
@@ -41,7 +41,7 @@ class NullCluster : public Cluster
virtual bool enqueue(Queue&, const boost::intrusive_ptr<Message>&) { return true; }
virtual void routed(const boost::intrusive_ptr<Message>&) {}
virtual void acquire(const QueuedMessage&) {}
- virtual void release(const QueuedMessage&) {}
+ virtual void requeue(const QueuedMessage&) {}
virtual bool dequeue(const QueuedMessage&) { return false; }
// Consumers
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 6b632ed737..1eec0c0b0a 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -227,7 +227,7 @@ void Queue::requeue(const QueuedMessage& msg){
}
}
}
- if (broker) broker->getCluster().release(msg); // FIXME aconway 2011-09-12: review. rename requeue?
+ if (broker) broker->getCluster().requeue(msg); // FIXME aconway 2011-09-12: review. rename requeue?
copy.notify();
}
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
index 4014b0ce37..e06068fd38 100644
--- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
@@ -28,7 +28,7 @@
#include "qpid/framing/ClusterMessageEnqueueBody.h"
#include "qpid/framing/ClusterMessageAcquireBody.h"
#include "qpid/framing/ClusterMessageDequeueBody.h"
-#include "qpid/framing/ClusterMessageReleaseBody.h"
+#include "qpid/framing/ClusterMessageRequeueBody.h"
#include "qpid/framing/ClusterWiringCreateQueueBody.h"
#include "qpid/framing/ClusterWiringCreateExchangeBody.h"
#include "qpid/framing/ClusterWiringDestroyQueueBody.h"
@@ -93,9 +93,7 @@ bool BrokerContext::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& m
core.getRoutingMap().put(tssRoutingId, msg);
}
core.mcast(ClusterMessageEnqueueBody(ProtocolVersion(), tssRoutingId, queue.getName()));
- // TODO aconway 2010-10-21: review delivery options: strict (wait
- // for CPG delivery vs loose (local deliver immediately).
- return false; // Strict delivery, cluster will call Queue deliver.
+ return false; // Strict order, wait for CPG self-delivery to enqueue.
}
void BrokerContext::routed(const boost::intrusive_ptr<Message>&) {
@@ -113,22 +111,27 @@ void BrokerContext::acquire(const broker::QueuedMessage& qm) {
}
bool BrokerContext::dequeue(const broker::QueuedMessage& qm) {
+ // FIXME aconway 2011-09-15: should dequeue locally immediately
+ // instead of waiting for redeliver. No need for CPG order on
+ // dequeues.
if (!tssNoReplicate)
core.mcast(ClusterMessageDequeueBody(
ProtocolVersion(), qm.queue->getName(), qm.position));
return false; // FIXME aconway 2011-09-14: needed?
}
-// FIXME aconway 2011-09-14: rename requeue?
-void BrokerContext::release(const broker::QueuedMessage& qm) {
+void BrokerContext::requeue(const broker::QueuedMessage& qm) {
if (!tssNoReplicate)
- core.mcast(ClusterMessageReleaseBody(
- ProtocolVersion(), qm.queue->getName(), qm.position, qm.payload->getRedelivered()));
+ core.mcast(ClusterMessageRequeueBody(
+ ProtocolVersion(),
+ qm.queue->getName(),
+ qm.position,
+ qm.payload->getRedelivered()));
}
// FIXME aconway 2011-06-08: should be be using shared_ptr to q here?
void BrokerContext::create(broker::Queue& q) {
- q.stopConsumers(); // FIXME aconway 2011-09-14: Stop queue initially.
+ q.stopConsumers(); // Stop queue initially.
if (tssNoReplicate) return;
assert(!QueueContext::get(q));
boost::intrusive_ptr<QueueContext> context(
@@ -192,8 +195,8 @@ void BrokerContext::empty(broker::Queue& ) {
void BrokerContext::stopped(broker::Queue& q) {
boost::intrusive_ptr<QueueContext> qc = QueueContext::get(q);
- // Don't forward the stopped call if the queue does not yet have a cluster context
- // this when the queue is first created locally.
+ // Don't forward the stopped call if the queue does not yet have a
+ // cluster context this when the queue is first created locally.
if (qc) qc->stopped();
}
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
index 6172296823..0583b7edc7 100644
--- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
@@ -58,7 +58,7 @@ class BrokerContext : public broker::Cluster
void routed(const boost::intrusive_ptr<broker::Message>&);
void acquire(const broker::QueuedMessage&);
bool dequeue(const broker::QueuedMessage&);
- void release(const broker::QueuedMessage&);
+ void requeue(const broker::QueuedMessage&);
// Consumers
diff --git a/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h b/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h
new file mode 100644
index 0000000000..5d16ce6e10
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h
@@ -0,0 +1,95 @@
+#ifndef QPID_CLUSTER_EXP_COUNTDOWNTIMER_H
+#define QPID_CLUSTER_EXP_COUNTDOWNTIMER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright countdown. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Timer.h"
+#include <boost/function.hpp>
+
+namespace qpid {
+namespace cluster {
+
+/** Manage the CountdownTimeout */
+class CountdownTimer {
+ public:
+ /**
+ * Resettable count-down timer for a fixed interval.
+ *@param cb callback when countdown expires.
+ *@param t Timer to use for countdown.
+ *@param d duration of countdown.
+ */
+ CountdownTimer(boost::function<void()> cb, sys::Timer& t, sys::Duration d)
+ : task(new Task(*this, d)), timerRunning(false), callback(cb), timer(t) {}
+
+ ~CountdownTimer() { stop(); }
+
+ /** Start the countdown if not already started. */
+ void start() {
+ sys::Mutex::ScopedLock l(lock);
+ if (!timerRunning) {
+ timerRunning = true;
+ task->restart();
+ timer.add(task);
+ }
+ }
+
+ /** Stop the countdown if not already stopped. */
+ void stop() {
+ sys::Mutex::ScopedLock l(lock);
+ if (timerRunning) {
+ timerRunning = false;
+ task->cancel();
+ }
+ }
+
+ private:
+
+ class Task : public sys::TimerTask {
+ CountdownTimer& parent;
+ public:
+ Task(CountdownTimer& ct, const sys::Duration& d) :
+ TimerTask(d, "CountdownTimer::Task"), parent(ct) {}
+ void fire() { parent.fire(); }
+ };
+
+ // Called when countdown expires.
+ void fire() {
+ bool doCallback = false;
+ {
+ sys::Mutex::ScopedLock l(lock);
+ doCallback = timerRunning;
+ timerRunning = false;
+ }
+ if (doCallback) callback();
+ }
+
+ sys::Mutex lock;
+ boost::intrusive_ptr<Task> task;
+ bool timerRunning;
+ boost::function<void()> callback;
+ sys::Timer& timer;
+};
+
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_EXP_COUNTDOWNTIMER_H*/
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
index 0dbbaca83b..14a39e1e61 100644
--- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
@@ -95,14 +95,15 @@ void MessageHandler::acquire(const std::string& q, uint32_t position) {
boost::shared_ptr<Queue> queue = findQueue(q, "Cluster acquire failed");
QueuedMessage qm;
BrokerContext::ScopedSuppressReplication ssr;
- bool ok = queue->acquireMessageAt(position, qm);
- (void)ok; // Avoid unused variable warnings.
- assert(ok); // FIXME aconway 2011-09-14: error handling
+ if (!queue->acquireMessageAt(position, qm))
+ throw Exception(QPID_MSG("Cluster acquire: message not found: "
+ << q << "[" << position << "]"));
assert(qm.position.getValue() == position);
assert(qm.payload);
- // Save for possible requeue.
+ // Save on context for possible requeue if released/rejected.
QueueContext::get(*queue)->acquire(qm);
}
+ // FIXME aconway 2011-09-15: systematic logging across cluster module.
QPID_LOG(trace, "cluster message " << q << "[" << position
<< "] acquired by " << PrettyId(sender(), self()));
}
@@ -124,11 +125,9 @@ void MessageHandler::dequeue(const std::string& q, uint32_t position) {
}
}
-// FIXME aconway 2011-09-14: rename as requeue?
-void MessageHandler::release(const std::string& q, uint32_t position, bool redelivered) {
- // FIXME aconway 2011-09-15: review release/requeue logic.
+void MessageHandler::requeue(const std::string& q, uint32_t position, bool redelivered) {
if (sender() != self()) {
- boost::shared_ptr<Queue> queue = findQueue(q, "Cluster release failed");
+ boost::shared_ptr<Queue> queue = findQueue(q, "Cluster requeue failed");
QueueContext::get(*queue)->requeue(position, redelivered);
}
}
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
index dba5b784ad..40e004d89a 100644
--- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
@@ -60,7 +60,7 @@ class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler
void routed(uint32_t routingId);
void acquire(const std::string& queue, uint32_t position);
void dequeue(const std::string& queue, uint32_t position);
- void release(const std::string& queue, uint32_t position, bool redelivered);
+ void requeue(const std::string& queue, uint32_t position, bool redelivered);
private:
struct Member {
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
index 55006911a6..3d0ba40bce 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
@@ -31,63 +31,54 @@
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueuedMessage.h"
#include "qpid/log/Statement.h"
-#include "qpid/sys/Timer.h"
namespace qpid {
namespace cluster {
-
-class OwnershipTimeout : public sys::TimerTask {
- QueueContext& queueContext;
-
- public:
- OwnershipTimeout(QueueContext& qc, const sys::Duration& interval) :
- TimerTask(interval, "QueueContext::OwnershipTimeout"), queueContext(qc) {}
-
- void fire() { queueContext.timeout(); }
-};
-
+// FIXME aconway 2011-09-16: configurable timeout.
QueueContext::QueueContext(broker::Queue& q, Multicaster& m)
- : timer(q.getBroker()->getTimer()), queue(q), mcast(m), consumers(0)
+ : ownership(UNSUBSCRIBED),
+ timer(boost::bind(&QueueContext::timeout, this),
+ q.getBroker()->getTimer(),
+ 100*sys::TIME_MSEC),
+ queue(q), mcast(m), consumers(0)
{
q.setClusterContext(boost::intrusive_ptr<QueueContext>(this));
}
-QueueContext::~QueueContext() {
- if (timerTask) timerTask->cancel();
-}
+QueueContext::~QueueContext() {}
-void QueueContext::cancelTimer(const sys::Mutex::ScopedLock&) {
- if (timerTask) { // no need for timeout, sole owner.
- timerTask->cancel();
- timerTask = 0;
- }
+// Invariant for ownership:
+// UNSUBSCRIBED, SUBSCRIBED => timer stopped, queue stopped
+// SOLE_OWNER => timer stopped, queue started
+// SHARED_OWNER => timer started, queue started
+
+namespace {
+bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; }
}
// Called by QueueReplica in CPG deliver thread when state changes.
-void QueueContext::replicaState(QueueOwnership state) {
+void QueueContext::replicaState(QueueOwnership newOwnership) {
sys::Mutex::ScopedLock l(lock);
- switch (state) {
- case UNSUBSCRIBED:
- case SUBSCRIBED:
- cancelTimer(l);
- queue.stopConsumers();
- break;
- case SOLE_OWNER:
- cancelTimer(l); // Sole owner, no need for timer.
- queue.startConsumers();
- break;
- case SHARED_OWNER:
- cancelTimer(l);
+ QueueOwnership before = ownership;
+ QueueOwnership after = newOwnership;
+ ownership = after;
+ if (!isOwner(before) && !isOwner(after))
+ ; // Nothing to do, now ownership change on this transition.
+ else if (isOwner(before) && !isOwner(after)) // Lost ownership
+ ; // Nothing to do, queue and timer were stopped before
+ // sending unsubscribe/resubscribe.
+ else if (!isOwner(before) && isOwner(after)) { // Took ownership
queue.startConsumers();
- // FIXME aconway 2011-07-28: configurable interval.
- timerTask = new OwnershipTimeout(*this, 100*sys::TIME_MSEC);
- timer.add(timerTask);
- break;
+ if (after == SHARED_OWNER) timer.start();
+ }
+ else if (isOwner(before) && isOwner(after) && before != after) {
+ if (after == SOLE_OWNER) timer.stop();
+ else timer.start();
}
}
-// FIXME aconway 2011-07-27: Dont spin token on an empty queue.
+// FIXME aconway 2011-07-27: Dont spin the token on an empty or idle queue.
// Called in connection threads when a consumer is added
void QueueContext::consume(size_t n) {
@@ -102,14 +93,16 @@ void QueueContext::cancel(size_t n) {
sys::Mutex::ScopedLock l(lock);
consumers = n;
// When consuming threads are stopped, this->stopped will be called.
- if (n == 0) queue.stopConsumers(); // FIXME aconway 2011-07-28: Ok inside lock?
+ if (n == 0) {
+ timer.stop();
+ queue.stopConsumers(); // FIXME aconway 2011-07-28: Ok inside lock?
+ }
}
// Called in timer thread.
void QueueContext::timeout() {
- // FIXME aconway 2011-09-14: need to deal with stray timeouts.
- queue.stopConsumers();
// When all threads have stopped, queue will call stopped()
+ queue.stopConsumers();
}
// Callback set up by queue.stopConsumers() called in connection thread.
@@ -117,18 +110,16 @@ void QueueContext::timeout() {
void QueueContext::stopped() {
sys::Mutex::ScopedLock l(lock);
// FIXME aconway 2011-07-28: review thread safety of state.
- // Deffered call to stopped doesn't sit well.
- // queueActive is invalid while stop is in progress?
if (consumers == 0)
mcast.mcast(framing::ClusterQueueUnsubscribeBody(
framing::ProtocolVersion(), queue.getName()));
- else // FIXME aconway 2011-09-13: check if we're owner?
+ else // FIXME aconway 2011-09-13: check if we're owner?
mcast.mcast(framing::ClusterQueueResubscribeBody(
framing::ProtocolVersion(), queue.getName()));
}
void QueueContext::requeue(uint32_t position, bool redelivered) {
- // FIXME aconway 2011-09-15: no lock, unacked has its own lock.
+ // No lock, unacked has its own lock.
broker::QueuedMessage qm;
if (unacked.get(position, qm)) {
unacked.erase(position);
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
index 4571c6744a..54bc81b175 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
@@ -23,9 +23,10 @@
*/
#include "LockedMap.h"
-#include <qpid/RefCounted.h>
+#include "CountdownTimer.h"
+#include "qpid/RefCounted.h"
#include "qpid/sys/Time.h"
-#include <qpid/sys/Mutex.h>
+#include "qpid/sys/Mutex.h"
#include "qpid/cluster/types.h"
#include <boost/intrusive_ptr.hpp>
@@ -37,10 +38,6 @@ namespace broker {
class Queue;
class QueuedMessage;
}
-namespace sys {
-class Timer;
-class TimerTask;
-}
namespace cluster {
@@ -91,18 +88,15 @@ class QueueContext : public RefCounted {
void dequeue(uint32_t position);
private:
- sys::Timer& timer;
-
sys::Mutex lock;
+ QueueOwnership ownership;
+ CountdownTimer timer;
broker::Queue& queue; // FIXME aconway 2011-06-08: should be shared/weak ptr?
Multicaster& mcast;
- boost::intrusive_ptr<sys::TimerTask> timerTask;
size_t consumers;
typedef LockedMap<uint32_t, broker::QueuedMessage> UnackedMap; // FIXME aconway 2011-09-15: don't need read/write map? Rename
UnackedMap unacked;
-
- void cancelTimer(const sys::Mutex::ScopedLock& l);
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
index 4c2b16e001..37079a17a1 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
@@ -52,7 +52,6 @@ void QueueHandler::resubscribe(const std::string& queue) {
void QueueHandler::left(const MemberId& member) {
// Unsubscribe for members that leave.
- // FIXME aconway 2011-06-28: also need to re-queue acquired messages.
for (QueueMap::iterator i = queues.begin(); i != queues.end(); ++i)
i->second->unsubscribe(member);
}
@@ -66,6 +65,7 @@ void QueueHandler::add(boost::shared_ptr<broker::Queue> q) {
// Local queues already have a context, remote queues need one.
if (!QueueContext::get(*q))
new QueueContext(*q, multicaster); // Context attaches itself to the Queue
+ // FIXME aconway 2011-09-15: thread safety: called from wiring handler..
queues[q->getName()] = boost::intrusive_ptr<QueueReplica>(
new QueueReplica(q, self()));
}
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
index 8b451a3eaf..0938498fa3 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
@@ -83,8 +83,7 @@ void QueueReplica::resubscribe(const MemberId& member) {
void QueueReplica::update(QueueOwnership before) {
QPID_LOG(trace, "cluster: queue replica " << *this << " (was " << before << ")");
QueueOwnership after = getState();
- if (before == after) return;
- context->replicaState(after);
+ if (before != after) context->replicaState(after);
}
QueueOwnership QueueReplica::getState() const {
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
index a1dca2e33d..20aef058fc 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
@@ -56,7 +56,7 @@ class QueueReplica : public RefCounted
void resubscribe(const MemberId&);
MemberId getSelf() const { return self; }
-
+
private:
typedef std::deque<MemberId> MemberQueue;
diff --git a/qpid/cpp/src/qpid/cluster/exp/README.txt b/qpid/cpp/src/qpid/cluster/exp/README.txt
index 97f2a10d84..189c755f09 100644
--- a/qpid/cpp/src/qpid/cluster/exp/README.txt
+++ b/qpid/cpp/src/qpid/cluster/exp/README.txt
@@ -1,2 +1,4 @@
Experimental code to test ideas about a new cluster design.
+
+See overview.h
diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
index ef4df3cf97..92f7183a08 100644
--- a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
@@ -56,22 +56,23 @@ bool WiringHandler::invoke(const framing::AMQBody& body) {
void WiringHandler::createQueue(const std::string& data) {
// FIXME aconway 2011-05-25: Needs async completion.
std::string name;
- if (sender() != self()) { // Created by another member, need to create locally.
+ if (sender() != self()) { // Created by another member, need to create locally.
BrokerContext::ScopedSuppressReplication ssr;
framing::Buffer buf(const_cast<char*>(&data[0]), data.size());
// TODO aconway 2011-02-21: asymetric - RecoveryManager vs Broker::create*()
RecoverableQueue::shared_ptr rq = recovery.recoverQueue(buf);
name = rq->getName();
}
- else { // Created locally, Queue and QueueContext already exist.
+ else { // Created locally, Queue and QueueContext already exist.
framing::Buffer buffer(const_cast<char*>(&data[0]), data.size());
// FIXME aconway 2011-05-10: implicit knowledge of queue encoding.
buffer.getShortString(name);
}
boost::shared_ptr<broker::Queue> q = broker.getQueues().find(name);
assert(q); // FIXME aconway 2011-05-10: error handling.
- // TODO aconway 2011-05-10: if we implement multi-group for queues then
- // this call is a problem: comes from wiring delivery thread, not queues.
+ // TODO aconway 2011-05-10: if we implement multi-group for queues
+ // then this call is a potential problem: comes from wiring
+ // delivery thread, not queues.
queueHandler->add(q);
QPID_LOG(debug, "cluster: create queue " << q->getName());
}
diff --git a/qpid/cpp/src/qpid/cluster/exp/overview.h b/qpid/cpp/src/qpid/cluster/exp/overview.h
index 3a0189d750..586a711827 100644
--- a/qpid/cpp/src/qpid/cluster/exp/overview.h
+++ b/qpid/cpp/src/qpid/cluster/exp/overview.h
@@ -3,10 +3,14 @@
<h1>New cluster implementation overview</h>
-There are 3 areas indicated by a suffix on class names:
+The code is broken down into 3 areas indicated by a suffix on class names:
-- Replica: State that is replicated to the entire cluster. Only called by Handlers in the deliver thread.
-- Context: State that is private to this member. Called by both Replia and broker objects in deliver and connection threads.
+- Replica: State that is replicated to the entire cluster.
+ Only called by Handlers in the deliver thread. May call on Contexts.
+
+- Context: State private to this member and associated with a local entity
+ such as the Broker or a Queue. Called in deliver and connection threads.
+
- Handler: Dispatch CPG messages by calling Replica objects in the deliver thread.
diff --git a/qpid/cpp/src/qpid/cluster/types.h b/qpid/cpp/src/qpid/cluster/types.h
index 667d9b89fa..0c234b01c0 100644
--- a/qpid/cpp/src/qpid/cluster/types.h
+++ b/qpid/cpp/src/qpid/cluster/types.h
@@ -82,8 +82,6 @@ std::ostream& operator<<(std::ostream&, EventType);
/** Number to identify a message being routed. */
typedef uint32_t RoutingId;
-// FIXME aconway 2011-07-28: can we put these 2 back in the
-// QueueReplica & QueueContext?
/** State of a queue with respect to a cluster member. */
enum QueueOwnership {
UNSUBSCRIBED,
diff --git a/qpid/cpp/src/qpid/sys/Stoppable.h b/qpid/cpp/src/qpid/sys/Stoppable.h
index 113a676503..6ddf926280 100644
--- a/qpid/cpp/src/qpid/sys/Stoppable.h
+++ b/qpid/cpp/src/qpid/sys/Stoppable.h
@@ -27,8 +27,6 @@
namespace qpid {
namespace sys {
-// FIXME aconway 2011-05-25: needs better name
-
/**
* An activity that may be executed by multiple threads, and can be stopped.
*
@@ -72,7 +70,7 @@ class Stoppable {
sys::Monitor::ScopedLock l(lock);
if (stopped) return;
stopped = true;
- check();
+ check(l);
}
/** Set the state to "started", allow threads to enter.
@@ -97,10 +95,10 @@ class Stoppable {
sys::Monitor::ScopedLock l(lock);
assert(busy > 0);
--busy;
- check();
+ check(l);
}
- void check() {
+ void check(const sys::Monitor::ScopedLock&) {
// Called with lock held.
if (stopped && busy == 0 && notify) notify();
}
diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp
index ae4f341efa..81f697dec0 100644
--- a/qpid/cpp/src/tests/qpid-receive.cpp
+++ b/qpid/cpp/src/tests/qpid-receive.cpp
@@ -198,7 +198,6 @@ int main(int argc, char ** argv)
std::map<std::string,Sender> replyTo;
while (!done && receiver.fetch(msg, timeout)) {
- cerr << "FIXME " << msg.getProperties()[SN] << endl;
if (!started) {
// Start the time on receipt of the first message to avoid counting
// idle time at process startup.
@@ -208,7 +207,6 @@ int main(int argc, char ** argv)
reporter.message(msg);
if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) {
if (msg.getContent() == EOS) {
- cerr << "FIXME eos" << endl;
done = true;
} else {
++count;
@@ -227,7 +225,6 @@ int main(int argc, char ** argv)
if (opts.printContent)
std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages
if (opts.messages && count >= opts.messages) {
- cerr << "FIXME "<< count << " >= " << opts.messages << endl;
done = true;
}
}
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index c84d8e3ef5..dfcf30ecdf 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -358,7 +358,7 @@
<field name="position" type="uint32"/>
</control>
- <control name="release" code="0x6">
+ <control name="requeue" code="0x6">
<field name="queue" type="queue.name"/>
<field name="position" type="uint32"/>
<field name="redelivered" type="bit"/>