diff options
author | Alan Conway <aconway@apache.org> | 2011-09-06 21:45:43 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-09-06 21:45:43 +0000 |
commit | 4acf969531264568693cfaf86492203a1ecf3634 (patch) | |
tree | c96ae7c251985e579527d462152061e94fa4a1fe | |
parent | 1c52d0b8deda196e257de7436a247921d9482046 (diff) | |
download | qpid-python-4acf969531264568693cfaf86492203a1ecf3634.tar.gz |
QPID-2920: First cut experimental prototype for new cluster.
Experimental code to investigate & measure performance of new cluster design ideas.
Experimental classes are in src/qpid/cluster/exp.
New broker::Cluster interface provides call points for cluster. Similar to
store but has more operations, may be merged at a future point.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-1@1165874 13f79535-47bb-0310-9956-ffa450edef68
40 files changed, 2199 insertions, 87 deletions
diff --git a/qpid/cpp/design_docs/new-cluster-plan.txt b/qpid/cpp/design_docs/new-cluster-plan.txt index 781876e55a..571c3d865c 100644 --- a/qpid/cpp/design_docs/new-cluster-plan.txt +++ b/qpid/cpp/design_docs/new-cluster-plan.txt @@ -146,10 +146,6 @@ reject(QueuedMessage): isRejecting = true mcast reject(qmsg) -# FIXME no longer needed? -drop(QueuedMessage) - cleanup(qmsg) - *** MessageHandler and mcast messages Types: - struct QueueEntry { QueuedMessage qmsg; NodeId acquired; } @@ -348,6 +344,9 @@ For 0-10 can use channel numbers & send whole frames packed into larger buffer. Extend broker::Cluster interface to capture transaction context and completion. Sequence number to generate per-node tx IDs. Replicate transaction completion. +** TODO [#B] Management support +- Replicate management methods that modify queues - e.g. move, purge. +- Report connections - local only or cluster-wide? ** TODO [#B] Batch CPG multicast messages The new cluster design involves a lot of small multicast messages, they need to be batched into larger CPG messages for efficiency. diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 2663987f75..c6fd021609 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -520,6 +520,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/Broker.cpp \ qpid/broker/Broker.h \ qpid/broker/BrokerImportExport.h \ + qpid/broker/Cluster.h \ qpid/broker/Connection.cpp \ qpid/broker/Connection.h \ qpid/broker/ConnectionFactory.cpp \ @@ -589,6 +590,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/PriorityQueue.cpp \ qpid/broker/NameGenerator.cpp \ qpid/broker/NameGenerator.h \ + qpid/broker/NullCluster.h \ qpid/broker/NullMessageStore.cpp \ qpid/broker/NullMessageStore.h \ qpid/broker/OwnershipToken.h \ diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk index 3ce4ce25b3..9543da12b7 100644 --- a/qpid/cpp/src/cluster.mk +++ b/qpid/cpp/src/cluster.mk @@ -101,6 +101,30 @@ cluster_la_LIBADD= -lcpg $(libcman) libqpidbroker.la libqpidclient.la cluster_la_CXXFLAGS = $(AM_CXXFLAGS) -fno-strict-aliasing cluster_la_LDFLAGS = $(PLUGINLDFLAGS) +# Experimental new cluster plugin +dmodule_LTLIBRARIES += cluster2.la +cluster2_la_LIBADD = -lcpg libqpidbroker.la +cluster2_la_LDFLAGS = $(PLUGINLDFLAGS) +cluster2_la_SOURCES = \ + qpid/cluster/Cpg.cpp \ + qpid/cluster/Cpg.h \ + qpid/cluster/PollerDispatch.cpp \ + qpid/cluster/PollerDispatch.h \ + qpid/cluster/exp/BrokerHandler.cpp \ + qpid/cluster/exp/BrokerHandler.h \ + qpid/cluster/exp/Cluster2Plugin.cpp \ + qpid/cluster/exp/Core.cpp \ + qpid/cluster/exp/Core.h \ + qpid/cluster/exp/EventHandler.cpp \ + qpid/cluster/exp/EventHandler.h \ + qpid/cluster/exp/HandlerBase.cpp \ + qpid/cluster/exp/HandlerBase.h \ + qpid/cluster/exp/MessageHandler.cpp \ + qpid/cluster/exp/MessageHandler.h \ + qpid/cluster/exp/WiringHandler.cpp \ + qpid/cluster/exp/WiringHandler.h + + # The watchdog plugin and helper executable dmoduleexec_LTLIBRARIES += watchdog.la watchdog_la_SOURCES = qpid/cluster/WatchDogPlugin.cpp diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 598c43b1d8..1004fa2bcd 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -25,6 +25,7 @@ #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/HeadersExchange.h" #include "qpid/broker/MessageStoreModule.h" +#include "qpid/broker/NullCluster.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/RecoveryManagerImpl.h" #include "qpid/broker/SaslAuthenticator.h" @@ -176,6 +177,7 @@ Broker::Broker(const Broker::Options& conf) : conf.qmf2Support) : 0), store(new NullMessageStore), + cluster(new NullCluster), acl(0), dataDir(conf.noDataDir ? std::string() : conf.dataDir), queues(this), @@ -757,7 +759,6 @@ void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) { const std::string Broker::TCP_TRANSPORT("tcp"); - std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue( const std::string& name, bool durable, @@ -811,10 +812,11 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue( void Broker::deleteQueue(const std::string& name, const std::string& userId, const std::string& connectionId, QueueFunctor check) { - if (acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,NULL)) { - throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId)); + if ((userId.size() || connectionId.size()) && // Skip ACL check if ID is empty. + acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,NULL)) { + throw framing::UnauthorizedAccessException( + QPID_MSG("ACL denied queue delete request from " << userId)); } - Queue::shared_ptr queue = queues.find(name); if (queue) { if (check) check(queue); @@ -878,6 +880,7 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange( ManagementAgent::toMap(arguments), "created")); } + getCluster().create(*result.first); } return result; } @@ -885,8 +888,8 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange( void Broker::deleteExchange(const std::string& name, const std::string& userId, const std::string& connectionId) { - if (acl) { - if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) ) + if ((userId.size() || connectionId.size()) && // Skip ACL check if ID is empty. + acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL)) { throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << userId)); } @@ -902,7 +905,7 @@ void Broker::deleteExchange(const std::string& name, const std::string& userId, if (managementAgent.get()) managementAgent->raiseEvent(_qmf::EventExchangeDelete(connectionId, userId, name)); - + getCluster().destroy(*exchange); } void Broker::bind(const std::string& queueName, @@ -937,6 +940,7 @@ void Broker::bind(const std::string& queueName, queueName, key, ManagementAgent::toMap(arguments))); } } + getCluster().bind(*queue, *exchange, key, arguments); } } @@ -965,14 +969,19 @@ void Broker::unbind(const std::string& queueName, } else { if (exchange->unbind(queue, key, 0)) { if (exchange->isDurable() && queue->isDurable()) { - store->unbind(*exchange, *queue, key, qpid::framing::FieldTable()); + store->unbind(*exchange, *queue, key, framing::FieldTable()); } if (managementAgent.get()) { managementAgent->raiseEvent(_qmf::EventUnbind(connectionId, userId, exchangeName, queueName, key)); } + getCluster().unbind(*queue, *exchange, key, framing::FieldTable()); } } } +void Broker::setCluster(std::auto_ptr<Cluster> c) { cluster = c; } + +Cluster& Broker::getCluster() { return *cluster; } + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 40f7b6273f..76d049df75 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -72,6 +72,7 @@ namespace broker { class ConnectionState; class ExpiryPolicy; class Message; +class Cluster; static const uint16_t DEFAULT_PORT=5672; @@ -165,6 +166,7 @@ public: std::auto_ptr<management::ManagementAgent> managementAgent; ProtocolFactoryMap protocolFactories; std::auto_ptr<MessageStore> store; + std::auto_ptr<Cluster> cluster; AclModule* acl; DataDir dataDir; @@ -294,6 +296,9 @@ public: bool isClusterUpdatee() const { return clusterUpdatee; } void setClusterUpdatee(bool set) { clusterUpdatee = set; } + QPID_BROKER_EXTERN void setCluster(std::auto_ptr<Cluster> c); + QPID_BROKER_EXTERN Cluster& getCluster(); + management::ManagementAgent* getManagementAgent() { return managementAgent.get(); } ConnectionCounter& getConnectionCounter() {return connectionCounter;} diff --git a/qpid/cpp/src/qpid/broker/Cluster.h b/qpid/cpp/src/qpid/broker/Cluster.h new file mode 100644 index 0000000000..9bbf245498 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/Cluster.h @@ -0,0 +1,104 @@ +#ifndef QPID_BROKER_CLUSTER_H +#define QPID_BROKER_CLUSTER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <boost/intrusive_ptr.hpp> + +namespace qpid { + +namespace framing { +class FieldTable; +} + +namespace broker { + +class Message; +struct QueuedMessage; +class Queue; +class Exchange; + +/** + * NOTE: this is part of an experimental cluster implementation that is not + * yet fully functional. The original cluster implementation remains in place. + * See ../cluster/new-cluster-design.txt + * + * Interface for cluster implementations. Functions on this interface are + * called at relevant points in the Broker's processing. + */ +class Cluster +{ + public: + virtual ~Cluster() {} + + // Messages + + /** In Exchange::route, before the message is enqueued. */ + virtual void routing(const boost::intrusive_ptr<Message>&) = 0; + + /** A message is delivered to a queue. + * Called before actually pushing the message to the queue. + *@return If true the message should be pushed to the queue now. + * otherwise the cluster code will push the message when it is replicated. + */ + virtual bool enqueue(Queue& queue, const boost::intrusive_ptr<Message>&) = 0; + + /** In Exchange::route, after all enqueues for the message. */ + virtual void routed(const boost::intrusive_ptr<Message>&) = 0; + + /** A message is acquired by a local consumer, it is unavailable to replicas. */ + virtual void acquire(const QueuedMessage&) = 0; + + /** A locally-acquired message is released by the consumer and re-queued. */ + virtual void release(const QueuedMessage&) = 0; + + /** A message is removed from the queue. */ + virtual void dequeue(const QueuedMessage&) = 0; + + // Consumers + + /** A new consumer subscribes to a queue. */ + virtual void consume(const Queue&, size_t consumerCount) = 0; + /** A consumer cancels its subscription to a queue */ + virtual void cancel(const Queue&, size_t consumerCount) = 0; + + // Wiring + + /** A queue is created */ + virtual void create(const Queue&) = 0; + /** A queue is destroyed */ + virtual void destroy(const Queue&) = 0; + /** An exchange is created */ + virtual void create(const Exchange&) = 0; + /** An exchange is destroyed */ + virtual void destroy(const Exchange&) = 0; + /** A binding is created */ + virtual void bind(const Queue&, const Exchange&, + const std::string& key, const framing::FieldTable& args) = 0; + /** A binding is removed */ + virtual void unbind(const Queue&, const Exchange&, + const std::string& key, const framing::FieldTable& args) = 0; +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_CLUSTER_H*/ diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index d68845062d..6ec68fbf47 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -24,6 +24,9 @@ #include "qpid/broker/Exchange.h" #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/FedOps.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/Cluster.h" +#include "qpid/management/ManagementAgent.h" #include "qpid/broker/Queue.h" #include "qpid/framing/MessageProperties.h" #include "qpid/framing/reply_exceptions.h" @@ -102,10 +105,23 @@ class ExInfo { }; } +// Bracket a scope with calls to Cluster::routing and Cluster::routed +struct ScopedClusterRouting { + Broker* broker; + boost::intrusive_ptr<Message> message; + ScopedClusterRouting(Broker* b, boost::intrusive_ptr<Message> m) + : broker(b), message(m) { + if (broker) broker->getCluster().routing(message); + } + ~ScopedClusterRouting() { + if (broker) broker->getCluster().routed(message); + } +}; + void Exchange::doRoute(Deliverable& msg, ConstBindingList b) { + ScopedClusterRouting scr(broker, &msg.getMessage()); int count = 0; - if (b.get()) { // Block the content release if the message is transient AND there is more than one binding if (!msg.getMessage().isPersistent() && b->size() > 1) { diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp index 1c8d26c4f7..54e6d5302c 100644 --- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -24,6 +24,9 @@ #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/HeadersExchange.h" #include "qpid/broker/TopicExchange.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/Cluster.h" +#include "qpid/log/Statement.h" #include "qpid/management/ManagementDirectExchange.h" #include "qpid/management/ManagementTopicExchange.h" #include "qpid/framing/reply_exceptions.h" @@ -88,11 +91,15 @@ void ExchangeRegistry::destroy(const string& name){ } Exchange::shared_ptr ExchangeRegistry::get(const string& name){ + Exchange::shared_ptr ex = find(name); + if (!ex) throw framing::NotFoundException(QPID_MSG("Exchange not found: " << name)); + return ex; +} + +Exchange::shared_ptr ExchangeRegistry::find(const string& name){ RWlock::ScopedRlock locker(lock); ExchangeMap::iterator i = exchanges.find(name); - if (i == exchanges.end()) - throw framing::NotFoundException(QPID_MSG("Exchange not found: " << name)); - return i->second; + return (i == exchanges.end()) ? Exchange::shared_ptr() : i->second; } bool ExchangeRegistry::registerExchange(const Exchange::shared_ptr& ex) { diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.h b/qpid/cpp/src/qpid/broker/ExchangeRegistry.h index 2b75a8f3cf..5f15ad22e6 100644 --- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.h +++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.h @@ -55,6 +55,7 @@ class ExchangeRegistry{ const qpid::framing::FieldTable& args = framing::FieldTable()); QPID_BROKER_EXTERN void destroy(const std::string& name); QPID_BROKER_EXTERN Exchange::shared_ptr get(const std::string& name); + QPID_BROKER_EXTERN Exchange::shared_ptr find(const std::string& name); Exchange::shared_ptr getDefault(); /** diff --git a/qpid/cpp/src/qpid/broker/NullCluster.h b/qpid/cpp/src/qpid/broker/NullCluster.h new file mode 100644 index 0000000000..995ec57058 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/NullCluster.h @@ -0,0 +1,66 @@ +#ifndef QPID_BROKER_NULLCLUSTER_H +#define QPID_BROKER_NULLCLUSTER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/broker/Cluster.h> + +namespace qpid { +namespace broker { + +/** + * No-op implementation of Cluster interface, installed by broker when + * no cluster plug-in is present or clustering is disabled. + */ +class NullCluster : public Cluster +{ + public: + + // Messages + + virtual void routing(const boost::intrusive_ptr<Message>&) {} + virtual bool enqueue(Queue&, const boost::intrusive_ptr<Message>&) { return true; } + virtual void routed(const boost::intrusive_ptr<Message>&) {} + virtual void acquire(const QueuedMessage&) {} + virtual void release(const QueuedMessage&) {} + virtual void dequeue(const QueuedMessage&) {} + + // Consumers + + virtual void consume(const Queue&, size_t) {} + virtual void cancel(const Queue&, size_t) {} + + // Wiring + + virtual void create(const Queue&) {} + virtual void destroy(const Queue&) {} + virtual void create(const Exchange&) {} + virtual void destroy(const Exchange&) {} + virtual void bind(const Queue&, const Exchange&, + const std::string&, const framing::FieldTable&) {} + virtual void unbind(const Queue&, const Exchange&, + const std::string&, const framing::FieldTable&) {} +}; + +}} // namespace qpid::broker + +#endif diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index dd3f982699..20d9361909 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -20,6 +20,7 @@ */ #include "qpid/broker/Broker.h" +#include "qpid/broker/Cluster.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueEvents.h" #include "qpid/broker/Exchange.h" @@ -152,6 +153,10 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){ // Check for deferred delivery in a cluster. if (broker && broker->deferDelivery(name, msg)) return; + // Same thing but for the new cluster interface. + if (broker && !broker->getCluster().enqueue(*this, msg)) + return; + if (msg->isImmediate() && getConsumerCount() == 0) { if (alternateExchange) { DeliverableMessage deliverable(msg); @@ -221,16 +226,33 @@ void Queue::requeue(const QueuedMessage& msg){ } } } + if (broker) broker->getCluster().release(msg); copy.notify(); } +// Inform the cluster of an acquired message on exit from a function +// that does the acquiring. ClusterAcquireOnExit is declared *before* +// any locks are taken. The calling function sets qmsg to the acquired +// message with a lock held, but the call to Cluster::acquire() will +// be outside the lock. +struct ClusterAcquireOnExit { + Broker* broker; + QueuedMessage qmsg; + ClusterAcquireOnExit(Broker* b) : broker(b) {} + ~ClusterAcquireOnExit() { + if (broker && qmsg.queue) broker->getCluster().acquire(qmsg); + } +}; + bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) { + ClusterAcquireOnExit willAcquire(broker); // Outside lock Mutex::ScopedLock locker(messageLock); assertClusterSafe(); QPID_LOG(debug, "Attempting to acquire message at " << position); if (messages->remove(position, message)) { QPID_LOG(debug, "Acquired message at " << position << " from " << name); + willAcquire.qmsg = message; return true; } else { QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position"); @@ -277,6 +299,7 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) { while (true) { + ClusterAcquireOnExit willAcquire(broker); // Outside the lock Mutex::ScopedLock locker(messageLock); if (messages->empty()) { QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); @@ -293,6 +316,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ if (c->filter(msg.payload)) { if (c->accept(msg.payload)) { m = msg; + willAcquire.qmsg = msg; pop(); return CONSUMED; } else { @@ -377,42 +401,51 @@ QueuedMessage Queue::find(SequenceNumber pos) const { return msg; } -void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ +void Queue::consume(Consumer::shared_ptr c, bool requestExclusive) { assertClusterSafe(); - Mutex::ScopedLock locker(consumerLock); - if(exclusive) { - throw ResourceLockedException( - QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); - } else if(requestExclusive) { - if(consumerCount) { + size_t consumers; + { + Mutex::ScopedLock locker(consumerLock); + if(exclusive) { throw ResourceLockedException( - QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); - } else { - exclusive = c->getSession(); + QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); + } else if(requestExclusive) { + if(consumerCount) { + throw ResourceLockedException( + QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); + } else { + exclusive = c->getSession(); + } } + consumers = ++consumerCount; } - consumerCount++; if (mgmtObject != 0) mgmtObject->inc_consumerCount (); //reset auto deletion timer if necessary if (autoDeleteTimeout && autoDeleteTask) { autoDeleteTask->cancel(); } + if (broker) broker->getCluster().consume(*this, consumers); } void Queue::cancel(Consumer::shared_ptr c){ removeListener(c); - Mutex::ScopedLock locker(consumerLock); - consumerCount--; - if(exclusive) exclusive = 0; - if (mgmtObject != 0) - mgmtObject->dec_consumerCount (); + size_t consumers; + { + Mutex::ScopedLock locker(consumerLock); + consumers = --consumerCount; + if(exclusive) exclusive = 0; + if (mgmtObject != 0) + mgmtObject->dec_consumerCount (); + } + if (broker) broker->getCluster().cancel(*this, consumers); } QueuedMessage Queue::get(){ + ClusterAcquireOnExit willAcquire(broker); // Outside lock Mutex::ScopedLock locker(messageLock); QueuedMessage msg(this); - messages->pop(msg); + if (messages->pop(msg)) willAcquire.qmsg = msg; return msg; } @@ -519,6 +552,7 @@ void Queue::pop() void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ assertClusterSafe(); + QueuedMessage qm; QueueListeners::NotificationSet copy; QueuedMessage removed; bool dequeueRequired = false; @@ -658,7 +692,6 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { ScopedUse u(barrier); if (!u.acquired) return false; - { Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return false; @@ -666,6 +699,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) dequeued(msg); } } + if (!ctxt && broker) broker->getCluster().dequeue(msg); // Outside lock // This check prevents messages which have been forced persistent on one queue from dequeuing // from another on which no forcing has taken place and thus causing a store error. bool fp = msg.payload->isForcedPersistent(); @@ -682,6 +716,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) void Queue::dequeueCommitted(const QueuedMessage& msg) { + if (broker) broker->getCluster().dequeue(msg); // Outside lock Mutex::ScopedLock locker(messageLock); dequeued(msg); if (mgmtObject != 0) { @@ -726,6 +761,7 @@ void Queue::create(const FieldTable& _settings) store->create(*this, _settings); } configureImpl(_settings); + if (broker) broker->getCluster().create(*this); } @@ -848,6 +884,7 @@ void Queue::destroyed() } if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>(); notifyDeleted(); + if (broker) broker->getCluster().destroy(*this); } void Queue::notifyDeleted() diff --git a/qpid/cpp/src/qpid/broker/QueuedMessage.h b/qpid/cpp/src/qpid/broker/QueuedMessage.h index 35e48b11f3..d1b0c1b41c 100644 --- a/qpid/cpp/src/qpid/broker/QueuedMessage.h +++ b/qpid/cpp/src/qpid/broker/QueuedMessage.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -34,13 +34,12 @@ struct QueuedMessage framing::SequenceNumber position; Queue* queue; - QueuedMessage() : queue(0) {} - QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) : + QueuedMessage(Queue* q=0) : position(0), queue(q) {} + QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) : payload(msg), position(sn), queue(q) {} - QueuedMessage(Queue* q) : queue(q) {} - + }; - inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) { return a.position < b.position; } + inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) { return a.position < b.position; } }} diff --git a/qpid/cpp/src/qpid/broker/RecoverableExchange.h b/qpid/cpp/src/qpid/broker/RecoverableExchange.h index ca6cc1541e..ee0848ebed 100644 --- a/qpid/cpp/src/qpid/broker/RecoverableExchange.h +++ b/qpid/cpp/src/qpid/broker/RecoverableExchange.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -44,6 +44,8 @@ public: const std::string& routingKey, qpid::framing::FieldTable& args) = 0; virtual ~RecoverableExchange() {}; + + virtual const std::string& getName() const = 0; }; }} diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index d08409695e..9db366fd20 100644 --- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -62,7 +62,7 @@ class RecoverableQueueImpl : public RecoverableQueue public: RecoverableQueueImpl(const boost::shared_ptr<Queue>& _queue) : queue(_queue) {} ~RecoverableQueueImpl() {}; - void setPersistenceId(uint64_t id); + void setPersistenceId(uint64_t id); uint64_t getPersistenceId() const; const std::string& getName() const; void setExternalQueueStore(ExternalQueueStore* inst); @@ -80,6 +80,7 @@ public: RecoverableExchangeImpl(Exchange::shared_ptr _exchange, QueueRegistry& _queues) : exchange(_exchange), queues(_queues) {} void setPersistenceId(uint64_t id); void bind(const std::string& queue, const std::string& routingKey, qpid::framing::FieldTable& args); + const std::string& getName() const; }; class RecoverableConfigImpl : public RecoverableConfig @@ -133,7 +134,7 @@ RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buff return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message)); } -RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid, +RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid, std::auto_ptr<TPCTransactionContext> txn) { DtxBuffer::shared_ptr buffer(new DtxBuffer()); @@ -202,7 +203,7 @@ void RecoverableQueueImpl::setPersistenceId(uint64_t id) { queue->setPersistenceId(id); } - + uint64_t RecoverableQueueImpl::getPersistenceId() const { return queue->getPersistenceId(); @@ -212,7 +213,7 @@ const std::string& RecoverableQueueImpl::getName() const { return queue->getName(); } - + void RecoverableQueueImpl::setExternalQueueStore(ExternalQueueStore* inst) { queue->setExternalQueueStore(inst); @@ -245,6 +246,11 @@ void RecoverableExchangeImpl::bind(const string& queueName, queue->bound(exchange->getName(), key, args); } +const std::string& RecoverableExchangeImpl::getName() const +{ + return exchange->getName(); +} + void RecoverableMessageImpl::dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue) { buffer->enlist(TxOp::shared_ptr(new RecoveredDequeue(queue, msg))); diff --git a/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp b/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp index b8d94b95a5..43c171efe8 100644 --- a/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp +++ b/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp @@ -37,9 +37,11 @@ PollerDispatch::PollerDispatch(Cpg& c, boost::shared_ptr<sys::Poller> p, started(false) {} -PollerDispatch::~PollerDispatch() { - if (started) - dispatchHandle.stopWatch(); +PollerDispatch::~PollerDispatch() { stop(); } + +void PollerDispatch::stop() { + if (started) dispatchHandle.stopWatch(); + started = false; } void PollerDispatch::start() { @@ -54,6 +56,7 @@ void PollerDispatch::dispatch(sys::DispatchHandle& h) { h.rewatch(); } catch (const std::exception& e) { QPID_LOG(critical, "Error in cluster dispatch: " << e.what()); + stop(); onError(); } } diff --git a/qpid/cpp/src/qpid/cluster/PollerDispatch.h b/qpid/cpp/src/qpid/cluster/PollerDispatch.h index 63801e0de9..f16d5ece95 100644 --- a/qpid/cpp/src/qpid/cluster/PollerDispatch.h +++ b/qpid/cpp/src/qpid/cluster/PollerDispatch.h @@ -41,6 +41,7 @@ class PollerDispatch { ~PollerDispatch(); void start(); + void stop(); private: // Poller callbacks diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp new file mode 100644 index 0000000000..a6eb12ed57 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp @@ -0,0 +1,154 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Core.h" +#include "BrokerHandler.h" +#include "qpid/framing/ClusterMessageRoutingBody.h" +#include "qpid/framing/ClusterMessageRoutedBody.h" +#include "qpid/framing/ClusterMessageEnqueueBody.h" +#include "qpid/framing/ClusterMessageDequeueBody.h" +#include "qpid/framing/ClusterWiringCreateQueueBody.h" +#include "qpid/framing/ClusterWiringCreateExchangeBody.h" +#include "qpid/framing/ClusterWiringDestroyQueueBody.h" +#include "qpid/framing/ClusterWiringDestroyExchangeBody.h" +#include "qpid/framing/ClusterWiringBindBody.h" +#include "qpid/framing/ClusterWiringUnbindBody.h" +#include "qpid/sys/Thread.h" +#include "qpid/broker/QueuedMessage.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/Exchange.h" +#include "qpid/framing/Buffer.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace cluster { + +using namespace framing; +using namespace broker; + +namespace { +// noReplicate means the current thread is handling a message +// received from the cluster so it should not be replciated. +QPID_TSS bool tssNoReplicate = false; + +// Routing ID of the message being routed in the current thread. +// 0 if we are not currently routing a message. +QPID_TSS RoutingId tssRoutingId = 0; +} + +BrokerHandler::ScopedSuppressReplication::ScopedSuppressReplication() { + assert(!tssNoReplicate); + tssNoReplicate = true; +} + +BrokerHandler::ScopedSuppressReplication::~ScopedSuppressReplication() { + assert(tssNoReplicate); + tssNoReplicate = false; +} + +BrokerHandler::BrokerHandler(Core& c) : core(c) {} + +RoutingId BrokerHandler::nextRoutingId() { + RoutingId id = ++routingId; + if (id == 0) id = ++routingId; // Avoid 0 on wrap-around. + return id; +} + +void BrokerHandler::routing(const boost::intrusive_ptr<Message>&) { } + +bool BrokerHandler::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& msg) +{ + if (tssNoReplicate) return true; + if (!tssRoutingId) { // This is the first enqueue, so send the message + tssRoutingId = nextRoutingId(); + // FIXME aconway 2010-10-20: replicate message in fixed size buffers. + std::string data(msg->encodedSize(),char()); + framing::Buffer buf(&data[0], data.size()); + msg->encode(buf); + core.mcast(ClusterMessageRoutingBody(ProtocolVersion(), tssRoutingId, data)); + core.getRoutingMap().put(tssRoutingId, msg); + } + core.mcast(ClusterMessageEnqueueBody(ProtocolVersion(), tssRoutingId, queue.getName())); + // TODO aconway 2010-10-21: configable option for strict (wait + // for CPG deliver to do local deliver) vs. loose (local deliver + // immediately). + return false; +} + +void BrokerHandler::routed(const boost::intrusive_ptr<Message>&) { + if (tssRoutingId) { // we enqueued at least one message. + core.mcast(ClusterMessageRoutedBody(ProtocolVersion(), tssRoutingId)); + // Note: routingMap is cleaned up on CPG delivery in MessageHandler. + tssRoutingId = 0; + } +} + +void BrokerHandler::dequeue(const broker::QueuedMessage& qm) { + if (tssNoReplicate) return; + // FIXME aconway 2010-10-28: we also need to delay completion of the + // ack that caused this dequeue until self-delivery of the mcast below. + core.mcast(ClusterMessageDequeueBody( + ProtocolVersion(), qm.queue->getName(), qm.position)); +} + +void BrokerHandler::create(const broker::Queue& q) { + if (tssNoReplicate) return; + std::string data(q.encodedSize(), '\0'); + framing::Buffer buf(&data[0], data.size()); + q.encode(buf); + core.mcast(ClusterWiringCreateQueueBody(ProtocolVersion(), data)); +} + +void BrokerHandler::destroy(const broker::Queue& q) { + if (tssNoReplicate) return; + core.mcast(ClusterWiringDestroyQueueBody(ProtocolVersion(), q.getName())); +} + +void BrokerHandler::create(const broker::Exchange& ex) { + if (tssNoReplicate) return; + std::string data(ex.encodedSize(), '\0'); + framing::Buffer buf(&data[0], data.size()); + ex.encode(buf); + core.mcast(ClusterWiringCreateExchangeBody(ProtocolVersion(), data)); +} + +void BrokerHandler::destroy(const broker::Exchange& ex) { + if (tssNoReplicate) return; + core.mcast(ClusterWiringDestroyExchangeBody(ProtocolVersion(), ex.getName())); +} + +void BrokerHandler::bind(const broker::Queue& q, const broker::Exchange& ex, + const std::string& key, const framing::FieldTable& args) +{ + if (tssNoReplicate) return; + core.mcast(ClusterWiringBindBody( + ProtocolVersion(), q.getName(), ex.getName(), key, args)); +} + +void BrokerHandler::unbind(const broker::Queue& q, const broker::Exchange& ex, + const std::string& key, const framing::FieldTable& args) +{ + if (tssNoReplicate) return; + core.mcast(ClusterWiringUnbindBody( + ProtocolVersion(), q.getName(), ex.getName(), key, args)); +} + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h b/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h new file mode 100644 index 0000000000..c53688125a --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h @@ -0,0 +1,86 @@ +#ifndef QPID_CLUSTER_BROKERHANDLER_H +#define QPID_CLUSTER_BROKERHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/Cluster.h" +#include "qpid/sys/AtomicValue.h" + +namespace qpid { +namespace cluster { +class Core; + +// TODO aconway 2010-10-19: experimental cluster code. + +/** + * Implements broker::Cluster interface, handles events in broker code. + */ +class BrokerHandler : public broker::Cluster +{ + public: + /** Suppress replication while in scope. + * Used to prevent re-replication of messages received from the cluster. + */ + struct ScopedSuppressReplication { + ScopedSuppressReplication(); + ~ScopedSuppressReplication(); + }; + + BrokerHandler(Core&); + + // FIXME aconway 2010-10-20: implement all points. + + // Messages + + void routing(const boost::intrusive_ptr<broker::Message>&); + bool enqueue(broker::Queue&, const boost::intrusive_ptr<broker::Message>&); + void routed(const boost::intrusive_ptr<broker::Message>&); + void acquire(const broker::QueuedMessage&) {} + void release(const broker::QueuedMessage&) {} + void dequeue(const broker::QueuedMessage&); + + // Consumers + + void consume(const broker::Queue&, size_t) {} + void cancel(const broker::Queue&, size_t) {} + + // Wiring + + void create(const broker::Queue&); + void destroy(const broker::Queue&); + void create(const broker::Exchange&); + void destroy(const broker::Exchange&); + void bind(const broker::Queue&, const broker::Exchange&, + const std::string&, const framing::FieldTable&); + void unbind(const broker::Queue&, const broker::Exchange&, + const std::string&, const framing::FieldTable&); + + + private: + uint32_t nextRoutingId(); + + Core& core; + sys::AtomicValue<uint32_t> routingId; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_BROKERHANDLER_H*/ diff --git a/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp b/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp new file mode 100644 index 0000000000..28b7dcec2e --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp @@ -0,0 +1,65 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <qpid/Options.h> +#include <qpid/broker/Broker.h> +#include "Core.h" + +namespace qpid { +namespace cluster { +using broker::Broker; + +// TODO aconway 2010-10-19: experimental new cluster code. + +/** + * Plugin for the cluster. + */ +struct Cluster2Plugin : public Plugin { + struct Opts : public Options { + Core::Settings& settings; + Opts(Core::Settings& s) : Options("Cluster Options"), settings(s) { + addOptions() + ("cluster2-name", optValue(settings.name, "NAME"), "Name of cluster to join"); + // TODO aconway 2010-10-19: copy across other options from ClusterPlugin.h + } + }; + + Core::Settings settings; + Opts options; + Core* core; // Core deletes itself on shutdown. + + Cluster2Plugin() : options(settings), core(0) {} + + Options* getOptions() { return &options; } + + void earlyInitialize(Plugin::Target& target) { + if (settings.name.empty()) return; + Broker* broker = dynamic_cast<Broker*>(&target); + if (!broker) return; + core = new Core(settings, *broker); + } + + void initialize(Plugin::Target& target) { + Broker* broker = dynamic_cast<Broker*>(&target); + if (broker && core) core->initialize(); + } +}; + +static Cluster2Plugin instance; // Static initialization. + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.cpp b/qpid/cpp/src/qpid/cluster/exp/Core.cpp new file mode 100644 index 0000000000..93ed96b9d8 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/Core.cpp @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Core.h" +#include "EventHandler.h" +#include "BrokerHandler.h" +#include "WiringHandler.h" +#include "MessageHandler.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/SignalHandler.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/Buffer.h" +#include "qpid/log/Statement.h" +#include <sys/uio.h> // For iovec + +namespace qpid { +namespace cluster { + +Core::Core(const Settings& s, broker::Broker& b) : + broker(b), + eventHandler(new EventHandler(*this)) +{ + eventHandler->add(boost::shared_ptr<HandlerBase>(new WiringHandler(*eventHandler))); + eventHandler->add(boost::shared_ptr<HandlerBase>(new MessageHandler(*eventHandler))); + + std::auto_ptr<BrokerHandler> bh(new BrokerHandler(*this)); + brokerHandler = bh.get(); + // BrokerHandler belongs to Broker + broker.setCluster(std::auto_ptr<broker::Cluster>(bh)); + eventHandler->start(); + eventHandler->getCpg().join(s.name); + // TODO aconway 2010-11-18: logging standards + QPID_LOG(notice, "cluster: joined " << s.name << ", member-id="<< eventHandler->getSelf()); +} + +void Core::initialize() {} + +void Core::fatal() { + // FIXME aconway 2010-10-20: error handling + assert(0); + broker::SignalHandler::shutdown(); +} + +void Core::mcast(const framing::AMQBody& body) { + QPID_LOG(trace, "cluster multicast: " << body); + // FIXME aconway 2010-10-20: use Multicaster, or bring in its features. + // here we multicast Frames rather than Events. + framing::AMQFrame f(body); + std::string data(f.encodedSize(), char()); + framing::Buffer buf(&data[0], data.size()); + f.encode(buf); + iovec iov = { buf.getPointer(), buf.getSize() }; + while (!eventHandler->getCpg().mcast(&iov, 1)) + ::usleep(1000); // FIXME aconway 2010-10-20: flow control +} + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.h b/qpid/cpp/src/qpid/cluster/exp/Core.h new file mode 100644 index 0000000000..3e53e0a65b --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/Core.h @@ -0,0 +1,93 @@ +#ifndef QPID_CLUSTER_CORE_H +#define QPID_CLUSTER_CORE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> +#include <memory> +#include "LockedMap.h" +#include "qpid/cluster/types.h" +#include "qpid/cluster/Cpg.h" +#include "qpid/broker/QueuedMessage.h" + +// TODO aconway 2010-10-19: experimental cluster code. + +namespace qpid { + +namespace framing{ +class AMQBody; +} + +namespace broker { +class Broker; +} + +namespace cluster { +class EventHandler; +class BrokerHandler; + +/** + * Cluster core state machine. + * Holds together the various objects that implement cluster behavior, + * and holds state that is shared by multiple components. + * + * Thread safe: called from broker connection threads and CPG dispatch threads. + */ +class Core +{ + public: + /** Configuration settings */ + struct Settings { + std::string name; + }; + + typedef LockedMap<RoutingId, boost::intrusive_ptr<broker::Message> > RoutingMap; + + /** Constructed during Plugin::earlyInitialize() */ + Core(const Settings&, broker::Broker&); + + /** Called during Plugin::initialize() */ + void initialize(); + + /** Shut down broker due to fatal error. Caller should log a critical message */ + void fatal(); + + /** Multicast an event */ + void mcast(const framing::AMQBody&); + + broker::Broker& getBroker() { return broker; } + EventHandler& getEventHandler() { return *eventHandler; } + BrokerHandler& getBrokerHandler() { return *brokerHandler; } + + /** Map of messages that are currently being routed. + * Used to pass messages being routed from BrokerHandler to MessageHandler + */ + RoutingMap& getRoutingMap() { return routingMap; } + private: + broker::Broker& broker; + std::auto_ptr<EventHandler> eventHandler; // Handles CPG events. + BrokerHandler* brokerHandler; // Handles broker events. + RoutingMap routingMap; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_CORE_H*/ diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp new file mode 100644 index 0000000000..c0e3e5fc42 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp @@ -0,0 +1,123 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Core.h" +#include "EventHandler.h" +#include "HandlerBase.h" +#include "qpid/broker/Broker.h" +#include "qpid/cluster/types.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/AllInvoker.h" +#include "qpid/framing/Buffer.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace cluster { + +EventHandler::EventHandler(Core& c) : + core(c), + cpg(*this), // FIXME aconway 2010-10-20: belongs on Core. + dispatcher(cpg, core.getBroker().getPoller(), boost::bind(&Core::fatal, &core)), + self(cpg.self()) +{} + +EventHandler::~EventHandler() {} + +void EventHandler::add(const boost::shared_ptr<HandlerBase>& handler) { + handlers.push_back(handler); +} + +void EventHandler::start() { + dispatcher.start(); +} + +// Print member ID or "self" if member is self +struct PrettyId { + MemberId id, self; + PrettyId(const MemberId& id_, const MemberId& self_) : id(id_), self(self_) {} +}; + +std::ostream& operator<<(std::ostream& o, const PrettyId& id) { + if (id.id == id.self) return o << "self"; + else return o << id.id; +} + +// Deliver CPG message. +void EventHandler::deliver( + cpg_handle_t /*handle*/, + const cpg_name* /*group*/, + uint32_t nodeid, + uint32_t pid, + void* msg, + int msg_len) +{ + sender = MemberId(nodeid, pid); + framing::Buffer buf(static_cast<char*>(msg), msg_len); + framing::AMQFrame frame; + while (buf.available()) { + frame.decode(buf); + assert(frame.getBody()); + QPID_LOG(trace, "cluster deliver: " << PrettyId(sender, self) << " " + << *frame.getBody()); + try { + invoke(*frame.getBody()); + } catch (const std::exception& e) { + // Note: exceptions are assumed to be survivable, + // fatal errors should log a message and call Core::fatal. + QPID_LOG(error, e.what()); + } + } +} + +void EventHandler::invoke(const framing::AMQBody& body) { + for (Handlers::iterator i = handlers.begin(); i != handlers.end(); ++i) + if ((*i)->invoke(body)) return; + QPID_LOG(error, "Cluster received unknown control: " << body ); + assert(0); // Error handling +} + +struct PrintAddrs { + PrintAddrs(const cpg_address* a, int n ) : addrs(a), count(n) {} + const cpg_address* addrs; + int count; +}; + +std::ostream& operator<<(std::ostream& o, const PrintAddrs& pa) { + for (const cpg_address* a = pa.addrs; a != pa.addrs+pa.count; ++a) + o << MemberId(*a) << " "; + return o; +} + +// CPG config-change callback. +void EventHandler::configChange ( + cpg_handle_t /*handle*/, + const cpg_name */*group*/, + const cpg_address *members, int nMembers, + const cpg_address *left, int nLeft, + const cpg_address *joined, int nJoined) +{ + // FIXME aconway 2010-10-20: TODO + QPID_LOG(notice, "cluster: new membership: " << PrintAddrs(members, nMembers)); + QPID_LOG_IF(notice, nLeft, "cluster: members left: " << PrintAddrs(left, nLeft)); + QPID_LOG_IF(notice, nJoined, "cluster: members joined: " << PrintAddrs(joined, nJoined)); +} + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.h b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h new file mode 100644 index 0000000000..b946c27084 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h @@ -0,0 +1,95 @@ +#ifndef QPID_CLUSTER_EVENTHANDLER_H +#define QPID_CLUSTER_EVENTHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +// TODO aconway 2010-10-19: experimental cluster code. + +#include "qpid/cluster/Cpg.h" +#include "qpid/cluster/PollerDispatch.h" +#include "qpid/cluster/types.h" +#include <boost/shared_ptr.hpp> +#include <vector> + +namespace qpid { + +namespace framing { +class AMQBody; +} + +namespace cluster { +class Core; +class HandlerBase; + +/** + * Dispatch events received from a CPG group. + * A container for Handler objects that handle specific cluster.xml classes. + * Thread unsafe: only called in its own CPG deliver thread context. + */ +class EventHandler : public Cpg::Handler +{ + public: + EventHandler(Core&); + ~EventHandler(); + + /** Add a handler */ + void add(const boost::shared_ptr<HandlerBase>&); + + /** Start polling */ + void start(); + + void deliver( // CPG deliver callback. + cpg_handle_t /*handle*/, + const struct cpg_name *group, + uint32_t /*nodeid*/, + uint32_t /*pid*/, + void* /*msg*/, + int /*msg_len*/); + + void configChange( // CPG config change callback. + cpg_handle_t /*handle*/, + const struct cpg_name */*group*/, + const struct cpg_address */*members*/, int /*nMembers*/, + const struct cpg_address */*left*/, int /*nLeft*/, + const struct cpg_address */*joined*/, int /*nJoined*/ + ); + + MemberId getSender() { return sender; } + MemberId getSelf() { return self; } + Core& getCore() { return core; } + Cpg& getCpg() { return cpg; } + + private: + void invoke(const framing::AMQBody& body); + + Core& core; + Cpg cpg; + PollerDispatch dispatcher; + MemberId sender; // sender of current event. + MemberId self; + + typedef std::vector<boost::shared_ptr<HandlerBase> > Handlers; + Handlers handlers; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_EVENTHANDLER_H*/ diff --git a/qpid/cpp/src/qpid/cluster/exp/HandlerBase.cpp b/qpid/cpp/src/qpid/cluster/exp/HandlerBase.cpp new file mode 100644 index 0000000000..c738fb2993 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/HandlerBase.cpp @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "HandlerBase.h" +#include "EventHandler.h" + +namespace qpid { +namespace cluster { + +HandlerBase::HandlerBase(EventHandler& eh) : eventHandler(eh) {} + +HandlerBase::~HandlerBase() {} + +MemberId HandlerBase::sender() { return eventHandler.getSender(); } + +MemberId HandlerBase::self() { return eventHandler.getSelf(); } + + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h b/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h new file mode 100644 index 0000000000..455375be5b --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h @@ -0,0 +1,54 @@ +#ifndef QPID_CLUSTER_HANDLERBASE_H +#define QPID_CLUSTER_HANDLERBASE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/cluster/types.h" + +namespace qpid { + +namespace framing { +class AMQBody; +} + +namespace cluster { +class EventHandler; + +/** + * Base class for handlers of events, children of the EventHandler. + */ +class HandlerBase +{ + public: + HandlerBase(EventHandler&); + virtual ~HandlerBase(); + + virtual bool invoke(const framing::AMQBody& body) = 0; + + protected: + EventHandler& eventHandler; + MemberId sender(); + MemberId self(); +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_HANDLERBASE_H*/ diff --git a/qpid/cpp/src/qpid/cluster/exp/LockedMap.h b/qpid/cpp/src/qpid/cluster/exp/LockedMap.h new file mode 100644 index 0000000000..0736e7ac35 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/LockedMap.h @@ -0,0 +1,73 @@ +#ifndef QPID_CLUSTER_LOCKEDMAP_H +#define QPID_CLUSTER_LOCKEDMAP_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Mutex.h" +#include <map> + +namespace qpid { +namespace cluster { + +/** + * A reader-writer locked thread safe map. + */ +template <class Key, class Value> +class LockedMap +{ + public: + /** Get value associated with key, returns Value() if none. */ + Value get(const Key& key) const { + sys::RWlock::ScopedRlock r(lock); + typename Map::const_iterator i = map.find(key); + if (i == map.end()) return Value(); + else return i->second; + } + + /** Associate value with key, overwriting any previous value for key. */ + void put(const Key& key, const Value& value) { + sys::RWlock::ScopedWlock w(lock); + map[key] = value; + } + + /** Associate value with key if there is not already a value associated with key. + * Returns true if the value was added. + */ + bool add(const Key& key, const Value& value) { + sys::RWlock::ScopedWlock w(lock); + return map.insert(key, value).second; + } + + /** Erase the value associated with key if any. Return true if a value was erased. */ + bool erase(const Key& key) { + sys::RWlock::ScopedWlock w(lock); + return map.erase(key); + } + + private: + typedef std::map<Key, Value> Map; + Map map; + mutable sys::RWlock lock; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_LOCKEDMAP_H*/ diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp new file mode 100644 index 0000000000..d4095e5bc1 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp @@ -0,0 +1,105 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Core.h" +#include "MessageHandler.h" +#include "BrokerHandler.h" +#include "EventHandler.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/Queue.h" +#include "qpid/framing/AllInvoker.h" +#include "qpid/framing/Buffer.h" +#include "qpid/sys/Thread.h" +#include "qpid/log/Statement.h" +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace cluster { +using namespace broker; + +MessageHandler::MessageHandler(EventHandler& e) : + HandlerBase(e), + broker(e.getCore().getBroker()) +{} + +bool MessageHandler::invoke(const framing::AMQBody& body) { + return framing::invoke(*this, body).wasHandled(); +} + +void MessageHandler::routing(RoutingId routingId, const std::string& message) { + if (sender() == self()) return; // Already in getCore().getRoutingMap() + boost::intrusive_ptr<Message> msg = new Message; + // FIXME aconway 2010-10-28: decode message in bounded-size buffers. + framing::Buffer buf(const_cast<char*>(&message[0]), message.size()); + msg->decodeHeader(buf); + msg->decodeContent(buf); + memberMap[sender()].routingMap[routingId] = msg; +} + +boost::shared_ptr<broker::Queue> MessageHandler::findQueue( + const std::string& q, const char* msg) +{ + boost::shared_ptr<Queue> queue = broker.getQueues().find(q); + if (!queue) throw Exception(QPID_MSG(msg << ": unknown queue " << q)); + return queue; +} + +void MessageHandler::enqueue(RoutingId routingId, const std::string& q) { + boost::shared_ptr<Queue> queue = findQueue(q, "Cluster enqueue failed"); + boost::intrusive_ptr<Message> msg; + if (sender() == self()) + msg = eventHandler.getCore().getRoutingMap().get(routingId); + else + msg = memberMap[sender()].routingMap[routingId]; + if (!msg) throw Exception(QPID_MSG("Cluster enqueue on " << q + << " failed: unknown message")); + BrokerHandler::ScopedSuppressReplication ssr; + queue->deliver(msg); +} + +void MessageHandler::routed(RoutingId routingId) { + if (sender() == self()) + eventHandler.getCore().getRoutingMap().erase(routingId); + else + memberMap[sender()].routingMap.erase(routingId); +} + +void MessageHandler::dequeue(const std::string& q, uint32_t position) { + if (sender() == self()) { + // FIXME aconway 2010-10-28: we should complete the ack that initiated + // the dequeue at this point, see BrokerHandler::dequeue + return; + } + boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue failed"); + BrokerHandler::ScopedSuppressReplication ssr; + QueuedMessage qm; + // FIXME aconway 2010-10-28: when we replicate acquires, the acquired + // messages will be stored by MessageHandler::acquire. + if (queue->acquireMessageAt(position, qm)) { + assert(qm.position.getValue() == position); + assert(qm.payload); + queue->dequeue(0, qm); + } +} + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h new file mode 100644 index 0000000000..f87f22a1ec --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h @@ -0,0 +1,73 @@ +#ifndef QPID_CLUSTER_MESSAGEHANDLER_H +#define QPID_CLUSTER_MESSAGEHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +// TODO aconway 2010-10-19: experimental cluster code. + +#include "HandlerBase.h" +#include "qpid/framing/AMQP_AllOperations.h" +#include <boost/intrusive_ptr.hpp> +#include <map> + +namespace qpid { + +namespace broker { +class Message; +class Broker; +class Queue; +} + +namespace cluster { +class EventHandler; +class BrokerHandler; + +/** + * Handler for message disposition events. + */ +class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler, + public HandlerBase +{ + public: + MessageHandler(EventHandler&); + + bool invoke(const framing::AMQBody& body); + + void routing(uint32_t routingId, const std::string& message); + void enqueue(uint32_t routingId, const std::string& queue); + void routed(uint32_t routingId); + void dequeue(const std::string& queue, uint32_t position); + private: + struct Member { + typedef std::map<uint32_t, boost::intrusive_ptr<broker::Message> > RoutingMap; + RoutingMap routingMap; + }; + typedef std::map<MemberId, Member> MemberMap; + + boost::shared_ptr<broker::Queue> findQueue(const std::string& q, const char* msg); + + broker::Broker& broker; + MemberMap memberMap; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_MESSAGEHANDLER_H*/ diff --git a/qpid/cpp/src/qpid/cluster/exp/README.txt b/qpid/cpp/src/qpid/cluster/exp/README.txt new file mode 100644 index 0000000000..97f2a10d84 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/README.txt @@ -0,0 +1,2 @@ + +Experimental code to test ideas about a new cluster design. diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp new file mode 100644 index 0000000000..04a76b9758 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp @@ -0,0 +1,111 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Core.h" +#include "WiringHandler.h" +#include "EventHandler.h" +#include "BrokerHandler.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/Exchange.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/ExchangeRegistry.h" +#include "qpid/broker/QueueRegistry.h" +#include "qpid/framing/AllInvoker.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Statement.h" +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace cluster { +using namespace broker; +using framing::FieldTable; + +WiringHandler::WiringHandler(EventHandler& e) : + HandlerBase(e), + broker(e.getCore().getBroker()), + recovery(broker.getQueues(), broker.getExchanges(), + broker.getLinks(), broker.getDtxManager()) +{} + +bool WiringHandler::invoke(const framing::AMQBody& body) { + return framing::invoke(*this, body).wasHandled(); +} + +void WiringHandler::createQueue(const std::string& data) { + if (sender() == self()) return; + BrokerHandler::ScopedSuppressReplication ssr; + framing::Buffer buf(const_cast<char*>(&data[0]), data.size()); + // TODO aconway 2011-02-21: asymetric - RecoveryManager vs Broker::create*() + RecoverableQueue::shared_ptr queue = recovery.recoverQueue(buf); + QPID_LOG(debug, "cluster: create queue " << queue->getName()); +} + +void WiringHandler::destroyQueue(const std::string& name) { + if (sender() == self()) return; + QPID_LOG(debug, "cluster: destroy queue " << name); + BrokerHandler::ScopedSuppressReplication ssr; + broker.deleteQueue(name, std::string(), std::string()); +} + +void WiringHandler::createExchange(const std::string& data) { + if (sender() == self()) return; + BrokerHandler::ScopedSuppressReplication ssr; + framing::Buffer buf(const_cast<char*>(&data[0]), data.size()); + // TODO aconway 2011-02-21: asymetric - RecoveryManager vs Broker::create*() + RecoverableExchange::shared_ptr exchange = recovery.recoverExchange(buf); + QPID_LOG(debug, "cluster: create exchange " << exchange->getName()); +} + +void WiringHandler::destroyExchange(const std::string& name) { + if (sender() == self()) return; + QPID_LOG(debug, "cluster: destroy exchange " << name); + BrokerHandler::ScopedSuppressReplication ssr; + broker.getExchanges().destroy(name); +} + +void WiringHandler::bind( + const std::string& queueName, const std::string& exchangeName, + const std::string& routingKey, const FieldTable& arguments) +{ + if (sender() == self()) return; + QPID_LOG(debug, "cluster: bind queue=" << queueName + << " exchange=" << exchangeName + << " key=" << routingKey + << " arguments=" << arguments); + BrokerHandler::ScopedSuppressReplication ssr; + broker.bind(queueName, exchangeName, routingKey, arguments, std::string(), std::string()); +} + +void WiringHandler::unbind( + const std::string& queueName, const std::string& exchangeName, + const std::string& routingKey, const FieldTable& arguments) +{ + if (sender() == self()) return; + QPID_LOG(debug, "cluster: unbind queue=" << queueName + << " exchange=" << exchangeName + << " key=" << routingKey + << " arguments=" << arguments); + BrokerHandler::ScopedSuppressReplication ssr; + broker.unbind(queueName, exchangeName, routingKey, std::string(), std::string()); +} + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h new file mode 100644 index 0000000000..e375cf6a95 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h @@ -0,0 +1,75 @@ +#ifndef QPID_CLUSTER_WIRINGHANDLER_H +#define QPID_CLUSTER_WIRINGHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +// TODO aconway 2010-10-19: experimental cluster code. + +#include "HandlerBase.h" +#include "qpid/broker/RecoveryManagerImpl.h" +#include "qpid/framing/AMQP_AllOperations.h" +#include <boost/intrusive_ptr.hpp> +#include <map> + +namespace qpid { + +namespace framing { +class FieldTable; +} + +namespace broker { +class Broker; +} + +namespace cluster { +class EventHandler; + + +/** + * Handler for wiring disposition events. + */ +class WiringHandler : public framing::AMQP_AllOperations::ClusterWiringHandler, + public HandlerBase +{ + public: + WiringHandler(EventHandler&); + + bool invoke(const framing::AMQBody& body); + + void createQueue(const std::string& data); + void destroyQueue(const std::string& name); + void createExchange(const std::string& data); + void destroyExchange(const std::string& name); + void bind(const std::string& queue, const std::string& exchange, + const std::string& routingKey, const framing::FieldTable& arguments); + void unbind(const std::string& queue, const std::string& exchange, + const std::string& routingKey, const framing::FieldTable& arguments); + + + private: + broker::Broker& broker; + broker::RecoveryManagerImpl recovery; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_WIRINGHANDLER_H*/ diff --git a/qpid/cpp/src/qpid/cluster/types.h b/qpid/cpp/src/qpid/cluster/types.h index bfb4fd5b9e..dec377b173 100644 --- a/qpid/cpp/src/qpid/cluster/types.h +++ b/qpid/cpp/src/qpid/cluster/types.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -38,7 +38,7 @@ extern "C" { # include <corosync/cpg.h> #else # error "No cpg.h header file available" -#endif +#endif } namespace qpid { @@ -79,6 +79,9 @@ std::ostream& operator<<(std::ostream&, const ConnectionId&); std::ostream& operator<<(std::ostream&, EventType); +/** Number to identify a message being routed. */ +typedef uint32_t RoutingId; + }} // namespace qpid::cluster #endif /*!QPID_CLUSTER_TYPES_H*/ diff --git a/qpid/cpp/src/tests/BrokerClusterCalls.cpp b/qpid/cpp/src/tests/BrokerClusterCalls.cpp new file mode 100644 index 0000000000..aa02d22267 --- /dev/null +++ b/qpid/cpp/src/tests/BrokerClusterCalls.cpp @@ -0,0 +1,419 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +///@file +// Tests using a dummy broker::Cluster implementation to verify the expected +// Cluster functions are called for various actions on the broker. +// + +#include "unit_test.h" +#include "test_tools.h" +#include "qpid/broker/Cluster.h" +#include "qpid/broker/Queue.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Session.h" +#include "qpid/messaging/Connection.h" +#include "qpid/messaging/Session.h" +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/Duration.h" +#include "BrokerFixture.h" +#include <boost/assign.hpp> +#include <boost/format.hpp> + +using namespace std; +using namespace boost; +using namespace boost::assign; +using namespace qpid::messaging; +using boost::format; +using boost::intrusive_ptr; + +namespace qpid { +namespace tests { + +class DummyCluster : public broker::Cluster +{ + private: + /** Flag used to ignore events other than enqueues while routing, + * e.g. acquires and accepts generated in a ring queue to replace an element.. + * In real impl would be a thread-local variable. + */ + bool isRouting; + + void recordQm(const string& op, const broker::QueuedMessage& qm) { + history += (format("%s(%s, %d, %s)") % op % qm.queue->getName() + % qm.position % qm.payload->getFrames().getContent()).str(); + } + void recordMsg(const string& op, broker::Queue& q, intrusive_ptr<broker::Message> msg) { + history += (format("%s(%s, %s)") % op % q.getName() % msg->getFrames().getContent()).str(); + } + void recordStr(const string& op, const string& name) { + history += (format("%s(%s)") % op % name).str(); + } + public: + // Messages + + virtual void routing(const boost::intrusive_ptr<broker::Message>& m) { + isRouting = true; + history += (format("routing(%s)") % m->getFrames().getContent()).str(); + } + + virtual bool enqueue(broker::Queue& q, const intrusive_ptr<broker::Message>&msg) { + recordMsg("enqueue", q, msg); + return true; + } + + virtual void routed(const boost::intrusive_ptr<broker::Message>& m) { + history += (format("routed(%s)") % m->getFrames().getContent()).str(); + isRouting = false; + } + virtual void acquire(const broker::QueuedMessage& qm) { + if (!isRouting) recordQm("acquire", qm); + } + virtual void release(const broker::QueuedMessage& qm) { + if (!isRouting) recordQm("release", qm); + } + virtual void dequeue(const broker::QueuedMessage& qm) { + if (!isRouting) recordQm("dequeue", qm); + } + + // Consumers + + virtual void consume(const broker::Queue& q, size_t n) { + history += (format("consume(%s, %d)") % q.getName() % n).str(); + } + virtual void cancel(const broker::Queue& q, size_t n) { + history += (format("cancel(%s, %d)") % q.getName() % n).str(); + } + + // Wiring + + virtual void create(const broker::Queue& q) { recordStr("createq", q.getName()); } + virtual void destroy(const broker::Queue& q) { recordStr("destroyq", q.getName()); } + virtual void create(const broker::Exchange& ex) { recordStr("createex", ex.getName()); } + virtual void destroy(const broker::Exchange& ex) { recordStr("destroyex", ex.getName()); } + virtual void bind(const broker::Queue& q, const broker::Exchange& ex, const std::string& key, const framing::FieldTable& /*args*/) { + history += (format("bind(%s, %s, %s)") % q.getName() % ex.getName() % key).str(); + } + virtual void unbind(const broker::Queue& q, const broker::Exchange& ex, const std::string& key, const framing::FieldTable& /*args*/) { + history += (format("unbind(%s, %s, %s)")% q.getName()%ex.getName()%key).str(); + } + vector<string> history; +}; + +QPID_AUTO_TEST_SUITE(BrokerClusterCallsTestSuite) + +// Broker fixture with DummyCluster set up and some new API client bits. +struct DummyClusterFixture: public BrokerFixture { + Connection c; + Session s; + DummyCluster*dc; + DummyClusterFixture() { + broker->setCluster(auto_ptr<broker::Cluster>(new DummyCluster)); + dc = &static_cast<DummyCluster&>(broker->getCluster()); + c = Connection("localhost:"+lexical_cast<string>(getPort())); + c.open(); + s = c.createSession(); + } + ~DummyClusterFixture() { + c.close(); + } +}; + +QPID_AUTO_TEST_CASE(testSimplePubSub) { + DummyClusterFixture f; + vector<string>& h = f.dc->history; + + // Queue creation + Sender sender = f.s.createSender("q;{create:always,delete:always}"); + size_t i = 0; + BOOST_CHECK_EQUAL(h.at(i++), "createq(q)"); // Note: at() does bounds checking. + BOOST_CHECK_EQUAL(h.size(), i); + + // Consumer + Receiver receiver = f.s.createReceiver("q"); + f.s.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "consume(q, 1)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Send message + sender.send(Message("a")); + f.s.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); + // Don't check size here as it is uncertain whether acquire has happened yet. + + // Acquire message + Message m = receiver.fetch(Duration::SECOND); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 1, a)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Acknowledge message + f.s.acknowledge(true); + f.s.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Close a consumer + receiver.close(); + BOOST_CHECK_EQUAL(h.at(i++), "cancel(q, 0)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Destroy the queue + f.c.close(); + BOOST_CHECK_EQUAL(h.at(i++), "destroyq(q)"); + BOOST_CHECK_EQUAL(h.size(), i); +} + +QPID_AUTO_TEST_CASE(testReleaseReject) { + DummyClusterFixture f; + vector<string>& h = f.dc->history; + + Sender sender = f.s.createSender("q;{create:always,delete:always,node:{x-declare:{alternate-exchange:amq.fanout}}}"); + sender.send(Message("a")); + Receiver receiver = f.s.createReceiver("q"); + Receiver altReceiver = f.s.createReceiver("amq.fanout;{link:{name:altq}}"); + Message m = receiver.fetch(Duration::SECOND); + h.clear(); + + // Explicit release + f.s.release(m); + f.s.sync(); + size_t i = 0; + BOOST_CHECK_EQUAL(h.at(i++), "release(q, 1, a)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Implicit release on closing connection. + Connection c("localhost:"+lexical_cast<string>(f.getPort())); + c.open(); + Session s = c.createSession(); + Receiver r = s.createReceiver("q"); + m = r.fetch(Duration::SECOND); + h.clear(); + i = 0; + c.close(); + BOOST_CHECK_EQUAL(h.at(i++), "cancel(q, 1)"); + BOOST_CHECK_EQUAL(h.at(i++), "release(q, 1, a)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Reject message, goes to alternate exchange. + m = receiver.fetch(Duration::SECOND); + h.clear(); + i = 0; + f.s.reject(m); + BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); // Routing to alt exchange + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(amq.fanout_altq, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)"); + BOOST_CHECK_EQUAL(h.size(), i); + m = altReceiver.fetch(Duration::SECOND); + BOOST_CHECK_EQUAL(m.getContent(), "a"); + + // Timed out message + h.clear(); + i = 0; + m = Message("t"); + m.setTtl(Duration(1)); // Timeout 1ms + sender.send(m); + usleep(2000); // Sleep 2ms + bool received = receiver.fetch(m, Duration::IMMEDIATE); + BOOST_CHECK(!received); // Timed out + BOOST_CHECK_EQUAL(h.at(i++), "routing(t)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, t)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(t)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, t)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Message replaced on LVQ + sender = f.s.createSender("lvq;{create:always,delete:always,node:{x-declare:{arguments:{qpid.last_value_queue:1}}}}"); + m = Message("a"); + m.getProperties()["qpid.LVQ_key"] = "foo"; + sender.send(m); + f.s.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "createq(lvq)"); + BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); + BOOST_CHECK_EQUAL(h.size(), i); + + m = Message("b"); + m.getProperties()["qpid.LVQ_key"] = "foo"; + sender.send(m); + f.s.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "routing(b)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); + BOOST_CHECK_EQUAL(h.size(), i); + + receiver = f.s.createReceiver("lvq"); + BOOST_CHECK_EQUAL(receiver.fetch(Duration::SECOND).getContent(), "b"); + f.s.acknowledge(true); + BOOST_CHECK_EQUAL(h.at(i++), "consume(lvq, 1)"); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(lvq, 1, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 1, b)"); + BOOST_CHECK_EQUAL(h.size(), i); +} + +QPID_AUTO_TEST_CASE(testFanout) { + DummyClusterFixture f; + vector<string>& h = f.dc->history; + + Receiver r1 = f.s.createReceiver("amq.fanout;{link:{name:r1}}"); + Receiver r2 = f.s.createReceiver("amq.fanout;{link:{name:r2}}"); + Sender sender = f.s.createSender("amq.fanout"); + r1.setCapacity(0); // Don't receive immediately. + r2.setCapacity(0); + h.clear(); + size_t i = 0; + + // Send message + sender.send(Message("a")); + f.s.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); + BOOST_CHECK_EQUAL(0u, h.at(i++).find("enqueue(amq.fanout_r")); + BOOST_CHECK_EQUAL(0u, h.at(i++).find("enqueue(amq.fanout_r")); + BOOST_CHECK(h.at(i-1) != h.at(i-2)); + BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Receive messages + Message m1 = r1.fetch(Duration::SECOND); + f.s.acknowledge(m1, true); + Message m2 = r2.fetch(Duration::SECOND); + f.s.acknowledge(m2, true); + + BOOST_CHECK_EQUAL(h.at(i++), "acquire(amq.fanout_r1, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(amq.fanout_r1, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(amq.fanout_r2, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(amq.fanout_r2, 1, a)"); + BOOST_CHECK_EQUAL(h.size(), i); +} + +QPID_AUTO_TEST_CASE(testRingQueue) { + DummyClusterFixture f; + vector<string>& h = f.dc->history; + + // FIXME aconway 2010-10-15: QPID-2908 ring queue address string is not working, + // so we can't do this: + // Sender sender = f.s.createSender("ring;{create:always,node:{x-declare:{arguments:{qpid.max_size:3,qpid.policy_type:ring}}}}"); + // Must use old API to declare ring queue: + qpid::client::Connection c; + f.open(c); + qpid::client::Session s = c.newSession(); + qpid::framing::FieldTable args; + args.setInt("qpid.max_size", 3); + args.setString("qpid.policy_type","ring"); + s.queueDeclare(qpid::client::arg::queue="ring", qpid::client::arg::arguments=args); + c.close(); + Sender sender = f.s.createSender("ring"); + + size_t i = 0; + // Send message + sender.send(Message("a")); + sender.send(Message("b")); + sender.send(Message("c")); + sender.send(Message("d")); + f.s.sync(); + + BOOST_CHECK_EQUAL(h.at(i++), "createq(ring)"); + + BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); + + BOOST_CHECK_EQUAL(h.at(i++), "routing(b)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); + + BOOST_CHECK_EQUAL(h.at(i++), "routing(c)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, c)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(c)"); + + BOOST_CHECK_EQUAL(h.at(i++), "routing(d)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, d)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(d)"); + + Receiver receiver = f.s.createReceiver("ring"); + BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "b"); + BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "c"); + BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "d"); + f.s.acknowledge(true); + + BOOST_CHECK_EQUAL(h.at(i++), "consume(ring, 1)"); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 2, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 3, c)"); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 4, d)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 2, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 3, c)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 4, d)"); + + BOOST_CHECK_EQUAL(h.size(), i); +} + +QPID_AUTO_TEST_CASE(testTransactions) { + DummyClusterFixture f; + vector<string>& h = f.dc->history; + Session ts = f.c.createTransactionalSession(); + Sender sender = ts.createSender("q;{create:always,delete:always}"); + size_t i = 0; + BOOST_CHECK_EQUAL(h.at(i++), "createq(q)"); // Note: at() does bounds checking. + BOOST_CHECK_EQUAL(h.size(), i); + + sender.send(Message("a")); + sender.send(Message("b")); + ts.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); + BOOST_CHECK_EQUAL(h.at(i++), "routing(b)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); + BOOST_CHECK_EQUAL(h.size(), i); // Not replicated till commit + ts.commit(); + // FIXME aconway 2010-10-18: As things stand the cluster is not + // compatible with transactions + // - enqueues occur after routing is complete + // - no call to Cluster::enqueue, should be in Queue::process? + // - no transaction context associated with messages in the Cluster interface. + // - no call to Cluster::accept in Queue::dequeueCommitted + // BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, a)"); + // BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, b)"); + BOOST_CHECK_EQUAL(h.size(), i); + + + Receiver receiver = ts.createReceiver("q"); + BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "a"); + BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "b"); + ts.acknowledge(); + ts.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "consume(q, 1)"); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 2, b)"); + BOOST_CHECK_EQUAL(h.size(), i); + ts.commit(); + ts.sync(); + // BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)"); + // BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 2, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, b)"); + BOOST_CHECK_EQUAL(h.size(), i); +} + +QPID_AUTO_TEST_SUITE_END() + +}} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index cd569e901c..bd75432d57 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -25,7 +25,7 @@ QMF_GEN=$(top_srcdir)/managementgen/qmf-gen abs_builddir=@abs_builddir@ abs_srcdir=@abs_srcdir@ -extra_libs = +extra_libs = lib_client = $(abs_builddir)/../libqpidclient.la lib_messaging = $(abs_builddir)/../libqpidmessaging.la lib_common = $(abs_builddir)/../libqpidcommon.la @@ -36,7 +36,7 @@ lib_qmf2 = $(abs_builddir)/../libqmf2.la # # Initialize variables that are incremented with += -# +# check_PROGRAMS= check_LTLIBRARIES= TESTS= @@ -61,9 +61,9 @@ tmodule_LTLIBRARIES= # Unit test program # # Unit tests are built as a single program to reduce valgrind overhead -# when running the tests. If you want to build a subset of the tests do +# when running the tests. If you want to build a subset of the tests do # rm -f unit_test; make unit_test unit_test_OBJECTS="unit_test.o SelectedTest.o" -# +# TESTS+=unit_test check_PROGRAMS+=unit_test @@ -124,7 +124,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ Variant.cpp \ Address.cpp \ ClientMessage.cpp \ - Qmf2.cpp + Qmf2.cpp \ + BrokerClusterCalls.cpp if HAVE_XML unit_test_SOURCES+= XmlClientSessionTest.cpp @@ -188,32 +189,32 @@ qpid_send_LDADD = $(lib_messaging) qpidtest_PROGRAMS+=qpid-perftest qpid_perftest_SOURCES=qpid-perftest.cpp test_tools.h TestOptions.h ConnectionOptions.h qpid_perftest_INCLUDES=$(PUBLIC_INCLUDES) -qpid_perftest_LDADD=$(lib_client) +qpid_perftest_LDADD=$(lib_client) qpidtest_PROGRAMS+=qpid-txtest qpid_txtest_INCLUDES=$(PUBLIC_INCLUDES) qpid_txtest_SOURCES=qpid-txtest.cpp TestOptions.h ConnectionOptions.h -qpid_txtest_LDADD=$(lib_client) +qpid_txtest_LDADD=$(lib_client) qpidtest_PROGRAMS+=qpid-latency-test qpid_latency_test_INCLUDES=$(PUBLIC_INCLUDES) qpid_latency_test_SOURCES=qpid-latency-test.cpp TestOptions.h ConnectionOptions.h -qpid_latency_test_LDADD=$(lib_client) +qpid_latency_test_LDADD=$(lib_client) qpidtest_PROGRAMS+=qpid-client-test qpid_client_test_INCLUDES=$(PUBLIC_INCLUDES) qpid_client_test_SOURCES=qpid-client-test.cpp TestOptions.h ConnectionOptions.h -qpid_client_test_LDADD=$(lib_client) +qpid_client_test_LDADD=$(lib_client) qpidtest_PROGRAMS+=qpid-topic-listener qpid_topic_listener_INCLUDES=$(PUBLIC_INCLUDES) qpid_topic_listener_SOURCES=qpid-topic-listener.cpp TestOptions.h ConnectionOptions.h -qpid_topic_listener_LDADD=$(lib_client) +qpid_topic_listener_LDADD=$(lib_client) qpidtest_PROGRAMS+=qpid-topic-publisher qpid_topic_publisher_INCLUDES=$(PUBLIC_INCLUDES) qpid_topic_publisher_SOURCES=qpid-topic-publisher.cpp TestOptions.h ConnectionOptions.h -qpid_topic_publisher_LDADD=$(lib_client) +qpid_topic_publisher_LDADD=$(lib_client) qpidtest_PROGRAMS+=qpid-ping qpid_ping_INCLUDES=$(PUBLIC_INCLUDES) @@ -232,17 +233,17 @@ echotest_LDADD=$(lib_client) check_PROGRAMS+=publish publish_INCLUDES=$(PUBLIC_INCLUDES) publish_SOURCES=publish.cpp TestOptions.h ConnectionOptions.h -publish_LDADD=$(lib_client) +publish_LDADD=$(lib_client) check_PROGRAMS+=consume consume_INCLUDES=$(PUBLIC_INCLUDES) consume_SOURCES=consume.cpp TestOptions.h ConnectionOptions.h -consume_LDADD=$(lib_client) +consume_LDADD=$(lib_client) check_PROGRAMS+=header_test header_test_INCLUDES=$(PUBLIC_INCLUDES) header_test_SOURCES=header_test.cpp TestOptions.h ConnectionOptions.h -header_test_LDADD=$(lib_client) +header_test_LDADD=$(lib_client) check_PROGRAMS+=failover_soak failover_soak_INCLUDES=$(PUBLIC_INCLUDES) @@ -251,28 +252,28 @@ failover_soak_LDADD=$(lib_client) $(lib_broker) check_PROGRAMS+=declare_queues declare_queues_INCLUDES=$(PUBLIC_INCLUDES) -declare_queues_SOURCES=declare_queues.cpp -declare_queues_LDADD=$(lib_client) +declare_queues_SOURCES=declare_queues.cpp +declare_queues_LDADD=$(lib_client) check_PROGRAMS+=replaying_sender replaying_sender_INCLUDES=$(PUBLIC_INCLUDES) -replaying_sender_SOURCES=replaying_sender.cpp -replaying_sender_LDADD=$(lib_client) +replaying_sender_SOURCES=replaying_sender.cpp +replaying_sender_LDADD=$(lib_client) check_PROGRAMS+=resuming_receiver resuming_receiver_INCLUDES=$(PUBLIC_INCLUDES) -resuming_receiver_SOURCES=resuming_receiver.cpp -resuming_receiver_LDADD=$(lib_client) +resuming_receiver_SOURCES=resuming_receiver.cpp +resuming_receiver_LDADD=$(lib_client) check_PROGRAMS+=txshift txshift_INCLUDES=$(PUBLIC_INCLUDES) txshift_SOURCES=txshift.cpp TestOptions.h ConnectionOptions.h -txshift_LDADD=$(lib_client) +txshift_LDADD=$(lib_client) check_PROGRAMS+=txjob txjob_INCLUDES=$(PUBLIC_INCLUDES) txjob_SOURCES=txjob.cpp TestOptions.h ConnectionOptions.h -txjob_LDADD=$(lib_client) +txjob_LDADD=$(lib_client) check_PROGRAMS+=PollerTest PollerTest_SOURCES=PollerTest.cpp @@ -295,7 +296,7 @@ TESTS_ENVIRONMENT = \ VALGRIND=$(VALGRIND) \ LIBTOOL="$(LIBTOOL)" \ QPID_DATA_DIR= \ - $(srcdir)/run_test + $(srcdir)/run_test system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_federation_sys_tests \ @@ -342,7 +343,8 @@ EXTRA_DIST += \ start_broker.ps1 \ stop_broker.ps1 \ topictest.ps1 \ - run_queue_flow_limit_tests + run_queue_flow_limit_tests \ + run_cluster_authentication_test check_LTLIBRARIES += libdlclose_noop.la libdlclose_noop_la_LDFLAGS = -module -rpath $(abs_builddir) @@ -366,6 +368,7 @@ EXTRA_DIST+= \ run_failover_soak \ reliable_replication_test \ federated_cluster_test_with_node_failure \ + run_cluster_authentication_soak \ sasl_test_setup.sh check-long: diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 16d7fb0b78..3e96adc8bf 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -401,17 +401,25 @@ class Cluster: _cluster_count = 0 - def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False): + def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True, + cluster2=False, show_cmd=False): + if cluster2: + cluster_name = "--cluster2-name" + cluster_lib = BrokerTest.cluster2_lib + else: + cluster_name = "--cluster-name" + cluster_lib = BrokerTest.cluster_lib self.test = test self._brokers=[] self.name = "cluster%d" % Cluster._cluster_count Cluster._cluster_count += 1 # Use unique cluster name self.args = copy(args) - self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ] + self.args += [ cluster_name, + "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ] self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"] - assert BrokerTest.cluster_lib, "Cannot locate cluster plug-in" - self.args += [ "--load-module", BrokerTest.cluster_lib ] + assert cluster_lib, "Cannot locate cluster plug-in" + self.args += [ "--load-module", cluster_lib ] self.start_n(count, expect=expect, wait=wait, show_cmd=show_cmd) def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0, show_cmd=False): @@ -440,6 +448,7 @@ class BrokerTest(TestCase): # Environment settings. qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC")) cluster_lib = os.getenv("CLUSTER_LIB") + cluster2_lib = os.getenv("CLUSTER2_LIB") xml_lib = os.getenv("XML_LIB") qpid_config_exec = os.getenv("QPID_CONFIG_EXEC") qpid_route_exec = os.getenv("QPID_ROUTE_EXEC") @@ -490,9 +499,11 @@ class BrokerTest(TestCase): raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e)) return b - def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False): + def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True, cluster2=False, + show_cmd=False): """Create and return a cluster ready for use""" - cluster = Cluster(self, count, args, expect=expect, wait=wait, show_cmd=show_cmd) + cluster = Cluster(self, count, args, expect=expect, wait=wait, cluster2=cluster2, + show_cmd=show_cmd) return cluster def browse(self, session, queue, timeout=0): diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk index 7d17dd7bde..bf5064e74c 100644 --- a/qpid/cpp/src/tests/cluster.mk +++ b/qpid/cpp/src/tests/cluster.mk @@ -92,7 +92,7 @@ cluster_test_SOURCES = \ PartialFailure.cpp \ ClusterFailover.cpp -cluster_test_LDADD=$(lib_client) $(lib_broker) ../cluster.la -lboost_unit_test_framework +cluster_test_LDADD=$(lib_client) $(lib_broker) $(lib_messaging) ../cluster.la -lboost_unit_test_framework qpidtest_SCRIPTS += run_cluster_tests brokertest.py cluster_tests.py cluster_test_logs.py run_long_cluster_tests long_cluster_tests.py testlib.py cluster_tests.fail qpidtest_SCRIPTS += $(CLUSTER_TEST_SCRIPTS_LIST) diff --git a/qpid/cpp/src/tests/cluster2_tests.py b/qpid/cpp/src/tests/cluster2_tests.py new file mode 100755 index 0000000000..f17dfe2961 --- /dev/null +++ b/qpid/cpp/src/tests/cluster2_tests.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os, signal, sys, time, imp, re, subprocess +from qpid import datatypes, messaging +from brokertest import * +from qpid.harness import Skipped +from qpid.messaging import Message +from qpid.messaging.exceptions import * +from threading import Thread, Lock +from logging import getLogger +from itertools import chain + +log = getLogger("qpid.cluster_tests") + +class Cluster2Tests(BrokerTest): + """Tests for new cluster code.""" + + def verify_content(self, content, receiver): + for c in content: self.assertEqual(c, receiver.fetch(1).content) + self.assertRaises(Empty, receiver.fetch, 0) + + def test_message_enqueue(self): + """Test basic replication of enqueued messages. + Verify that fanout messages are replicated correctly. + """ + + cluster = self.cluster(2, cluster2=True) + + sn0 = cluster[0].connect().session() + r0p = sn0.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}"); + r0q = sn0.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}"); + s0 = sn0.sender("amq.fanout"); + + sn1 = cluster[1].connect().session() + r1p = sn1.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}"); + r1q = sn1.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}"); + + + # Send messages on member 0 + content = ["a","b","c"] + for m in content: s0.send(Message(m)) + + # Browse on both members. + self.verify_content(content, r0p) + self.verify_content(content, r0q) + self.verify_content(content, r1p) + self.verify_content(content, r1q) + + sn1.connection.close() + sn0.connection.close() + + def test_message_dequeue(self): + """Test replication of dequeues""" + cluster = self.cluster(2, cluster2=True) + sn0 = cluster[0].connect().session() + s0 = sn0.sender("q;{create:always,delete:always}") + r0 = sn0.receiver("q") + sn1 = cluster[1].connect().session() + r1 = sn1.receiver("q;{create:always}") # Not yet replicating wiring. + + content = ["a","b","c"] + for m in content: s0.send(Message(m)) + # Verify enqueued on cluster[1] + self.verify_content(content, sn1.receiver("q;{mode:browse}")) + # Dequeue on cluster[0] + self.assertEqual(r0.fetch(1).content, "a") + sn0.acknowledge(sync=True) + + # Verify dequeued on cluster[0] and cluster[1] + self.verify_content(["b", "c"], sn0.receiver("q;{mode:browse}")) + self.verify_content(["b", "c"], sn1.receiver("q;{mode:browse}")) + + def test_wiring(self): + """Test replication of wiring""" + cluster = self.cluster(2, cluster2=True) + sn0 = cluster[0].connect().session() + sn1 = cluster[1].connect().session() + + # Test creation of queue, exchange, binding + r0ex = sn0.receiver("ex; {create:always, delete:always, node:{type:topic, x-declare:{name:ex, type:'direct'}}}") + r0q = sn0.receiver("q; {create:always, delete:always, link:{x-bindings:[{exchange:ex,queue:q,key:k}]}}") + + # Verify objects were created on member 1 + r1 = sn1.receiver("q") # Queue + s1ex = sn1.sender("ex/k; {node:{type:topic}}"); # Exchange + s1ex.send(Message("x")) # Binding with key k + self.assertEqual(r1.fetch(1).content, "x") + + # Test destroy. + r0q.close() # Delete queue q + self.assertRaises(NotFound, sn1.receiver, "q") + r0ex.close() # Delete exchange ex + # FIXME aconway 2010-11-05: this does not raise NotFound, sn1 is caching "ex" + # self.assertRaises(NotFound, sn1.sender, "ex") + # Have to create a new session. + self.assertRaises(NotFound, cluster[1].connect().session().receiver, "ex") + + # FIXME aconway 2010-10-29: test unbind, may need to use old API. diff --git a/qpid/cpp/src/tests/run_cluster_tests b/qpid/cpp/src/tests/run_cluster_tests index e136d3810a..3971a39144 100755 --- a/qpid/cpp/src/tests/run_cluster_tests +++ b/qpid/cpp/src/tests/run_cluster_tests @@ -33,5 +33,5 @@ mkdir -p $OUTDIR CLUSTER_TESTS_IGNORE=${CLUSTER_TESTS_IGNORE:--i cluster_tests.StoreTests.* -I $srcdir/cluster_tests.fail} CLUSTER_TESTS=${CLUSTER_TESTS:-$*} -with_ais_group $QPID_PYTHON_TEST -DOUTDIR=$OUTDIR -m cluster_tests $CLUSTER_TESTS_IGNORE $CLUSTER_TESTS || exit 1 +with_ais_group $QPID_PYTHON_TEST -DOUTDIR=$OUTDIR -m cluster_tests -m cluster2_tests $CLUSTER_TESTS_IGNORE $CLUSTER_TESTS || exit 1 rm -rf $OUTDIR diff --git a/qpid/cpp/src/tests/test_env.sh.in b/qpid/cpp/src/tests/test_env.sh.in index 842d7729cb..67b4df14bf 100644 --- a/qpid/cpp/src/tests/test_env.sh.in +++ b/qpid/cpp/src/tests/test_env.sh.in @@ -63,6 +63,7 @@ export TEST_STORE_LIB=$testmoduledir/test_store.so exportmodule() { test -f $moduledir/$2 && eval "export $1=$moduledir/$2"; } exportmodule ACL_LIB acl.so exportmodule CLUSTER_LIB cluster.so +exportmodule CLUSTER2_LIB cluster2.so exportmodule REPLICATING_LISTENER_LIB replicating_listener.so exportmodule REPLICATION_EXCHANGE_LIB replication_exchange.so exportmodule SSLCONNECTOR_LIB sslconnector.so diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index 899625f5ec..b782a6d606 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -326,4 +326,62 @@ </class> + + <!-- TODO aconway 2010-10-20: Experimental classes for new cluster. --> + + <!-- Message delivery and disposition --> + <class name="cluster-message" code="0x82"> + <!-- FIXME aconway 2010-10-19: create message in fragments --> + <control name="routing" code="0x1"> + <field name="routing-id" type="uint32"/> + <field name="message" type="str32"/> + </control> + + <control name="enqueue" code="0x2"> + <field name="routing-id" type="uint32"/> + <field name="queue" type="queue.name"/> + </control> + + <control name="routed" code="0x3"> + <field name="routing-id" type="uint32"/> + </control> + + <control name="dequeue" code="0x4"> + <field name="queue" type="queue.name"/> + <field name="position" type="uint32"/> + </control> + </class> + + <class name="cluster-wiring" code="0x83"> + <control name="create-queue" code="0x1"> + <field name="data" type="str32"/> + </control> + + <control name="destroy-queue" code="0x2"> + <field name="name" type="queue.name"/> + </control> + + <control name="create-exchange" code="0x3"> + <field name="data" type="str32"/> + </control> + + <control name="destroy-exchange" code="0x4"> + <field name="name" type="exchange.name"/> + </control> + + <control name="bind" code="0x5"> + <field name="queue" type="queue.name"/> + <field name="exchange" type="exchange.name"/> + <field name="binding-key" type="str8"/> + <field name="arguments" type="map"/> + </control> + + <control name="unbind" code="0x6"> + <field name="queue" type="queue.name"/> + <field name="exchange" type="exchange.name"/> + <field name="binding-key" type="str8"/> + <field name="arguments" type="map"/> + </control> + + </class> </amqp> |