summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-10-27 18:01:27 +0000
committerAlan Conway <aconway@apache.org>2010-10-27 18:01:27 +0000
commit326dddd0d0d48401d14ca93044b3fc0e35ad87d9 (patch)
tree019a45480d8cdf832f62d7176b7a10a5d0971535 /cpp/src
parentaae11121cfcf891b2365241141f9ab9cb47d3024 (diff)
downloadqpid-python-326dddd0d0d48401d14ca93044b3fc0e35ad87d9.tar.gz
Revert experimental cluster code, too close to 0.8 release.
Reverts revisions: r1023966 "Introduce broker::Cluster interface." r1024275 "Fix compile error: outline set/getCluster fucntions on Broker." r1027210 "New cluster: core framework and initial implementation of enqueue logic." git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1028055 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am2
-rw-r--r--cpp/src/cluster.mk22
-rw-r--r--cpp/src/qpid/broker/Broker.cpp6
-rw-r--r--cpp/src/qpid/broker/Broker.h5
-rw-r--r--cpp/src/qpid/broker/Cluster.h110
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp19
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp16
-rw-r--r--cpp/src/qpid/broker/NullCluster.h66
-rw-r--r--cpp/src/qpid/broker/Queue.cpp137
-rw-r--r--cpp/src/qpid/broker/Queue.h7
-rw-r--r--cpp/src/qpid/broker/QueuedMessage.h3
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp7
-rw-r--r--cpp/src/qpid/cluster/BrokerHandler.cpp96
-rw-r--r--cpp/src/qpid/cluster/BrokerHandler.h86
-rw-r--r--cpp/src/qpid/cluster/Cluster2Plugin.cpp65
-rw-r--r--cpp/src/qpid/cluster/Core.cpp68
-rw-r--r--cpp/src/qpid/cluster/Core.h95
-rw-r--r--cpp/src/qpid/cluster/EventHandler.cpp89
-rw-r--r--cpp/src/qpid/cluster/EventHandler.h85
-rw-r--r--cpp/src/qpid/cluster/LockedMap.h73
-rw-r--r--cpp/src/qpid/cluster/MessageHandler.cpp82
-rw-r--r--cpp/src/qpid/cluster/MessageHandler.h70
-rw-r--r--cpp/src/qpid/cluster/MessageId.cpp35
-rw-r--r--cpp/src/qpid/cluster/MessageId.h52
-rw-r--r--cpp/src/qpid/cluster/PollerDispatch.cpp9
-rw-r--r--cpp/src/qpid/cluster/PollerDispatch.h1
-rw-r--r--cpp/src/tests/BrokerClusterCalls.cpp435
-rw-r--r--cpp/src/tests/Makefile.am3
-rw-r--r--cpp/src/tests/cluster.mk2
-rwxr-xr-xcpp/src/tests/cluster2_tests.py66
-rwxr-xr-xcpp/src/tests/run_cluster_tests2
-rw-r--r--cpp/src/tests/test_env.sh.in1
32 files changed, 63 insertions, 1752 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index ea1672e1e1..d8e604c41a 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -504,7 +504,6 @@ libqpidbroker_la_SOURCES = \
qpid/broker/Broker.cpp \
qpid/broker/Broker.h \
qpid/broker/BrokerImportExport.h \
- qpid/broker/Cluster.h \
qpid/broker/Connection.cpp \
qpid/broker/Connection.h \
qpid/broker/ConnectionFactory.cpp \
@@ -564,7 +563,6 @@ libqpidbroker_la_SOURCES = \
qpid/broker/MessageStoreModule.h \
qpid/broker/NameGenerator.cpp \
qpid/broker/NameGenerator.h \
- qpid/broker/NullCluster.h \
qpid/broker/NullMessageStore.cpp \
qpid/broker/NullMessageStore.h \
qpid/broker/OwnershipToken.h \
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk
index 7cd4a18c9e..2a648e968c 100644
--- a/cpp/src/cluster.mk
+++ b/cpp/src/cluster.mk
@@ -35,6 +35,7 @@ endif
if HAVE_LIBCPG
dmodule_LTLIBRARIES += cluster.la
+
cluster_la_SOURCES = \
$(CMAN_SOURCES) \
qpid/cluster/Cluster.cpp \
@@ -98,27 +99,6 @@ cluster_la_LIBADD= -lcpg $(libcman) libqpidbroker.la libqpidclient.la
cluster_la_CXXFLAGS = $(AM_CXXFLAGS) -fno-strict-aliasing
cluster_la_LDFLAGS = $(PLUGINLDFLAGS)
-# Experimental new cluster plugin
-dmodule_LTLIBRARIES += cluster2.la
-cluster2_la_LIBADD = -lcpg libqpidbroker.la
-cluster2_la_LDFLAGS = $(PLUGINLDFLAGS)
-cluster2_la_SOURCES = \
- qpid/cluster/BrokerHandler.cpp \
- qpid/cluster/BrokerHandler.h \
- qpid/cluster/Cluster2Plugin.cpp \
- qpid/cluster/Core.cpp \
- qpid/cluster/Core.h \
- qpid/cluster/Cpg.cpp \
- qpid/cluster/Cpg.h \
- qpid/cluster/EventHandler.cpp \
- qpid/cluster/EventHandler.h \
- qpid/cluster/MessageHandler.cpp \
- qpid/cluster/MessageHandler.h \
- qpid/cluster/MessageId.cpp \
- qpid/cluster/MessageId.h \
- qpid/cluster/PollerDispatch.cpp \
- qpid/cluster/PollerDispatch.h
-
# The watchdog plugin and helper executable
dmodule_LTLIBRARIES += watchdog.la
watchdog_la_SOURCES = qpid/cluster/WatchDogPlugin.cpp
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index c93949e33f..33364e48df 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -24,7 +24,6 @@
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
#include "qpid/broker/MessageStoreModule.h"
-#include "qpid/broker/NullCluster.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/RecoveryManagerImpl.h"
#include "qpid/broker/SaslAuthenticator.h"
@@ -147,7 +146,6 @@ Broker::Broker(const Broker::Options& conf) :
conf.qmf2Support)
: 0),
store(new NullMessageStore),
- cluster(new NullCluster),
acl(0),
dataDir(conf.noDataDir ? std::string() : conf.dataDir),
queues(this),
@@ -512,9 +510,5 @@ void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) {
const std::string Broker::TCP_TRANSPORT("tcp");
-void Broker::setCluster(std::auto_ptr<Cluster> c) { cluster = c; }
-
-Cluster& Broker::getCluster() { return *cluster; }
-
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index d589b15f19..6636b5d912 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -70,7 +70,6 @@ namespace broker {
class ExpiryPolicy;
class Message;
-class Cluster;
static const uint16_t DEFAULT_PORT=5672;
@@ -154,7 +153,6 @@ public:
std::auto_ptr<management::ManagementAgent> managementAgent;
ProtocolFactoryMap protocolFactories;
std::auto_ptr<MessageStore> store;
- std::auto_ptr<Cluster> cluster;
AclModule* acl;
DataDir dataDir;
@@ -275,9 +273,6 @@ public:
void setClusterUpdatee(bool set) { clusterUpdatee = set; }
bool isClusterUpdatee() const { return clusterUpdatee; }
- QPID_BROKER_EXTERN void setCluster(std::auto_ptr<Cluster> c);
- QPID_BROKER_EXTERN Cluster& getCluster();
-
management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
ConnectionCounter& getConnectionCounter() {return connectionCounter;}
diff --git a/cpp/src/qpid/broker/Cluster.h b/cpp/src/qpid/broker/Cluster.h
deleted file mode 100644
index 4dabd98eab..0000000000
--- a/cpp/src/qpid/broker/Cluster.h
+++ /dev/null
@@ -1,110 +0,0 @@
-#ifndef QPID_BROKER_CLUSTER_H
-#define QPID_BROKER_CLUSTER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <boost/intrusive_ptr.hpp>
-
-namespace qpid {
-
-namespace framing {
-class FieldTable;
-}
-
-namespace broker {
-
-class Message;
-struct QueuedMessage;
-class Queue;
-class Exchange;
-
-/**
- * NOTE: this is part of an experimental cluster implementation that is not
- * yet fully functional. The original cluster implementation remains in place.
- * See ../cluster/new-cluster-design.txt
- *
- * Interface for cluster implementations. Functions on this interface are
- * called at relevant points in the Broker's processing.
- */
-class Cluster
-{
- public:
- virtual ~Cluster() {}
-
- // Messages
-
- /** In Exchange::route, before the message is enqueued. */
- virtual void routing(const boost::intrusive_ptr<Message>&) = 0;
-
- /** A message is delivered to a queue.
- * Called before actually pushing the message to the queue.
- *@return If true the message should be pushed to the queue now.
- * otherwise the cluster code will push the message when it is replicated.
- */
- virtual bool enqueue(Queue& queue, const boost::intrusive_ptr<Message>&) = 0;
-
- /** In Exchange::route, after all enqueues for the message. */
- virtual void routed(const boost::intrusive_ptr<Message>&) = 0;
-
- /** A message is acquired by a local consumer, it is unavailable to replicas. */
- virtual void acquire(const QueuedMessage&) = 0;
- /** A locally-acquired message is accepted, it is removed from all replicas. */
- virtual void accept(const QueuedMessage&) = 0;
-
- /** A locally-acquired message is rejected, and may be re-routed. */
- virtual void reject(const QueuedMessage&) = 0;
- /** Re-routing (if any) is complete for a rejected message. */
- virtual void rejected(const QueuedMessage&) = 0;
-
- /** A locally-acquired message is released by the consumer and re-queued. */
- virtual void release(const QueuedMessage&) = 0;
-
- /** A message is removed from the queue. It could have been
- * accepted, rejected or dropped for other reasons e.g. expired or
- * replaced on an LVQ.
- */
- virtual void drop(const QueuedMessage&) = 0;
-
- // Consumers
-
- /** A new consumer subscribes to a queue. */
- virtual void consume(const Queue&, size_t consumerCount) = 0;
- /** A consumer cancels its subscription to a queue */
- virtual void cancel(const Queue&, size_t consumerCount) = 0;
-
- // Wiring
-
- /** A queue is created */
- virtual void create(const Queue&) = 0;
- /** A queue is destroyed */
- virtual void destroy(const Queue&) = 0;
- /** An exchange is created */
- virtual void create(const Exchange&) = 0;
- /** An exchange is destroyed */
- virtual void destroy(const Exchange&) = 0;
- /** A binding is created */
- virtual void bind(const Queue&, const Exchange&, const std::string& key, const framing::FieldTable& args) = 0;
-};
-
-}} // namespace qpid::broker
-
-#endif /*!QPID_BROKER_CLUSTER_H*/
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index 315b1af2a8..9443eb6ea5 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -112,7 +112,7 @@ void DeliveryRecord::complete() {
bool DeliveryRecord::accept(TransactionContext* ctxt) {
if (acquired && !ended) {
- queue->accept(ctxt, msg);
+ queue->dequeue(ctxt, msg);
setEnded();
QPID_LOG(debug, "Accepted " << id);
}
@@ -130,8 +130,19 @@ void DeliveryRecord::committed() const{
}
void DeliveryRecord::reject()
-{
- queue->reject(msg);
+{
+ Exchange::shared_ptr alternate = queue->getAlternateExchange();
+ if (alternate) {
+ DeliverableMessage delivery(msg.payload);
+ alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders());
+ QPID_LOG(info, "Routed rejected message from " << queue->getName() << " to "
+ << alternate->getName());
+ } else {
+ //just drop it
+ QPID_LOG(info, "Dropping rejected message from " << queue->getName());
+ }
+
+ dequeue();
}
uint32_t DeliveryRecord::getCredit() const
@@ -145,7 +156,7 @@ void DeliveryRecord::acquire(DeliveryIds& results) {
results.push_back(id);
if (!acceptExpected) {
if (ended) { QPID_LOG(error, "Can't dequeue ended message"); }
- else { queue->accept(0, msg); setEnded(); }
+ else { queue->dequeue(0, msg); setEnded(); }
}
} else {
QPID_LOG(info, "Message already acquired " << id.getValue());
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index b499171418..d143471559 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -23,7 +23,6 @@
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/FedOps.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/Cluster.h"
#include "qpid/management/ManagementAgent.h"
#include "qpid/broker/Queue.h"
#include "qpid/log/Statement.h"
@@ -71,23 +70,10 @@ Exchange::PreRoute::~PreRoute(){
}
}
-// Bracket a scope with calls to Cluster::routing and Cluster::routed
-struct ScopedClusterRouting {
- Broker* broker;
- boost::intrusive_ptr<Message> message;
- ScopedClusterRouting(Broker* b, boost::intrusive_ptr<Message> m)
- : broker(b), message(m) {
- if (broker) broker->getCluster().routing(message);
- }
- ~ScopedClusterRouting() {
- if (broker) broker->getCluster().routed(message);
- }
-};
-
void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
{
- ScopedClusterRouting scr(broker, &msg.getMessage());
int count = 0;
+
if (b.get()) {
// Block the content release if the message is transient AND there is more than one binding
if (!msg.getMessage().isPersistent() && b->size() > 1) {
diff --git a/cpp/src/qpid/broker/NullCluster.h b/cpp/src/qpid/broker/NullCluster.h
deleted file mode 100644
index 0e11ceef27..0000000000
--- a/cpp/src/qpid/broker/NullCluster.h
+++ /dev/null
@@ -1,66 +0,0 @@
-#ifndef QPID_BROKER_NULLCLUSTER_H
-#define QPID_BROKER_NULLCLUSTER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <qpid/broker/Cluster.h>
-
-namespace qpid {
-namespace broker {
-
-/**
- * No-op implementation of Cluster interface, installed by broker when
- * no cluster plug-in is present or clustering is disabled.
- */
-class NullCluster : public Cluster
-{
- public:
-
- // Messages
-
- virtual void routing(const boost::intrusive_ptr<Message>&) {}
- virtual bool enqueue(Queue&, const boost::intrusive_ptr<Message>&) { return true; }
- virtual void routed(const boost::intrusive_ptr<Message>&) {}
- virtual void acquire(const QueuedMessage&) {}
- virtual void accept(const QueuedMessage&) {}
- virtual void reject(const QueuedMessage&) {}
- virtual void rejected(const QueuedMessage&) {}
- virtual void release(const QueuedMessage&) {}
- virtual void drop(const QueuedMessage&) {}
-
- // Consumers
-
- virtual void consume(const Queue&, size_t) {}
- virtual void cancel(const Queue&, size_t) {}
-
- // Wiring
-
- virtual void create(const Queue&) {}
- virtual void destroy(const Queue&) {}
- virtual void create(const Exchange&) {}
- virtual void destroy(const Exchange&) {}
- virtual void bind(const Queue&, const Exchange&, const std::string&, const framing::FieldTable&) {}
-};
-
-}} // namespace qpid::broker
-
-#endif
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index c530e9cd51..e59857462c 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -20,7 +20,6 @@
*/
#include "qpid/broker/Broker.h"
-#include "qpid/broker/Cluster.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueEvents.h"
#include "qpid/broker/Exchange.h"
@@ -146,10 +145,6 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){
// Check for deferred delivery in a cluster.
if (broker && broker->deferDelivery(name, msg))
return;
- // Same thing but for the new cluster interface.
- if (broker && !broker->getCluster().enqueue(*this, msg))
- return;
-
if (msg->isImmediate() && getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg);
@@ -169,6 +164,7 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){
}else {
push(msg);
}
+ mgntEnqStats(msg);
QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
}
}
@@ -202,6 +198,7 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
void Queue::process(boost::intrusive_ptr<Message>& msg){
push(msg);
+ mgntEnqStats(msg);
if (mgmtObject != 0){
mgmtObject->inc_msgTxnEnqueues ();
mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
@@ -227,7 +224,6 @@ void Queue::requeue(const QueuedMessage& msg){
}
}
}
- if (broker) broker->getCluster().release(msg);
copy.notify();
}
@@ -240,22 +236,8 @@ void Queue::clearLVQIndex(const QueuedMessage& msg){
}
}
-// Inform the cluster of an acquired message on exit from a function
-// that does the acquiring. The calling function should set qmsg
-// to the acquired message.
-struct ClusterAcquireOnExit {
- Broker* broker;
- QueuedMessage qmsg;
- ClusterAcquireOnExit(Broker* b) : broker(b) {}
- ~ClusterAcquireOnExit() {
- if (broker && qmsg.queue) broker->getCluster().acquire(qmsg);
- }
-};
-
bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
{
- ClusterAcquireOnExit willAcquire(broker);
-
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
QPID_LOG(debug, "Attempting to acquire message at " << position);
@@ -266,18 +248,16 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess
if (lastValueQueue) {
clearLVQIndex(*i);
}
- QPID_LOG(debug, "Acquired message at " << i->position << " from " << name);
- willAcquire.qmsg = *i;
+ QPID_LOG(debug,
+ "Acquired message at " << i->position << " from " << name);
messages.erase(i);
return true;
- }
+ }
QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
return false;
}
bool Queue::acquire(const QueuedMessage& msg) {
- ClusterAcquireOnExit acquire(broker);
-
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
@@ -285,17 +265,16 @@ bool Queue::acquire(const QueuedMessage& msg) {
Messages::iterator i = findAt(msg.position);
if ((i != messages.end() && i->position == msg.position) && // note that in some cases payload not be set
(!lastValueQueue ||
- (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0
- ) {
+ (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0
+ ) {
clearLVQIndex(msg);
QPID_LOG(debug,
"Match found, acquire succeeded: " <<
i->position << " == " << msg.position);
- acquire.qmsg = *i;
messages.erase(i);
return true;
- }
+ }
QPID_LOG(debug, "Acquire failed for " << msg.position);
return false;
@@ -335,8 +314,6 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
while (true) {
- ClusterAcquireOnExit willAcquire(broker); // Outside the lock
-
Mutex::ScopedLock locker(messageLock);
if (messages.empty()) {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
@@ -353,7 +330,6 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
if (c->filter(msg.payload)) {
if (c->accept(msg.payload)) {
m = msg;
- willAcquire.qmsg = msg;
popMsg(msg);
return CONSUMED;
} else {
@@ -475,51 +451,40 @@ QueuedMessage Queue::find(SequenceNumber pos) const {
return QueuedMessage();
}
-void Queue::consume(Consumer::shared_ptr c, bool requestExclusive) {
+void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
assertClusterSafe();
- size_t consumers;
- {
- Mutex::ScopedLock locker(consumerLock);
- if(exclusive) {
+ Mutex::ScopedLock locker(consumerLock);
+ if(exclusive) {
+ throw ResourceLockedException(
+ QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
+ } else if(requestExclusive) {
+ if(consumerCount) {
throw ResourceLockedException(
- QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
- } else if(requestExclusive) {
- if(consumerCount) {
- throw ResourceLockedException(
- QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
- } else {
- exclusive = c->getSession();
- }
+ QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
+ } else {
+ exclusive = c->getSession();
}
- consumers = ++consumerCount;
- if (mgmtObject != 0)
- mgmtObject->inc_consumerCount ();
}
- if (broker) broker->getCluster().consume(*this, consumers);
+ consumerCount++;
+ if (mgmtObject != 0)
+ mgmtObject->inc_consumerCount ();
}
void Queue::cancel(Consumer::shared_ptr c){
removeListener(c);
- size_t consumers;
- {
- Mutex::ScopedLock locker(consumerLock);
- consumers = --consumerCount;
- if(exclusive) exclusive = 0;
- if (mgmtObject != 0)
- mgmtObject->dec_consumerCount ();
- }
- if (broker) broker->getCluster().cancel(*this, consumers);
+ Mutex::ScopedLock locker(consumerLock);
+ consumerCount--;
+ if(exclusive) exclusive = 0;
+ if (mgmtObject != 0)
+ mgmtObject->dec_consumerCount ();
}
QueuedMessage Queue::get(){
- ClusterAcquireOnExit acquire(broker); // Outside lock
-
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
if(!messages.empty()){
msg = getFront();
- acquire.qmsg = msg;
popMsg(msg);
}
return msg;
@@ -644,12 +609,10 @@ void Queue::popMsg(QueuedMessage& qmsg)
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
assertClusterSafe();
- if (!isRecovery) mgntEnqStats(msg);
- QueuedMessage qm;
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
- qm = QueuedMessage(this, msg, ++sequence);
+ QueuedMessage qm(this, msg, ++sequence);
if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
LVQ::iterator i;
@@ -666,14 +629,12 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this);
if (!old) old = i->second;
i->second->setReplacementMessage(msg,this);
- // FIXME aconway 2010-10-15: it is incorrect to use qm.position below
- // should be using the position of the message being replaced.
if (isRecovery) {
//can't issue new requests for the store until
//recovery is complete
pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position));
} else {
- Mutex::ScopedUnlock u(messageLock);
+ Mutex::ScopedUnlock u(messageLock);
dequeue(0, QueuedMessage(qm.queue, old, qm.position));
}
}
@@ -831,48 +792,19 @@ void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
if (policy.get()) policy->enqueueAborted(msg);
}
-void Queue::accept(TransactionContext* ctxt, const QueuedMessage& msg) {
- if (broker) broker->getCluster().accept(msg);
- dequeue(ctxt, msg);
-}
-
-struct ScopedClusterReject {
- Broker* broker;
- const QueuedMessage& qmsg;
- ScopedClusterReject(Broker* b, const QueuedMessage& m) : broker(b), qmsg(m) {
- if (broker) broker->getCluster().reject(qmsg);
- }
- ~ScopedClusterReject() {
- if (broker) broker->getCluster().rejected(qmsg);
- }
-};
-
-void Queue::reject(const QueuedMessage &msg) {
- ScopedClusterReject scr(broker, msg);
- Exchange::shared_ptr alternate = getAlternateExchange();
- if (alternate) {
- DeliverableMessage delivery(msg.payload);
- alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders());
- QPID_LOG(info, "Routed rejected message from " << getName() << " to "
- << alternate->getName());
- } else {
- //just drop it
- QPID_LOG(info, "Dropping rejected message from " << getName());
- }
- dequeue(0, msg);
-}
-
// return true if store exists,
bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
ScopedUse u(barrier);
if (!u.acquired) return false;
+
{
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return false;
- if (!ctxt) dequeued(msg);
+ if (!ctxt) {
+ dequeued(msg);
+ }
}
- if (!ctxt && broker) broker->getCluster().drop(msg); // Outside lock
// This check prevents messages which have been forced persistent on one queue from dequeuing
// from another on which no forcing has taken place and thus causing a store error.
bool fp = msg.payload->isForcedPersistent();
@@ -889,7 +821,6 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
void Queue::dequeueCommitted(const QueuedMessage& msg)
{
- if (broker) broker->getCluster().drop(msg); // Outside lock
Mutex::ScopedLock locker(messageLock);
dequeued(msg);
if (mgmtObject != 0) {
@@ -915,8 +846,6 @@ void Queue::popAndDequeue()
*/
void Queue::dequeued(const QueuedMessage& msg)
{
- // Note: Cluster::drop does only local book-keeping, no multicast
- // So OK to call here with lock held.
if (policy.get()) policy->dequeued(msg);
mgntDeqStats(msg.payload);
if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) {
@@ -932,7 +861,6 @@ void Queue::create(const FieldTable& _settings)
store->create(*this, _settings);
}
configure(_settings);
- if (broker) broker->getCluster().create(*this);
}
void Queue::configure(const FieldTable& _settings, bool recovering)
@@ -1006,7 +934,6 @@ void Queue::destroy()
store->destroy(*this);
store = 0;//ensure we make no more calls to the store for this queue
}
- if (broker) broker->getCluster().destroy(*this);
}
void Queue::notifyDeleted()
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 572f3dc0e2..96c79d1b92 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -259,13 +259,6 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck = false);
void enqueueAborted(boost::intrusive_ptr<Message> msg);
-
- /** Message acknowledged, dequeue it. */
- QPID_BROKER_EXTERN void accept(TransactionContext* ctxt, const QueuedMessage &msg);
-
- /** Message rejected, dequeue it and re-route to alternate exchange if necessary. */
- QPID_BROKER_EXTERN void reject(const QueuedMessage &msg);
-
/**
* dequeue from store (only done once messages is acknowledged)
*/
diff --git a/cpp/src/qpid/broker/QueuedMessage.h b/cpp/src/qpid/broker/QueuedMessage.h
index 8cf73bda52..35e48b11f3 100644
--- a/cpp/src/qpid/broker/QueuedMessage.h
+++ b/cpp/src/qpid/broker/QueuedMessage.h
@@ -34,9 +34,10 @@ struct QueuedMessage
framing::SequenceNumber position;
Queue* queue;
- QueuedMessage(Queue* q=0) : position(0), queue(q) {}
+ QueuedMessage() : queue(0) {}
QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) :
payload(msg), position(sn), queue(q) {}
+ QueuedMessage(Queue* q) : queue(q) {}
};
inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) { return a.position < b.position; }
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index f393879c16..c91cfba2f8 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -333,7 +333,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
parent->record(record);
}
if (acquire && !ackExpected) {
- queue->accept(0, msg);
+ queue->dequeue(0, msg);
}
if (mgmtObject) { mgmtObject->inc_delivered(); }
return true;
@@ -347,6 +347,11 @@ bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>)
bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
{
assertClusterSafe();
+ // FIXME aconway 2009-06-08: if we have byte & message credit but
+ // checkCredit fails because the message is to big, we should
+ // remain on queue's listener list for possible smaller messages
+ // in future.
+ //
blocked = !(filter(msg) && checkCredit(msg));
return !blocked;
}
diff --git a/cpp/src/qpid/cluster/BrokerHandler.cpp b/cpp/src/qpid/cluster/BrokerHandler.cpp
deleted file mode 100644
index f0b930a221..0000000000
--- a/cpp/src/qpid/cluster/BrokerHandler.cpp
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "Core.h"
-#include "BrokerHandler.h"
-#include "qpid/framing/ClusterMessageRoutingBody.h"
-#include "qpid/framing/ClusterMessageRoutedBody.h"
-#include "qpid/framing/ClusterMessageEnqueueBody.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/broker/QueuedMessage.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/log/Statement.h"
-
-namespace qpid {
-namespace cluster {
-
-using namespace framing;
-using namespace broker;
-
-namespace {
-// noReplicate means the current thread is handling a message
-// received from the cluster so it should not be replciated.
-QPID_TSS bool noReplicate = false;
-
-// Sequence number of the message currently being routed.
-// 0 if we are not currently routing a message.
-QPID_TSS SequenceNumber routeSeq = 0;
-}
-
-BrokerHandler::ScopedSuppressReplication::ScopedSuppressReplication() {
- assert(!noReplicate);
- noReplicate = true;
-}
-
-BrokerHandler::ScopedSuppressReplication::~ScopedSuppressReplication() {
- assert(noReplicate);
- noReplicate = false;
-}
-
-BrokerHandler::BrokerHandler(Core& c) : core(c) {}
-
-SequenceNumber BrokerHandler::nextSequenceNumber() {
- SequenceNumber s = ++sequence;
- if (!s) s = ++sequence; // Avoid 0 on wrap-around.
- return s;
-}
-
-void BrokerHandler::routing(const boost::intrusive_ptr<Message>&) { }
-
-bool BrokerHandler::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& msg)
-{
- if (noReplicate) return true;
- if (!routeSeq) { // This is the first enqueue, so send the message
- routeSeq = nextSequenceNumber();
- // FIXME aconway 2010-10-20: replicate message in fixed size buffers.
- std::string data(msg->encodedSize(),char());
- framing::Buffer buf(&data[0], data.size());
- msg->encode(buf);
- core.mcast(ClusterMessageRoutingBody(ProtocolVersion(), routeSeq, data));
- core.getRoutingMap().put(routeSeq, msg);
- }
- core.mcast(ClusterMessageEnqueueBody(ProtocolVersion(), routeSeq, queue.getName()));
- // TODO aconway 2010-10-21: configable option for strict (wait
- // for CPG deliver to do local deliver) vs. loose (local deliver
- // immediately).
- return false;
-}
-
-void BrokerHandler::routed(const boost::intrusive_ptr<Message>&) {
- if (routeSeq) { // we enqueued at least one message.
- core.mcast(ClusterMessageRoutedBody(ProtocolVersion(), routeSeq));
- // Note: routingMap is cleaned up on CPG delivery in MessageHandler.
- routeSeq = 0;
- }
-}
-
-}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/BrokerHandler.h b/cpp/src/qpid/cluster/BrokerHandler.h
deleted file mode 100644
index 1a61d1fc11..0000000000
--- a/cpp/src/qpid/cluster/BrokerHandler.h
+++ /dev/null
@@ -1,86 +0,0 @@
-#ifndef QPID_CLUSTER_BROKERHANDLER_H
-#define QPID_CLUSTER_BROKERHANDLER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/broker/Cluster.h"
-#include "qpid/sys/AtomicValue.h"
-
-namespace qpid {
-namespace cluster {
-class Core;
-
-// TODO aconway 2010-10-19: experimental cluster code.
-
-/**
- * Implements broker::Cluster interface, handles events in broker code.
- */
-class BrokerHandler : public broker::Cluster
-{
- public:
- /** Suppress replication while in scope.
- * Used to prevent re-replication of messages received from the cluster.
- */
- struct ScopedSuppressReplication {
- ScopedSuppressReplication();
- ~ScopedSuppressReplication();
- };
-
- BrokerHandler(Core&);
-
- // FIXME aconway 2010-10-20: implement all points.
-
- // Messages
-
- void routing(const boost::intrusive_ptr<broker::Message>&);
- bool enqueue(broker::Queue&, const boost::intrusive_ptr<broker::Message>&);
- void routed(const boost::intrusive_ptr<broker::Message>&);
- void acquire(const broker::QueuedMessage&) {}
- void accept(const broker::QueuedMessage&) {}
- void reject(const broker::QueuedMessage&) {}
- void rejected(const broker::QueuedMessage&) {}
- void release(const broker::QueuedMessage&) {}
- void drop(const broker::QueuedMessage&) {}
-
- // Consumers
-
- void consume(const broker::Queue&, size_t) {}
- void cancel(const broker::Queue&, size_t) {}
-
- // Wiring
-
- void create(const broker::Queue&) {}
- void destroy(const broker::Queue&) {}
- void create(const broker::Exchange&) {}
- void destroy(const broker::Exchange&) {}
- void bind(const broker::Queue&, const broker::Exchange&,
- const std::string&, const framing::FieldTable&) {}
-
- private:
- SequenceNumber nextSequenceNumber();
-
- Core& core;
- sys::AtomicValue<SequenceNumber> sequence;
-};
-}} // namespace qpid::cluster
-
-#endif /*!QPID_CLUSTER_BROKERHANDLER_H*/
diff --git a/cpp/src/qpid/cluster/Cluster2Plugin.cpp b/cpp/src/qpid/cluster/Cluster2Plugin.cpp
deleted file mode 100644
index 28b7dcec2e..0000000000
--- a/cpp/src/qpid/cluster/Cluster2Plugin.cpp
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <qpid/Options.h>
-#include <qpid/broker/Broker.h>
-#include "Core.h"
-
-namespace qpid {
-namespace cluster {
-using broker::Broker;
-
-// TODO aconway 2010-10-19: experimental new cluster code.
-
-/**
- * Plugin for the cluster.
- */
-struct Cluster2Plugin : public Plugin {
- struct Opts : public Options {
- Core::Settings& settings;
- Opts(Core::Settings& s) : Options("Cluster Options"), settings(s) {
- addOptions()
- ("cluster2-name", optValue(settings.name, "NAME"), "Name of cluster to join");
- // TODO aconway 2010-10-19: copy across other options from ClusterPlugin.h
- }
- };
-
- Core::Settings settings;
- Opts options;
- Core* core; // Core deletes itself on shutdown.
-
- Cluster2Plugin() : options(settings), core(0) {}
-
- Options* getOptions() { return &options; }
-
- void earlyInitialize(Plugin::Target& target) {
- if (settings.name.empty()) return;
- Broker* broker = dynamic_cast<Broker*>(&target);
- if (!broker) return;
- core = new Core(settings, *broker);
- }
-
- void initialize(Plugin::Target& target) {
- Broker* broker = dynamic_cast<Broker*>(&target);
- if (broker && core) core->initialize();
- }
-};
-
-static Cluster2Plugin instance; // Static initialization.
-
-}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Core.cpp b/cpp/src/qpid/cluster/Core.cpp
deleted file mode 100644
index e4127fa443..0000000000
--- a/cpp/src/qpid/cluster/Core.cpp
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "Core.h"
-#include "EventHandler.h"
-#include "BrokerHandler.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/broker/SignalHandler.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/log/Statement.h"
-#include <sys/uio.h> // For iovec
-
-namespace qpid {
-namespace cluster {
-
-Core::Core(const Settings& s, broker::Broker& b) :
- broker(b),
- eventHandler(new EventHandler(*this))
-{
- std::auto_ptr<BrokerHandler> bh(new BrokerHandler(*this));
- brokerHandler = bh.get();
- // BrokerHandler belongs to Broker
- broker.setCluster(std::auto_ptr<broker::Cluster>(bh));
- // FIXME aconway 2010-10-20: ownership of BrokerHandler, shutdown issues.
- eventHandler->getCpg().join(s.name);
-}
-
-void Core::initialize() {}
-
-void Core::fatal() {
- // FIXME aconway 2010-10-20: error handling
- assert(0);
- broker::SignalHandler::shutdown();
-}
-
-void Core::mcast(const framing::AMQBody& body) {
- QPID_LOG(trace, "multicast: " << body);
- // FIXME aconway 2010-10-20: use Multicaster, or bring in its features.
- // here we multicast Frames rather than Events.
- framing::AMQFrame f(body);
- std::string data(f.encodedSize(), char());
- framing::Buffer buf(&data[0], data.size());
- f.encode(buf);
- iovec iov = { buf.getPointer(), buf.getSize() };
- while (!eventHandler->getCpg().mcast(&iov, 1))
- ::usleep(1000); // FIXME aconway 2010-10-20: flow control
-}
-
-}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Core.h b/cpp/src/qpid/cluster/Core.h
deleted file mode 100644
index 9976c1c906..0000000000
--- a/cpp/src/qpid/cluster/Core.h
+++ /dev/null
@@ -1,95 +0,0 @@
-#ifndef QPID_CLUSTER_CORE_H
-#define QPID_CLUSTER_CORE_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <string>
-#include <memory>
-
-#include "Cpg.h"
-#include "MessageId.h"
-#include "LockedMap.h"
-#include <qpid/broker/QueuedMessage.h>
-
-// TODO aconway 2010-10-19: experimental cluster code.
-
-namespace qpid {
-
-namespace framing{
-class AMQBody;
-}
-
-namespace broker {
-class Broker;
-}
-
-namespace cluster {
-class EventHandler;
-class BrokerHandler;
-
-/**
- * Cluster core state machine.
- * Holds together the various objects that implement cluster behavior,
- * and holds state that is shared by multiple components.
- *
- * Thread safe: called from broker connection threads and CPG dispatch threads.
- */
-class Core
-{
- public:
- /** Configuration settings */
- struct Settings {
- std::string name;
- };
-
- typedef LockedMap<SequenceNumber, boost::intrusive_ptr<broker::Message> >
- SequenceMessageMap;
-
- /** Constructed during Plugin::earlyInitialize() */
- Core(const Settings&, broker::Broker&);
-
- /** Called during Plugin::initialize() */
- void initialize();
-
- /** Shut down broker due to fatal error. Caller should log a critical message */
- void fatal();
-
- /** Multicast an event */
- void mcast(const framing::AMQBody&);
-
- broker::Broker& getBroker() { return broker; }
- EventHandler& getEventHandler() { return *eventHandler; }
- BrokerHandler& getBrokerHandler() { return *brokerHandler; }
-
- /** Map of messages that are currently being routed.
- * Used to pass messages being routed from BrokerHandler to MessageHandler
- */
- SequenceMessageMap& getRoutingMap() { return routingMap; }
- private:
- broker::Broker& broker;
- std::auto_ptr<EventHandler> eventHandler; // Handles CPG events.
- BrokerHandler* brokerHandler; // Handles broker events.
- SequenceMessageMap routingMap;
-};
-}} // namespace qpid::cluster
-
-#endif /*!QPID_CLUSTER_CORE_H*/
diff --git a/cpp/src/qpid/cluster/EventHandler.cpp b/cpp/src/qpid/cluster/EventHandler.cpp
deleted file mode 100644
index 95ae285b06..0000000000
--- a/cpp/src/qpid/cluster/EventHandler.cpp
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "MessageHandler.h"
-#include "EventHandler.h"
-#include "Core.h"
-#include "types.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/AllInvoker.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/log/Statement.h"
-
-namespace qpid {
-namespace cluster {
-
-EventHandler::EventHandler(Core& c) :
- core(c),
- cpg(*this), // FIXME aconway 2010-10-20: belongs on Core.
- dispatcher(cpg, core.getBroker().getPoller(), boost::bind(&Core::fatal, &core)),
- self(cpg.self()),
- messageHandler(new MessageHandler(*this))
-{
- dispatcher.start(); // FIXME aconway 2010-10-20: later in initialization?
-}
-
-EventHandler::~EventHandler() {}
-
-// Deliver CPG message.
-void EventHandler::deliver(
- cpg_handle_t /*handle*/,
- const cpg_name* /*group*/,
- uint32_t nodeid,
- uint32_t pid,
- void* msg,
- int msg_len)
-{
- sender = MemberId(nodeid, pid);
- framing::Buffer buf(static_cast<char*>(msg), msg_len);
- framing::AMQFrame frame;
- while (buf.available()) {
- frame.decode(buf);
- assert(frame.getBody());
- QPID_LOG(trace, "cluster deliver: " << *frame.getBody());
- try {
- invoke(*frame.getBody());
- }
- catch (const std::exception& e) {
- // Note: exceptions are assumed to be survivable,
- // fatal errors should log a message and call Core::fatal.
- QPID_LOG(error, e.what());
- }
- }
-}
-
-void EventHandler::invoke(const framing::AMQBody& body) {
- if (framing::invoke(*messageHandler, body).wasHandled()) return;
-}
-
-// CPG config-change callback.
-void EventHandler::configChange (
- cpg_handle_t /*handle*/,
- const cpg_name */*group*/,
- const cpg_address */*members*/, int /*nMembers*/,
- const cpg_address */*left*/, int /*nLeft*/,
- const cpg_address */*joined*/, int /*nJoined*/)
-{
- // FIXME aconway 2010-10-20: TODO
-}
-
-}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/EventHandler.h b/cpp/src/qpid/cluster/EventHandler.h
deleted file mode 100644
index 5645c3980b..0000000000
--- a/cpp/src/qpid/cluster/EventHandler.h
+++ /dev/null
@@ -1,85 +0,0 @@
-#ifndef QPID_CLUSTER_EVENTHANDLER_H
-#define QPID_CLUSTER_EVENTHANDLER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-// TODO aconway 2010-10-19: experimental cluster code.
-
-#include "types.h"
-#include "Cpg.h"
-#include "PollerDispatch.h"
-
-namespace qpid {
-
-namespace framing {
-class AMQBody;
-}
-
-namespace cluster {
-class Core;
-class MessageHandler;
-
-/**
- * Dispatch events received from CPG.
- * Thread unsafe: only called in CPG deliver thread context.
- */
-class EventHandler : public Cpg::Handler
-{
- public:
- EventHandler(Core&);
- ~EventHandler();
-
- void deliver( // CPG deliver callback.
- cpg_handle_t /*handle*/,
- const struct cpg_name *group,
- uint32_t /*nodeid*/,
- uint32_t /*pid*/,
- void* /*msg*/,
- int /*msg_len*/);
-
- void configChange( // CPG config change callback.
- cpg_handle_t /*handle*/,
- const struct cpg_name */*group*/,
- const struct cpg_address */*members*/, int /*nMembers*/,
- const struct cpg_address */*left*/, int /*nLeft*/,
- const struct cpg_address */*joined*/, int /*nJoined*/
- );
-
-
- MemberId getSender() { return sender; }
- MemberId getSelf() { return self; }
- Core& getCore() { return core; }
- Cpg& getCpg() { return cpg; }
-
- private:
- void invoke(const framing::AMQBody& body);
-
- Core& core;
- Cpg cpg;
- PollerDispatch dispatcher;
- MemberId sender; // sender of current event.
- MemberId self;
- std::auto_ptr<MessageHandler> messageHandler;
-};
-}} // namespace qpid::cluster
-
-#endif /*!QPID_CLUSTER_EVENTHANDLER_H*/
diff --git a/cpp/src/qpid/cluster/LockedMap.h b/cpp/src/qpid/cluster/LockedMap.h
deleted file mode 100644
index 0736e7ac35..0000000000
--- a/cpp/src/qpid/cluster/LockedMap.h
+++ /dev/null
@@ -1,73 +0,0 @@
-#ifndef QPID_CLUSTER_LOCKEDMAP_H
-#define QPID_CLUSTER_LOCKEDMAP_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/Mutex.h"
-#include <map>
-
-namespace qpid {
-namespace cluster {
-
-/**
- * A reader-writer locked thread safe map.
- */
-template <class Key, class Value>
-class LockedMap
-{
- public:
- /** Get value associated with key, returns Value() if none. */
- Value get(const Key& key) const {
- sys::RWlock::ScopedRlock r(lock);
- typename Map::const_iterator i = map.find(key);
- if (i == map.end()) return Value();
- else return i->second;
- }
-
- /** Associate value with key, overwriting any previous value for key. */
- void put(const Key& key, const Value& value) {
- sys::RWlock::ScopedWlock w(lock);
- map[key] = value;
- }
-
- /** Associate value with key if there is not already a value associated with key.
- * Returns true if the value was added.
- */
- bool add(const Key& key, const Value& value) {
- sys::RWlock::ScopedWlock w(lock);
- return map.insert(key, value).second;
- }
-
- /** Erase the value associated with key if any. Return true if a value was erased. */
- bool erase(const Key& key) {
- sys::RWlock::ScopedWlock w(lock);
- return map.erase(key);
- }
-
- private:
- typedef std::map<Key, Value> Map;
- Map map;
- mutable sys::RWlock lock;
-};
-}} // namespace qpid::cluster
-
-#endif /*!QPID_CLUSTER_LOCKEDMAP_H*/
diff --git a/cpp/src/qpid/cluster/MessageHandler.cpp b/cpp/src/qpid/cluster/MessageHandler.cpp
deleted file mode 100644
index fbbdad38a3..0000000000
--- a/cpp/src/qpid/cluster/MessageHandler.cpp
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "Core.h"
-#include "MessageHandler.h"
-#include "BrokerHandler.h"
-#include "EventHandler.h"
-#include "qpid/broker/Message.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/broker/QueueRegistry.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/sys/Thread.h"
-#include <boost/shared_ptr.hpp>
-
-namespace qpid {
-namespace cluster {
-using namespace broker;
-
-MessageHandler::MessageHandler(EventHandler& e) :
- broker(e.getCore().getBroker()),
- eventHandler(e),
- brokerHandler(e.getCore().getBrokerHandler())
-{}
-
-MessageHandler::~MessageHandler() {}
-
-MemberId MessageHandler::sender() { return eventHandler.getSender(); }
-MemberId MessageHandler::self() { return eventHandler.getSelf(); }
-
-void MessageHandler::routing(uint64_t sequence, const std::string& message) {
- MessageId id(sender(), sequence);
- boost::intrusive_ptr<Message> msg;
- if (sender() == self())
- msg = eventHandler.getCore().getRoutingMap().get(sequence);
- if (!msg) {
- framing::Buffer buf(const_cast<char*>(&message[0]), message.size());
- msg = new Message;
- msg->decodeHeader(buf);
- msg->decodeContent(buf);
- }
- routingMap[id] = msg;
-}
-
-void MessageHandler::enqueue(uint64_t sequence, const std::string& q) {
- MessageId id(sender(), sequence);
- boost::shared_ptr<Queue> queue = broker.getQueues().find(q);
- if (!queue) throw Exception(QPID_MSG("Cluster message for unknown queue " << q));
- boost::intrusive_ptr<Message> msg = routingMap[id];
- if (!msg) throw Exception(QPID_MSG("Unknown cluster message for queue " << q));
- BrokerHandler::ScopedSuppressReplication ssr;
- // TODO aconway 2010-10-21: configable option for strict (wait
- // for CPG deliver to do local deliver) vs. loose (local deliver
- // immediately).
- queue->deliver(msg);
-}
-
-void MessageHandler::routed(uint64_t sequence) {
- MessageId id(sender(), sequence);
- routingMap.erase(id);
- eventHandler.getCore().getRoutingMap().erase(sequence);
-}
-
-}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/MessageHandler.h b/cpp/src/qpid/cluster/MessageHandler.h
deleted file mode 100644
index 5c32bf474e..0000000000
--- a/cpp/src/qpid/cluster/MessageHandler.h
+++ /dev/null
@@ -1,70 +0,0 @@
-#ifndef QPID_CLUSTER_MESSAGEHANDLER_H
-#define QPID_CLUSTER_MESSAGEHANDLER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-// TODO aconway 2010-10-19: experimental cluster code.
-
-#include "qpid/framing/AMQP_AllOperations.h"
-#include "MessageId.h"
-#include <boost/intrusive_ptr.hpp>
-#include <map>
-
-namespace qpid {
-
-namespace broker {
-class Message;
-class Broker;
-}
-
-namespace cluster {
-class EventHandler;
-class BrokerHandler;
-
-/**
- * Handler for message disposition events.
- */
-class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler
-{
- public:
- MessageHandler(EventHandler&);
- ~MessageHandler();
-
- void routing(uint64_t sequence, const std::string& message);
- void enqueue(uint64_t sequence, const std::string& queue);
- void routed(uint64_t sequence);
-
- private:
- typedef std::map<MessageId, boost::intrusive_ptr<broker::Message> > RoutingMap;
-
- MemberId sender();
- MemberId self();
-
- broker::Broker& broker;
- EventHandler& eventHandler;
- BrokerHandler& brokerHandler;
- RoutingMap routingMap;
-
-};
-}} // namespace qpid::cluster
-
-#endif /*!QPID_CLUSTER_MESSAGEHANDLER_H*/
diff --git a/cpp/src/qpid/cluster/MessageId.cpp b/cpp/src/qpid/cluster/MessageId.cpp
deleted file mode 100644
index fbd248ed69..0000000000
--- a/cpp/src/qpid/cluster/MessageId.cpp
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "MessageId.h"
-#include <ostream>
-
-namespace qpid {
-namespace cluster {
-
-bool operator<(const MessageId& a, const MessageId& b) {
- return a.member < b.member || ((a.member == b.member) && a.sequence < b.sequence);
-}
-
-std::ostream& operator<<(std::ostream& o, const MessageId& m) {
- return o << m.member << ":" << m.sequence;
-}
-
-}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/MessageId.h b/cpp/src/qpid/cluster/MessageId.h
deleted file mode 100644
index 16bf7ddd6d..0000000000
--- a/cpp/src/qpid/cluster/MessageId.h
+++ /dev/null
@@ -1,52 +0,0 @@
-#ifndef QPID_CLUSTER_MESSAGEID_H
-#define QPID_CLUSTER_MESSAGEID_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "types.h"
-#include <iosfwd>
-
-namespace qpid {
-namespace cluster {
-
-// TODO aconway 2010-10-20: experimental new cluster code.
-
-/** Sequence number used in message identifiers */
-typedef uint64_t SequenceNumber;
-
-/**
- * Message identifier
- */
-struct MessageId {
- MemberId member; /// Member that created the message
- SequenceNumber sequence; /// Sequence number assiged by member.
- MessageId(MemberId m=MemberId(), SequenceNumber s=0) : member(m), sequence(s) {}
-};
-
-bool operator<(const MessageId&, const MessageId&);
-
-std::ostream& operator<<(std::ostream&, const MessageId&);
-
-
-}} // namespace qpid::cluster
-
-#endif /*!QPID_CLUSTER_MESSAGEID_H*/
diff --git a/cpp/src/qpid/cluster/PollerDispatch.cpp b/cpp/src/qpid/cluster/PollerDispatch.cpp
index 43c171efe8..b8d94b95a5 100644
--- a/cpp/src/qpid/cluster/PollerDispatch.cpp
+++ b/cpp/src/qpid/cluster/PollerDispatch.cpp
@@ -37,11 +37,9 @@ PollerDispatch::PollerDispatch(Cpg& c, boost::shared_ptr<sys::Poller> p,
started(false)
{}
-PollerDispatch::~PollerDispatch() { stop(); }
-
-void PollerDispatch::stop() {
- if (started) dispatchHandle.stopWatch();
- started = false;
+PollerDispatch::~PollerDispatch() {
+ if (started)
+ dispatchHandle.stopWatch();
}
void PollerDispatch::start() {
@@ -56,7 +54,6 @@ void PollerDispatch::dispatch(sys::DispatchHandle& h) {
h.rewatch();
} catch (const std::exception& e) {
QPID_LOG(critical, "Error in cluster dispatch: " << e.what());
- stop();
onError();
}
}
diff --git a/cpp/src/qpid/cluster/PollerDispatch.h b/cpp/src/qpid/cluster/PollerDispatch.h
index f16d5ece95..63801e0de9 100644
--- a/cpp/src/qpid/cluster/PollerDispatch.h
+++ b/cpp/src/qpid/cluster/PollerDispatch.h
@@ -41,7 +41,6 @@ class PollerDispatch {
~PollerDispatch();
void start();
- void stop();
private:
// Poller callbacks
diff --git a/cpp/src/tests/BrokerClusterCalls.cpp b/cpp/src/tests/BrokerClusterCalls.cpp
deleted file mode 100644
index f659702387..0000000000
--- a/cpp/src/tests/BrokerClusterCalls.cpp
+++ /dev/null
@@ -1,435 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-///@file
-// Tests using a dummy broker::Cluster implementation to verify the expected
-// Cluster functions are called for various actions on the broker.
-//
-
-#include "unit_test.h"
-#include "test_tools.h"
-#include "qpid/broker/Cluster.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/client/Connection.h"
-#include "qpid/client/Session.h"
-#include "qpid/messaging/Connection.h"
-#include "qpid/messaging/Session.h"
-#include "qpid/messaging/Sender.h"
-#include "qpid/messaging/Receiver.h"
-#include "qpid/messaging/Message.h"
-#include "qpid/messaging/Duration.h"
-#include "BrokerFixture.h"
-#include <boost/assign.hpp>
-#include <boost/format.hpp>
-
-using namespace std;
-using namespace boost;
-using namespace boost::assign;
-using namespace qpid::messaging;
-using boost::format;
-using boost::intrusive_ptr;
-
-namespace qpid {
-namespace tests {
-
-class DummyCluster : public broker::Cluster
-{
- private:
- /** Flag used to ignore events other than enqueues while routing,
- * e.g. acquires and accepts generated in a ring queue to replace an element..
- * In real impl would be a thread-local variable.
- */
- bool isRouting;
-
- void recordQm(const string& op, const broker::QueuedMessage& qm) {
- history += (format("%s(%s, %d, %s)") % op % qm.queue->getName()
- % qm.position % qm.payload->getFrames().getContent()).str();
- }
- void recordMsg(const string& op, broker::Queue& q, intrusive_ptr<broker::Message> msg) {
- history += (format("%s(%s, %s)") % op % q.getName() % msg->getFrames().getContent()).str();
- }
- void recordStr(const string& op, const string& name) {
- history += (format("%s(%s)") % op % name).str();
- }
- public:
- // Messages
-
- virtual void routing(const boost::intrusive_ptr<broker::Message>& m) {
- isRouting = true;
- history += (format("routing(%s)") % m->getFrames().getContent()).str();
- }
-
- virtual bool enqueue(broker::Queue& q, const intrusive_ptr<broker::Message>&msg) {
- recordMsg("enqueue", q, msg);
- return true;
- }
-
- virtual void routed(const boost::intrusive_ptr<broker::Message>& m) {
- history += (format("routed(%s)") % m->getFrames().getContent()).str();
- isRouting = false;
- }
- virtual void acquire(const broker::QueuedMessage& qm) {
- if (!isRouting) recordQm("acquire", qm);
- }
- virtual void accept(const broker::QueuedMessage& qm) {
- if (!isRouting) recordQm("accept", qm);
- }
- virtual void reject(const broker::QueuedMessage& qm) {
- if (!isRouting) recordQm("reject", qm);
- }
- virtual void rejected(const broker::QueuedMessage& qm) {
- if (!isRouting) recordQm("rejected", qm);
- }
- virtual void release(const broker::QueuedMessage& qm) {
- if (!isRouting) recordQm("release", qm);
- }
- virtual void drop(const broker::QueuedMessage& qm) {
- if (!isRouting) recordQm("dequeue", qm);
- }
-
- // Consumers
-
- virtual void consume(const broker::Queue& q, size_t n) {
- history += (format("consume(%s, %d)") % q.getName() % n).str();
- }
- virtual void cancel(const broker::Queue& q, size_t n) {
- history += (format("cancel(%s, %d)") % q.getName() % n).str();
- }
-
- // Wiring
-
- virtual void create(const broker::Queue& q) { recordStr("createq", q.getName()); }
- virtual void destroy(const broker::Queue& q) { recordStr("destroyq", q.getName()); }
- virtual void create(const broker::Exchange& ex) { recordStr("createex", ex.getName()); }
- virtual void destroy(const broker::Exchange& ex) { recordStr("destroyex", ex.getName()); }
- virtual void bind(const broker::Queue& q, const broker::Exchange& ex, const std::string& key, const framing::FieldTable& /*args*/) {
- history += (format("bind(%s, %s, %s)") % q.getName() % ex.getName() % key).str();
- }
- vector<string> history;
-};
-
-QPID_AUTO_TEST_SUITE(BrokerClusterCallsTestSuite)
-
-// Broker fixture with DummyCluster set up and some new API client bits.
-struct DummyClusterFixture: public BrokerFixture {
- Connection c;
- Session s;
- DummyCluster*dc;
- DummyClusterFixture() {
- broker->setCluster(auto_ptr<broker::Cluster>(new DummyCluster));
- dc = &static_cast<DummyCluster&>(broker->getCluster());
- c = Connection("localhost:"+lexical_cast<string>(getPort()));
- c.open();
- s = c.createSession();
- }
- ~DummyClusterFixture() {
- c.close();
- }
-};
-
-QPID_AUTO_TEST_CASE(testSimplePubSub) {
- DummyClusterFixture f;
- vector<string>& h = f.dc->history;
-
- // Queue creation
- Sender sender = f.s.createSender("q;{create:always,delete:always}");
- size_t i = 0;
- BOOST_CHECK_EQUAL(h.at(i++), "createq(q)"); // Note: at() does bounds checking.
- BOOST_CHECK_EQUAL(h.size(), i);
-
- // Consumer
- Receiver receiver = f.s.createReceiver("q");
- f.s.sync();
- BOOST_CHECK_EQUAL(h.at(i++), "consume(q, 1)");
- BOOST_CHECK_EQUAL(h.size(), i);
-
- // Send message
- sender.send(Message("a"));
- f.s.sync();
- BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
- BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, a)");
- BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
- // Don't check size here as it is uncertain whether acquire has happened yet.
-
- // Acquire message
- Message m = receiver.fetch(Duration::SECOND);
- BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 1, a)");
- BOOST_CHECK_EQUAL(h.size(), i);
-
- // Acknowledge message
- f.s.acknowledge(true);
- f.s.sync();
- BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 1, a)");
- BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)");
- BOOST_CHECK_EQUAL(h.size(), i);
-
- // Close a consumer
- receiver.close();
- BOOST_CHECK_EQUAL(h.at(i++), "cancel(q, 0)");
- BOOST_CHECK_EQUAL(h.size(), i);
-
- // Destroy the queue
- f.c.close();
- BOOST_CHECK_EQUAL(h.at(i++), "destroyq(q)");
- BOOST_CHECK_EQUAL(h.size(), i);
-}
-
-QPID_AUTO_TEST_CASE(testReleaseReject) {
- DummyClusterFixture f;
- vector<string>& h = f.dc->history;
-
- Sender sender = f.s.createSender("q;{create:always,delete:always,node:{x-declare:{alternate-exchange:amq.fanout}}}");
- sender.send(Message("a"));
- Receiver receiver = f.s.createReceiver("q");
- Receiver altReceiver = f.s.createReceiver("amq.fanout;{link:{name:altq}}");
- Message m = receiver.fetch(Duration::SECOND);
- h.clear();
-
- // Explicit release
- f.s.release(m);
- f.s.sync();
- size_t i = 0;
- BOOST_CHECK_EQUAL(h.at(i++), "release(q, 1, a)");
- BOOST_CHECK_EQUAL(h.size(), i);
-
- // Implicit release on closing connection.
- Connection c("localhost:"+lexical_cast<string>(f.getPort()));
- c.open();
- Session s = c.createSession();
- Receiver r = s.createReceiver("q");
- m = r.fetch(Duration::SECOND);
- h.clear();
- i = 0;
- c.close();
- BOOST_CHECK_EQUAL(h.at(i++), "cancel(q, 1)");
- BOOST_CHECK_EQUAL(h.at(i++), "release(q, 1, a)");
- BOOST_CHECK_EQUAL(h.size(), i);
-
- // Reject message, goes to alternate exchange.
- m = receiver.fetch(Duration::SECOND);
- h.clear();
- i = 0;
- f.s.reject(m);
- BOOST_CHECK_EQUAL(h.at(i++), "reject(q, 1, a)");
- BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); // Routing to alt exchange
- BOOST_CHECK_EQUAL(h.at(i++), "enqueue(amq.fanout_altq, a)");
- BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
- BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)");
- BOOST_CHECK_EQUAL(h.at(i++), "rejected(q, 1, a)");
- BOOST_CHECK_EQUAL(h.size(), i);
- m = altReceiver.fetch(Duration::SECOND);
- BOOST_CHECK_EQUAL(m.getContent(), "a");
-
- // Timed out message
- h.clear();
- i = 0;
- m = Message("t");
- m.setTtl(Duration(1)); // Timeout 1ms
- sender.send(m);
- usleep(2000); // Sleep 2ms
- bool received = receiver.fetch(m, Duration::IMMEDIATE);
- BOOST_CHECK(!received); // Timed out
- BOOST_CHECK_EQUAL(h.at(i++), "routing(t)");
- BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, t)");
- BOOST_CHECK_EQUAL(h.at(i++), "routed(t)");
- BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, t)");
- BOOST_CHECK_EQUAL(h.size(), i);
-
- // Message replaced on LVQ
- sender = f.s.createSender("lvq;{create:always,delete:always,node:{x-declare:{arguments:{qpid.last_value_queue:1}}}}");
- m = Message("a");
- m.getProperties()["qpid.LVQ_key"] = "foo";
- sender.send(m);
- f.s.sync();
- BOOST_CHECK_EQUAL(h.at(i++), "createq(lvq)");
- BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
- BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, a)");
- BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
- BOOST_CHECK_EQUAL(h.size(), i);
-
- m = Message("b");
- m.getProperties()["qpid.LVQ_key"] = "foo";
- sender.send(m);
- f.s.sync();
- BOOST_CHECK_EQUAL(h.at(i++), "routing(b)");
- BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, b)");
- BOOST_CHECK_EQUAL(h.at(i++), "routed(b)");
- BOOST_CHECK_EQUAL(h.size(), i);
-
- receiver = f.s.createReceiver("lvq");
- BOOST_CHECK_EQUAL(receiver.fetch(Duration::SECOND).getContent(), "b");
- f.s.acknowledge(true);
- BOOST_CHECK_EQUAL(h.at(i++), "consume(lvq, 1)");
- BOOST_CHECK_EQUAL(h.at(i++), "acquire(lvq, 1, b)");
- BOOST_CHECK_EQUAL(h.at(i++), "accept(lvq, 1, b)");
- BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 1, b)");
- BOOST_CHECK_EQUAL(h.size(), i);
-}
-
-QPID_AUTO_TEST_CASE(testFanout) {
- DummyClusterFixture f;
- vector<string>& h = f.dc->history;
-
- Receiver r1 = f.s.createReceiver("amq.fanout;{link:{name:r1}}");
- Receiver r2 = f.s.createReceiver("amq.fanout;{link:{name:r2}}");
- Sender sender = f.s.createSender("amq.fanout");
- r1.setCapacity(0); // Don't receive immediately.
- r2.setCapacity(0);
- h.clear();
- size_t i = 0;
-
- // Send message
- sender.send(Message("a"));
- f.s.sync();
- BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
- BOOST_CHECK_EQUAL(0u, h.at(i++).find("enqueue(amq.fanout_r"));
- BOOST_CHECK_EQUAL(0u, h.at(i++).find("enqueue(amq.fanout_r"));
- BOOST_CHECK(h.at(i-1) != h.at(i-2));
- BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
- BOOST_CHECK_EQUAL(h.size(), i);
-
- // Receive messages
- Message m1 = r1.fetch(Duration::SECOND);
- f.s.acknowledge(m1, true);
- Message m2 = r2.fetch(Duration::SECOND);
- f.s.acknowledge(m2, true);
-
- BOOST_CHECK_EQUAL(h.at(i++), "acquire(amq.fanout_r1, 1, a)");
- BOOST_CHECK_EQUAL(h.at(i++), "accept(amq.fanout_r1, 1, a)");
- BOOST_CHECK_EQUAL(h.at(i++), "dequeue(amq.fanout_r1, 1, a)");
- BOOST_CHECK_EQUAL(h.at(i++), "acquire(amq.fanout_r2, 1, a)");
- BOOST_CHECK_EQUAL(h.at(i++), "accept(amq.fanout_r2, 1, a)");
- BOOST_CHECK_EQUAL(h.at(i++), "dequeue(amq.fanout_r2, 1, a)");
- BOOST_CHECK_EQUAL(h.size(), i);
-}
-
-QPID_AUTO_TEST_CASE(testRingQueue) {
- DummyClusterFixture f;
- vector<string>& h = f.dc->history;
-
- // FIXME aconway 2010-10-15: QPID-2908 ring queue address string is not working,
- // so we can't do this:
- // Sender sender = f.s.createSender("ring;{create:always,node:{x-declare:{arguments:{qpid.max_size:3,qpid.policy_type:ring}}}}");
- // Must use old API to declare ring queue:
- qpid::client::Connection c;
- f.open(c);
- qpid::client::Session s = c.newSession();
- qpid::framing::FieldTable args;
- args.setInt("qpid.max_size", 3);
- args.setString("qpid.policy_type","ring");
- s.queueDeclare(qpid::client::arg::queue="ring", qpid::client::arg::arguments=args);
- c.close();
- Sender sender = f.s.createSender("ring");
-
- size_t i = 0;
- // Send message
- sender.send(Message("a"));
- sender.send(Message("b"));
- sender.send(Message("c"));
- sender.send(Message("d"));
- f.s.sync();
-
- BOOST_CHECK_EQUAL(h.at(i++), "createq(ring)");
-
- BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
- BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, a)");
- BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
-
- BOOST_CHECK_EQUAL(h.at(i++), "routing(b)");
- BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, b)");
- BOOST_CHECK_EQUAL(h.at(i++), "routed(b)");
-
- BOOST_CHECK_EQUAL(h.at(i++), "routing(c)");
- BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, c)");
- BOOST_CHECK_EQUAL(h.at(i++), "routed(c)");
-
- BOOST_CHECK_EQUAL(h.at(i++), "routing(d)");
- BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, d)");
- BOOST_CHECK_EQUAL(h.at(i++), "routed(d)");
-
- Receiver receiver = f.s.createReceiver("ring");
- BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "b");
- BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "c");
- BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "d");
- f.s.acknowledge(true);
-
- BOOST_CHECK_EQUAL(h.at(i++), "consume(ring, 1)");
- BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 2, b)");
- BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 3, c)");
- BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 4, d)");
- BOOST_CHECK_EQUAL(h.at(i++), "accept(ring, 2, b)");
- BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 2, b)");
- BOOST_CHECK_EQUAL(h.at(i++), "accept(ring, 3, c)");
- BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 3, c)");
- BOOST_CHECK_EQUAL(h.at(i++), "accept(ring, 4, d)");
- BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 4, d)");
-
- BOOST_CHECK_EQUAL(h.size(), i);
-}
-
-QPID_AUTO_TEST_CASE(testTransactions) {
- DummyClusterFixture f;
- vector<string>& h = f.dc->history;
- Session ts = f.c.createTransactionalSession();
- Sender sender = ts.createSender("q;{create:always,delete:always}");
- size_t i = 0;
- BOOST_CHECK_EQUAL(h.at(i++), "createq(q)"); // Note: at() does bounds checking.
- BOOST_CHECK_EQUAL(h.size(), i);
-
- sender.send(Message("a"));
- sender.send(Message("b"));
- ts.sync();
- BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
- BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
- BOOST_CHECK_EQUAL(h.at(i++), "routing(b)");
- BOOST_CHECK_EQUAL(h.at(i++), "routed(b)");
- BOOST_CHECK_EQUAL(h.size(), i); // Not replicated till commit
- ts.commit();
- // FIXME aconway 2010-10-18: As things stand the cluster is not
- // compatible with transactions
- // - enqueues occur after routing is complete
- // - no call to Cluster::enqueue, should be in Queue::process?
- // - no transaction context associated with messages in the Cluster interface.
- // - no call to Cluster::accept in Queue::dequeueCommitted
- // BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, a)");
- // BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, b)");
- BOOST_CHECK_EQUAL(h.size(), i);
-
-
- Receiver receiver = ts.createReceiver("q");
- BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "a");
- BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "b");
- ts.acknowledge();
- ts.sync();
- BOOST_CHECK_EQUAL(h.at(i++), "consume(q, 1)");
- BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 1, a)");
- BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 2, b)");
- BOOST_CHECK_EQUAL(h.size(), i);
- ts.commit();
- ts.sync();
- // BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 1, a)");
- BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)");
- // BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 2, b)");
- BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, b)");
- BOOST_CHECK_EQUAL(h.size(), i);
-}
-
-QPID_AUTO_TEST_SUITE_END()
-
-}} // namespace qpid::tests
-
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 2a7430b8ca..241ee0fbb1 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -123,8 +123,7 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \
Variant.cpp \
Address.cpp \
ClientMessage.cpp \
- Qmf2.cpp \
- BrokerClusterCalls.cpp
+ Qmf2.cpp
if HAVE_XML
unit_test_SOURCES+= XmlClientSessionTest.cpp
diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk
index bb0f5d150b..da191e8682 100644
--- a/cpp/src/tests/cluster.mk
+++ b/cpp/src/tests/cluster.mk
@@ -77,7 +77,7 @@ cluster_test_SOURCES = \
PartialFailure.cpp \
ClusterFailover.cpp
-cluster_test_LDADD=$(lib_client) $(lib_broker) $(lib_messaging) ../cluster.la -lboost_unit_test_framework
+cluster_test_LDADD=$(lib_client) $(lib_broker) ../cluster.la -lboost_unit_test_framework
qpidtest_SCRIPTS += run_cluster_tests cluster_tests.py run_long_cluster_tests long_cluster_tests.py testlib.py cluster_tests.fail
diff --git a/cpp/src/tests/cluster2_tests.py b/cpp/src/tests/cluster2_tests.py
deleted file mode 100755
index e3a19ae2a0..0000000000
--- a/cpp/src/tests/cluster2_tests.py
+++ /dev/null
@@ -1,66 +0,0 @@
-#!/usr/bin/env python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import os, signal, sys, time, imp, re, subprocess
-from qpid import datatypes, messaging
-from qpid.brokertest import *
-from qpid.harness import Skipped
-from qpid.messaging import Message
-from qpid.messaging.exceptions import Empty
-from threading import Thread, Lock
-from logging import getLogger
-from itertools import chain
-
-log = getLogger("qpid.cluster_tests")
-
-class Cluster2Tests(BrokerTest):
- """Tests for new cluster code."""
-
- def test_message_enqueue(self):
- """Test basic replication of enqueued messages."""
-
- cluster = self.cluster(2, cluster2=True, args=["--log-enable=trace+:cluster"])
-
- sn0 = cluster[0].connect().session()
- r0p = sn0.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}");
- r0q = sn0.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}");
- s0 = sn0.sender("amq.fanout");
-
- sn1 = cluster[1].connect().session()
- r1p = sn1.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}");
- r1q = sn1.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}");
-
-
- # Send messages on member 0
- content = ["a","b","c"]
- for m in content: s0.send(Message(m))
-
- # Browse on both members.
- def check(content, receiver):
- for c in content: self.assertEqual(c, receiver.fetch(1).content)
- self.assertRaises(Empty, receiver.fetch, 0)
-
- check(content, r0p)
- check(content, r0q)
- check(content, r1p)
- check(content, r1q)
-
- sn1.connection.close()
- sn0.connection.close()
diff --git a/cpp/src/tests/run_cluster_tests b/cpp/src/tests/run_cluster_tests
index 3971a39144..e136d3810a 100755
--- a/cpp/src/tests/run_cluster_tests
+++ b/cpp/src/tests/run_cluster_tests
@@ -33,5 +33,5 @@ mkdir -p $OUTDIR
CLUSTER_TESTS_IGNORE=${CLUSTER_TESTS_IGNORE:--i cluster_tests.StoreTests.* -I $srcdir/cluster_tests.fail}
CLUSTER_TESTS=${CLUSTER_TESTS:-$*}
-with_ais_group $QPID_PYTHON_TEST -DOUTDIR=$OUTDIR -m cluster_tests -m cluster2_tests $CLUSTER_TESTS_IGNORE $CLUSTER_TESTS || exit 1
+with_ais_group $QPID_PYTHON_TEST -DOUTDIR=$OUTDIR -m cluster_tests $CLUSTER_TESTS_IGNORE $CLUSTER_TESTS || exit 1
rm -rf $OUTDIR
diff --git a/cpp/src/tests/test_env.sh.in b/cpp/src/tests/test_env.sh.in
index 96fe6b64f4..b5c3b0fa3d 100644
--- a/cpp/src/tests/test_env.sh.in
+++ b/cpp/src/tests/test_env.sh.in
@@ -63,7 +63,6 @@ export TEST_STORE_LIB=$testmoduledir/test_store.so
exportmodule() { test -f $moduledir/$2 && eval "export $1=$moduledir/$2"; }
exportmodule ACL_LIB acl.so
exportmodule CLUSTER_LIB cluster.so
-exportmodule CLUSTER2_LIB cluster2.so
exportmodule REPLICATING_LISTENER_LIB replicating_listener.so
exportmodule REPLICATION_EXCHANGE_LIB replication_exchange.so
exportmodule SSLCONNECTOR_LIB sslconnector.so