summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-09-16 20:16:53 +0000
committerAlan Conway <aconway@apache.org>2011-09-16 20:16:53 +0000
commit358260bab45abbfea24f686f978b8dcaba10438c (patch)
treecb8abf8ef189f1fe32e47a7d68604b1d8b3b0bec
parent277889b4d238fb70e274891384fe56096a3dbc16 (diff)
downloadqpid-python-358260bab45abbfea24f686f978b8dcaba10438c.tar.gz
QPID-2920: New cluster release/requeue.
Almost functional, seeing sporadic hangs in qpid-cpp-benchmark with two brokers: qpid-cpp-benchmark -b localhost:5556,localhost:5555 -r2 -m10000 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1171756 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/cluster.mk1
-rw-r--r--qpid/cpp/src/qpid/broker/Cluster.h9
-rw-r--r--qpid/cpp/src/qpid/broker/NullCluster.h2
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp55
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h11
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp36
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BrokerContext.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Core.cpp1
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp12
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/LockedMap.h16
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp49
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageHandler.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp3
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Multicaster.h4
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/PrettyId.h46
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp67
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.h25
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp1
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp28
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueReplica.h13
-rw-r--r--qpid/cpp/src/qpid/sys/Stoppable.h10
-rw-r--r--qpid/cpp/src/tests/BrokerClusterCalls.cpp3
-rwxr-xr-xqpid/cpp/src/tests/cluster2_tests.py2
-rwxr-xr-xqpid/cpp/src/tests/qpid-cpp-benchmark18
-rw-r--r--qpid/cpp/src/tests/qpid-receive.cpp7
-rw-r--r--qpid/cpp/xml/cluster.xml1
26 files changed, 267 insertions, 157 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk
index 1809c87ca8..ab6e90baec 100644
--- a/qpid/cpp/src/cluster.mk
+++ b/qpid/cpp/src/cluster.mk
@@ -83,6 +83,7 @@ cluster_la_SOURCES = \
qpid/cluster/OutputInterceptor.h \
qpid/cluster/PollerDispatch.cpp \
qpid/cluster/PollerDispatch.h \
+ qpid/cluster/PrettyId.h \
qpid/cluster/ProxyInputHandler.h \
qpid/cluster/Quorum.h \
qpid/cluster/InitialStatusMap.h \
diff --git a/qpid/cpp/src/qpid/broker/Cluster.h b/qpid/cpp/src/qpid/broker/Cluster.h
index 193332692b..0e8b3822a5 100644
--- a/qpid/cpp/src/qpid/broker/Cluster.h
+++ b/qpid/cpp/src/qpid/broker/Cluster.h
@@ -57,8 +57,7 @@ class Cluster
/** A message is delivered to a queue.
* Called before actually pushing the message to the queue.
- *@return If true the message should be pushed to the queue now.
- * otherwise the cluster code will push the message when it is replicated.
+ *@return If true the message should be enqueued now, false for delayed enqueue.
*/
virtual bool enqueue(Queue& queue, const boost::intrusive_ptr<Message>&) = 0;
@@ -71,8 +70,10 @@ class Cluster
/** A locally-acquired message is released by the consumer and re-queued. */
virtual void release(const QueuedMessage&) = 0;
- /** A message is removed from the queue. */
- virtual void dequeue(const QueuedMessage&) = 0;
+ /** A message is removed from the queue.
+ *@return true if the message should be dequeued, false for delayed dequeue.
+ */
+ virtual bool dequeue(const QueuedMessage&) = 0;
// Consumers
diff --git a/qpid/cpp/src/qpid/broker/NullCluster.h b/qpid/cpp/src/qpid/broker/NullCluster.h
index 399e2a3ca6..16a62beace 100644
--- a/qpid/cpp/src/qpid/broker/NullCluster.h
+++ b/qpid/cpp/src/qpid/broker/NullCluster.h
@@ -42,7 +42,7 @@ class NullCluster : public Cluster
virtual void routed(const boost::intrusive_ptr<Message>&) {}
virtual void acquire(const QueuedMessage&) {}
virtual void release(const QueuedMessage&) {}
- virtual void dequeue(const QueuedMessage&) {}
+ virtual bool dequeue(const QueuedMessage&) { return false; }
// Consumers
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 32b037bb21..6b632ed737 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -113,7 +113,7 @@ Queue::Queue(const string& _name, bool _autodelete,
deleted(false),
barrier(*this),
autoDeleteTimeout(0),
- dispatching(boost::bind(&Queue::acquireStopped,this))
+ consuming(boost::bind(&Queue::consumingStopped,this))
{
if (parent != 0 && broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
@@ -154,7 +154,7 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){
// Check for deferred delivery in a cluster.
if (broker && broker->deferDelivery(name, msg))
return;
- // Same thing but for the new cluster interface.
+ // Check for deferred delivery with new cluster interface.
if (broker && !broker->getCluster().enqueue(*this, msg))
return;
@@ -227,39 +227,32 @@ void Queue::requeue(const QueuedMessage& msg){
}
}
}
-
- if (broker) broker->getCluster().release(msg);
+ if (broker) broker->getCluster().release(msg); // FIXME aconway 2011-09-12: review. rename requeue?
copy.notify();
}
/** Mark a scope that acquires a message.
*
- * ClusterAcquireScope is declared before are taken. The calling
- * function sets qmsg with the lock held, but the call to
- * Cluster::acquire() will happen after the lock is released.
+ * ClusterAcquireScope is declared before locks are taken. The
+ * calling function sets qmsg with the lock held, but the call to
+ * Cluster::acquire() will happen after the lock is released in
+ * ~ClusterAcquireScope().
*
* Also marks a Stoppable as busy for the duration of the scope.
**/
struct ClusterAcquireScope {
- Broker* broker;
- Queue& queue;
QueuedMessage qmsg;
- ClusterAcquireScope(Queue& q) : broker(q.getBroker()), queue(q) {}
+ ClusterAcquireScope() {}
~ClusterAcquireScope() {
- if (broker) {
- // FIXME aconway 2011-06-27: Move to QueueContext.
- // Avoid the indirection via queuename.
- if (qmsg.queue) broker->getCluster().acquire(qmsg);
- else broker->getCluster().empty(queue);
- }
+ if (qmsg.queue) qmsg.queue->getBroker()->getCluster().acquire(qmsg);
}
};
bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
{
- ClusterAcquireScope acquireScope(*this); // Outside lock
+ ClusterAcquireScope acquireScope; // Outside lock
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
QPID_LOG(debug, "Attempting to acquire message at " << position);
@@ -312,13 +305,13 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
while (true) {
- Stoppable::Scope stopper(dispatching); // FIXME aconway 2011-06-28: rename consuming
- if (!stopper) {
+ Stoppable::Scope consumeScope(consuming);
+ if (!consumeScope) {
QPID_LOG(trace, "Queue is stopped: " << name);
listeners.addListener(c);
return NO_MESSAGES;
}
- ClusterAcquireScope acquireScope(*this); // Outside the lock
+ ClusterAcquireScope acquireScope; // Outside the lock
Mutex::ScopedLock locker(messageLock);
if (messages->empty()) { // FIXME aconway 2011-06-07: ugly
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
@@ -461,7 +454,7 @@ void Queue::cancel(Consumer::shared_ptr c){
}
QueuedMessage Queue::get(){
- ClusterAcquireScope acquireScope(*this); // Outside lock
+ ClusterAcquireScope acquireScope; // Outside lock
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
if (messages->pop(msg)) acquireScope.qmsg = msg;
@@ -709,6 +702,10 @@ void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
// return true if store exists,
bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
+ // FIXME aconway 2011-09-13: new cluster needs tx/dtx support.
+ if (!ctxt && broker)
+ if (!broker->getCluster().dequeue(msg)) return false;
+
ScopedUse u(barrier);
if (!u.acquired) return false;
{
@@ -719,8 +716,6 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
}
}
- if (!ctxt && broker) broker->getCluster().dequeue(msg); // Outside lock
-
// This check prevents messages which have been forced persistent on one queue from dequeuing
// from another on which no forcing has taken place and thus causing a store error.
bool fp = msg.payload->isForcedPersistent();
@@ -737,7 +732,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
void Queue::dequeueCommitted(const QueuedMessage& msg)
{
- if (broker) broker->getCluster().dequeue(msg); // Outside lock
+ // FIXME aconway 2011-09-13: new cluster needs TX support.
Mutex::ScopedLock locker(messageLock);
dequeued(msg);
if (mgmtObject != 0) {
@@ -919,7 +914,7 @@ void Queue::notifyDeleted()
set.notifyAll();
}
-void Queue::acquireStopped() {
+void Queue::consumingStopped() {
if (broker) broker->getCluster().stopped(*this);
}
@@ -1291,15 +1286,13 @@ void Queue::UsageBarrier::destroy()
while (count) parent.messageLock.wait();
}
-// FIXME aconway 2011-05-06: naming - only affects consumers. stopDispatch()?
-void Queue::stop() {
+void Queue::stopConsumers() {
QPID_LOG(trace, "Queue stopped: " << getName());
- // FIXME aconway 2011-05-25: rename dispatching - acquiring?
- dispatching.stop();
+ consuming.stop();
}
-void Queue::start() {
+void Queue::startConsumers() {
QPID_LOG(trace, "Queue started: " << getName());
- dispatching.start();
+ consuming.start();
notifyListener();
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 9435750b4e..6c9c111dbb 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -132,9 +132,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
UsageBarrier barrier;
int autoDeleteTimeout;
boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
- // Allow dispatching consumer threads to be stopped. Used by cluster
- sys::Stoppable dispatching; // FIXME aconway 2011-06-07: name: acquiring?
- boost::intrusive_ptr<RefCounted> clusterContext;
+ sys::Stoppable consuming; // Allow consumer threads to be stopped, used by cluster
+ boost::intrusive_ptr<RefCounted> clusterContext; // Used by cluster
void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -182,7 +181,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
void checkNotDeleted();
void notifyDeleted();
- void acquireStopped();
+ void consumingStopped();
public:
@@ -396,10 +395,10 @@ class Queue : public boost::enable_shared_from_this<Queue>,
/** Stop consumers. Return when all consumer threads are stopped.
*@pre Queue is active and not already stopping.
*/
- void stop();
+ void stopConsumers();
/** Start consumers. */
- void start();
+ void startConsumers();
/** Context information used in a cluster. */
boost::intrusive_ptr<RefCounted> getClusterContext() { return clusterContext; }
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
index f30a790547..4014b0ce37 100644
--- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
@@ -93,10 +93,9 @@ bool BrokerContext::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& m
core.getRoutingMap().put(tssRoutingId, msg);
}
core.mcast(ClusterMessageEnqueueBody(ProtocolVersion(), tssRoutingId, queue.getName()));
- // TODO aconway 2010-10-21: configable option for strict (wait
- // for CPG deliver to do local deliver) vs. loose (local deliver
- // immediately).
- return false;
+ // TODO aconway 2010-10-21: review delivery options: strict (wait
+ // for CPG delivery vs loose (local deliver immediately).
+ return false; // Strict delivery, cluster will call Queue deliver.
}
void BrokerContext::routed(const boost::intrusive_ptr<Message>&) {
@@ -113,25 +112,27 @@ void BrokerContext::acquire(const broker::QueuedMessage& qm) {
ProtocolVersion(), qm.queue->getName(), qm.position));
}
-// FIXME aconway 2011-05-24: need to handle acquire and release.
-// Dequeue in the wrong place?
-void BrokerContext::dequeue(const broker::QueuedMessage& qm) {
- if (tssNoReplicate) return;
- core.mcast(ClusterMessageDequeueBody(
- ProtocolVersion(), qm.queue->getName(), qm.position));
+bool BrokerContext::dequeue(const broker::QueuedMessage& qm) {
+ if (!tssNoReplicate)
+ core.mcast(ClusterMessageDequeueBody(
+ ProtocolVersion(), qm.queue->getName(), qm.position));
+ return false; // FIXME aconway 2011-09-14: needed?
}
-void BrokerContext::release(const broker::QueuedMessage& ) {
- // FIXME aconway 2011-05-24: TODO
+// FIXME aconway 2011-09-14: rename requeue?
+void BrokerContext::release(const broker::QueuedMessage& qm) {
+ if (!tssNoReplicate)
+ core.mcast(ClusterMessageReleaseBody(
+ ProtocolVersion(), qm.queue->getName(), qm.position, qm.payload->getRedelivered()));
}
// FIXME aconway 2011-06-08: should be be using shared_ptr to q here?
void BrokerContext::create(broker::Queue& q) {
- if (tssNoReplicate) return; // FIXME aconway 2011-06-08: revisit
- // FIXME aconway 2011-06-08: error handling- if already set...
- // Create local context immediately, queue will be stopped until replicated.
+ q.stopConsumers(); // FIXME aconway 2011-09-14: Stop queue initially.
+ if (tssNoReplicate) return;
+ assert(!QueueContext::get(q));
boost::intrusive_ptr<QueueContext> context(
- new QueueContext(q,core.getMulticaster()));
+ new QueueContext(q, core.getMulticaster()));
std::string data(q.encodedSize(), '\0');
framing::Buffer buf(&data[0], data.size());
q.encode(buf);
@@ -174,11 +175,12 @@ void BrokerContext::unbind(broker::Queue& q, broker::Exchange& ex,
}
// n is the number of consumers including the one just added.
-// FIXME aconway 2011-06-27: rename, conflicting terms.
+// FIXME aconway 2011-06-27: rename, conflicting terms. subscribe?
void BrokerContext::consume(broker::Queue& q, size_t n) {
QueueContext::get(q)->consume(n);
}
+// FIXME aconway 2011-09-13: rename unsubscribe?
// n is the number of consumers after the cancel.
void BrokerContext::cancel(broker::Queue& q, size_t n) {
QueueContext::get(q)->cancel(n);
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
index fc19d6487b..6172296823 100644
--- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
@@ -57,7 +57,7 @@ class BrokerContext : public broker::Cluster
bool enqueue(broker::Queue&, const boost::intrusive_ptr<broker::Message>&);
void routed(const boost::intrusive_ptr<broker::Message>&);
void acquire(const broker::QueuedMessage&);
- void dequeue(const broker::QueuedMessage&);
+ bool dequeue(const broker::QueuedMessage&);
void release(const broker::QueuedMessage&);
// Consumers
diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.cpp b/qpid/cpp/src/qpid/cluster/exp/Core.cpp
index 7bcc068120..5241b9e414 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Core.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Core.cpp
@@ -68,7 +68,6 @@ void Core::fatal() {
}
void Core::mcast(const framing::AMQBody& body) {
- QPID_LOG(trace, "cluster multicast: " << body);
multicaster.mcast(body);
}
diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
index beebe9fc16..4653cbf1ca 100644
--- a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
@@ -22,6 +22,7 @@
#include "Core.h"
#include "EventHandler.h"
#include "HandlerBase.h"
+#include "PrettyId.h"
#include "qpid/broker/Broker.h"
#include "qpid/cluster/types.h"
#include "qpid/framing/AMQFrame.h"
@@ -49,17 +50,6 @@ void EventHandler::start() {
dispatcher.start();
}
-// Print member ID or "self" if member is self
-struct PrettyId {
- MemberId id, self;
- PrettyId(const MemberId& id_, const MemberId& self_) : id(id_), self(self_) {}
-};
-
-std::ostream& operator<<(std::ostream& o, const PrettyId& id) {
- if (id.id == id.self) return o << "self";
- else return o << id.id;
-}
-
// Deliver CPG message.
void EventHandler::deliver(
cpg_handle_t /*handle*/,
diff --git a/qpid/cpp/src/qpid/cluster/exp/LockedMap.h b/qpid/cpp/src/qpid/cluster/exp/LockedMap.h
index c0afe740f8..7294ff767e 100644
--- a/qpid/cpp/src/qpid/cluster/exp/LockedMap.h
+++ b/qpid/cpp/src/qpid/cluster/exp/LockedMap.h
@@ -39,8 +39,20 @@ class LockedMap
Value get(const Key& key) const {
sys::RWlock::ScopedRlock r(lock);
typename Map::const_iterator i = map.find(key);
- if (i == map.end()) return Value();
- else return i->second;
+ return (i == map.end()) ? Value() : i->second;
+ }
+
+ /** Update value with the value for key.
+ *@return true if key was found.
+ */
+ bool get(const Key& key, Value& value) const {
+ sys::RWlock::ScopedRlock r(lock);
+ typename Map::const_iterator i = map.find(key);
+ if (i != map.end()) {
+ value = i->second;
+ return true;
+ }
+ return false;
}
/** Associate value with key, overwriting any previous value for key. */
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
index 7e9a1219ae..0dbbaca83b 100644
--- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
@@ -22,7 +22,9 @@
#include "Core.h"
#include "MessageHandler.h"
#include "BrokerContext.h"
+#include "QueueContext.h"
#include "EventHandler.h"
+#include "PrettyId.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/QueueRegistry.h"
@@ -72,7 +74,7 @@ void MessageHandler::enqueue(RoutingId routingId, const std::string& q) {
else
msg = memberMap[sender()].routingMap[routingId];
if (!msg) throw Exception(QPID_MSG("Cluster enqueue on " << q
- << " failed: unknown message"));
+ << " failed: unknown message"));
BrokerContext::ScopedSuppressReplication ssr;
queue->deliver(msg);
}
@@ -84,40 +86,51 @@ void MessageHandler::routed(RoutingId routingId) {
memberMap[sender()].routingMap.erase(routingId);
}
+// FIXME aconway 2011-09-14: performance: pack acquires into a SequenceSet
+// and scan queue once.
void MessageHandler::acquire(const std::string& q, uint32_t position) {
// Note acquires from other members. My own acquires were executed in
// the connection thread
if (sender() != self()) {
- // FIXME aconway 2010-10-28: need to store acquired messages on QueueContext
- // by broker for possible re-queuing if a broker leaves.
- boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue failed");
+ boost::shared_ptr<Queue> queue = findQueue(q, "Cluster acquire failed");
QueuedMessage qm;
BrokerContext::ScopedSuppressReplication ssr;
bool ok = queue->acquireMessageAt(position, qm);
- (void)ok; // Avoid unused variable warnings.
- assert(ok); // FIXME aconway 2011-08-04: failing this assertion.
+ (void)ok; // Avoid unused variable warnings.
+ assert(ok); // FIXME aconway 2011-09-14: error handling
assert(qm.position.getValue() == position);
assert(qm.payload);
+ // Save for possible requeue.
+ QueueContext::get(*queue)->acquire(qm);
}
-}
+ QPID_LOG(trace, "cluster message " << q << "[" << position
+ << "] acquired by " << PrettyId(sender(), self()));
+ }
-void MessageHandler::dequeue(const std::string& q, uint32_t /*position*/) {
+void MessageHandler::dequeue(const std::string& q, uint32_t position) {
if (sender() == self()) {
// FIXME aconway 2010-10-28: we should complete the ack that initiated
// the dequeue at this point, see BrokerContext::dequeue
- return;
}
- boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue failed");
- BrokerContext::ScopedSuppressReplication ssr;
- // FIXME aconway 2011-05-12: Remove the acquired message from QueueContext.
- // Do we need to call this? Review with gsim.
- // QueuedMessage qm;
- // Get qm from QueueContext?
- // queue->dequeue(0, qm);
+ else {
+ // FIXME aconway 2011-09-15: new cluster, inefficient looks up
+ // message by position multiple times?
+ boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue failed");
+ // Remove fom the unacked list
+ QueueContext::get(*queue)->dequeue(position);
+ BrokerContext::ScopedSuppressReplication ssr;
+ QueuedMessage qm = queue->find(position);
+ if (qm.queue) queue->dequeue(0, qm);
+ }
}
-void MessageHandler::release(const std::string& /*queue*/ , uint32_t /*position*/) {
- // FIXME aconway 2011-05-24:
+// FIXME aconway 2011-09-14: rename as requeue?
+void MessageHandler::release(const std::string& q, uint32_t position, bool redelivered) {
+ // FIXME aconway 2011-09-15: review release/requeue logic.
+ if (sender() != self()) {
+ boost::shared_ptr<Queue> queue = findQueue(q, "Cluster release failed");
+ QueueContext::get(*queue)->requeue(position, redelivered);
+ }
}
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
index 0a010a8ecf..dba5b784ad 100644
--- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
@@ -60,7 +60,7 @@ class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler
void routed(uint32_t routingId);
void acquire(const std::string& queue, uint32_t position);
void dequeue(const std::string& queue, uint32_t position);
- void release(const std::string& queue, uint32_t position);
+ void release(const std::string& queue, uint32_t position, bool redelivered);
private:
struct Member {
diff --git a/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp b/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp
index 427c25093a..9d8a00e217 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp
@@ -51,7 +51,8 @@ Multicaster::Multicaster(Cpg& cpg_,
queue.start();
}
-void Multicaster::mcast(const framing::AMQDataBlock& data) {
+void Multicaster::mcast(const framing::AMQFrame& data) {
+ QPID_LOG(trace, "cluster multicast: " << data);
BufferRef bufRef = buffers.get(data.encodedSize());
framing::Buffer buf(bufRef.begin(), bufRef.size());
data.encode(buf);
diff --git a/qpid/cpp/src/qpid/cluster/exp/Multicaster.h b/qpid/cpp/src/qpid/cluster/exp/Multicaster.h
index 6953d2bfbd..c28f29f1a3 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Multicaster.h
+++ b/qpid/cpp/src/qpid/cluster/exp/Multicaster.h
@@ -30,7 +30,7 @@
namespace qpid {
namespace framing {
-class AMQDataBlock;
+class AMQFrame;
class AMQBody;
}
@@ -54,7 +54,7 @@ class Multicaster
);
/** Multicast an event */
- void mcast(const framing::AMQDataBlock&);
+ void mcast(const framing::AMQFrame&);
void mcast(const framing::AMQBody&);
private:
diff --git a/qpid/cpp/src/qpid/cluster/exp/PrettyId.h b/qpid/cpp/src/qpid/cluster/exp/PrettyId.h
new file mode 100644
index 0000000000..0f7651151b
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/PrettyId.h
@@ -0,0 +1,46 @@
+#ifndef QPID_CLUSTER_EXP_PRETTYID_H
+#define QPID_CLUSTER_EXP_PRETTYID_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/cluster/types.h"
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Wrapper for a MemberId that prints as the member ID or the string
+ * "self" if the member is self.
+ */
+struct PrettyId {
+ MemberId id, self;
+ PrettyId(const MemberId& id_, const MemberId& self_) : id(id_), self(self_) {}
+};
+
+inline std::ostream& operator<<(std::ostream& o, const PrettyId& id) {
+ if (id.id == id.self) return o << "self";
+ else return o << id.id;
+}
+
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_EXP_PRETTYID_H*/
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
index 60b218da14..55006911a6 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
@@ -20,13 +20,16 @@
*/
#include "QueueContext.h"
+
#include "Multicaster.h"
+#include "BrokerContext.h" // for ScopedSuppressReplication
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/framing/ClusterQueueResubscribeBody.h"
#include "qpid/framing/ClusterQueueSubscribeBody.h"
#include "qpid/framing/ClusterQueueUnsubscribeBody.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueuedMessage.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/Timer.h"
@@ -41,7 +44,6 @@ class OwnershipTimeout : public sys::TimerTask {
OwnershipTimeout(QueueContext& qc, const sys::Duration& interval) :
TimerTask(interval, "QueueContext::OwnershipTimeout"), queueContext(qc) {}
- // FIXME aconway 2011-07-27: thread safety on deletion?
void fire() { queueContext.timeout(); }
};
@@ -49,32 +51,35 @@ QueueContext::QueueContext(broker::Queue& q, Multicaster& m)
: timer(q.getBroker()->getTimer()), queue(q), mcast(m), consumers(0)
{
q.setClusterContext(boost::intrusive_ptr<QueueContext>(this));
- q.stop(); // Initially stopped.
}
QueueContext::~QueueContext() {
- // FIXME aconway 2011-07-27: revisit shutdown logic.
- // timeout() could be called concurrently with destructor.
- sys::Mutex::ScopedLock l(lock);
if (timerTask) timerTask->cancel();
}
+void QueueContext::cancelTimer(const sys::Mutex::ScopedLock&) {
+ if (timerTask) { // no need for timeout, sole owner.
+ timerTask->cancel();
+ timerTask = 0;
+ }
+}
+
+// Called by QueueReplica in CPG deliver thread when state changes.
void QueueContext::replicaState(QueueOwnership state) {
sys::Mutex::ScopedLock l(lock);
switch (state) {
case UNSUBSCRIBED:
case SUBSCRIBED:
+ cancelTimer(l);
+ queue.stopConsumers();
break;
case SOLE_OWNER:
- queue.start();
- if (timerTask) { // no need for timeout.
- timerTask->cancel();
- timerTask = 0;
- }
+ cancelTimer(l); // Sole owner, no need for timer.
+ queue.startConsumers();
break;
case SHARED_OWNER:
- queue.start();
- if (timerTask) timerTask->cancel();
+ cancelTimer(l);
+ queue.startConsumers();
// FIXME aconway 2011-07-28: configurable interval.
timerTask = new OwnershipTimeout(*this, 100*sys::TIME_MSEC);
timer.add(timerTask);
@@ -82,7 +87,7 @@ void QueueContext::replicaState(QueueOwnership state) {
}
}
-// FIXME aconway 2011-07-27: Dont spin token on an empty queue. Cancel timer.
+// FIXME aconway 2011-07-27: Dont spin token on an empty queue.
// Called in connection threads when a consumer is added
void QueueContext::consume(size_t n) {
@@ -96,18 +101,19 @@ void QueueContext::consume(size_t n) {
void QueueContext::cancel(size_t n) {
sys::Mutex::ScopedLock l(lock);
consumers = n;
- if (n == 0) queue.stop(); // FIXME aconway 2011-07-28: Ok inside lock?
+ // When consuming threads are stopped, this->stopped will be called.
+ if (n == 0) queue.stopConsumers(); // FIXME aconway 2011-07-28: Ok inside lock?
}
+// Called in timer thread.
void QueueContext::timeout() {
- QPID_LOG(critical, "FIXME Ownership timeout on queue " << queue.getName());
- queue.stop();
+ // FIXME aconway 2011-09-14: need to deal with stray timeouts.
+ queue.stopConsumers();
// When all threads have stopped, queue will call stopped()
}
-
-// Callback set up by queue.stop(), called when no threads are dispatching from the queue.
-// Release the queue.
+// Callback set up by queue.stopConsumers() called in connection thread.
+// Called when no threads are dispatching from the queue.
void QueueContext::stopped() {
sys::Mutex::ScopedLock l(lock);
// FIXME aconway 2011-07-28: review thread safety of state.
@@ -116,16 +122,33 @@ void QueueContext::stopped() {
if (consumers == 0)
mcast.mcast(framing::ClusterQueueUnsubscribeBody(
framing::ProtocolVersion(), queue.getName()));
- else
+ else // FIXME aconway 2011-09-13: check if we're owner?
mcast.mcast(framing::ClusterQueueResubscribeBody(
framing::ProtocolVersion(), queue.getName()));
}
+void QueueContext::requeue(uint32_t position, bool redelivered) {
+ // FIXME aconway 2011-09-15: no lock, unacked has its own lock.
+ broker::QueuedMessage qm;
+ if (unacked.get(position, qm)) {
+ unacked.erase(position);
+ if (redelivered) qm.payload->redeliver();
+ BrokerContext::ScopedSuppressReplication ssr;
+ queue.requeue(qm);
+ }
+}
+
+void QueueContext::acquire(const broker::QueuedMessage& qm) {
+ unacked.put(qm.position, qm);
+}
+
+void QueueContext::dequeue(uint32_t position) {
+ unacked.erase(position);
+}
+
boost::intrusive_ptr<QueueContext> QueueContext::get(broker::Queue& q) {
return boost::intrusive_ptr<QueueContext>(
static_cast<QueueContext*>(q.getClusterContext().get()));
}
-
-
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
index c244b57a2e..4571c6744a 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
@@ -22,7 +22,7 @@
*
*/
-
+#include "LockedMap.h"
#include <qpid/RefCounted.h>
#include "qpid/sys/Time.h"
#include <qpid/sys/Mutex.h>
@@ -35,6 +35,7 @@
namespace qpid {
namespace broker {
class Queue;
+class QueuedMessage;
}
namespace sys {
class Timer;
@@ -60,16 +61,16 @@ class QueueContext : public RefCounted {
void replicaState(QueueOwnership);
/** Called when queue is stopped, no threads are dispatching.
- * Connection or deliver thread.
+ * May be called in connection or deliver thread.
*/
void stopped();
- /** Called when a consumer is added to the queue.
+ /** Called in connection thread when a consumer is added.
*@param n: nubmer of consumers after new one is added.
*/
void consume(size_t n);
- /** Called when a consumer is cancelled on the queue.
+ /** Called in connection thread when a consumer is cancelled on the queue.
*@param n: nubmer of consumers after the cancel.
*/
void cancel(size_t n);
@@ -77,9 +78,18 @@ class QueueContext : public RefCounted {
/** Get the context for a broker queue. */
static boost::intrusive_ptr<QueueContext> get(broker::Queue&);
- /** Called when the timer runs out: stop the queue. */
+ /** Called in timer thread when the timer runs out. */
void timeout();
+ /** Called by MessageHandler to requeue a message. */
+ void requeue(uint32_t position, bool redelivered);
+
+ /** Called by MessageHandler when a mesages is acquired. */
+ void acquire(const broker::QueuedMessage& qm);
+
+ /** Called by MesageHandler when a message is dequeued. */
+ void dequeue(uint32_t position);
+
private:
sys::Timer& timer;
@@ -89,7 +99,10 @@ class QueueContext : public RefCounted {
boost::intrusive_ptr<sys::TimerTask> timerTask;
size_t consumers;
- // FIXME aconway 2011-06-28: need to store acquired messages for possible re-queueing.
+ typedef LockedMap<uint32_t, broker::QueuedMessage> UnackedMap; // FIXME aconway 2011-09-15: don't need read/write map? Rename
+ UnackedMap unacked;
+
+ void cancelTimer(const sys::Mutex::ScopedLock& l);
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
index 7d56025fb8..4c2b16e001 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
@@ -58,6 +58,7 @@ void QueueHandler::left(const MemberId& member) {
}
// FIXME aconway 2011-06-08: do we need to hold on to the shared pointer for lifecycle?
+// FIXME aconway 2011-09-13: called from wiring handler, need to consider for multi-cpg.
void QueueHandler::add(boost::shared_ptr<broker::Queue> q) {
// FIXME aconway 2011-06-08: move create operation from Wiring to Queue handler.
// FIXME aconway 2011-05-10: assert not already in map.
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
index 7bbd6e1422..8b451a3eaf 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
@@ -20,6 +20,7 @@
*/
#include "QueueReplica.h"
#include "QueueContext.h"
+#include "PrettyId.h"
#include "qpid/broker/Queue.h"
#include "qpid/log/Statement.h"
#include <algorithm>
@@ -30,17 +31,17 @@ namespace cluster {
QueueReplica::QueueReplica(boost::shared_ptr<broker::Queue> q,
const MemberId& self_)
: queue(q), self(self_), context(QueueContext::get(*q))
-{
- // q is initially stopped.
-}
+{}
struct PrintSubscribers {
const QueueReplica::MemberQueue& mq;
- PrintSubscribers(const QueueReplica::MemberQueue& m) : mq(m) {}
+ MemberId self;
+ PrintSubscribers(const QueueReplica::MemberQueue& m, const MemberId& s) : mq(m), self(s) {}
};
std::ostream& operator<<(std::ostream& o, const PrintSubscribers& ps) {
- copy(ps.mq.begin(), ps.mq.end(), std::ostream_iterator<MemberId>(o, " "));
+ for (QueueReplica::MemberQueue::const_iterator i = ps.mq.begin(); i != ps.mq.end(); ++i)
+ o << PrettyId(*i, ps.self) << " ";
return o;
}
@@ -51,12 +52,10 @@ std::ostream& operator<<(std::ostream& o, QueueOwnership s) {
std::ostream& operator<<(std::ostream& o, const QueueReplica& qr) {
o << qr.queue->getName() << "(" << qr.getState() << "): "
- << PrintSubscribers(qr.subscribers);
+ << PrintSubscribers(qr.subscribers, qr.getSelf());
return o;
}
-// FIXME aconway 2011-05-17: error handling for asserts.
-
void QueueReplica::subscribe(const MemberId& member) {
QueueOwnership before = getState();
subscribers.push_back(member);
@@ -73,15 +72,16 @@ void QueueReplica::unsubscribe(const MemberId& member) {
}
void QueueReplica::resubscribe(const MemberId& member) {
- assert (member == subscribers.front()); // FIXME aconway 2011-06-27: error handling
- QueueOwnership before = getState();
- subscribers.pop_front();
- subscribers.push_back(member);
- update(before);
+ if (member == subscribers.front()) { // FIXME aconway 2011-09-13: should be assert?
+ QueueOwnership before = getState();
+ subscribers.pop_front();
+ subscribers.push_back(member);
+ update(before);
+ }
}
void QueueReplica::update(QueueOwnership before) {
- QPID_LOG(trace, "QueueReplica " << *this << " (was " << before << ")");
+ QPID_LOG(trace, "cluster: queue replica " << *this << " (was " << before << ")");
QueueOwnership after = getState();
if (before == after) return;
context->replicaState(after);
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
index 4ebbc84ef0..a1dca2e33d 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
@@ -36,6 +36,7 @@ class Queue;
namespace cluster {
class QueueContext;
+struct PrintSubscribers;
/**
* Queue state that is replicated among all cluster members.
@@ -54,12 +55,9 @@ class QueueReplica : public RefCounted
void unsubscribe(const MemberId&);
void resubscribe(const MemberId&);
+ MemberId getSelf() const { return self; }
+
private:
-
- friend class PrintSubscribers;
- friend std::ostream& operator<<(std::ostream&, QueueOwnership);
- friend std::ostream& operator<<(std::ostream&, const QueueReplica&);
-
typedef std::deque<MemberId> MemberQueue;
boost::shared_ptr<broker::Queue> queue;
@@ -71,6 +69,11 @@ class QueueReplica : public RefCounted
bool isOwner() const;
bool isSubscriber(const MemberId&) const;
void update(QueueOwnership before);
+
+ friend struct PrintSubscribers;
+ friend std::ostream& operator<<(std::ostream&, QueueOwnership);
+ friend std::ostream& operator<<(std::ostream&, const QueueReplica&);
+ friend std::ostream& operator<<(std::ostream& o, const PrintSubscribers& ps);
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/sys/Stoppable.h b/qpid/cpp/src/qpid/sys/Stoppable.h
index ac0f03d3a1..113a676503 100644
--- a/qpid/cpp/src/qpid/sys/Stoppable.h
+++ b/qpid/cpp/src/qpid/sys/Stoppable.h
@@ -33,11 +33,12 @@ namespace sys {
* An activity that may be executed by multiple threads, and can be stopped.
*
* Stopping prevents new threads from entering and calls a callback
- * when all busy threads leave.
+ * when all busy threads have left.
*/
class Stoppable {
public:
/**
+ * Initially not stopped.
*@param stoppedCallback: called when all threads have stopped.
*/
Stoppable(boost::function<void()> stoppedCallback)
@@ -55,7 +56,7 @@ class Stoppable {
Stoppable& state;
bool entered;
public:
- Scope(Stoppable& s) : state(s) { entered = s.enter(); }
+ Scope(Stoppable& s) : state(s) { entered = state.enter(); }
~Scope() { if (entered) state.exit(); }
operator bool() const { return entered; }
};
@@ -69,6 +70,7 @@ class Stoppable {
*/
void stop() {
sys::Monitor::ScopedLock l(lock);
+ if (stopped) return;
stopped = true;
check();
}
@@ -81,6 +83,8 @@ class Stoppable {
stopped = false;
}
+ private:
+
// Busy thread enters scope
bool enter() {
sys::Monitor::ScopedLock l(lock);
@@ -96,8 +100,8 @@ class Stoppable {
check();
}
- private:
void check() {
+ // Called with lock held.
if (stopped && busy == 0 && notify) notify();
}
diff --git a/qpid/cpp/src/tests/BrokerClusterCalls.cpp b/qpid/cpp/src/tests/BrokerClusterCalls.cpp
index 01c0639bf0..7975210e4e 100644
--- a/qpid/cpp/src/tests/BrokerClusterCalls.cpp
+++ b/qpid/cpp/src/tests/BrokerClusterCalls.cpp
@@ -94,8 +94,9 @@ class DummyCluster : public broker::Cluster
virtual void release(const broker::QueuedMessage& qm) {
if (!isRouting) recordQm("release", qm);
}
- virtual void dequeue(const broker::QueuedMessage& qm) {
+ virtual bool dequeue(const broker::QueuedMessage& qm) {
if (!isRouting) recordQm("dequeue", qm);
+ return false;
}
// Consumers
diff --git a/qpid/cpp/src/tests/cluster2_tests.py b/qpid/cpp/src/tests/cluster2_tests.py
index ad13986ad3..81bc71d22f 100755
--- a/qpid/cpp/src/tests/cluster2_tests.py
+++ b/qpid/cpp/src/tests/cluster2_tests.py
@@ -159,7 +159,7 @@ class Cluster2Tests(BrokerTest):
self.session.acknowledge()
except Empty: pass
- cluster = self.cluster(3, cluster2=True, args=["-t"]) # FIXME aconway 2011-05-13: -t
+ cluster = self.cluster(3, cluster2=True)
connections = [ b.connect() for b in cluster]
sessions = [ c.session() for c in connections ]
sender = sessions[0].sender("q;{create:always}")
diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark
index fcc76f6cf3..6da0c11944 100755
--- a/qpid/cpp/src/tests/qpid-cpp-benchmark
+++ b/qpid/cpp/src/tests/qpid-cpp-benchmark
@@ -115,7 +115,9 @@ def start_receive(queue, index, opts, ready_queue, broker, host):
if opts.connection_options:
command += ["--connection-options",opts.connection_options]
if host: command = ssh_command(host, command)
- return clients.add(Popen(command, stdout=PIPE, stderr=PIPE))
+ # FIXME aconway 2011-09-15:
+ # return clients.add(Popen(command, stdout=PIPE, stderr=PIPE))
+ return clients.add(Popen(command, stdout=PIPE))
def start_send(queue, opts, broker, host):
address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:always"]))
@@ -128,7 +130,9 @@ def start_send(queue, opts, broker, host):
"--report-total",
"--report-header=no",
"--timestamp=%s"%(opts.timestamp and "yes" or "no"),
- "--sequence=no",
+ # FIXME aconway 2011-09-15:
+ # "--sequence=no",
+ "--sequence=yes",
"--flow-control", str(opts.flow_control),
"--durable", str(opts.durable)
]
@@ -166,12 +170,12 @@ def recreate_queues(queues, brokers):
for q in queues:
try: s.sender("%s;{delete:always}"%(q)).close()
except qpid.messaging.exceptions.NotFound: pass
- # FIXME aconway 2011-05-04: async wiring, wait for changes to propagate.
+ # FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate
for b in brokers:
while queue_exists(q,b): time.sleep(0.1);
- for q in queues:
- s.sender("%s;{create:always}"%q)
- # FIXME aconway 2011-05-04: async wiring, wait for changes to propagate.
+ for q in queues:
+ s.sender("%s;{create:always}"%q)
+ # FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate
for b in brokers:
while not queue_exists(q,b): time.sleep(0.1);
c.close()
@@ -182,8 +186,6 @@ def print_header(timestamp):
print "send-tp\t\trecv-tp%s"%latency_header
def parse(parser, lines): # Parse sender/receiver output
- for l in lines:
- fn_val = zip(parser, l)
return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines]
def parse_senders(senders):
diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp
index fc33685407..ae4f341efa 100644
--- a/qpid/cpp/src/tests/qpid-receive.cpp
+++ b/qpid/cpp/src/tests/qpid-receive.cpp
@@ -198,6 +198,7 @@ int main(int argc, char ** argv)
std::map<std::string,Sender> replyTo;
while (!done && receiver.fetch(msg, timeout)) {
+ cerr << "FIXME " << msg.getProperties()[SN] << endl;
if (!started) {
// Start the time on receipt of the first message to avoid counting
// idle time at process startup.
@@ -207,6 +208,7 @@ int main(int argc, char ** argv)
reporter.message(msg);
if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) {
if (msg.getContent() == EOS) {
+ cerr << "FIXME eos" << endl;
done = true;
} else {
++count;
@@ -224,7 +226,10 @@ int main(int argc, char ** argv)
}
if (opts.printContent)
std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages
- if (opts.messages && count >= opts.messages) done = true;
+ if (opts.messages && count >= opts.messages) {
+ cerr << "FIXME "<< count << " >= " << opts.messages << endl;
+ done = true;
+ }
}
} else if (opts.checkRedelivered && !msg.getRedelivered()) {
throw qpid::Exception("duplicate sequence number received, message not marked as redelivered!");
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 1fed9e7de1..c84d8e3ef5 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -361,6 +361,7 @@
<control name="release" code="0x6">
<field name="queue" type="queue.name"/>
<field name="position" type="uint32"/>
+ <field name="redelivered" type="bit"/>
</control>
</class>