diff options
author | Alan Conway <aconway@apache.org> | 2010-10-27 18:01:27 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-10-27 18:01:27 +0000 |
commit | 326dddd0d0d48401d14ca93044b3fc0e35ad87d9 (patch) | |
tree | 019a45480d8cdf832f62d7176b7a10a5d0971535 /cpp/src | |
parent | aae11121cfcf891b2365241141f9ab9cb47d3024 (diff) | |
download | qpid-python-326dddd0d0d48401d14ca93044b3fc0e35ad87d9.tar.gz |
Revert experimental cluster code, too close to 0.8 release.
Reverts revisions:
r1023966 "Introduce broker::Cluster interface."
r1024275 "Fix compile error: outline set/getCluster fucntions on Broker."
r1027210 "New cluster: core framework and initial implementation of enqueue logic."
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1028055 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
32 files changed, 63 insertions, 1752 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index ea1672e1e1..d8e604c41a 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -504,7 +504,6 @@ libqpidbroker_la_SOURCES = \ qpid/broker/Broker.cpp \ qpid/broker/Broker.h \ qpid/broker/BrokerImportExport.h \ - qpid/broker/Cluster.h \ qpid/broker/Connection.cpp \ qpid/broker/Connection.h \ qpid/broker/ConnectionFactory.cpp \ @@ -564,7 +563,6 @@ libqpidbroker_la_SOURCES = \ qpid/broker/MessageStoreModule.h \ qpid/broker/NameGenerator.cpp \ qpid/broker/NameGenerator.h \ - qpid/broker/NullCluster.h \ qpid/broker/NullMessageStore.cpp \ qpid/broker/NullMessageStore.h \ qpid/broker/OwnershipToken.h \ diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index 7cd4a18c9e..2a648e968c 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -35,6 +35,7 @@ endif if HAVE_LIBCPG dmodule_LTLIBRARIES += cluster.la + cluster_la_SOURCES = \ $(CMAN_SOURCES) \ qpid/cluster/Cluster.cpp \ @@ -98,27 +99,6 @@ cluster_la_LIBADD= -lcpg $(libcman) libqpidbroker.la libqpidclient.la cluster_la_CXXFLAGS = $(AM_CXXFLAGS) -fno-strict-aliasing cluster_la_LDFLAGS = $(PLUGINLDFLAGS) -# Experimental new cluster plugin -dmodule_LTLIBRARIES += cluster2.la -cluster2_la_LIBADD = -lcpg libqpidbroker.la -cluster2_la_LDFLAGS = $(PLUGINLDFLAGS) -cluster2_la_SOURCES = \ - qpid/cluster/BrokerHandler.cpp \ - qpid/cluster/BrokerHandler.h \ - qpid/cluster/Cluster2Plugin.cpp \ - qpid/cluster/Core.cpp \ - qpid/cluster/Core.h \ - qpid/cluster/Cpg.cpp \ - qpid/cluster/Cpg.h \ - qpid/cluster/EventHandler.cpp \ - qpid/cluster/EventHandler.h \ - qpid/cluster/MessageHandler.cpp \ - qpid/cluster/MessageHandler.h \ - qpid/cluster/MessageId.cpp \ - qpid/cluster/MessageId.h \ - qpid/cluster/PollerDispatch.cpp \ - qpid/cluster/PollerDispatch.h - # The watchdog plugin and helper executable dmodule_LTLIBRARIES += watchdog.la watchdog_la_SOURCES = qpid/cluster/WatchDogPlugin.cpp diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index c93949e33f..33364e48df 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -24,7 +24,6 @@ #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/HeadersExchange.h" #include "qpid/broker/MessageStoreModule.h" -#include "qpid/broker/NullCluster.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/RecoveryManagerImpl.h" #include "qpid/broker/SaslAuthenticator.h" @@ -147,7 +146,6 @@ Broker::Broker(const Broker::Options& conf) : conf.qmf2Support) : 0), store(new NullMessageStore), - cluster(new NullCluster), acl(0), dataDir(conf.noDataDir ? std::string() : conf.dataDir), queues(this), @@ -512,9 +510,5 @@ void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) { const std::string Broker::TCP_TRANSPORT("tcp"); -void Broker::setCluster(std::auto_ptr<Cluster> c) { cluster = c; } - -Cluster& Broker::getCluster() { return *cluster; } - }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index d589b15f19..6636b5d912 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -70,7 +70,6 @@ namespace broker { class ExpiryPolicy; class Message; -class Cluster; static const uint16_t DEFAULT_PORT=5672; @@ -154,7 +153,6 @@ public: std::auto_ptr<management::ManagementAgent> managementAgent; ProtocolFactoryMap protocolFactories; std::auto_ptr<MessageStore> store; - std::auto_ptr<Cluster> cluster; AclModule* acl; DataDir dataDir; @@ -275,9 +273,6 @@ public: void setClusterUpdatee(bool set) { clusterUpdatee = set; } bool isClusterUpdatee() const { return clusterUpdatee; } - QPID_BROKER_EXTERN void setCluster(std::auto_ptr<Cluster> c); - QPID_BROKER_EXTERN Cluster& getCluster(); - management::ManagementAgent* getManagementAgent() { return managementAgent.get(); } ConnectionCounter& getConnectionCounter() {return connectionCounter;} diff --git a/cpp/src/qpid/broker/Cluster.h b/cpp/src/qpid/broker/Cluster.h deleted file mode 100644 index 4dabd98eab..0000000000 --- a/cpp/src/qpid/broker/Cluster.h +++ /dev/null @@ -1,110 +0,0 @@ -#ifndef QPID_BROKER_CLUSTER_H -#define QPID_BROKER_CLUSTER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <boost/intrusive_ptr.hpp> - -namespace qpid { - -namespace framing { -class FieldTable; -} - -namespace broker { - -class Message; -struct QueuedMessage; -class Queue; -class Exchange; - -/** - * NOTE: this is part of an experimental cluster implementation that is not - * yet fully functional. The original cluster implementation remains in place. - * See ../cluster/new-cluster-design.txt - * - * Interface for cluster implementations. Functions on this interface are - * called at relevant points in the Broker's processing. - */ -class Cluster -{ - public: - virtual ~Cluster() {} - - // Messages - - /** In Exchange::route, before the message is enqueued. */ - virtual void routing(const boost::intrusive_ptr<Message>&) = 0; - - /** A message is delivered to a queue. - * Called before actually pushing the message to the queue. - *@return If true the message should be pushed to the queue now. - * otherwise the cluster code will push the message when it is replicated. - */ - virtual bool enqueue(Queue& queue, const boost::intrusive_ptr<Message>&) = 0; - - /** In Exchange::route, after all enqueues for the message. */ - virtual void routed(const boost::intrusive_ptr<Message>&) = 0; - - /** A message is acquired by a local consumer, it is unavailable to replicas. */ - virtual void acquire(const QueuedMessage&) = 0; - /** A locally-acquired message is accepted, it is removed from all replicas. */ - virtual void accept(const QueuedMessage&) = 0; - - /** A locally-acquired message is rejected, and may be re-routed. */ - virtual void reject(const QueuedMessage&) = 0; - /** Re-routing (if any) is complete for a rejected message. */ - virtual void rejected(const QueuedMessage&) = 0; - - /** A locally-acquired message is released by the consumer and re-queued. */ - virtual void release(const QueuedMessage&) = 0; - - /** A message is removed from the queue. It could have been - * accepted, rejected or dropped for other reasons e.g. expired or - * replaced on an LVQ. - */ - virtual void drop(const QueuedMessage&) = 0; - - // Consumers - - /** A new consumer subscribes to a queue. */ - virtual void consume(const Queue&, size_t consumerCount) = 0; - /** A consumer cancels its subscription to a queue */ - virtual void cancel(const Queue&, size_t consumerCount) = 0; - - // Wiring - - /** A queue is created */ - virtual void create(const Queue&) = 0; - /** A queue is destroyed */ - virtual void destroy(const Queue&) = 0; - /** An exchange is created */ - virtual void create(const Exchange&) = 0; - /** An exchange is destroyed */ - virtual void destroy(const Exchange&) = 0; - /** A binding is created */ - virtual void bind(const Queue&, const Exchange&, const std::string& key, const framing::FieldTable& args) = 0; -}; - -}} // namespace qpid::broker - -#endif /*!QPID_BROKER_CLUSTER_H*/ diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 315b1af2a8..9443eb6ea5 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -112,7 +112,7 @@ void DeliveryRecord::complete() { bool DeliveryRecord::accept(TransactionContext* ctxt) { if (acquired && !ended) { - queue->accept(ctxt, msg); + queue->dequeue(ctxt, msg); setEnded(); QPID_LOG(debug, "Accepted " << id); } @@ -130,8 +130,19 @@ void DeliveryRecord::committed() const{ } void DeliveryRecord::reject() -{ - queue->reject(msg); +{ + Exchange::shared_ptr alternate = queue->getAlternateExchange(); + if (alternate) { + DeliverableMessage delivery(msg.payload); + alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders()); + QPID_LOG(info, "Routed rejected message from " << queue->getName() << " to " + << alternate->getName()); + } else { + //just drop it + QPID_LOG(info, "Dropping rejected message from " << queue->getName()); + } + + dequeue(); } uint32_t DeliveryRecord::getCredit() const @@ -145,7 +156,7 @@ void DeliveryRecord::acquire(DeliveryIds& results) { results.push_back(id); if (!acceptExpected) { if (ended) { QPID_LOG(error, "Can't dequeue ended message"); } - else { queue->accept(0, msg); setEnded(); } + else { queue->dequeue(0, msg); setEnded(); } } } else { QPID_LOG(info, "Message already acquired " << id.getValue()); diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index b499171418..d143471559 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -23,7 +23,6 @@ #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/FedOps.h" #include "qpid/broker/Broker.h" -#include "qpid/broker/Cluster.h" #include "qpid/management/ManagementAgent.h" #include "qpid/broker/Queue.h" #include "qpid/log/Statement.h" @@ -71,23 +70,10 @@ Exchange::PreRoute::~PreRoute(){ } } -// Bracket a scope with calls to Cluster::routing and Cluster::routed -struct ScopedClusterRouting { - Broker* broker; - boost::intrusive_ptr<Message> message; - ScopedClusterRouting(Broker* b, boost::intrusive_ptr<Message> m) - : broker(b), message(m) { - if (broker) broker->getCluster().routing(message); - } - ~ScopedClusterRouting() { - if (broker) broker->getCluster().routed(message); - } -}; - void Exchange::doRoute(Deliverable& msg, ConstBindingList b) { - ScopedClusterRouting scr(broker, &msg.getMessage()); int count = 0; + if (b.get()) { // Block the content release if the message is transient AND there is more than one binding if (!msg.getMessage().isPersistent() && b->size() > 1) { diff --git a/cpp/src/qpid/broker/NullCluster.h b/cpp/src/qpid/broker/NullCluster.h deleted file mode 100644 index 0e11ceef27..0000000000 --- a/cpp/src/qpid/broker/NullCluster.h +++ /dev/null @@ -1,66 +0,0 @@ -#ifndef QPID_BROKER_NULLCLUSTER_H -#define QPID_BROKER_NULLCLUSTER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <qpid/broker/Cluster.h> - -namespace qpid { -namespace broker { - -/** - * No-op implementation of Cluster interface, installed by broker when - * no cluster plug-in is present or clustering is disabled. - */ -class NullCluster : public Cluster -{ - public: - - // Messages - - virtual void routing(const boost::intrusive_ptr<Message>&) {} - virtual bool enqueue(Queue&, const boost::intrusive_ptr<Message>&) { return true; } - virtual void routed(const boost::intrusive_ptr<Message>&) {} - virtual void acquire(const QueuedMessage&) {} - virtual void accept(const QueuedMessage&) {} - virtual void reject(const QueuedMessage&) {} - virtual void rejected(const QueuedMessage&) {} - virtual void release(const QueuedMessage&) {} - virtual void drop(const QueuedMessage&) {} - - // Consumers - - virtual void consume(const Queue&, size_t) {} - virtual void cancel(const Queue&, size_t) {} - - // Wiring - - virtual void create(const Queue&) {} - virtual void destroy(const Queue&) {} - virtual void create(const Exchange&) {} - virtual void destroy(const Exchange&) {} - virtual void bind(const Queue&, const Exchange&, const std::string&, const framing::FieldTable&) {} -}; - -}} // namespace qpid::broker - -#endif diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index c530e9cd51..e59857462c 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -20,7 +20,6 @@ */ #include "qpid/broker/Broker.h" -#include "qpid/broker/Cluster.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueEvents.h" #include "qpid/broker/Exchange.h" @@ -146,10 +145,6 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){ // Check for deferred delivery in a cluster. if (broker && broker->deferDelivery(name, msg)) return; - // Same thing but for the new cluster interface. - if (broker && !broker->getCluster().enqueue(*this, msg)) - return; - if (msg->isImmediate() && getConsumerCount() == 0) { if (alternateExchange) { DeliverableMessage deliverable(msg); @@ -169,6 +164,7 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){ }else { push(msg); } + mgntEnqStats(msg); QPID_LOG(debug, "Message " << msg << " enqueued on " << name); } } @@ -202,6 +198,7 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ void Queue::process(boost::intrusive_ptr<Message>& msg){ push(msg); + mgntEnqStats(msg); if (mgmtObject != 0){ mgmtObject->inc_msgTxnEnqueues (); mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); @@ -227,7 +224,6 @@ void Queue::requeue(const QueuedMessage& msg){ } } } - if (broker) broker->getCluster().release(msg); copy.notify(); } @@ -240,22 +236,8 @@ void Queue::clearLVQIndex(const QueuedMessage& msg){ } } -// Inform the cluster of an acquired message on exit from a function -// that does the acquiring. The calling function should set qmsg -// to the acquired message. -struct ClusterAcquireOnExit { - Broker* broker; - QueuedMessage qmsg; - ClusterAcquireOnExit(Broker* b) : broker(b) {} - ~ClusterAcquireOnExit() { - if (broker && qmsg.queue) broker->getCluster().acquire(qmsg); - } -}; - bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) { - ClusterAcquireOnExit willAcquire(broker); - Mutex::ScopedLock locker(messageLock); assertClusterSafe(); QPID_LOG(debug, "Attempting to acquire message at " << position); @@ -266,18 +248,16 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess if (lastValueQueue) { clearLVQIndex(*i); } - QPID_LOG(debug, "Acquired message at " << i->position << " from " << name); - willAcquire.qmsg = *i; + QPID_LOG(debug, + "Acquired message at " << i->position << " from " << name); messages.erase(i); return true; - } + } QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position"); return false; } bool Queue::acquire(const QueuedMessage& msg) { - ClusterAcquireOnExit acquire(broker); - Mutex::ScopedLock locker(messageLock); assertClusterSafe(); @@ -285,17 +265,16 @@ bool Queue::acquire(const QueuedMessage& msg) { Messages::iterator i = findAt(msg.position); if ((i != messages.end() && i->position == msg.position) && // note that in some cases payload not be set (!lastValueQueue || - (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0 - ) { + (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0 + ) { clearLVQIndex(msg); QPID_LOG(debug, "Match found, acquire succeeded: " << i->position << " == " << msg.position); - acquire.qmsg = *i; messages.erase(i); return true; - } + } QPID_LOG(debug, "Acquire failed for " << msg.position); return false; @@ -335,8 +314,6 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) { while (true) { - ClusterAcquireOnExit willAcquire(broker); // Outside the lock - Mutex::ScopedLock locker(messageLock); if (messages.empty()) { QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); @@ -353,7 +330,6 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ if (c->filter(msg.payload)) { if (c->accept(msg.payload)) { m = msg; - willAcquire.qmsg = msg; popMsg(msg); return CONSUMED; } else { @@ -475,51 +451,40 @@ QueuedMessage Queue::find(SequenceNumber pos) const { return QueuedMessage(); } -void Queue::consume(Consumer::shared_ptr c, bool requestExclusive) { +void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ assertClusterSafe(); - size_t consumers; - { - Mutex::ScopedLock locker(consumerLock); - if(exclusive) { + Mutex::ScopedLock locker(consumerLock); + if(exclusive) { + throw ResourceLockedException( + QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); + } else if(requestExclusive) { + if(consumerCount) { throw ResourceLockedException( - QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); - } else if(requestExclusive) { - if(consumerCount) { - throw ResourceLockedException( - QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); - } else { - exclusive = c->getSession(); - } + QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); + } else { + exclusive = c->getSession(); } - consumers = ++consumerCount; - if (mgmtObject != 0) - mgmtObject->inc_consumerCount (); } - if (broker) broker->getCluster().consume(*this, consumers); + consumerCount++; + if (mgmtObject != 0) + mgmtObject->inc_consumerCount (); } void Queue::cancel(Consumer::shared_ptr c){ removeListener(c); - size_t consumers; - { - Mutex::ScopedLock locker(consumerLock); - consumers = --consumerCount; - if(exclusive) exclusive = 0; - if (mgmtObject != 0) - mgmtObject->dec_consumerCount (); - } - if (broker) broker->getCluster().cancel(*this, consumers); + Mutex::ScopedLock locker(consumerLock); + consumerCount--; + if(exclusive) exclusive = 0; + if (mgmtObject != 0) + mgmtObject->dec_consumerCount (); } QueuedMessage Queue::get(){ - ClusterAcquireOnExit acquire(broker); // Outside lock - Mutex::ScopedLock locker(messageLock); QueuedMessage msg(this); if(!messages.empty()){ msg = getFront(); - acquire.qmsg = msg; popMsg(msg); } return msg; @@ -644,12 +609,10 @@ void Queue::popMsg(QueuedMessage& qmsg) void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ assertClusterSafe(); - if (!isRecovery) mgntEnqStats(msg); - QueuedMessage qm; QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); - qm = QueuedMessage(this, msg, ++sequence); + QueuedMessage qm(this, msg, ++sequence); if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); LVQ::iterator i; @@ -666,14 +629,12 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this); if (!old) old = i->second; i->second->setReplacementMessage(msg,this); - // FIXME aconway 2010-10-15: it is incorrect to use qm.position below - // should be using the position of the message being replaced. if (isRecovery) { //can't issue new requests for the store until //recovery is complete pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position)); } else { - Mutex::ScopedUnlock u(messageLock); + Mutex::ScopedUnlock u(messageLock); dequeue(0, QueuedMessage(qm.queue, old, qm.position)); } } @@ -831,48 +792,19 @@ void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg) if (policy.get()) policy->enqueueAborted(msg); } -void Queue::accept(TransactionContext* ctxt, const QueuedMessage& msg) { - if (broker) broker->getCluster().accept(msg); - dequeue(ctxt, msg); -} - -struct ScopedClusterReject { - Broker* broker; - const QueuedMessage& qmsg; - ScopedClusterReject(Broker* b, const QueuedMessage& m) : broker(b), qmsg(m) { - if (broker) broker->getCluster().reject(qmsg); - } - ~ScopedClusterReject() { - if (broker) broker->getCluster().rejected(qmsg); - } -}; - -void Queue::reject(const QueuedMessage &msg) { - ScopedClusterReject scr(broker, msg); - Exchange::shared_ptr alternate = getAlternateExchange(); - if (alternate) { - DeliverableMessage delivery(msg.payload); - alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders()); - QPID_LOG(info, "Routed rejected message from " << getName() << " to " - << alternate->getName()); - } else { - //just drop it - QPID_LOG(info, "Dropping rejected message from " << getName()); - } - dequeue(0, msg); -} - // return true if store exists, bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { ScopedUse u(barrier); if (!u.acquired) return false; + { Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return false; - if (!ctxt) dequeued(msg); + if (!ctxt) { + dequeued(msg); + } } - if (!ctxt && broker) broker->getCluster().drop(msg); // Outside lock // This check prevents messages which have been forced persistent on one queue from dequeuing // from another on which no forcing has taken place and thus causing a store error. bool fp = msg.payload->isForcedPersistent(); @@ -889,7 +821,6 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) void Queue::dequeueCommitted(const QueuedMessage& msg) { - if (broker) broker->getCluster().drop(msg); // Outside lock Mutex::ScopedLock locker(messageLock); dequeued(msg); if (mgmtObject != 0) { @@ -915,8 +846,6 @@ void Queue::popAndDequeue() */ void Queue::dequeued(const QueuedMessage& msg) { - // Note: Cluster::drop does only local book-keeping, no multicast - // So OK to call here with lock held. if (policy.get()) policy->dequeued(msg); mgntDeqStats(msg.payload); if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) { @@ -932,7 +861,6 @@ void Queue::create(const FieldTable& _settings) store->create(*this, _settings); } configure(_settings); - if (broker) broker->getCluster().create(*this); } void Queue::configure(const FieldTable& _settings, bool recovering) @@ -1006,7 +934,6 @@ void Queue::destroy() store->destroy(*this); store = 0;//ensure we make no more calls to the store for this queue } - if (broker) broker->getCluster().destroy(*this); } void Queue::notifyDeleted() diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 572f3dc0e2..96c79d1b92 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -259,13 +259,6 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck = false); void enqueueAborted(boost::intrusive_ptr<Message> msg); - - /** Message acknowledged, dequeue it. */ - QPID_BROKER_EXTERN void accept(TransactionContext* ctxt, const QueuedMessage &msg); - - /** Message rejected, dequeue it and re-route to alternate exchange if necessary. */ - QPID_BROKER_EXTERN void reject(const QueuedMessage &msg); - /** * dequeue from store (only done once messages is acknowledged) */ diff --git a/cpp/src/qpid/broker/QueuedMessage.h b/cpp/src/qpid/broker/QueuedMessage.h index 8cf73bda52..35e48b11f3 100644 --- a/cpp/src/qpid/broker/QueuedMessage.h +++ b/cpp/src/qpid/broker/QueuedMessage.h @@ -34,9 +34,10 @@ struct QueuedMessage framing::SequenceNumber position; Queue* queue; - QueuedMessage(Queue* q=0) : position(0), queue(q) {} + QueuedMessage() : queue(0) {} QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) : payload(msg), position(sn), queue(q) {} + QueuedMessage(Queue* q) : queue(q) {} }; inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) { return a.position < b.position; } diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index f393879c16..c91cfba2f8 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -333,7 +333,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) parent->record(record); } if (acquire && !ackExpected) { - queue->accept(0, msg); + queue->dequeue(0, msg); } if (mgmtObject) { mgmtObject->inc_delivered(); } return true; @@ -347,6 +347,11 @@ bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>) bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) { assertClusterSafe(); + // FIXME aconway 2009-06-08: if we have byte & message credit but + // checkCredit fails because the message is to big, we should + // remain on queue's listener list for possible smaller messages + // in future. + // blocked = !(filter(msg) && checkCredit(msg)); return !blocked; } diff --git a/cpp/src/qpid/cluster/BrokerHandler.cpp b/cpp/src/qpid/cluster/BrokerHandler.cpp deleted file mode 100644 index f0b930a221..0000000000 --- a/cpp/src/qpid/cluster/BrokerHandler.cpp +++ /dev/null @@ -1,96 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "Core.h" -#include "BrokerHandler.h" -#include "qpid/framing/ClusterMessageRoutingBody.h" -#include "qpid/framing/ClusterMessageRoutedBody.h" -#include "qpid/framing/ClusterMessageEnqueueBody.h" -#include "qpid/sys/Thread.h" -#include "qpid/broker/QueuedMessage.h" -#include "qpid/broker/Queue.h" -#include "qpid/framing/Buffer.h" -#include "qpid/log/Statement.h" - -namespace qpid { -namespace cluster { - -using namespace framing; -using namespace broker; - -namespace { -// noReplicate means the current thread is handling a message -// received from the cluster so it should not be replciated. -QPID_TSS bool noReplicate = false; - -// Sequence number of the message currently being routed. -// 0 if we are not currently routing a message. -QPID_TSS SequenceNumber routeSeq = 0; -} - -BrokerHandler::ScopedSuppressReplication::ScopedSuppressReplication() { - assert(!noReplicate); - noReplicate = true; -} - -BrokerHandler::ScopedSuppressReplication::~ScopedSuppressReplication() { - assert(noReplicate); - noReplicate = false; -} - -BrokerHandler::BrokerHandler(Core& c) : core(c) {} - -SequenceNumber BrokerHandler::nextSequenceNumber() { - SequenceNumber s = ++sequence; - if (!s) s = ++sequence; // Avoid 0 on wrap-around. - return s; -} - -void BrokerHandler::routing(const boost::intrusive_ptr<Message>&) { } - -bool BrokerHandler::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& msg) -{ - if (noReplicate) return true; - if (!routeSeq) { // This is the first enqueue, so send the message - routeSeq = nextSequenceNumber(); - // FIXME aconway 2010-10-20: replicate message in fixed size buffers. - std::string data(msg->encodedSize(),char()); - framing::Buffer buf(&data[0], data.size()); - msg->encode(buf); - core.mcast(ClusterMessageRoutingBody(ProtocolVersion(), routeSeq, data)); - core.getRoutingMap().put(routeSeq, msg); - } - core.mcast(ClusterMessageEnqueueBody(ProtocolVersion(), routeSeq, queue.getName())); - // TODO aconway 2010-10-21: configable option for strict (wait - // for CPG deliver to do local deliver) vs. loose (local deliver - // immediately). - return false; -} - -void BrokerHandler::routed(const boost::intrusive_ptr<Message>&) { - if (routeSeq) { // we enqueued at least one message. - core.mcast(ClusterMessageRoutedBody(ProtocolVersion(), routeSeq)); - // Note: routingMap is cleaned up on CPG delivery in MessageHandler. - routeSeq = 0; - } -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/BrokerHandler.h b/cpp/src/qpid/cluster/BrokerHandler.h deleted file mode 100644 index 1a61d1fc11..0000000000 --- a/cpp/src/qpid/cluster/BrokerHandler.h +++ /dev/null @@ -1,86 +0,0 @@ -#ifndef QPID_CLUSTER_BROKERHANDLER_H -#define QPID_CLUSTER_BROKERHANDLER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/broker/Cluster.h" -#include "qpid/sys/AtomicValue.h" - -namespace qpid { -namespace cluster { -class Core; - -// TODO aconway 2010-10-19: experimental cluster code. - -/** - * Implements broker::Cluster interface, handles events in broker code. - */ -class BrokerHandler : public broker::Cluster -{ - public: - /** Suppress replication while in scope. - * Used to prevent re-replication of messages received from the cluster. - */ - struct ScopedSuppressReplication { - ScopedSuppressReplication(); - ~ScopedSuppressReplication(); - }; - - BrokerHandler(Core&); - - // FIXME aconway 2010-10-20: implement all points. - - // Messages - - void routing(const boost::intrusive_ptr<broker::Message>&); - bool enqueue(broker::Queue&, const boost::intrusive_ptr<broker::Message>&); - void routed(const boost::intrusive_ptr<broker::Message>&); - void acquire(const broker::QueuedMessage&) {} - void accept(const broker::QueuedMessage&) {} - void reject(const broker::QueuedMessage&) {} - void rejected(const broker::QueuedMessage&) {} - void release(const broker::QueuedMessage&) {} - void drop(const broker::QueuedMessage&) {} - - // Consumers - - void consume(const broker::Queue&, size_t) {} - void cancel(const broker::Queue&, size_t) {} - - // Wiring - - void create(const broker::Queue&) {} - void destroy(const broker::Queue&) {} - void create(const broker::Exchange&) {} - void destroy(const broker::Exchange&) {} - void bind(const broker::Queue&, const broker::Exchange&, - const std::string&, const framing::FieldTable&) {} - - private: - SequenceNumber nextSequenceNumber(); - - Core& core; - sys::AtomicValue<SequenceNumber> sequence; -}; -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_BROKERHANDLER_H*/ diff --git a/cpp/src/qpid/cluster/Cluster2Plugin.cpp b/cpp/src/qpid/cluster/Cluster2Plugin.cpp deleted file mode 100644 index 28b7dcec2e..0000000000 --- a/cpp/src/qpid/cluster/Cluster2Plugin.cpp +++ /dev/null @@ -1,65 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <qpid/Options.h> -#include <qpid/broker/Broker.h> -#include "Core.h" - -namespace qpid { -namespace cluster { -using broker::Broker; - -// TODO aconway 2010-10-19: experimental new cluster code. - -/** - * Plugin for the cluster. - */ -struct Cluster2Plugin : public Plugin { - struct Opts : public Options { - Core::Settings& settings; - Opts(Core::Settings& s) : Options("Cluster Options"), settings(s) { - addOptions() - ("cluster2-name", optValue(settings.name, "NAME"), "Name of cluster to join"); - // TODO aconway 2010-10-19: copy across other options from ClusterPlugin.h - } - }; - - Core::Settings settings; - Opts options; - Core* core; // Core deletes itself on shutdown. - - Cluster2Plugin() : options(settings), core(0) {} - - Options* getOptions() { return &options; } - - void earlyInitialize(Plugin::Target& target) { - if (settings.name.empty()) return; - Broker* broker = dynamic_cast<Broker*>(&target); - if (!broker) return; - core = new Core(settings, *broker); - } - - void initialize(Plugin::Target& target) { - Broker* broker = dynamic_cast<Broker*>(&target); - if (broker && core) core->initialize(); - } -}; - -static Cluster2Plugin instance; // Static initialization. - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Core.cpp b/cpp/src/qpid/cluster/Core.cpp deleted file mode 100644 index e4127fa443..0000000000 --- a/cpp/src/qpid/cluster/Core.cpp +++ /dev/null @@ -1,68 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "Core.h" -#include "EventHandler.h" -#include "BrokerHandler.h" -#include "qpid/broker/Broker.h" -#include "qpid/broker/SignalHandler.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/Buffer.h" -#include "qpid/log/Statement.h" -#include <sys/uio.h> // For iovec - -namespace qpid { -namespace cluster { - -Core::Core(const Settings& s, broker::Broker& b) : - broker(b), - eventHandler(new EventHandler(*this)) -{ - std::auto_ptr<BrokerHandler> bh(new BrokerHandler(*this)); - brokerHandler = bh.get(); - // BrokerHandler belongs to Broker - broker.setCluster(std::auto_ptr<broker::Cluster>(bh)); - // FIXME aconway 2010-10-20: ownership of BrokerHandler, shutdown issues. - eventHandler->getCpg().join(s.name); -} - -void Core::initialize() {} - -void Core::fatal() { - // FIXME aconway 2010-10-20: error handling - assert(0); - broker::SignalHandler::shutdown(); -} - -void Core::mcast(const framing::AMQBody& body) { - QPID_LOG(trace, "multicast: " << body); - // FIXME aconway 2010-10-20: use Multicaster, or bring in its features. - // here we multicast Frames rather than Events. - framing::AMQFrame f(body); - std::string data(f.encodedSize(), char()); - framing::Buffer buf(&data[0], data.size()); - f.encode(buf); - iovec iov = { buf.getPointer(), buf.getSize() }; - while (!eventHandler->getCpg().mcast(&iov, 1)) - ::usleep(1000); // FIXME aconway 2010-10-20: flow control -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Core.h b/cpp/src/qpid/cluster/Core.h deleted file mode 100644 index 9976c1c906..0000000000 --- a/cpp/src/qpid/cluster/Core.h +++ /dev/null @@ -1,95 +0,0 @@ -#ifndef QPID_CLUSTER_CORE_H -#define QPID_CLUSTER_CORE_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <string> -#include <memory> - -#include "Cpg.h" -#include "MessageId.h" -#include "LockedMap.h" -#include <qpid/broker/QueuedMessage.h> - -// TODO aconway 2010-10-19: experimental cluster code. - -namespace qpid { - -namespace framing{ -class AMQBody; -} - -namespace broker { -class Broker; -} - -namespace cluster { -class EventHandler; -class BrokerHandler; - -/** - * Cluster core state machine. - * Holds together the various objects that implement cluster behavior, - * and holds state that is shared by multiple components. - * - * Thread safe: called from broker connection threads and CPG dispatch threads. - */ -class Core -{ - public: - /** Configuration settings */ - struct Settings { - std::string name; - }; - - typedef LockedMap<SequenceNumber, boost::intrusive_ptr<broker::Message> > - SequenceMessageMap; - - /** Constructed during Plugin::earlyInitialize() */ - Core(const Settings&, broker::Broker&); - - /** Called during Plugin::initialize() */ - void initialize(); - - /** Shut down broker due to fatal error. Caller should log a critical message */ - void fatal(); - - /** Multicast an event */ - void mcast(const framing::AMQBody&); - - broker::Broker& getBroker() { return broker; } - EventHandler& getEventHandler() { return *eventHandler; } - BrokerHandler& getBrokerHandler() { return *brokerHandler; } - - /** Map of messages that are currently being routed. - * Used to pass messages being routed from BrokerHandler to MessageHandler - */ - SequenceMessageMap& getRoutingMap() { return routingMap; } - private: - broker::Broker& broker; - std::auto_ptr<EventHandler> eventHandler; // Handles CPG events. - BrokerHandler* brokerHandler; // Handles broker events. - SequenceMessageMap routingMap; -}; -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_CORE_H*/ diff --git a/cpp/src/qpid/cluster/EventHandler.cpp b/cpp/src/qpid/cluster/EventHandler.cpp deleted file mode 100644 index 95ae285b06..0000000000 --- a/cpp/src/qpid/cluster/EventHandler.cpp +++ /dev/null @@ -1,89 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "MessageHandler.h" -#include "EventHandler.h" -#include "Core.h" -#include "types.h" -#include "qpid/framing/Buffer.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/AllInvoker.h" -#include "qpid/broker/Broker.h" -#include "qpid/log/Statement.h" - -namespace qpid { -namespace cluster { - -EventHandler::EventHandler(Core& c) : - core(c), - cpg(*this), // FIXME aconway 2010-10-20: belongs on Core. - dispatcher(cpg, core.getBroker().getPoller(), boost::bind(&Core::fatal, &core)), - self(cpg.self()), - messageHandler(new MessageHandler(*this)) -{ - dispatcher.start(); // FIXME aconway 2010-10-20: later in initialization? -} - -EventHandler::~EventHandler() {} - -// Deliver CPG message. -void EventHandler::deliver( - cpg_handle_t /*handle*/, - const cpg_name* /*group*/, - uint32_t nodeid, - uint32_t pid, - void* msg, - int msg_len) -{ - sender = MemberId(nodeid, pid); - framing::Buffer buf(static_cast<char*>(msg), msg_len); - framing::AMQFrame frame; - while (buf.available()) { - frame.decode(buf); - assert(frame.getBody()); - QPID_LOG(trace, "cluster deliver: " << *frame.getBody()); - try { - invoke(*frame.getBody()); - } - catch (const std::exception& e) { - // Note: exceptions are assumed to be survivable, - // fatal errors should log a message and call Core::fatal. - QPID_LOG(error, e.what()); - } - } -} - -void EventHandler::invoke(const framing::AMQBody& body) { - if (framing::invoke(*messageHandler, body).wasHandled()) return; -} - -// CPG config-change callback. -void EventHandler::configChange ( - cpg_handle_t /*handle*/, - const cpg_name */*group*/, - const cpg_address */*members*/, int /*nMembers*/, - const cpg_address */*left*/, int /*nLeft*/, - const cpg_address */*joined*/, int /*nJoined*/) -{ - // FIXME aconway 2010-10-20: TODO -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/EventHandler.h b/cpp/src/qpid/cluster/EventHandler.h deleted file mode 100644 index 5645c3980b..0000000000 --- a/cpp/src/qpid/cluster/EventHandler.h +++ /dev/null @@ -1,85 +0,0 @@ -#ifndef QPID_CLUSTER_EVENTHANDLER_H -#define QPID_CLUSTER_EVENTHANDLER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -// TODO aconway 2010-10-19: experimental cluster code. - -#include "types.h" -#include "Cpg.h" -#include "PollerDispatch.h" - -namespace qpid { - -namespace framing { -class AMQBody; -} - -namespace cluster { -class Core; -class MessageHandler; - -/** - * Dispatch events received from CPG. - * Thread unsafe: only called in CPG deliver thread context. - */ -class EventHandler : public Cpg::Handler -{ - public: - EventHandler(Core&); - ~EventHandler(); - - void deliver( // CPG deliver callback. - cpg_handle_t /*handle*/, - const struct cpg_name *group, - uint32_t /*nodeid*/, - uint32_t /*pid*/, - void* /*msg*/, - int /*msg_len*/); - - void configChange( // CPG config change callback. - cpg_handle_t /*handle*/, - const struct cpg_name */*group*/, - const struct cpg_address */*members*/, int /*nMembers*/, - const struct cpg_address */*left*/, int /*nLeft*/, - const struct cpg_address */*joined*/, int /*nJoined*/ - ); - - - MemberId getSender() { return sender; } - MemberId getSelf() { return self; } - Core& getCore() { return core; } - Cpg& getCpg() { return cpg; } - - private: - void invoke(const framing::AMQBody& body); - - Core& core; - Cpg cpg; - PollerDispatch dispatcher; - MemberId sender; // sender of current event. - MemberId self; - std::auto_ptr<MessageHandler> messageHandler; -}; -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_EVENTHANDLER_H*/ diff --git a/cpp/src/qpid/cluster/LockedMap.h b/cpp/src/qpid/cluster/LockedMap.h deleted file mode 100644 index 0736e7ac35..0000000000 --- a/cpp/src/qpid/cluster/LockedMap.h +++ /dev/null @@ -1,73 +0,0 @@ -#ifndef QPID_CLUSTER_LOCKEDMAP_H -#define QPID_CLUSTER_LOCKEDMAP_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/Mutex.h" -#include <map> - -namespace qpid { -namespace cluster { - -/** - * A reader-writer locked thread safe map. - */ -template <class Key, class Value> -class LockedMap -{ - public: - /** Get value associated with key, returns Value() if none. */ - Value get(const Key& key) const { - sys::RWlock::ScopedRlock r(lock); - typename Map::const_iterator i = map.find(key); - if (i == map.end()) return Value(); - else return i->second; - } - - /** Associate value with key, overwriting any previous value for key. */ - void put(const Key& key, const Value& value) { - sys::RWlock::ScopedWlock w(lock); - map[key] = value; - } - - /** Associate value with key if there is not already a value associated with key. - * Returns true if the value was added. - */ - bool add(const Key& key, const Value& value) { - sys::RWlock::ScopedWlock w(lock); - return map.insert(key, value).second; - } - - /** Erase the value associated with key if any. Return true if a value was erased. */ - bool erase(const Key& key) { - sys::RWlock::ScopedWlock w(lock); - return map.erase(key); - } - - private: - typedef std::map<Key, Value> Map; - Map map; - mutable sys::RWlock lock; -}; -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_LOCKEDMAP_H*/ diff --git a/cpp/src/qpid/cluster/MessageHandler.cpp b/cpp/src/qpid/cluster/MessageHandler.cpp deleted file mode 100644 index fbbdad38a3..0000000000 --- a/cpp/src/qpid/cluster/MessageHandler.cpp +++ /dev/null @@ -1,82 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "Core.h" -#include "MessageHandler.h" -#include "BrokerHandler.h" -#include "EventHandler.h" -#include "qpid/broker/Message.h" -#include "qpid/broker/Broker.h" -#include "qpid/broker/QueueRegistry.h" -#include "qpid/broker/Queue.h" -#include "qpid/framing/Buffer.h" -#include "qpid/sys/Thread.h" -#include <boost/shared_ptr.hpp> - -namespace qpid { -namespace cluster { -using namespace broker; - -MessageHandler::MessageHandler(EventHandler& e) : - broker(e.getCore().getBroker()), - eventHandler(e), - brokerHandler(e.getCore().getBrokerHandler()) -{} - -MessageHandler::~MessageHandler() {} - -MemberId MessageHandler::sender() { return eventHandler.getSender(); } -MemberId MessageHandler::self() { return eventHandler.getSelf(); } - -void MessageHandler::routing(uint64_t sequence, const std::string& message) { - MessageId id(sender(), sequence); - boost::intrusive_ptr<Message> msg; - if (sender() == self()) - msg = eventHandler.getCore().getRoutingMap().get(sequence); - if (!msg) { - framing::Buffer buf(const_cast<char*>(&message[0]), message.size()); - msg = new Message; - msg->decodeHeader(buf); - msg->decodeContent(buf); - } - routingMap[id] = msg; -} - -void MessageHandler::enqueue(uint64_t sequence, const std::string& q) { - MessageId id(sender(), sequence); - boost::shared_ptr<Queue> queue = broker.getQueues().find(q); - if (!queue) throw Exception(QPID_MSG("Cluster message for unknown queue " << q)); - boost::intrusive_ptr<Message> msg = routingMap[id]; - if (!msg) throw Exception(QPID_MSG("Unknown cluster message for queue " << q)); - BrokerHandler::ScopedSuppressReplication ssr; - // TODO aconway 2010-10-21: configable option for strict (wait - // for CPG deliver to do local deliver) vs. loose (local deliver - // immediately). - queue->deliver(msg); -} - -void MessageHandler::routed(uint64_t sequence) { - MessageId id(sender(), sequence); - routingMap.erase(id); - eventHandler.getCore().getRoutingMap().erase(sequence); -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/MessageHandler.h b/cpp/src/qpid/cluster/MessageHandler.h deleted file mode 100644 index 5c32bf474e..0000000000 --- a/cpp/src/qpid/cluster/MessageHandler.h +++ /dev/null @@ -1,70 +0,0 @@ -#ifndef QPID_CLUSTER_MESSAGEHANDLER_H -#define QPID_CLUSTER_MESSAGEHANDLER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -// TODO aconway 2010-10-19: experimental cluster code. - -#include "qpid/framing/AMQP_AllOperations.h" -#include "MessageId.h" -#include <boost/intrusive_ptr.hpp> -#include <map> - -namespace qpid { - -namespace broker { -class Message; -class Broker; -} - -namespace cluster { -class EventHandler; -class BrokerHandler; - -/** - * Handler for message disposition events. - */ -class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler -{ - public: - MessageHandler(EventHandler&); - ~MessageHandler(); - - void routing(uint64_t sequence, const std::string& message); - void enqueue(uint64_t sequence, const std::string& queue); - void routed(uint64_t sequence); - - private: - typedef std::map<MessageId, boost::intrusive_ptr<broker::Message> > RoutingMap; - - MemberId sender(); - MemberId self(); - - broker::Broker& broker; - EventHandler& eventHandler; - BrokerHandler& brokerHandler; - RoutingMap routingMap; - -}; -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_MESSAGEHANDLER_H*/ diff --git a/cpp/src/qpid/cluster/MessageId.cpp b/cpp/src/qpid/cluster/MessageId.cpp deleted file mode 100644 index fbd248ed69..0000000000 --- a/cpp/src/qpid/cluster/MessageId.cpp +++ /dev/null @@ -1,35 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "MessageId.h" -#include <ostream> - -namespace qpid { -namespace cluster { - -bool operator<(const MessageId& a, const MessageId& b) { - return a.member < b.member || ((a.member == b.member) && a.sequence < b.sequence); -} - -std::ostream& operator<<(std::ostream& o, const MessageId& m) { - return o << m.member << ":" << m.sequence; -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/MessageId.h b/cpp/src/qpid/cluster/MessageId.h deleted file mode 100644 index 16bf7ddd6d..0000000000 --- a/cpp/src/qpid/cluster/MessageId.h +++ /dev/null @@ -1,52 +0,0 @@ -#ifndef QPID_CLUSTER_MESSAGEID_H -#define QPID_CLUSTER_MESSAGEID_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "types.h" -#include <iosfwd> - -namespace qpid { -namespace cluster { - -// TODO aconway 2010-10-20: experimental new cluster code. - -/** Sequence number used in message identifiers */ -typedef uint64_t SequenceNumber; - -/** - * Message identifier - */ -struct MessageId { - MemberId member; /// Member that created the message - SequenceNumber sequence; /// Sequence number assiged by member. - MessageId(MemberId m=MemberId(), SequenceNumber s=0) : member(m), sequence(s) {} -}; - -bool operator<(const MessageId&, const MessageId&); - -std::ostream& operator<<(std::ostream&, const MessageId&); - - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_MESSAGEID_H*/ diff --git a/cpp/src/qpid/cluster/PollerDispatch.cpp b/cpp/src/qpid/cluster/PollerDispatch.cpp index 43c171efe8..b8d94b95a5 100644 --- a/cpp/src/qpid/cluster/PollerDispatch.cpp +++ b/cpp/src/qpid/cluster/PollerDispatch.cpp @@ -37,11 +37,9 @@ PollerDispatch::PollerDispatch(Cpg& c, boost::shared_ptr<sys::Poller> p, started(false) {} -PollerDispatch::~PollerDispatch() { stop(); } - -void PollerDispatch::stop() { - if (started) dispatchHandle.stopWatch(); - started = false; +PollerDispatch::~PollerDispatch() { + if (started) + dispatchHandle.stopWatch(); } void PollerDispatch::start() { @@ -56,7 +54,6 @@ void PollerDispatch::dispatch(sys::DispatchHandle& h) { h.rewatch(); } catch (const std::exception& e) { QPID_LOG(critical, "Error in cluster dispatch: " << e.what()); - stop(); onError(); } } diff --git a/cpp/src/qpid/cluster/PollerDispatch.h b/cpp/src/qpid/cluster/PollerDispatch.h index f16d5ece95..63801e0de9 100644 --- a/cpp/src/qpid/cluster/PollerDispatch.h +++ b/cpp/src/qpid/cluster/PollerDispatch.h @@ -41,7 +41,6 @@ class PollerDispatch { ~PollerDispatch(); void start(); - void stop(); private: // Poller callbacks diff --git a/cpp/src/tests/BrokerClusterCalls.cpp b/cpp/src/tests/BrokerClusterCalls.cpp deleted file mode 100644 index f659702387..0000000000 --- a/cpp/src/tests/BrokerClusterCalls.cpp +++ /dev/null @@ -1,435 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -///@file -// Tests using a dummy broker::Cluster implementation to verify the expected -// Cluster functions are called for various actions on the broker. -// - -#include "unit_test.h" -#include "test_tools.h" -#include "qpid/broker/Cluster.h" -#include "qpid/broker/Queue.h" -#include "qpid/client/Connection.h" -#include "qpid/client/Session.h" -#include "qpid/messaging/Connection.h" -#include "qpid/messaging/Session.h" -#include "qpid/messaging/Sender.h" -#include "qpid/messaging/Receiver.h" -#include "qpid/messaging/Message.h" -#include "qpid/messaging/Duration.h" -#include "BrokerFixture.h" -#include <boost/assign.hpp> -#include <boost/format.hpp> - -using namespace std; -using namespace boost; -using namespace boost::assign; -using namespace qpid::messaging; -using boost::format; -using boost::intrusive_ptr; - -namespace qpid { -namespace tests { - -class DummyCluster : public broker::Cluster -{ - private: - /** Flag used to ignore events other than enqueues while routing, - * e.g. acquires and accepts generated in a ring queue to replace an element.. - * In real impl would be a thread-local variable. - */ - bool isRouting; - - void recordQm(const string& op, const broker::QueuedMessage& qm) { - history += (format("%s(%s, %d, %s)") % op % qm.queue->getName() - % qm.position % qm.payload->getFrames().getContent()).str(); - } - void recordMsg(const string& op, broker::Queue& q, intrusive_ptr<broker::Message> msg) { - history += (format("%s(%s, %s)") % op % q.getName() % msg->getFrames().getContent()).str(); - } - void recordStr(const string& op, const string& name) { - history += (format("%s(%s)") % op % name).str(); - } - public: - // Messages - - virtual void routing(const boost::intrusive_ptr<broker::Message>& m) { - isRouting = true; - history += (format("routing(%s)") % m->getFrames().getContent()).str(); - } - - virtual bool enqueue(broker::Queue& q, const intrusive_ptr<broker::Message>&msg) { - recordMsg("enqueue", q, msg); - return true; - } - - virtual void routed(const boost::intrusive_ptr<broker::Message>& m) { - history += (format("routed(%s)") % m->getFrames().getContent()).str(); - isRouting = false; - } - virtual void acquire(const broker::QueuedMessage& qm) { - if (!isRouting) recordQm("acquire", qm); - } - virtual void accept(const broker::QueuedMessage& qm) { - if (!isRouting) recordQm("accept", qm); - } - virtual void reject(const broker::QueuedMessage& qm) { - if (!isRouting) recordQm("reject", qm); - } - virtual void rejected(const broker::QueuedMessage& qm) { - if (!isRouting) recordQm("rejected", qm); - } - virtual void release(const broker::QueuedMessage& qm) { - if (!isRouting) recordQm("release", qm); - } - virtual void drop(const broker::QueuedMessage& qm) { - if (!isRouting) recordQm("dequeue", qm); - } - - // Consumers - - virtual void consume(const broker::Queue& q, size_t n) { - history += (format("consume(%s, %d)") % q.getName() % n).str(); - } - virtual void cancel(const broker::Queue& q, size_t n) { - history += (format("cancel(%s, %d)") % q.getName() % n).str(); - } - - // Wiring - - virtual void create(const broker::Queue& q) { recordStr("createq", q.getName()); } - virtual void destroy(const broker::Queue& q) { recordStr("destroyq", q.getName()); } - virtual void create(const broker::Exchange& ex) { recordStr("createex", ex.getName()); } - virtual void destroy(const broker::Exchange& ex) { recordStr("destroyex", ex.getName()); } - virtual void bind(const broker::Queue& q, const broker::Exchange& ex, const std::string& key, const framing::FieldTable& /*args*/) { - history += (format("bind(%s, %s, %s)") % q.getName() % ex.getName() % key).str(); - } - vector<string> history; -}; - -QPID_AUTO_TEST_SUITE(BrokerClusterCallsTestSuite) - -// Broker fixture with DummyCluster set up and some new API client bits. -struct DummyClusterFixture: public BrokerFixture { - Connection c; - Session s; - DummyCluster*dc; - DummyClusterFixture() { - broker->setCluster(auto_ptr<broker::Cluster>(new DummyCluster)); - dc = &static_cast<DummyCluster&>(broker->getCluster()); - c = Connection("localhost:"+lexical_cast<string>(getPort())); - c.open(); - s = c.createSession(); - } - ~DummyClusterFixture() { - c.close(); - } -}; - -QPID_AUTO_TEST_CASE(testSimplePubSub) { - DummyClusterFixture f; - vector<string>& h = f.dc->history; - - // Queue creation - Sender sender = f.s.createSender("q;{create:always,delete:always}"); - size_t i = 0; - BOOST_CHECK_EQUAL(h.at(i++), "createq(q)"); // Note: at() does bounds checking. - BOOST_CHECK_EQUAL(h.size(), i); - - // Consumer - Receiver receiver = f.s.createReceiver("q"); - f.s.sync(); - BOOST_CHECK_EQUAL(h.at(i++), "consume(q, 1)"); - BOOST_CHECK_EQUAL(h.size(), i); - - // Send message - sender.send(Message("a")); - f.s.sync(); - BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); - // Don't check size here as it is uncertain whether acquire has happened yet. - - // Acquire message - Message m = receiver.fetch(Duration::SECOND); - BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 1, a)"); - BOOST_CHECK_EQUAL(h.size(), i); - - // Acknowledge message - f.s.acknowledge(true); - f.s.sync(); - BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)"); - BOOST_CHECK_EQUAL(h.size(), i); - - // Close a consumer - receiver.close(); - BOOST_CHECK_EQUAL(h.at(i++), "cancel(q, 0)"); - BOOST_CHECK_EQUAL(h.size(), i); - - // Destroy the queue - f.c.close(); - BOOST_CHECK_EQUAL(h.at(i++), "destroyq(q)"); - BOOST_CHECK_EQUAL(h.size(), i); -} - -QPID_AUTO_TEST_CASE(testReleaseReject) { - DummyClusterFixture f; - vector<string>& h = f.dc->history; - - Sender sender = f.s.createSender("q;{create:always,delete:always,node:{x-declare:{alternate-exchange:amq.fanout}}}"); - sender.send(Message("a")); - Receiver receiver = f.s.createReceiver("q"); - Receiver altReceiver = f.s.createReceiver("amq.fanout;{link:{name:altq}}"); - Message m = receiver.fetch(Duration::SECOND); - h.clear(); - - // Explicit release - f.s.release(m); - f.s.sync(); - size_t i = 0; - BOOST_CHECK_EQUAL(h.at(i++), "release(q, 1, a)"); - BOOST_CHECK_EQUAL(h.size(), i); - - // Implicit release on closing connection. - Connection c("localhost:"+lexical_cast<string>(f.getPort())); - c.open(); - Session s = c.createSession(); - Receiver r = s.createReceiver("q"); - m = r.fetch(Duration::SECOND); - h.clear(); - i = 0; - c.close(); - BOOST_CHECK_EQUAL(h.at(i++), "cancel(q, 1)"); - BOOST_CHECK_EQUAL(h.at(i++), "release(q, 1, a)"); - BOOST_CHECK_EQUAL(h.size(), i); - - // Reject message, goes to alternate exchange. - m = receiver.fetch(Duration::SECOND); - h.clear(); - i = 0; - f.s.reject(m); - BOOST_CHECK_EQUAL(h.at(i++), "reject(q, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); // Routing to alt exchange - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(amq.fanout_altq, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); - BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "rejected(q, 1, a)"); - BOOST_CHECK_EQUAL(h.size(), i); - m = altReceiver.fetch(Duration::SECOND); - BOOST_CHECK_EQUAL(m.getContent(), "a"); - - // Timed out message - h.clear(); - i = 0; - m = Message("t"); - m.setTtl(Duration(1)); // Timeout 1ms - sender.send(m); - usleep(2000); // Sleep 2ms - bool received = receiver.fetch(m, Duration::IMMEDIATE); - BOOST_CHECK(!received); // Timed out - BOOST_CHECK_EQUAL(h.at(i++), "routing(t)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, t)"); - BOOST_CHECK_EQUAL(h.at(i++), "routed(t)"); - BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, t)"); - BOOST_CHECK_EQUAL(h.size(), i); - - // Message replaced on LVQ - sender = f.s.createSender("lvq;{create:always,delete:always,node:{x-declare:{arguments:{qpid.last_value_queue:1}}}}"); - m = Message("a"); - m.getProperties()["qpid.LVQ_key"] = "foo"; - sender.send(m); - f.s.sync(); - BOOST_CHECK_EQUAL(h.at(i++), "createq(lvq)"); - BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); - BOOST_CHECK_EQUAL(h.size(), i); - - m = Message("b"); - m.getProperties()["qpid.LVQ_key"] = "foo"; - sender.send(m); - f.s.sync(); - BOOST_CHECK_EQUAL(h.at(i++), "routing(b)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, b)"); - BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); - BOOST_CHECK_EQUAL(h.size(), i); - - receiver = f.s.createReceiver("lvq"); - BOOST_CHECK_EQUAL(receiver.fetch(Duration::SECOND).getContent(), "b"); - f.s.acknowledge(true); - BOOST_CHECK_EQUAL(h.at(i++), "consume(lvq, 1)"); - BOOST_CHECK_EQUAL(h.at(i++), "acquire(lvq, 1, b)"); - BOOST_CHECK_EQUAL(h.at(i++), "accept(lvq, 1, b)"); - BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 1, b)"); - BOOST_CHECK_EQUAL(h.size(), i); -} - -QPID_AUTO_TEST_CASE(testFanout) { - DummyClusterFixture f; - vector<string>& h = f.dc->history; - - Receiver r1 = f.s.createReceiver("amq.fanout;{link:{name:r1}}"); - Receiver r2 = f.s.createReceiver("amq.fanout;{link:{name:r2}}"); - Sender sender = f.s.createSender("amq.fanout"); - r1.setCapacity(0); // Don't receive immediately. - r2.setCapacity(0); - h.clear(); - size_t i = 0; - - // Send message - sender.send(Message("a")); - f.s.sync(); - BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); - BOOST_CHECK_EQUAL(0u, h.at(i++).find("enqueue(amq.fanout_r")); - BOOST_CHECK_EQUAL(0u, h.at(i++).find("enqueue(amq.fanout_r")); - BOOST_CHECK(h.at(i-1) != h.at(i-2)); - BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); - BOOST_CHECK_EQUAL(h.size(), i); - - // Receive messages - Message m1 = r1.fetch(Duration::SECOND); - f.s.acknowledge(m1, true); - Message m2 = r2.fetch(Duration::SECOND); - f.s.acknowledge(m2, true); - - BOOST_CHECK_EQUAL(h.at(i++), "acquire(amq.fanout_r1, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "accept(amq.fanout_r1, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "dequeue(amq.fanout_r1, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "acquire(amq.fanout_r2, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "accept(amq.fanout_r2, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "dequeue(amq.fanout_r2, 1, a)"); - BOOST_CHECK_EQUAL(h.size(), i); -} - -QPID_AUTO_TEST_CASE(testRingQueue) { - DummyClusterFixture f; - vector<string>& h = f.dc->history; - - // FIXME aconway 2010-10-15: QPID-2908 ring queue address string is not working, - // so we can't do this: - // Sender sender = f.s.createSender("ring;{create:always,node:{x-declare:{arguments:{qpid.max_size:3,qpid.policy_type:ring}}}}"); - // Must use old API to declare ring queue: - qpid::client::Connection c; - f.open(c); - qpid::client::Session s = c.newSession(); - qpid::framing::FieldTable args; - args.setInt("qpid.max_size", 3); - args.setString("qpid.policy_type","ring"); - s.queueDeclare(qpid::client::arg::queue="ring", qpid::client::arg::arguments=args); - c.close(); - Sender sender = f.s.createSender("ring"); - - size_t i = 0; - // Send message - sender.send(Message("a")); - sender.send(Message("b")); - sender.send(Message("c")); - sender.send(Message("d")); - f.s.sync(); - - BOOST_CHECK_EQUAL(h.at(i++), "createq(ring)"); - - BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); - - BOOST_CHECK_EQUAL(h.at(i++), "routing(b)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, b)"); - BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); - - BOOST_CHECK_EQUAL(h.at(i++), "routing(c)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, c)"); - BOOST_CHECK_EQUAL(h.at(i++), "routed(c)"); - - BOOST_CHECK_EQUAL(h.at(i++), "routing(d)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, d)"); - BOOST_CHECK_EQUAL(h.at(i++), "routed(d)"); - - Receiver receiver = f.s.createReceiver("ring"); - BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "b"); - BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "c"); - BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "d"); - f.s.acknowledge(true); - - BOOST_CHECK_EQUAL(h.at(i++), "consume(ring, 1)"); - BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 2, b)"); - BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 3, c)"); - BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 4, d)"); - BOOST_CHECK_EQUAL(h.at(i++), "accept(ring, 2, b)"); - BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 2, b)"); - BOOST_CHECK_EQUAL(h.at(i++), "accept(ring, 3, c)"); - BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 3, c)"); - BOOST_CHECK_EQUAL(h.at(i++), "accept(ring, 4, d)"); - BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 4, d)"); - - BOOST_CHECK_EQUAL(h.size(), i); -} - -QPID_AUTO_TEST_CASE(testTransactions) { - DummyClusterFixture f; - vector<string>& h = f.dc->history; - Session ts = f.c.createTransactionalSession(); - Sender sender = ts.createSender("q;{create:always,delete:always}"); - size_t i = 0; - BOOST_CHECK_EQUAL(h.at(i++), "createq(q)"); // Note: at() does bounds checking. - BOOST_CHECK_EQUAL(h.size(), i); - - sender.send(Message("a")); - sender.send(Message("b")); - ts.sync(); - BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); - BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); - BOOST_CHECK_EQUAL(h.at(i++), "routing(b)"); - BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); - BOOST_CHECK_EQUAL(h.size(), i); // Not replicated till commit - ts.commit(); - // FIXME aconway 2010-10-18: As things stand the cluster is not - // compatible with transactions - // - enqueues occur after routing is complete - // - no call to Cluster::enqueue, should be in Queue::process? - // - no transaction context associated with messages in the Cluster interface. - // - no call to Cluster::accept in Queue::dequeueCommitted - // BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, a)"); - // BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, b)"); - BOOST_CHECK_EQUAL(h.size(), i); - - - Receiver receiver = ts.createReceiver("q"); - BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "a"); - BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "b"); - ts.acknowledge(); - ts.sync(); - BOOST_CHECK_EQUAL(h.at(i++), "consume(q, 1)"); - BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 2, b)"); - BOOST_CHECK_EQUAL(h.size(), i); - ts.commit(); - ts.sync(); - // BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)"); - // BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 2, b)"); - BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, b)"); - BOOST_CHECK_EQUAL(h.size(), i); -} - -QPID_AUTO_TEST_SUITE_END() - -}} // namespace qpid::tests - diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 2a7430b8ca..241ee0fbb1 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -123,8 +123,7 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ Variant.cpp \ Address.cpp \ ClientMessage.cpp \ - Qmf2.cpp \ - BrokerClusterCalls.cpp + Qmf2.cpp if HAVE_XML unit_test_SOURCES+= XmlClientSessionTest.cpp diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk index bb0f5d150b..da191e8682 100644 --- a/cpp/src/tests/cluster.mk +++ b/cpp/src/tests/cluster.mk @@ -77,7 +77,7 @@ cluster_test_SOURCES = \ PartialFailure.cpp \ ClusterFailover.cpp -cluster_test_LDADD=$(lib_client) $(lib_broker) $(lib_messaging) ../cluster.la -lboost_unit_test_framework +cluster_test_LDADD=$(lib_client) $(lib_broker) ../cluster.la -lboost_unit_test_framework qpidtest_SCRIPTS += run_cluster_tests cluster_tests.py run_long_cluster_tests long_cluster_tests.py testlib.py cluster_tests.fail diff --git a/cpp/src/tests/cluster2_tests.py b/cpp/src/tests/cluster2_tests.py deleted file mode 100755 index e3a19ae2a0..0000000000 --- a/cpp/src/tests/cluster2_tests.py +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/env python - -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import os, signal, sys, time, imp, re, subprocess -from qpid import datatypes, messaging -from qpid.brokertest import * -from qpid.harness import Skipped -from qpid.messaging import Message -from qpid.messaging.exceptions import Empty -from threading import Thread, Lock -from logging import getLogger -from itertools import chain - -log = getLogger("qpid.cluster_tests") - -class Cluster2Tests(BrokerTest): - """Tests for new cluster code.""" - - def test_message_enqueue(self): - """Test basic replication of enqueued messages.""" - - cluster = self.cluster(2, cluster2=True, args=["--log-enable=trace+:cluster"]) - - sn0 = cluster[0].connect().session() - r0p = sn0.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}"); - r0q = sn0.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}"); - s0 = sn0.sender("amq.fanout"); - - sn1 = cluster[1].connect().session() - r1p = sn1.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}"); - r1q = sn1.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}"); - - - # Send messages on member 0 - content = ["a","b","c"] - for m in content: s0.send(Message(m)) - - # Browse on both members. - def check(content, receiver): - for c in content: self.assertEqual(c, receiver.fetch(1).content) - self.assertRaises(Empty, receiver.fetch, 0) - - check(content, r0p) - check(content, r0q) - check(content, r1p) - check(content, r1q) - - sn1.connection.close() - sn0.connection.close() diff --git a/cpp/src/tests/run_cluster_tests b/cpp/src/tests/run_cluster_tests index 3971a39144..e136d3810a 100755 --- a/cpp/src/tests/run_cluster_tests +++ b/cpp/src/tests/run_cluster_tests @@ -33,5 +33,5 @@ mkdir -p $OUTDIR CLUSTER_TESTS_IGNORE=${CLUSTER_TESTS_IGNORE:--i cluster_tests.StoreTests.* -I $srcdir/cluster_tests.fail} CLUSTER_TESTS=${CLUSTER_TESTS:-$*} -with_ais_group $QPID_PYTHON_TEST -DOUTDIR=$OUTDIR -m cluster_tests -m cluster2_tests $CLUSTER_TESTS_IGNORE $CLUSTER_TESTS || exit 1 +with_ais_group $QPID_PYTHON_TEST -DOUTDIR=$OUTDIR -m cluster_tests $CLUSTER_TESTS_IGNORE $CLUSTER_TESTS || exit 1 rm -rf $OUTDIR diff --git a/cpp/src/tests/test_env.sh.in b/cpp/src/tests/test_env.sh.in index 96fe6b64f4..b5c3b0fa3d 100644 --- a/cpp/src/tests/test_env.sh.in +++ b/cpp/src/tests/test_env.sh.in @@ -63,7 +63,6 @@ export TEST_STORE_LIB=$testmoduledir/test_store.so exportmodule() { test -f $moduledir/$2 && eval "export $1=$moduledir/$2"; } exportmodule ACL_LIB acl.so exportmodule CLUSTER_LIB cluster.so -exportmodule CLUSTER2_LIB cluster2.so exportmodule REPLICATING_LISTENER_LIB replicating_listener.so exportmodule REPLICATION_EXCHANGE_LIB replication_exchange.so exportmodule SSLCONNECTOR_LIB sslconnector.so |