diff options
author | Alan Conway <aconway@apache.org> | 2010-10-18 19:36:13 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-10-18 19:36:13 +0000 |
commit | a08d54e27d4e91b52c5979cc566ab3e933878983 (patch) | |
tree | 7f57ad88051e4a02f52d4bdf395968549e24f57a /cpp/src | |
parent | 8e53bc375ef2bfb4b05cc32b4a8c0042d95b9ec2 (diff) | |
download | qpid-python-a08d54e27d4e91b52c5979cc566ab3e933878983.tar.gz |
Introduce broker::Cluster interface.
See cpp/src/qpid/cluster/new-cluster-design.txt and new-cluster-plan.txt.
qpid/cpp/src/tests/BrokerClusterCalls.cpp is a unit test that verifies
the broker makes the expected calls on broker::Cluster in various situations.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1023966 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Cluster.h | 103 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.cpp | 19 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 16 | ||||
-rw-r--r-- | cpp/src/qpid/broker/NullCluster.h | 66 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 126 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueuedMessage.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/new-cluster-design.txt | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/new-cluster-plan.txt | 439 | ||||
-rw-r--r-- | cpp/src/tests/BrokerClusterCalls.cpp | 435 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 3 | ||||
-rw-r--r-- | cpp/src/tests/cluster.mk | 2 |
16 files changed, 1185 insertions, 53 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 2b6a6fcf5d..0ce1480825 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -501,6 +501,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/Broker.cpp \ qpid/broker/Broker.h \ qpid/broker/BrokerImportExport.h \ + qpid/broker/Cluster.h \ qpid/broker/Connection.cpp \ qpid/broker/Connection.h \ qpid/broker/ConnectionFactory.cpp \ @@ -559,6 +560,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/MessageStoreModule.h \ qpid/broker/NameGenerator.cpp \ qpid/broker/NameGenerator.h \ + qpid/broker/NullCluster.h \ qpid/broker/NullMessageStore.cpp \ qpid/broker/NullMessageStore.h \ qpid/broker/OwnershipToken.h \ diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 33364e48df..a288da00c7 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -24,6 +24,7 @@ #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/HeadersExchange.h" #include "qpid/broker/MessageStoreModule.h" +#include "qpid/broker/NullCluster.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/RecoveryManagerImpl.h" #include "qpid/broker/SaslAuthenticator.h" @@ -146,6 +147,7 @@ Broker::Broker(const Broker::Options& conf) : conf.qmf2Support) : 0), store(new NullMessageStore), + cluster(new NullCluster), acl(0), dataDir(conf.noDataDir ? std::string() : conf.dataDir), queues(this), diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 6636b5d912..dcb20c4fe3 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -70,6 +70,7 @@ namespace broker { class ExpiryPolicy; class Message; +class Cluster; static const uint16_t DEFAULT_PORT=5672; @@ -153,6 +154,7 @@ public: std::auto_ptr<management::ManagementAgent> managementAgent; ProtocolFactoryMap protocolFactories; std::auto_ptr<MessageStore> store; + std::auto_ptr<Cluster> cluster; AclModule* acl; DataDir dataDir; @@ -273,6 +275,9 @@ public: void setClusterUpdatee(bool set) { clusterUpdatee = set; } bool isClusterUpdatee() const { return clusterUpdatee; } + QPID_BROKER_EXTERN void setCluster(std::auto_ptr<Cluster> c) { cluster = c; } + QPID_BROKER_EXTERN Cluster& getCluster() { return *cluster; } + management::ManagementAgent* getManagementAgent() { return managementAgent.get(); } ConnectionCounter& getConnectionCounter() {return connectionCounter;} diff --git a/cpp/src/qpid/broker/Cluster.h b/cpp/src/qpid/broker/Cluster.h new file mode 100644 index 0000000000..91b52e8af1 --- /dev/null +++ b/cpp/src/qpid/broker/Cluster.h @@ -0,0 +1,103 @@ +#ifndef QPID_BROKER_CLUSTER_H +#define QPID_BROKER_CLUSTER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <boost/intrusive_ptr.hpp> + +namespace qpid { + +namespace framing { +class FieldTable; +} + +namespace broker { + +class Message; +struct QueuedMessage; +class Queue; +class Exchange; + +/** + * NOTE: this is part of an experimental cluster implementation that is not + * yet fully functional. The original cluster implementation remains in place. + * See ../cluster/new-cluster-design.txt + * + * Interface for cluster implementations. Functions on this interface are + * called at relevant points in the Broker's processing. + */ +class Cluster +{ + public: + virtual ~Cluster() {} + + // Messages + + /** In Exchange::route, before the message is enqueued. */ + virtual void routing(const boost::intrusive_ptr<Message>&) = 0; + /** A message is delivered to a queue. */ + virtual void enqueue(QueuedMessage&) = 0; + /** In Exchange::route, after all enqueues for the message. */ + virtual void routed(const boost::intrusive_ptr<Message>&) = 0; + + /** A message is acquired by a local consumer, it is unavailable to replicas. */ + virtual void acquire(const QueuedMessage&) = 0; + /** A locally-acquired message is accepted, it is removed from all replicas. */ + virtual void accept(const QueuedMessage&) = 0; + + /** A locally-acquired message is rejected, and may be re-routed. */ + virtual void reject(const QueuedMessage&) = 0; + /** Re-routing (if any) is complete for a rejected message. */ + virtual void rejected(const QueuedMessage&) = 0; + + /** A locally-acquired message is released by the consumer and re-queued. */ + virtual void release(const QueuedMessage&) = 0; + /** A message is dropped from the queue, e.g. expired or replaced on an LVQ. + * This function does only local book-keeping, it does not multicast. + * It is reasonable to call with a queue lock held. + */ + virtual void dequeue(const QueuedMessage&) = 0; + + // Consumers + + /** A new consumer subscribes to a queue. */ + virtual void consume(const Queue&, size_t consumerCount) = 0; + /** A consumer cancels its subscription to a queue */ + virtual void cancel(const Queue&, size_t consumerCount) = 0; + + // Wiring + + /** A queue is created */ + virtual void create(const Queue&) = 0; + /** A queue is destroyed */ + virtual void destroy(const Queue&) = 0; + /** An exchange is created */ + virtual void create(const Exchange&) = 0; + /** An exchange is destroyed */ + virtual void destroy(const Exchange&) = 0; + /** A binding is created */ + virtual void bind(const Queue&, const Exchange&, const std::string& key, const framing::FieldTable& args) = 0; +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_CLUSTER_H*/ diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 9443eb6ea5..315b1af2a8 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -112,7 +112,7 @@ void DeliveryRecord::complete() { bool DeliveryRecord::accept(TransactionContext* ctxt) { if (acquired && !ended) { - queue->dequeue(ctxt, msg); + queue->accept(ctxt, msg); setEnded(); QPID_LOG(debug, "Accepted " << id); } @@ -130,19 +130,8 @@ void DeliveryRecord::committed() const{ } void DeliveryRecord::reject() -{ - Exchange::shared_ptr alternate = queue->getAlternateExchange(); - if (alternate) { - DeliverableMessage delivery(msg.payload); - alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders()); - QPID_LOG(info, "Routed rejected message from " << queue->getName() << " to " - << alternate->getName()); - } else { - //just drop it - QPID_LOG(info, "Dropping rejected message from " << queue->getName()); - } - - dequeue(); +{ + queue->reject(msg); } uint32_t DeliveryRecord::getCredit() const @@ -156,7 +145,7 @@ void DeliveryRecord::acquire(DeliveryIds& results) { results.push_back(id); if (!acceptExpected) { if (ended) { QPID_LOG(error, "Can't dequeue ended message"); } - else { queue->dequeue(0, msg); setEnded(); } + else { queue->accept(0, msg); setEnded(); } } } else { QPID_LOG(info, "Message already acquired " << id.getValue()); diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 98980e0360..aaf0805543 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -22,6 +22,7 @@ #include "qpid/broker/Exchange.h" #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/Cluster.h" #include "qpid/management/ManagementAgent.h" #include "qpid/broker/Queue.h" #include "qpid/log/Statement.h" @@ -78,10 +79,23 @@ Exchange::PreRoute::~PreRoute(){ } } +// Bracket a scope with calls to Cluster::routing and Cluster::routed +struct ScopedClusterRouting { + Broker* broker; + boost::intrusive_ptr<Message> message; + ScopedClusterRouting(Broker* b, boost::intrusive_ptr<Message> m) + : broker(b), message(m) { + if (broker) broker->getCluster().routing(message); + } + ~ScopedClusterRouting() { + if (broker) broker->getCluster().routed(message); + } +}; + void Exchange::doRoute(Deliverable& msg, ConstBindingList b) { + ScopedClusterRouting scr(broker, &msg.getMessage()); int count = 0; - if (b.get()) { // Block the content release if the message is transient AND there is more than one binding if (!msg.getMessage().isPersistent() && b->size() > 1) { diff --git a/cpp/src/qpid/broker/NullCluster.h b/cpp/src/qpid/broker/NullCluster.h new file mode 100644 index 0000000000..4f3485eb40 --- /dev/null +++ b/cpp/src/qpid/broker/NullCluster.h @@ -0,0 +1,66 @@ +#ifndef QPID_BROKER_NULLCLUSTER_H +#define QPID_BROKER_NULLCLUSTER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/broker/Cluster.h> + +namespace qpid { +namespace broker { + +/** + * No-op implementation of Cluster interface, installed by broker when + * no cluster plug-in is present or clustering is disabled. + */ +class NullCluster : public Cluster +{ + public: + + // Messages + + virtual void routing(const boost::intrusive_ptr<Message>&) {} + virtual void enqueue(QueuedMessage&) {} + virtual void routed(const boost::intrusive_ptr<Message>&) {} + virtual void acquire(const QueuedMessage&) {} + virtual void accept(const QueuedMessage&) {} + virtual void reject(const QueuedMessage&) {} + virtual void rejected(const QueuedMessage&) {} + virtual void release(const QueuedMessage&) {} + virtual void dequeue(const QueuedMessage&) {} + + // Consumers + + virtual void consume(const Queue&, size_t) {} + virtual void cancel(const Queue&, size_t) {} + + // Wiring + + virtual void create(const Queue&) {} + virtual void destroy(const Queue&) {} + virtual void create(const Exchange&) {} + virtual void destroy(const Exchange&) {} + virtual void bind(const Queue&, const Exchange&, const std::string&, const framing::FieldTable&) {} +}; + +}} // namespace qpid::broker + +#endif diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index e59857462c..b05172f984 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -20,6 +20,7 @@ */ #include "qpid/broker/Broker.h" +#include "qpid/broker/Cluster.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueEvents.h" #include "qpid/broker/Exchange.h" @@ -224,6 +225,7 @@ void Queue::requeue(const QueuedMessage& msg){ } } } + if (broker) broker->getCluster().release(msg); copy.notify(); } @@ -236,8 +238,22 @@ void Queue::clearLVQIndex(const QueuedMessage& msg){ } } +// Inform the cluster of an acquired message on exit from a function +// that does the acquiring. The calling function should set qmsg +// to the acquired message. +struct ClusterAcquireOnExit { + Broker* broker; + QueuedMessage qmsg; + ClusterAcquireOnExit(Broker* b) : broker(b) {} + ~ClusterAcquireOnExit() { + if (broker && qmsg.queue) broker->getCluster().acquire(qmsg); + } +}; + bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) { + ClusterAcquireOnExit willAcquire(broker); + Mutex::ScopedLock locker(messageLock); assertClusterSafe(); QPID_LOG(debug, "Attempting to acquire message at " << position); @@ -248,16 +264,18 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess if (lastValueQueue) { clearLVQIndex(*i); } - QPID_LOG(debug, - "Acquired message at " << i->position << " from " << name); + QPID_LOG(debug, "Acquired message at " << i->position << " from " << name); + willAcquire.qmsg = *i; messages.erase(i); return true; - } + } QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position"); return false; } bool Queue::acquire(const QueuedMessage& msg) { + ClusterAcquireOnExit acquire(broker); + Mutex::ScopedLock locker(messageLock); assertClusterSafe(); @@ -265,16 +283,17 @@ bool Queue::acquire(const QueuedMessage& msg) { Messages::iterator i = findAt(msg.position); if ((i != messages.end() && i->position == msg.position) && // note that in some cases payload not be set (!lastValueQueue || - (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0 - ) { + (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0 + ) { clearLVQIndex(msg); QPID_LOG(debug, "Match found, acquire succeeded: " << i->position << " == " << msg.position); + acquire.qmsg = *i; messages.erase(i); return true; - } + } QPID_LOG(debug, "Acquire failed for " << msg.position); return false; @@ -314,6 +333,8 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) { while (true) { + ClusterAcquireOnExit willAcquire(broker); // Outside the lock + Mutex::ScopedLock locker(messageLock); if (messages.empty()) { QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); @@ -330,6 +351,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ if (c->filter(msg.payload)) { if (c->accept(msg.payload)) { m = msg; + willAcquire.qmsg = msg; popMsg(msg); return CONSUMED; } else { @@ -451,40 +473,51 @@ QueuedMessage Queue::find(SequenceNumber pos) const { return QueuedMessage(); } -void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ +void Queue::consume(Consumer::shared_ptr c, bool requestExclusive) { assertClusterSafe(); - Mutex::ScopedLock locker(consumerLock); - if(exclusive) { - throw ResourceLockedException( - QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); - } else if(requestExclusive) { - if(consumerCount) { + size_t consumers; + { + Mutex::ScopedLock locker(consumerLock); + if(exclusive) { throw ResourceLockedException( - QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); - } else { - exclusive = c->getSession(); + QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); + } else if(requestExclusive) { + if(consumerCount) { + throw ResourceLockedException( + QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); + } else { + exclusive = c->getSession(); + } } + consumers = ++consumerCount; + if (mgmtObject != 0) + mgmtObject->inc_consumerCount (); } - consumerCount++; - if (mgmtObject != 0) - mgmtObject->inc_consumerCount (); + if (broker) broker->getCluster().consume(*this, consumers); } void Queue::cancel(Consumer::shared_ptr c){ removeListener(c); - Mutex::ScopedLock locker(consumerLock); - consumerCount--; - if(exclusive) exclusive = 0; - if (mgmtObject != 0) - mgmtObject->dec_consumerCount (); + size_t consumers; + { + Mutex::ScopedLock locker(consumerLock); + consumers = --consumerCount; + if(exclusive) exclusive = 0; + if (mgmtObject != 0) + mgmtObject->dec_consumerCount (); + } + if (broker) broker->getCluster().cancel(*this, consumers); } QueuedMessage Queue::get(){ + ClusterAcquireOnExit acquire(broker); // Outside lock + Mutex::ScopedLock locker(messageLock); QueuedMessage msg(this); if(!messages.empty()){ msg = getFront(); + acquire.qmsg = msg; popMsg(msg); } return msg; @@ -609,10 +642,11 @@ void Queue::popMsg(QueuedMessage& qmsg) void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ assertClusterSafe(); + QueuedMessage qm; QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); - QueuedMessage qm(this, msg, ++sequence); + qm = QueuedMessage(this, msg, ++sequence); if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); LVQ::iterator i; @@ -629,12 +663,14 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this); if (!old) old = i->second; i->second->setReplacementMessage(msg,this); + // FIXME aconway 2010-10-15: it is incorrect to use qm.position below + // should be using the position of the message being replaced. if (isRecovery) { //can't issue new requests for the store until //recovery is complete pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position)); } else { - Mutex::ScopedUnlock u(messageLock); + Mutex::ScopedUnlock u(messageLock); dequeue(0, QueuedMessage(qm.queue, old, qm.position)); } } @@ -651,6 +687,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ } } copy.notify(); + if (broker) broker->getCluster().enqueue(qm); } QueuedMessage Queue::getFront() @@ -792,12 +829,42 @@ void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg) if (policy.get()) policy->enqueueAborted(msg); } +void Queue::accept(TransactionContext* ctxt, const QueuedMessage& msg) { + if (broker) broker->getCluster().accept(msg); + dequeue(ctxt, msg); +} + +struct ScopedClusterReject { + Broker* broker; + const QueuedMessage& qmsg; + ScopedClusterReject(Broker* b, const QueuedMessage& m) : broker(b), qmsg(m) { + if (broker) broker->getCluster().reject(qmsg); + } + ~ScopedClusterReject() { + if (broker) broker->getCluster().rejected(qmsg); + } +}; + +void Queue::reject(const QueuedMessage &msg) { + ScopedClusterReject scr(broker, msg); + Exchange::shared_ptr alternate = getAlternateExchange(); + if (alternate) { + DeliverableMessage delivery(msg.payload); + alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders()); + QPID_LOG(info, "Routed rejected message from " << getName() << " to " + << alternate->getName()); + } else { + //just drop it + QPID_LOG(info, "Dropping rejected message from " << getName()); + } + dequeue(0, msg); +} + // return true if store exists, bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { ScopedUse u(barrier); if (!u.acquired) return false; - { Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return false; @@ -846,6 +913,9 @@ void Queue::popAndDequeue() */ void Queue::dequeued(const QueuedMessage& msg) { + // Note: Cluster::dequeued does only local book-keeping, no multicast + // So OK to call here with lock held. + if (broker) broker->getCluster().dequeue(msg); if (policy.get()) policy->dequeued(msg); mgntDeqStats(msg.payload); if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) { @@ -861,6 +931,7 @@ void Queue::create(const FieldTable& _settings) store->create(*this, _settings); } configure(_settings); + if (broker) broker->getCluster().create(*this); } void Queue::configure(const FieldTable& _settings, bool recovering) @@ -934,6 +1005,7 @@ void Queue::destroy() store->destroy(*this); store = 0;//ensure we make no more calls to the store for this queue } + if (broker) broker->getCluster().destroy(*this); } void Queue::notifyDeleted() diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 96c79d1b92..572f3dc0e2 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -259,6 +259,13 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck = false); void enqueueAborted(boost::intrusive_ptr<Message> msg); + + /** Message acknowledged, dequeue it. */ + QPID_BROKER_EXTERN void accept(TransactionContext* ctxt, const QueuedMessage &msg); + + /** Message rejected, dequeue it and re-route to alternate exchange if necessary. */ + QPID_BROKER_EXTERN void reject(const QueuedMessage &msg); + /** * dequeue from store (only done once messages is acknowledged) */ diff --git a/cpp/src/qpid/broker/QueuedMessage.h b/cpp/src/qpid/broker/QueuedMessage.h index 35e48b11f3..8cf73bda52 100644 --- a/cpp/src/qpid/broker/QueuedMessage.h +++ b/cpp/src/qpid/broker/QueuedMessage.h @@ -34,10 +34,9 @@ struct QueuedMessage framing::SequenceNumber position; Queue* queue; - QueuedMessage() : queue(0) {} + QueuedMessage(Queue* q=0) : position(0), queue(q) {} QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) : payload(msg), position(sn), queue(q) {} - QueuedMessage(Queue* q) : queue(q) {} }; inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) { return a.position < b.position; } diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index c91cfba2f8..f393879c16 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -333,7 +333,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) parent->record(record); } if (acquire && !ackExpected) { - queue->dequeue(0, msg); + queue->accept(0, msg); } if (mgmtObject) { mgmtObject->inc_delivered(); } return true; @@ -347,11 +347,6 @@ bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>) bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) { assertClusterSafe(); - // FIXME aconway 2009-06-08: if we have byte & message credit but - // checkCredit fails because the message is to big, we should - // remain on queue's listener list for possible smaller messages - // in future. - // blocked = !(filter(msg) && checkCredit(msg)); return !blocked; } diff --git a/cpp/src/qpid/cluster/new-cluster-design.txt b/cpp/src/qpid/cluster/new-cluster-design.txt index 392de890c3..8ee740372d 100644 --- a/cpp/src/qpid/cluster/new-cluster-design.txt +++ b/cpp/src/qpid/cluster/new-cluster-design.txt @@ -75,6 +75,8 @@ Use a moving queue ownership protocol to agree order of dequeues, rather than relying on identical state and lock-step behavior to cause identical dequeues on each broker. +Clearly defined interface between broker code and cluster plug-in. + *** Requirements The cluster must provide these delivery guarantees: @@ -365,3 +367,4 @@ there a better term? Clustering and scalability: new design may give us the flexibility to address scalability as part of cluster design. Think about relationship to federation and "fragmented queues" idea. + diff --git a/cpp/src/qpid/cluster/new-cluster-plan.txt b/cpp/src/qpid/cluster/new-cluster-plan.txt new file mode 100644 index 0000000000..57c1241607 --- /dev/null +++ b/cpp/src/qpid/cluster/new-cluster-plan.txt @@ -0,0 +1,439 @@ +-*-org-*- +Notes on new cluster implementation. See also: new-cluster-design.txt + +* Implementation plan. + +Co-existence with old cluster code and tests: +- Separate plugin cluster2, options --cluster2-*. Eventually renamed to replace cluster. +- Double up tests with old version/new version as the new code develops. + +Minimal POC for message delivery & perf test. +- no wiring replication, no updates, no failover, no persistence, no async completion. +- just implement publish and acquire/dequeue locking protocol. +- measure performance. + +Full implementation of transient cluster +- Update (based on existing update), async completion etc. +- Passing all existing transient cluster tests. + +Persistent cluster +- Make sure async completion works correctly. +- InitialStatus protoocl etc. to support persistent start-up (existing code) +- cluster restart from store: stores not identical. Load one, update the rest. + - assign cluster ID's to messages recovered from store, don't replicate. + +Improved update protocol +- per-queue, less stalling, bounded catch-up. + +* Task list + +** TODO [#A] Minimal POC: publish/acquire/dequeue protocol. + +NOTE: as implementation questions arise, take the easiest option and make +a note for later optimization/improvement. + +*** Tests +- python test: 4 senders, numbered messages, 4 receivers, verify message set. +- acquire then release messages: verify can be dequeued on any member +- acquire then kill broker: verify can be dequeued other members. +- acquire then reject: verify goes on alt-exchange once only. + +*** TODO broker::Cluster interface and call points. + +Initial draft is commited. + +Issues to review: + +queue API: internal classes like RingQueuePolicy use Queue::acuqire/dequeue +when messages are pushed. How to reconcile with queue ownership? + +rejecting messages: if there's an alternate exchange where do we do the +re-routing? On origin broker or on all brokers? + +Intercept points: on Queue vs. on DeliveryRecord, SemanticState etc. +Intercepting client actions on the queue vs. internal actions +(e.g. ring policy) + +*** Main classes + +BrokerHandler: +- implements broker::Cluster intercept points. +- sends mcast events to inform cluster of local actions. +- thread safe, called in connection threads. + +LocalMessageMap: +- Holds local messages while they are being enqueued. +- thread safe: called by both BrokerHandler and DeliverHandler + +MessageHandler: +- handles delivered mcast messages related to messages. +- initiates local actions in response to mcast events. +- thread unsafe, only called in deliver thread. +- maintains view of cluster state regarding messages. + +QueueOwnerHandler: +- handles delivered mcast messages related to queue consumer ownership. +- thread safe, called in deliver, connection and timer threads. +- maintains view of cluster state regarding queue ownership. + +cluster::Core: class to hold new cluster together (replaces cluster::Cluster) +- thread safe: manage state used by both DeliverHandler and BrokerHandler + +The following code sketch illustrates only the "happy path" error handling +is omitted. + +*** BrokerHandler +Types: +- struct QueuedMessage { Message msg; QueueName q; Position pos; } +- SequenceNumber 64 bit sequence number to identify messages. +- NodeId 64 bit CPG node-id, identifies member of the cluster. +- struct MessageId { NodeId node; SequenceNumber seq; } + +Members: +- atomic<SequenceNumber> sequence // sequence number for message IDs. +- thread_local bool noReplicate // suppress replication. +- thread_local bool isRouting // suppress operations while routing +- QueuedMessage localMessage[SequenceNumber] // local messages being enqueued. + +NOTE: localMessage is also modified by DeliverHandler. + +broker::Cluster intercept functions: + +routing(msg) + if noReplicate: return + # Supress everything except enqueues while we are routing. + # We don't want to replicate acquires & dequeues caused by an enqueu, + # e.g. removal of messages from ring/LV queues. + isRouting = true + +enqueue(qmsg): + if noReplicate: return + if !qmsg.msg.id: + seq = sequence++ + qmsg.msg.id = (self,seq) + localMessage[seq] = qmsg + mcast create(encode(qmsg.msg),seq) + mcast enqueue(qmsg.q,qmsg.msg.id.seq) + +routed(msg): + if noReplicate: return + if msg.id: mcast routed(msg.id.seq) + isRouting = false + +acquire(qmsg): + if noReplicate: return + if isRouting: return # Ignore while we are routing a message. + if msg.id: mcast acquire(msg.id, q) + +release(QueuedMessage) + if noReplicate: return + if isRouting: return # Ignore while we are routing a message. + if msg.id: mcast release(id, q) + +accept(QueuedMessage): + if noReplicate: return + if isRouting: return # Ignore while we are routing a message. + if msg.id: mcast dequeue(msg.id, msg.q) + +reject(QueuedMessage): + isRejecting = true + if msg.id: mcast reject(msg.id, msg.q) + +rejected(QueuedMessage): + isRejecting = false + mcast dequeue(msg.id, msg.q) + +dequeue(QueuedMessage) + # No mcast in dequeue, only used for local cleanup of resources. + # E.g. messages that are replaced on an LVQ are dequeued without being + # accepted or rejected. dequeue is called with the queue lock held + # FIXME revisit - move it out of the queue lock. + cleanup(msg) + +*** DeliverHandler and mcast messages +Types: +- struct QueueEntry { QueuedMessage qmsg; NodeId acquired; } +- struct QueueKey { MessageId id; QueueName q; } +- typedef map<QueueKey, QueueEntry> Queue +- struct Node { Message routing[SequenceNumber]; list<QueueKey> acquired; } + +Members: +- QueueEntry enqueued[QueueKey] +- Node node[NodeId] + +Mcast messages in Message class: + +create(msg,seq) + if sender != self: node[sender].routing[seq] = decode(msg) + +enqueue(q,seq): + id = (sender,seq) + if sender == self: + enqueued[id,q] = (localMessage[seq], acquired=None) + else: + msg = sender.routing[seq] + enqueued[id,q] = (qmsg, acquired=None) + with noReplicate=true: qmsg = broker.getQueue(q).push(msg) + +routed(seq): + if sender == self: localMessage.erase(msg.id.seq) + else: sender.routing.erase(seq) + +acquire(id,q): + enqueued[id,q].acquired = sender + node[sender].acquired.push_back((id,q)) + if sender != self: + with noReplicate=true: broker.getQueue(q).acquire(enqueued[id,q]) + +release(id,q) + enqueued[id,q].acquired = None + node[sender].acquired.erase((id,q)) + if sender != self + with noReplicate=true: broker.getQueue(q).requeue(enqueued[id,q]) + +reject(id,q): + sender.routing[id] = enqueued[id,q] # prepare for re-queueing + +rejected(id,q) + sender.routing.erase[id] + +dequeue(id,q) + entry = enqueued[id,q] + enqueued.erase[id,q] + node[entry.acquired].acquired.erase(id,q) + if sender != self: + with noReplicate=true: broker.getQueue(q).dequeue(entry.qmsg) + +member m leaves cluster: + for key in node[m].acquired: + release(key.id, key.q) + node.erase(m) + +*** Queue consumer locking + +When a queue is locked it does not deliver messages to its consumers. + +New broker::Queue functions: +- stopConsumers(): set consumersStopped flag, wait for currently busy consumers to exit. +- startConsumers(): reset consumersStopped flag + +Implementation sketch, locking omitted: + +void Queue::stopConsumers() { + consumersStopped = true; + while (consumersBusy) consumersBusyMonitor.wait(); +} + +void Queue::startConsumers() { + consumersStopped = false; + listeners.notify(); +} + +bool Queue::dispatch(consumer) { + if (consumersStopped) return false; + ++consumersBusy; + do_regular_dispatch_body() + if (--consumersBusy == 0) consumersBusyMonitor.notify(); +} + +*** QueueOwnerHandler + +Invariants: +- Each queue is owned by at most one node at any time. +- Each node is interested in a set of queues at any given time. +- A queue is un-owned if no node is interested. + +The queue owner releases the queue when +- it loses interest i.e. queue has no consumers with credit. +- a configured time delay expires and there are other interested nodes. + +The owner mcasts release(q). On delivery the new queue owner is the +next node in node-id order (treating nodes as a circular list) +starting from the old owner that is interested in the queue. + +Queue consumers initially are stopped, only started when we get +ownership from the cluster. + +Thread safety: called by deliver, connection and timer threads, needs locking. + +Thread safe object per queue holding queue ownership status. +Called by deliver, connection and timer threads. + +class QueueOwnership { + bool owned; + Timer timer; + BrokerQueue q; + + drop(): # locked + if owned: + owned = false + q.stopConsumers() + mcast release(q.name, false) + timer.stop() + + take(): # locked + if not owned: + owned = true + q.startConsumers() + timer.start(timeout) + + timer.fire(): drop() +} + +Data Members, only modified/examined in deliver thread: +- typedef set<NodeId> ConsumerSet +- map<QueueName, ConsumerSet> consumers +- map<QueueName, NodeId> owner + +Thread safe data members, accessed in connection threads (via BrokerHandler): +- map<QueueName, QueueOwnership> ownership + +Multicast messages in QueueOwner class: + +consume(q): + if sender==self and consumers[q].empty(): ownership[q].take() + consumers[q].insert(sender) + +release(q): + asssert(owner[q] == sender and owner[q] in consumers[q]) + owner[q] = circular search from sender in consumers[q] + if owner==self: ownership[q].take() + +cancel(q): + assert(queue[q].owner != sender) # sender must release() before cancel() + consumers[q].erase(sender) + +member-leaves: + for q in queue: if owner[q] = left: left.release(q) + +Need 2 more intercept points in broker::Cluster: + +consume(q,consumer,consumerCount) - Queue::consume() + if consumerCount == 1: mcast consume(q) + +cancel(q,consumer,consumerCount) - Queue::cancel() + if consumerCount == 0: + ownership[q].drop() + mcast cancel(q) + +#TODO: lifecycle, updating cluster data structures when queues are destroyed + +*** Re-use of existing cluster code +- re-use Event +- re-use Multicaster +- re-use same PollableQueueSetup (may experiment later) +- new Core class to replace Cluster. +- keep design modular, keep threading rules clear. + +** TODO [#B] Large message replication. +Need to be able to multicast large messages in fragments + +** TODO [#B] Batch CPG multicast messages +The new cluster design involves a lot of small multicast messages, +they need to be batched into larger CPG messages for efficiency. +** TODO [#B] Genuine async completion +Replace current synchronous waiting implementation with genuine async completion. + +Test: enhance test_store.cpp to defer enqueueComplete till special message received. + +Async callback uses *requestIOProcessing* to queue action on IO thread. + +** TODO [#B] Async completion of accept when dequeue completes. +Interface is already there on broker::Message, just need to ensure +that store and cluster implementations call it appropriately. + +** TODO [#B] Replicate wiring. +From messageStore create/destroy/bind, replicate encoded declare/destroy/bind command. + +** TODO [#B] New members joining - first pass + +Re-use update code from old cluster but don't replicate sessions & +connections. + +Need to extend it to send cluster IDs with messages. + +Need to replicate the queue ownership data as part of the update. + +** TODO [#B] Persistence support. +InitialStatus protoocl etc. to support persistent start-up (existing code) + +Only one broker recovers from store, update to others. + +Assign cluster IDs to messages recovered from store, don't replicate. See Queue::recover. + +** TODO [#B] Handle other ways that messages can leave a queue. + +Other ways (other than via a consumer) that messages are take off a queue. + +NOTE: Not controlled by queue lock, how to make them consistent? + +Target broker may not have all messages on other brokers for purge/destroy. +- Queue::move() - need to wait for lock? Replicate? +- Queue::get() - ??? +- Queue::purge() - replicate purge? or just delete what's on broker ? +- Queue::destroy() - messages to alternate exchange on all brokers.? + +Need to add callpoints & mcast messages to replicate these? + +** TODO [#B] Flow control for internal queues. + +Need to bound the size of the internal queues holding cluster events & frames. +- stop polling when we reach bound. +- start polling when we get back under it. +** TODO [#B] Integration with transactions. +Do we want to replicate during transaction & replicate commit/rollback +or replicate only on commit? +No integration with DTX transactions. +** TODO [#B] Make new cluster work with replication exchange. +Possibly re-use some common logic. Replication exchange is like clustering +except over TCP. +** TODO [#C] Async completion for declare, bind, destroy queues and exchanges. +Cluster needs to complete these asynchronously to guarantee resources +exist across the cluster when the command completes. + +** TODO [#C] Allow non-replicated exchanges, queues. + +Set qpid.replicated=false in declare arguments, set flag on Exchange, Queue objects. +- save replicated status to store. +- support in management tools. +Replicated exchange: replicate binds to replicated queues. +Replicated queue: replicate all messages. + +** TODO [#C] New members joining - improved. + +Replicate wiring like old cluster, stall for wiring but not for +messages. Update messages on a per-queue basis from back to front. + +Updater: +- stall & push wiring: declare exchanges, queues, bindings. +- start update iterator thread on each queue. +- unstall and process normally while iterator threads run. + +Update iterator thread: +- starts at back of updater queue, message m. +- send update_front(q,m) to updatee and advance towards front +- at front: send update_done(q) + +Updatee: +- stall, receive wiring, lock all queues, mark queues "updating", unstall +- update_front(q,m): push m to *front* of q +- update_done(q): mark queue "ready" + +Updatee cannot take the queue consume lock for a queue that is updating. +Updatee *can* push messages onto a queue that is updating. + +TODO: Is there any way to eliminate the stall for wiring? + +** TODO [#C] Refactoring of common concerns. + +There are a bunch of things that act as "Queue observers" with intercept +points in similar places. +- QueuePolicy +- QueuedEvents (async replication) +- MessageStore +- Cluster + +Look for ways to capitalize on the similarity & simplify the code. + +In particular QueuedEvents (async replication) strongly resembles +cluster replication, but over TCP rather than multicast. diff --git a/cpp/src/tests/BrokerClusterCalls.cpp b/cpp/src/tests/BrokerClusterCalls.cpp new file mode 100644 index 0000000000..2b06d9ce34 --- /dev/null +++ b/cpp/src/tests/BrokerClusterCalls.cpp @@ -0,0 +1,435 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +///@file +// Tests using a dummy broker::Cluster implementation to verify the expected +// Cluster functions are called for various actions on the broker. +// + +#include "unit_test.h" +#include "test_tools.h" +#include "qpid/broker/Cluster.h" +#include "qpid/broker/Queue.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Session.h" +#include "qpid/messaging/Connection.h" +#include "qpid/messaging/Session.h" +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/Duration.h" +#include "BrokerFixture.h" +#include <boost/assign.hpp> +#include <boost/format.hpp> +#include <boost/regex.hpp> + +using namespace std; +using namespace boost; +using namespace boost::assign; +using namespace qpid::messaging; +using boost::format; +using boost::regex; + +namespace qpid { +namespace tests { + +class DummyCluster : public broker::Cluster +{ + private: + /** Flag used to ignore events other than enqueues while routing, + * e.g. acquires and accepts generated in a ring queue to replace an element.. + * In real impl would be a thread-local variable. + */ + bool isRouting; + + void recordQm(const string& op, const broker::QueuedMessage& qm) { + history += (format("%s(%s, %d, %s)") % op % qm.queue->getName() + % qm.position % qm.payload->getFrames().getContent()).str(); + } + void recordStr(const string& op, const string& name) { + history += (format("%s(%s)") % op % name).str(); + } + public: + // Messages + + virtual void routing(const boost::intrusive_ptr<broker::Message>& m) { + isRouting = true; + history += (format("routing(%s)") % m->getFrames().getContent()).str(); + } + + virtual void enqueue(broker::QueuedMessage& qm) { recordQm("enqueue", qm); } + + virtual void routed(const boost::intrusive_ptr<broker::Message>& m) { + history += (format("routed(%s)") % m->getFrames().getContent()).str(); + isRouting = false; + } + virtual void acquire(const broker::QueuedMessage& qm) { + if (!isRouting) recordQm("acquire", qm); + } + virtual void accept(const broker::QueuedMessage& qm) { + if (!isRouting) recordQm("accept", qm); + } + virtual void reject(const broker::QueuedMessage& qm) { + if (!isRouting) recordQm("reject", qm); + } + virtual void rejected(const broker::QueuedMessage& qm) { + if (!isRouting) recordQm("rejected", qm); + } + virtual void release(const broker::QueuedMessage& qm) { + if (!isRouting) recordQm("release", qm); + } + virtual void dequeue(const broker::QueuedMessage& qm) { + // Never ignore dequeue, used to avoid resource leaks. + recordQm("dequeue", qm); + } + + // Consumers + + virtual void consume(const broker::Queue& q, size_t n) { + history += (format("consume(%s, %d)") % q.getName() % n).str(); + } + virtual void cancel(const broker::Queue& q, size_t n) { + history += (format("cancel(%s, %d)") % q.getName() % n).str(); + } + + // Wiring + + virtual void create(const broker::Queue& q) { recordStr("createq", q.getName()); } + virtual void destroy(const broker::Queue& q) { recordStr("destroyq", q.getName()); } + virtual void create(const broker::Exchange& ex) { recordStr("createex", ex.getName()); } + virtual void destroy(const broker::Exchange& ex) { recordStr("destroyex", ex.getName()); } + virtual void bind(const broker::Queue& q, const broker::Exchange& ex, const std::string& key, const framing::FieldTable& /*args*/) { + history += (format("bind(%s, %s, %s)") % q.getName() % ex.getName() % key).str(); + } + vector<string> history; +}; + +QPID_AUTO_TEST_SUITE(BrokerClusterCallsTestSuite) + +// Broker fixture with DummyCluster set up and some new API client bits. +struct DummyClusterFixture: public BrokerFixture { + Connection c; + Session s; + DummyCluster*dc; + DummyClusterFixture() { + broker->setCluster(auto_ptr<broker::Cluster>(new DummyCluster)); + dc = &static_cast<DummyCluster&>(broker->getCluster()); + c = Connection("localhost:"+lexical_cast<string>(getPort())); + c.open(); + s = c.createSession(); + } + ~DummyClusterFixture() { + c.close(); + } +}; + +QPID_AUTO_TEST_CASE(testSimplePubSub) { + DummyClusterFixture f; + vector<string>& h = f.dc->history; + + // Queue creation + Sender sender = f.s.createSender("q;{create:always,delete:always}"); + int i = 0; + BOOST_CHECK_EQUAL(h.at(i++), "createq(q)"); // Note: at() does bounds checking. + BOOST_CHECK_EQUAL(h.size(), i); + + // Consumer + Receiver receiver = f.s.createReceiver("q"); + f.s.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "consume(q, 1)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Send message + sender.send(Message("a")); + f.s.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); + // Don't check size here as it is uncertain whether acquire has happened yet. + + // Acquire message + Message m = receiver.fetch(Duration::SECOND); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 1, a)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Acknowledge message + f.s.acknowledge(true); + f.s.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Close a consumer + receiver.close(); + BOOST_CHECK_EQUAL(h.at(i++), "cancel(q, 0)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Destroy the queue + f.c.close(); + BOOST_CHECK_EQUAL(h.at(i++), "destroyq(q)"); + BOOST_CHECK_EQUAL(h.size(), i); +} + +QPID_AUTO_TEST_CASE(testReleaseReject) { + DummyClusterFixture f; + vector<string>& h = f.dc->history; + + Sender sender = f.s.createSender("q;{create:always,delete:always,node:{x-declare:{alternate-exchange:amq.fanout}}}"); + sender.send(Message("a")); + Receiver receiver = f.s.createReceiver("q"); + Receiver altReceiver = f.s.createReceiver("amq.fanout;{link:{name:altq}}"); + Message m = receiver.fetch(Duration::SECOND); + h.clear(); + + // Explicit release + f.s.release(m); + f.s.sync(); + int i = 0; + BOOST_CHECK_EQUAL(h.at(i++), "release(q, 1, a)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Implicit release on closing connection. + Connection c("localhost:"+lexical_cast<string>(f.getPort())); + c.open(); + Session s = c.createSession(); + Receiver r = s.createReceiver("q"); + m = r.fetch(Duration::SECOND); + h.clear(); + i = 0; + c.close(); + BOOST_CHECK_EQUAL(h.at(i++), "cancel(q, 1)"); + BOOST_CHECK_EQUAL(h.at(i++), "release(q, 1, a)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Reject message, goes to alternate exchange. + m = receiver.fetch(Duration::SECOND); + h.clear(); + i = 0; + f.s.reject(m); + BOOST_CHECK_EQUAL(h.at(i++), "reject(q, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); // Routing to alt exchange + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(amq.fanout_altq, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "rejected(q, 1, a)"); + BOOST_CHECK_EQUAL(h.size(), i); + m = altReceiver.fetch(Duration::SECOND); + BOOST_CHECK_EQUAL(m.getContent(), "a"); + + // Timed out message + h.clear(); + i = 0; + m = Message("t"); + m.setTtl(Duration(1)); // Timeout 1ms + sender.send(m); + usleep(2000); // Sleep 2ms + bool received = receiver.fetch(m, Duration::IMMEDIATE); + BOOST_CHECK(!received); // Timed out + BOOST_CHECK_EQUAL(h.at(i++), "routing(t)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 2, t)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(t)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, t)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Message replaced on LVQ + sender = f.s.createSender("lvq;{create:always,delete:always,node:{x-declare:{arguments:{qpid.last_value_queue:1}}}}"); + m = Message("a"); + m.getProperties()["qpid.LVQ_key"] = "foo"; + sender.send(m); + f.s.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "createq(lvq)"); + BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); + BOOST_CHECK_EQUAL(h.size(), i); + + m = Message("b"); + m.getProperties()["qpid.LVQ_key"] = "foo"; + sender.send(m); + f.s.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "routing(b)"); + // FIXME: bug in Queue.cpp gives the incorrect position when + // dequeueing a replaced LVQ message. + // BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 2, a)"); // Should be 1 + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, 2, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); + BOOST_CHECK_EQUAL(h.size(), i); + + receiver = f.s.createReceiver("lvq"); + BOOST_CHECK_EQUAL(receiver.fetch(Duration::SECOND).getContent(), "b"); + f.s.acknowledge(true); + BOOST_CHECK_EQUAL(h.at(i++), "consume(lvq, 1)"); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(lvq, 1, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "accept(lvq, 1, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 1, b)"); + BOOST_CHECK_EQUAL(h.size(), i); +} + +QPID_AUTO_TEST_CASE(testFanout) { + DummyClusterFixture f; + vector<string>& h = f.dc->history; + + Receiver r1 = f.s.createReceiver("amq.fanout;{link:{name:r1}}"); + Receiver r2 = f.s.createReceiver("amq.fanout;{link:{name:r2}}"); + Sender sender = f.s.createSender("amq.fanout"); + r1.setCapacity(0); // Don't receive immediately. + r2.setCapacity(0); + h.clear(); + int i = 0; + + // Send message + sender.send(Message("a")); + f.s.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); + BOOST_CHECK_REGEX("enqueue\\(amq.fanout_r[12], 1, a\\)", h.at(i++)); + BOOST_CHECK_REGEX("enqueue\\(amq.fanout_r[12], 1, a\\)", h.at(i++)); + BOOST_CHECK(h.at(i-1) != h.at(i-2)); + BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Receive messages + Message m1 = r1.fetch(Duration::SECOND); + f.s.acknowledge(m1, true); + Message m2 = r2.fetch(Duration::SECOND); + f.s.acknowledge(m2, true); + + BOOST_CHECK_EQUAL(h.at(i++), "acquire(amq.fanout_r1, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "accept(amq.fanout_r1, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(amq.fanout_r1, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(amq.fanout_r2, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "accept(amq.fanout_r2, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(amq.fanout_r2, 1, a)"); + BOOST_CHECK_EQUAL(h.size(), i); +} + +QPID_AUTO_TEST_CASE(testRingQueue) { + DummyClusterFixture f; + vector<string>& h = f.dc->history; + + // FIXME aconway 2010-10-15: QPID-2908 ring queue address string is not working, + // so we can't do this: + // Sender sender = f.s.createSender("ring;{create:always,node:{x-declare:{arguments:{qpid.max_size:3,qpid.policy_type:ring}}}}"); + // Must use old API to declare ring queue: + qpid::client::Connection c; + f.open(c); + qpid::client::Session s = c.newSession(); + qpid::framing::FieldTable args; + args.setInt("qpid.max_size", 3); + args.setString("qpid.policy_type","ring"); + s.queueDeclare(qpid::client::arg::queue="ring", qpid::client::arg::arguments=args); + c.close(); + Sender sender = f.s.createSender("ring"); + + int i = 0; + // Send message + sender.send(Message("a")); + sender.send(Message("b")); + sender.send(Message("c")); + sender.send(Message("d")); + f.s.sync(); + + BOOST_CHECK_EQUAL(h.at(i++), "createq(ring)"); + + BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); + + BOOST_CHECK_EQUAL(h.at(i++), "routing(b)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 2, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); + + BOOST_CHECK_EQUAL(h.at(i++), "routing(c)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 3, c)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(c)"); + + BOOST_CHECK_EQUAL(h.at(i++), "routing(d)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 4, d)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(d)"); + + Receiver receiver = f.s.createReceiver("ring"); + BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "b"); + BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "c"); + BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "d"); + f.s.acknowledge(true); + + BOOST_CHECK_EQUAL(h.at(i++), "consume(ring, 1)"); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 2, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 3, c)"); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 4, d)"); + BOOST_CHECK_EQUAL(h.at(i++), "accept(ring, 2, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 2, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "accept(ring, 3, c)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 3, c)"); + BOOST_CHECK_EQUAL(h.at(i++), "accept(ring, 4, d)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 4, d)"); + + BOOST_CHECK_EQUAL(h.size(), i); +} + +QPID_AUTO_TEST_CASE(testTransactions) { + DummyClusterFixture f; + vector<string>& h = f.dc->history; + Session ts = f.c.createTransactionalSession(); + Sender sender = ts.createSender("q;{create:always,delete:always}"); + int i = 0; + BOOST_CHECK_EQUAL(h.at(i++), "createq(q)"); // Note: at() does bounds checking. + BOOST_CHECK_EQUAL(h.size(), i); + + sender.send(Message("a")); + sender.send(Message("b")); + ts.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); + BOOST_CHECK_EQUAL(h.at(i++), "routing(b)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); + BOOST_CHECK_EQUAL(h.size(), i); // Not replicated till commit + ts.commit(); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 2, b)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // FIXME aconway 2010-10-18: As things stand the cluster is not + // compatible with transactions + // - enqueues occur after routing is complete. + // - no transaction context associated with messages in the Cluster interface. + // - no call to Cluster::accept in Queue::dequeueCommitted + + Receiver receiver = ts.createReceiver("q"); + BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "a"); + BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "b"); + ts.acknowledge(); + ts.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "consume(q, 1)"); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 2, b)"); + BOOST_CHECK_EQUAL(h.size(), i); + ts.commit(); + ts.sync(); + // BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)"); + // BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 2, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, b)"); + BOOST_CHECK_EQUAL(h.size(), i); +} + +QPID_AUTO_TEST_SUITE_END() + +}} // namespace qpid::tests + diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 02454971cb..5e1438d813 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -123,7 +123,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ Variant.cpp \ Address.cpp \ ClientMessage.cpp \ - Qmf2.cpp + Qmf2.cpp \ + BrokerClusterCalls.cpp if HAVE_XML unit_test_SOURCES+= XmlClientSessionTest.cpp diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk index da191e8682..a033025226 100644 --- a/cpp/src/tests/cluster.mk +++ b/cpp/src/tests/cluster.mk @@ -77,7 +77,7 @@ cluster_test_SOURCES = \ PartialFailure.cpp \ ClusterFailover.cpp -cluster_test_LDADD=$(lib_client) $(lib_broker) ../cluster.la -lboost_unit_test_framework +cluster_test_LDADD=$(lib_client) $(lib_broker) $(lib_messaging) ../cluster.la -lboost_unit_test_framework -lboost_regex qpidtest_SCRIPTS += run_cluster_tests cluster_tests.py run_long_cluster_tests long_cluster_tests.py testlib.py cluster_tests.fail |