summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-09-06 21:45:43 +0000
committerAlan Conway <aconway@apache.org>2011-09-06 21:45:43 +0000
commit4acf969531264568693cfaf86492203a1ecf3634 (patch)
treec96ae7c251985e579527d462152061e94fa4a1fe
parent1c52d0b8deda196e257de7436a247921d9482046 (diff)
downloadqpid-python-4acf969531264568693cfaf86492203a1ecf3634.tar.gz
QPID-2920: First cut experimental prototype for new cluster.
Experimental code to investigate & measure performance of new cluster design ideas. Experimental classes are in src/qpid/cluster/exp. New broker::Cluster interface provides call points for cluster. Similar to store but has more operations, may be merged at a future point. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-1@1165874 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/design_docs/new-cluster-plan.txt7
-rw-r--r--qpid/cpp/src/Makefile.am2
-rw-r--r--qpid/cpp/src/cluster.mk24
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp25
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h5
-rw-r--r--qpid/cpp/src/qpid/broker/Cluster.h104
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.cpp18
-rw-r--r--qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp13
-rw-r--r--qpid/cpp/src/qpid/broker/ExchangeRegistry.h1
-rw-r--r--qpid/cpp/src/qpid/broker/NullCluster.h66
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp73
-rw-r--r--qpid/cpp/src/qpid/broker/QueuedMessage.h13
-rw-r--r--qpid/cpp/src/qpid/broker/RecoverableExchange.h6
-rw-r--r--qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp18
-rw-r--r--qpid/cpp/src/qpid/cluster/PollerDispatch.cpp9
-rw-r--r--qpid/cpp/src/qpid/cluster/PollerDispatch.h1
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp154
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h86
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp65
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Core.cpp75
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Core.h93
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp123
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/EventHandler.h95
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/HandlerBase.cpp36
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/HandlerBase.h54
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/LockedMap.h73
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp105
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageHandler.h73
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/README.txt2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp111
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/WiringHandler.h75
-rw-r--r--qpid/cpp/src/qpid/cluster/types.h9
-rw-r--r--qpid/cpp/src/tests/BrokerClusterCalls.cpp419
-rw-r--r--qpid/cpp/src/tests/Makefile.am51
-rw-r--r--qpid/cpp/src/tests/brokertest.py23
-rw-r--r--qpid/cpp/src/tests/cluster.mk2
-rwxr-xr-xqpid/cpp/src/tests/cluster2_tests.py116
-rwxr-xr-xqpid/cpp/src/tests/run_cluster_tests2
-rw-r--r--qpid/cpp/src/tests/test_env.sh.in1
-rw-r--r--qpid/cpp/xml/cluster.xml58
40 files changed, 2199 insertions, 87 deletions
diff --git a/qpid/cpp/design_docs/new-cluster-plan.txt b/qpid/cpp/design_docs/new-cluster-plan.txt
index 781876e55a..571c3d865c 100644
--- a/qpid/cpp/design_docs/new-cluster-plan.txt
+++ b/qpid/cpp/design_docs/new-cluster-plan.txt
@@ -146,10 +146,6 @@ reject(QueuedMessage):
isRejecting = true
mcast reject(qmsg)
-# FIXME no longer needed?
-drop(QueuedMessage)
- cleanup(qmsg)
-
*** MessageHandler and mcast messages
Types:
- struct QueueEntry { QueuedMessage qmsg; NodeId acquired; }
@@ -348,6 +344,9 @@ For 0-10 can use channel numbers & send whole frames packed into larger buffer.
Extend broker::Cluster interface to capture transaction context and completion.
Sequence number to generate per-node tx IDs.
Replicate transaction completion.
+** TODO [#B] Management support
+- Replicate management methods that modify queues - e.g. move, purge.
+- Report connections - local only or cluster-wide?
** TODO [#B] Batch CPG multicast messages
The new cluster design involves a lot of small multicast messages,
they need to be batched into larger CPG messages for efficiency.
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 2663987f75..c6fd021609 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -520,6 +520,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/Broker.cpp \
qpid/broker/Broker.h \
qpid/broker/BrokerImportExport.h \
+ qpid/broker/Cluster.h \
qpid/broker/Connection.cpp \
qpid/broker/Connection.h \
qpid/broker/ConnectionFactory.cpp \
@@ -589,6 +590,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/PriorityQueue.cpp \
qpid/broker/NameGenerator.cpp \
qpid/broker/NameGenerator.h \
+ qpid/broker/NullCluster.h \
qpid/broker/NullMessageStore.cpp \
qpid/broker/NullMessageStore.h \
qpid/broker/OwnershipToken.h \
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk
index 3ce4ce25b3..9543da12b7 100644
--- a/qpid/cpp/src/cluster.mk
+++ b/qpid/cpp/src/cluster.mk
@@ -101,6 +101,30 @@ cluster_la_LIBADD= -lcpg $(libcman) libqpidbroker.la libqpidclient.la
cluster_la_CXXFLAGS = $(AM_CXXFLAGS) -fno-strict-aliasing
cluster_la_LDFLAGS = $(PLUGINLDFLAGS)
+# Experimental new cluster plugin
+dmodule_LTLIBRARIES += cluster2.la
+cluster2_la_LIBADD = -lcpg libqpidbroker.la
+cluster2_la_LDFLAGS = $(PLUGINLDFLAGS)
+cluster2_la_SOURCES = \
+ qpid/cluster/Cpg.cpp \
+ qpid/cluster/Cpg.h \
+ qpid/cluster/PollerDispatch.cpp \
+ qpid/cluster/PollerDispatch.h \
+ qpid/cluster/exp/BrokerHandler.cpp \
+ qpid/cluster/exp/BrokerHandler.h \
+ qpid/cluster/exp/Cluster2Plugin.cpp \
+ qpid/cluster/exp/Core.cpp \
+ qpid/cluster/exp/Core.h \
+ qpid/cluster/exp/EventHandler.cpp \
+ qpid/cluster/exp/EventHandler.h \
+ qpid/cluster/exp/HandlerBase.cpp \
+ qpid/cluster/exp/HandlerBase.h \
+ qpid/cluster/exp/MessageHandler.cpp \
+ qpid/cluster/exp/MessageHandler.h \
+ qpid/cluster/exp/WiringHandler.cpp \
+ qpid/cluster/exp/WiringHandler.h
+
+
# The watchdog plugin and helper executable
dmoduleexec_LTLIBRARIES += watchdog.la
watchdog_la_SOURCES = qpid/cluster/WatchDogPlugin.cpp
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 598c43b1d8..1004fa2bcd 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -25,6 +25,7 @@
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
#include "qpid/broker/MessageStoreModule.h"
+#include "qpid/broker/NullCluster.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/RecoveryManagerImpl.h"
#include "qpid/broker/SaslAuthenticator.h"
@@ -176,6 +177,7 @@ Broker::Broker(const Broker::Options& conf) :
conf.qmf2Support)
: 0),
store(new NullMessageStore),
+ cluster(new NullCluster),
acl(0),
dataDir(conf.noDataDir ? std::string() : conf.dataDir),
queues(this),
@@ -757,7 +759,6 @@ void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) {
const std::string Broker::TCP_TRANSPORT("tcp");
-
std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
const std::string& name,
bool durable,
@@ -811,10 +812,11 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
void Broker::deleteQueue(const std::string& name, const std::string& userId,
const std::string& connectionId, QueueFunctor check)
{
- if (acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,NULL)) {
- throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId));
+ if ((userId.size() || connectionId.size()) && // Skip ACL check if ID is empty.
+ acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,NULL)) {
+ throw framing::UnauthorizedAccessException(
+ QPID_MSG("ACL denied queue delete request from " << userId));
}
-
Queue::shared_ptr queue = queues.find(name);
if (queue) {
if (check) check(queue);
@@ -878,6 +880,7 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
ManagementAgent::toMap(arguments),
"created"));
}
+ getCluster().create(*result.first);
}
return result;
}
@@ -885,8 +888,8 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
void Broker::deleteExchange(const std::string& name, const std::string& userId,
const std::string& connectionId)
{
- if (acl) {
- if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) )
+ if ((userId.size() || connectionId.size()) && // Skip ACL check if ID is empty.
+ acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL)) {
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << userId));
}
@@ -902,7 +905,7 @@ void Broker::deleteExchange(const std::string& name, const std::string& userId,
if (managementAgent.get())
managementAgent->raiseEvent(_qmf::EventExchangeDelete(connectionId, userId, name));
-
+ getCluster().destroy(*exchange);
}
void Broker::bind(const std::string& queueName,
@@ -937,6 +940,7 @@ void Broker::bind(const std::string& queueName,
queueName, key, ManagementAgent::toMap(arguments)));
}
}
+ getCluster().bind(*queue, *exchange, key, arguments);
}
}
@@ -965,14 +969,19 @@ void Broker::unbind(const std::string& queueName,
} else {
if (exchange->unbind(queue, key, 0)) {
if (exchange->isDurable() && queue->isDurable()) {
- store->unbind(*exchange, *queue, key, qpid::framing::FieldTable());
+ store->unbind(*exchange, *queue, key, framing::FieldTable());
}
if (managementAgent.get()) {
managementAgent->raiseEvent(_qmf::EventUnbind(connectionId, userId, exchangeName, queueName, key));
}
+ getCluster().unbind(*queue, *exchange, key, framing::FieldTable());
}
}
}
+void Broker::setCluster(std::auto_ptr<Cluster> c) { cluster = c; }
+
+Cluster& Broker::getCluster() { return *cluster; }
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 40f7b6273f..76d049df75 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -72,6 +72,7 @@ namespace broker {
class ConnectionState;
class ExpiryPolicy;
class Message;
+class Cluster;
static const uint16_t DEFAULT_PORT=5672;
@@ -165,6 +166,7 @@ public:
std::auto_ptr<management::ManagementAgent> managementAgent;
ProtocolFactoryMap protocolFactories;
std::auto_ptr<MessageStore> store;
+ std::auto_ptr<Cluster> cluster;
AclModule* acl;
DataDir dataDir;
@@ -294,6 +296,9 @@ public:
bool isClusterUpdatee() const { return clusterUpdatee; }
void setClusterUpdatee(bool set) { clusterUpdatee = set; }
+ QPID_BROKER_EXTERN void setCluster(std::auto_ptr<Cluster> c);
+ QPID_BROKER_EXTERN Cluster& getCluster();
+
management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
ConnectionCounter& getConnectionCounter() {return connectionCounter;}
diff --git a/qpid/cpp/src/qpid/broker/Cluster.h b/qpid/cpp/src/qpid/broker/Cluster.h
new file mode 100644
index 0000000000..9bbf245498
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/Cluster.h
@@ -0,0 +1,104 @@
+#ifndef QPID_BROKER_CLUSTER_H
+#define QPID_BROKER_CLUSTER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+
+namespace framing {
+class FieldTable;
+}
+
+namespace broker {
+
+class Message;
+struct QueuedMessage;
+class Queue;
+class Exchange;
+
+/**
+ * NOTE: this is part of an experimental cluster implementation that is not
+ * yet fully functional. The original cluster implementation remains in place.
+ * See ../cluster/new-cluster-design.txt
+ *
+ * Interface for cluster implementations. Functions on this interface are
+ * called at relevant points in the Broker's processing.
+ */
+class Cluster
+{
+ public:
+ virtual ~Cluster() {}
+
+ // Messages
+
+ /** In Exchange::route, before the message is enqueued. */
+ virtual void routing(const boost::intrusive_ptr<Message>&) = 0;
+
+ /** A message is delivered to a queue.
+ * Called before actually pushing the message to the queue.
+ *@return If true the message should be pushed to the queue now.
+ * otherwise the cluster code will push the message when it is replicated.
+ */
+ virtual bool enqueue(Queue& queue, const boost::intrusive_ptr<Message>&) = 0;
+
+ /** In Exchange::route, after all enqueues for the message. */
+ virtual void routed(const boost::intrusive_ptr<Message>&) = 0;
+
+ /** A message is acquired by a local consumer, it is unavailable to replicas. */
+ virtual void acquire(const QueuedMessage&) = 0;
+
+ /** A locally-acquired message is released by the consumer and re-queued. */
+ virtual void release(const QueuedMessage&) = 0;
+
+ /** A message is removed from the queue. */
+ virtual void dequeue(const QueuedMessage&) = 0;
+
+ // Consumers
+
+ /** A new consumer subscribes to a queue. */
+ virtual void consume(const Queue&, size_t consumerCount) = 0;
+ /** A consumer cancels its subscription to a queue */
+ virtual void cancel(const Queue&, size_t consumerCount) = 0;
+
+ // Wiring
+
+ /** A queue is created */
+ virtual void create(const Queue&) = 0;
+ /** A queue is destroyed */
+ virtual void destroy(const Queue&) = 0;
+ /** An exchange is created */
+ virtual void create(const Exchange&) = 0;
+ /** An exchange is destroyed */
+ virtual void destroy(const Exchange&) = 0;
+ /** A binding is created */
+ virtual void bind(const Queue&, const Exchange&,
+ const std::string& key, const framing::FieldTable& args) = 0;
+ /** A binding is removed */
+ virtual void unbind(const Queue&, const Exchange&,
+ const std::string& key, const framing::FieldTable& args) = 0;
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_CLUSTER_H*/
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp
index d68845062d..6ec68fbf47 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.cpp
+++ b/qpid/cpp/src/qpid/broker/Exchange.cpp
@@ -24,6 +24,9 @@
#include "qpid/broker/Exchange.h"
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/FedOps.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Cluster.h"
+#include "qpid/management/ManagementAgent.h"
#include "qpid/broker/Queue.h"
#include "qpid/framing/MessageProperties.h"
#include "qpid/framing/reply_exceptions.h"
@@ -102,10 +105,23 @@ class ExInfo {
};
}
+// Bracket a scope with calls to Cluster::routing and Cluster::routed
+struct ScopedClusterRouting {
+ Broker* broker;
+ boost::intrusive_ptr<Message> message;
+ ScopedClusterRouting(Broker* b, boost::intrusive_ptr<Message> m)
+ : broker(b), message(m) {
+ if (broker) broker->getCluster().routing(message);
+ }
+ ~ScopedClusterRouting() {
+ if (broker) broker->getCluster().routed(message);
+ }
+};
+
void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
{
+ ScopedClusterRouting scr(broker, &msg.getMessage());
int count = 0;
-
if (b.get()) {
// Block the content release if the message is transient AND there is more than one binding
if (!msg.getMessage().isPersistent() && b->size() > 1) {
diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
index 1c8d26c4f7..54e6d5302c 100644
--- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -24,6 +24,9 @@
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
#include "qpid/broker/TopicExchange.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Cluster.h"
+#include "qpid/log/Statement.h"
#include "qpid/management/ManagementDirectExchange.h"
#include "qpid/management/ManagementTopicExchange.h"
#include "qpid/framing/reply_exceptions.h"
@@ -88,11 +91,15 @@ void ExchangeRegistry::destroy(const string& name){
}
Exchange::shared_ptr ExchangeRegistry::get(const string& name){
+ Exchange::shared_ptr ex = find(name);
+ if (!ex) throw framing::NotFoundException(QPID_MSG("Exchange not found: " << name));
+ return ex;
+}
+
+Exchange::shared_ptr ExchangeRegistry::find(const string& name){
RWlock::ScopedRlock locker(lock);
ExchangeMap::iterator i = exchanges.find(name);
- if (i == exchanges.end())
- throw framing::NotFoundException(QPID_MSG("Exchange not found: " << name));
- return i->second;
+ return (i == exchanges.end()) ? Exchange::shared_ptr() : i->second;
}
bool ExchangeRegistry::registerExchange(const Exchange::shared_ptr& ex) {
diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.h b/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
index 2b75a8f3cf..5f15ad22e6 100644
--- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
+++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
@@ -55,6 +55,7 @@ class ExchangeRegistry{
const qpid::framing::FieldTable& args = framing::FieldTable());
QPID_BROKER_EXTERN void destroy(const std::string& name);
QPID_BROKER_EXTERN Exchange::shared_ptr get(const std::string& name);
+ QPID_BROKER_EXTERN Exchange::shared_ptr find(const std::string& name);
Exchange::shared_ptr getDefault();
/**
diff --git a/qpid/cpp/src/qpid/broker/NullCluster.h b/qpid/cpp/src/qpid/broker/NullCluster.h
new file mode 100644
index 0000000000..995ec57058
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/NullCluster.h
@@ -0,0 +1,66 @@
+#ifndef QPID_BROKER_NULLCLUSTER_H
+#define QPID_BROKER_NULLCLUSTER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/broker/Cluster.h>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * No-op implementation of Cluster interface, installed by broker when
+ * no cluster plug-in is present or clustering is disabled.
+ */
+class NullCluster : public Cluster
+{
+ public:
+
+ // Messages
+
+ virtual void routing(const boost::intrusive_ptr<Message>&) {}
+ virtual bool enqueue(Queue&, const boost::intrusive_ptr<Message>&) { return true; }
+ virtual void routed(const boost::intrusive_ptr<Message>&) {}
+ virtual void acquire(const QueuedMessage&) {}
+ virtual void release(const QueuedMessage&) {}
+ virtual void dequeue(const QueuedMessage&) {}
+
+ // Consumers
+
+ virtual void consume(const Queue&, size_t) {}
+ virtual void cancel(const Queue&, size_t) {}
+
+ // Wiring
+
+ virtual void create(const Queue&) {}
+ virtual void destroy(const Queue&) {}
+ virtual void create(const Exchange&) {}
+ virtual void destroy(const Exchange&) {}
+ virtual void bind(const Queue&, const Exchange&,
+ const std::string&, const framing::FieldTable&) {}
+ virtual void unbind(const Queue&, const Exchange&,
+ const std::string&, const framing::FieldTable&) {}
+};
+
+}} // namespace qpid::broker
+
+#endif
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index dd3f982699..20d9361909 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -20,6 +20,7 @@
*/
#include "qpid/broker/Broker.h"
+#include "qpid/broker/Cluster.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueEvents.h"
#include "qpid/broker/Exchange.h"
@@ -152,6 +153,10 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){
// Check for deferred delivery in a cluster.
if (broker && broker->deferDelivery(name, msg))
return;
+ // Same thing but for the new cluster interface.
+ if (broker && !broker->getCluster().enqueue(*this, msg))
+ return;
+
if (msg->isImmediate() && getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg);
@@ -221,16 +226,33 @@ void Queue::requeue(const QueuedMessage& msg){
}
}
}
+ if (broker) broker->getCluster().release(msg);
copy.notify();
}
+// Inform the cluster of an acquired message on exit from a function
+// that does the acquiring. ClusterAcquireOnExit is declared *before*
+// any locks are taken. The calling function sets qmsg to the acquired
+// message with a lock held, but the call to Cluster::acquire() will
+// be outside the lock.
+struct ClusterAcquireOnExit {
+ Broker* broker;
+ QueuedMessage qmsg;
+ ClusterAcquireOnExit(Broker* b) : broker(b) {}
+ ~ClusterAcquireOnExit() {
+ if (broker && qmsg.queue) broker->getCluster().acquire(qmsg);
+ }
+};
+
bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
{
+ ClusterAcquireOnExit willAcquire(broker); // Outside lock
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
QPID_LOG(debug, "Attempting to acquire message at " << position);
if (messages->remove(position, message)) {
QPID_LOG(debug, "Acquired message at " << position << " from " << name);
+ willAcquire.qmsg = message;
return true;
} else {
QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
@@ -277,6 +299,7 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
while (true) {
+ ClusterAcquireOnExit willAcquire(broker); // Outside the lock
Mutex::ScopedLock locker(messageLock);
if (messages->empty()) {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
@@ -293,6 +316,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
if (c->filter(msg.payload)) {
if (c->accept(msg.payload)) {
m = msg;
+ willAcquire.qmsg = msg;
pop();
return CONSUMED;
} else {
@@ -377,42 +401,51 @@ QueuedMessage Queue::find(SequenceNumber pos) const {
return msg;
}
-void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
+void Queue::consume(Consumer::shared_ptr c, bool requestExclusive) {
assertClusterSafe();
- Mutex::ScopedLock locker(consumerLock);
- if(exclusive) {
- throw ResourceLockedException(
- QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
- } else if(requestExclusive) {
- if(consumerCount) {
+ size_t consumers;
+ {
+ Mutex::ScopedLock locker(consumerLock);
+ if(exclusive) {
throw ResourceLockedException(
- QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
- } else {
- exclusive = c->getSession();
+ QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
+ } else if(requestExclusive) {
+ if(consumerCount) {
+ throw ResourceLockedException(
+ QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
+ } else {
+ exclusive = c->getSession();
+ }
}
+ consumers = ++consumerCount;
}
- consumerCount++;
if (mgmtObject != 0)
mgmtObject->inc_consumerCount ();
//reset auto deletion timer if necessary
if (autoDeleteTimeout && autoDeleteTask) {
autoDeleteTask->cancel();
}
+ if (broker) broker->getCluster().consume(*this, consumers);
}
void Queue::cancel(Consumer::shared_ptr c){
removeListener(c);
- Mutex::ScopedLock locker(consumerLock);
- consumerCount--;
- if(exclusive) exclusive = 0;
- if (mgmtObject != 0)
- mgmtObject->dec_consumerCount ();
+ size_t consumers;
+ {
+ Mutex::ScopedLock locker(consumerLock);
+ consumers = --consumerCount;
+ if(exclusive) exclusive = 0;
+ if (mgmtObject != 0)
+ mgmtObject->dec_consumerCount ();
+ }
+ if (broker) broker->getCluster().cancel(*this, consumers);
}
QueuedMessage Queue::get(){
+ ClusterAcquireOnExit willAcquire(broker); // Outside lock
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
- messages->pop(msg);
+ if (messages->pop(msg)) willAcquire.qmsg = msg;
return msg;
}
@@ -519,6 +552,7 @@ void Queue::pop()
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
assertClusterSafe();
+ QueuedMessage qm;
QueueListeners::NotificationSet copy;
QueuedMessage removed;
bool dequeueRequired = false;
@@ -658,7 +692,6 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
ScopedUse u(barrier);
if (!u.acquired) return false;
-
{
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return false;
@@ -666,6 +699,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
dequeued(msg);
}
}
+ if (!ctxt && broker) broker->getCluster().dequeue(msg); // Outside lock
// This check prevents messages which have been forced persistent on one queue from dequeuing
// from another on which no forcing has taken place and thus causing a store error.
bool fp = msg.payload->isForcedPersistent();
@@ -682,6 +716,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
void Queue::dequeueCommitted(const QueuedMessage& msg)
{
+ if (broker) broker->getCluster().dequeue(msg); // Outside lock
Mutex::ScopedLock locker(messageLock);
dequeued(msg);
if (mgmtObject != 0) {
@@ -726,6 +761,7 @@ void Queue::create(const FieldTable& _settings)
store->create(*this, _settings);
}
configureImpl(_settings);
+ if (broker) broker->getCluster().create(*this);
}
@@ -848,6 +884,7 @@ void Queue::destroyed()
}
if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
notifyDeleted();
+ if (broker) broker->getCluster().destroy(*this);
}
void Queue::notifyDeleted()
diff --git a/qpid/cpp/src/qpid/broker/QueuedMessage.h b/qpid/cpp/src/qpid/broker/QueuedMessage.h
index 35e48b11f3..d1b0c1b41c 100644
--- a/qpid/cpp/src/qpid/broker/QueuedMessage.h
+++ b/qpid/cpp/src/qpid/broker/QueuedMessage.h
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,13 +34,12 @@ struct QueuedMessage
framing::SequenceNumber position;
Queue* queue;
- QueuedMessage() : queue(0) {}
- QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) :
+ QueuedMessage(Queue* q=0) : position(0), queue(q) {}
+ QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) :
payload(msg), position(sn), queue(q) {}
- QueuedMessage(Queue* q) : queue(q) {}
-
+
};
- inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) { return a.position < b.position; }
+ inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) { return a.position < b.position; }
}}
diff --git a/qpid/cpp/src/qpid/broker/RecoverableExchange.h b/qpid/cpp/src/qpid/broker/RecoverableExchange.h
index ca6cc1541e..ee0848ebed 100644
--- a/qpid/cpp/src/qpid/broker/RecoverableExchange.h
+++ b/qpid/cpp/src/qpid/broker/RecoverableExchange.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -44,6 +44,8 @@ public:
const std::string& routingKey,
qpid::framing::FieldTable& args) = 0;
virtual ~RecoverableExchange() {};
+
+ virtual const std::string& getName() const = 0;
};
}}
diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index d08409695e..9db366fd20 100644
--- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -62,7 +62,7 @@ class RecoverableQueueImpl : public RecoverableQueue
public:
RecoverableQueueImpl(const boost::shared_ptr<Queue>& _queue) : queue(_queue) {}
~RecoverableQueueImpl() {};
- void setPersistenceId(uint64_t id);
+ void setPersistenceId(uint64_t id);
uint64_t getPersistenceId() const;
const std::string& getName() const;
void setExternalQueueStore(ExternalQueueStore* inst);
@@ -80,6 +80,7 @@ public:
RecoverableExchangeImpl(Exchange::shared_ptr _exchange, QueueRegistry& _queues) : exchange(_exchange), queues(_queues) {}
void setPersistenceId(uint64_t id);
void bind(const std::string& queue, const std::string& routingKey, qpid::framing::FieldTable& args);
+ const std::string& getName() const;
};
class RecoverableConfigImpl : public RecoverableConfig
@@ -133,7 +134,7 @@ RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buff
return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message));
}
-RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid,
+RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid,
std::auto_ptr<TPCTransactionContext> txn)
{
DtxBuffer::shared_ptr buffer(new DtxBuffer());
@@ -202,7 +203,7 @@ void RecoverableQueueImpl::setPersistenceId(uint64_t id)
{
queue->setPersistenceId(id);
}
-
+
uint64_t RecoverableQueueImpl::getPersistenceId() const
{
return queue->getPersistenceId();
@@ -212,7 +213,7 @@ const std::string& RecoverableQueueImpl::getName() const
{
return queue->getName();
}
-
+
void RecoverableQueueImpl::setExternalQueueStore(ExternalQueueStore* inst)
{
queue->setExternalQueueStore(inst);
@@ -245,6 +246,11 @@ void RecoverableExchangeImpl::bind(const string& queueName,
queue->bound(exchange->getName(), key, args);
}
+const std::string& RecoverableExchangeImpl::getName() const
+{
+ return exchange->getName();
+}
+
void RecoverableMessageImpl::dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue)
{
buffer->enlist(TxOp::shared_ptr(new RecoveredDequeue(queue, msg)));
diff --git a/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp b/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp
index b8d94b95a5..43c171efe8 100644
--- a/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp
+++ b/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp
@@ -37,9 +37,11 @@ PollerDispatch::PollerDispatch(Cpg& c, boost::shared_ptr<sys::Poller> p,
started(false)
{}
-PollerDispatch::~PollerDispatch() {
- if (started)
- dispatchHandle.stopWatch();
+PollerDispatch::~PollerDispatch() { stop(); }
+
+void PollerDispatch::stop() {
+ if (started) dispatchHandle.stopWatch();
+ started = false;
}
void PollerDispatch::start() {
@@ -54,6 +56,7 @@ void PollerDispatch::dispatch(sys::DispatchHandle& h) {
h.rewatch();
} catch (const std::exception& e) {
QPID_LOG(critical, "Error in cluster dispatch: " << e.what());
+ stop();
onError();
}
}
diff --git a/qpid/cpp/src/qpid/cluster/PollerDispatch.h b/qpid/cpp/src/qpid/cluster/PollerDispatch.h
index 63801e0de9..f16d5ece95 100644
--- a/qpid/cpp/src/qpid/cluster/PollerDispatch.h
+++ b/qpid/cpp/src/qpid/cluster/PollerDispatch.h
@@ -41,6 +41,7 @@ class PollerDispatch {
~PollerDispatch();
void start();
+ void stop();
private:
// Poller callbacks
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp
new file mode 100644
index 0000000000..a6eb12ed57
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp
@@ -0,0 +1,154 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Core.h"
+#include "BrokerHandler.h"
+#include "qpid/framing/ClusterMessageRoutingBody.h"
+#include "qpid/framing/ClusterMessageRoutedBody.h"
+#include "qpid/framing/ClusterMessageEnqueueBody.h"
+#include "qpid/framing/ClusterMessageDequeueBody.h"
+#include "qpid/framing/ClusterWiringCreateQueueBody.h"
+#include "qpid/framing/ClusterWiringCreateExchangeBody.h"
+#include "qpid/framing/ClusterWiringDestroyQueueBody.h"
+#include "qpid/framing/ClusterWiringDestroyExchangeBody.h"
+#include "qpid/framing/ClusterWiringBindBody.h"
+#include "qpid/framing/ClusterWiringUnbindBody.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace framing;
+using namespace broker;
+
+namespace {
+// noReplicate means the current thread is handling a message
+// received from the cluster so it should not be replciated.
+QPID_TSS bool tssNoReplicate = false;
+
+// Routing ID of the message being routed in the current thread.
+// 0 if we are not currently routing a message.
+QPID_TSS RoutingId tssRoutingId = 0;
+}
+
+BrokerHandler::ScopedSuppressReplication::ScopedSuppressReplication() {
+ assert(!tssNoReplicate);
+ tssNoReplicate = true;
+}
+
+BrokerHandler::ScopedSuppressReplication::~ScopedSuppressReplication() {
+ assert(tssNoReplicate);
+ tssNoReplicate = false;
+}
+
+BrokerHandler::BrokerHandler(Core& c) : core(c) {}
+
+RoutingId BrokerHandler::nextRoutingId() {
+ RoutingId id = ++routingId;
+ if (id == 0) id = ++routingId; // Avoid 0 on wrap-around.
+ return id;
+}
+
+void BrokerHandler::routing(const boost::intrusive_ptr<Message>&) { }
+
+bool BrokerHandler::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& msg)
+{
+ if (tssNoReplicate) return true;
+ if (!tssRoutingId) { // This is the first enqueue, so send the message
+ tssRoutingId = nextRoutingId();
+ // FIXME aconway 2010-10-20: replicate message in fixed size buffers.
+ std::string data(msg->encodedSize(),char());
+ framing::Buffer buf(&data[0], data.size());
+ msg->encode(buf);
+ core.mcast(ClusterMessageRoutingBody(ProtocolVersion(), tssRoutingId, data));
+ core.getRoutingMap().put(tssRoutingId, msg);
+ }
+ core.mcast(ClusterMessageEnqueueBody(ProtocolVersion(), tssRoutingId, queue.getName()));
+ // TODO aconway 2010-10-21: configable option for strict (wait
+ // for CPG deliver to do local deliver) vs. loose (local deliver
+ // immediately).
+ return false;
+}
+
+void BrokerHandler::routed(const boost::intrusive_ptr<Message>&) {
+ if (tssRoutingId) { // we enqueued at least one message.
+ core.mcast(ClusterMessageRoutedBody(ProtocolVersion(), tssRoutingId));
+ // Note: routingMap is cleaned up on CPG delivery in MessageHandler.
+ tssRoutingId = 0;
+ }
+}
+
+void BrokerHandler::dequeue(const broker::QueuedMessage& qm) {
+ if (tssNoReplicate) return;
+ // FIXME aconway 2010-10-28: we also need to delay completion of the
+ // ack that caused this dequeue until self-delivery of the mcast below.
+ core.mcast(ClusterMessageDequeueBody(
+ ProtocolVersion(), qm.queue->getName(), qm.position));
+}
+
+void BrokerHandler::create(const broker::Queue& q) {
+ if (tssNoReplicate) return;
+ std::string data(q.encodedSize(), '\0');
+ framing::Buffer buf(&data[0], data.size());
+ q.encode(buf);
+ core.mcast(ClusterWiringCreateQueueBody(ProtocolVersion(), data));
+}
+
+void BrokerHandler::destroy(const broker::Queue& q) {
+ if (tssNoReplicate) return;
+ core.mcast(ClusterWiringDestroyQueueBody(ProtocolVersion(), q.getName()));
+}
+
+void BrokerHandler::create(const broker::Exchange& ex) {
+ if (tssNoReplicate) return;
+ std::string data(ex.encodedSize(), '\0');
+ framing::Buffer buf(&data[0], data.size());
+ ex.encode(buf);
+ core.mcast(ClusterWiringCreateExchangeBody(ProtocolVersion(), data));
+}
+
+void BrokerHandler::destroy(const broker::Exchange& ex) {
+ if (tssNoReplicate) return;
+ core.mcast(ClusterWiringDestroyExchangeBody(ProtocolVersion(), ex.getName()));
+}
+
+void BrokerHandler::bind(const broker::Queue& q, const broker::Exchange& ex,
+ const std::string& key, const framing::FieldTable& args)
+{
+ if (tssNoReplicate) return;
+ core.mcast(ClusterWiringBindBody(
+ ProtocolVersion(), q.getName(), ex.getName(), key, args));
+}
+
+void BrokerHandler::unbind(const broker::Queue& q, const broker::Exchange& ex,
+ const std::string& key, const framing::FieldTable& args)
+{
+ if (tssNoReplicate) return;
+ core.mcast(ClusterWiringUnbindBody(
+ ProtocolVersion(), q.getName(), ex.getName(), key, args));
+}
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h b/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h
new file mode 100644
index 0000000000..c53688125a
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h
@@ -0,0 +1,86 @@
+#ifndef QPID_CLUSTER_BROKERHANDLER_H
+#define QPID_CLUSTER_BROKERHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/Cluster.h"
+#include "qpid/sys/AtomicValue.h"
+
+namespace qpid {
+namespace cluster {
+class Core;
+
+// TODO aconway 2010-10-19: experimental cluster code.
+
+/**
+ * Implements broker::Cluster interface, handles events in broker code.
+ */
+class BrokerHandler : public broker::Cluster
+{
+ public:
+ /** Suppress replication while in scope.
+ * Used to prevent re-replication of messages received from the cluster.
+ */
+ struct ScopedSuppressReplication {
+ ScopedSuppressReplication();
+ ~ScopedSuppressReplication();
+ };
+
+ BrokerHandler(Core&);
+
+ // FIXME aconway 2010-10-20: implement all points.
+
+ // Messages
+
+ void routing(const boost::intrusive_ptr<broker::Message>&);
+ bool enqueue(broker::Queue&, const boost::intrusive_ptr<broker::Message>&);
+ void routed(const boost::intrusive_ptr<broker::Message>&);
+ void acquire(const broker::QueuedMessage&) {}
+ void release(const broker::QueuedMessage&) {}
+ void dequeue(const broker::QueuedMessage&);
+
+ // Consumers
+
+ void consume(const broker::Queue&, size_t) {}
+ void cancel(const broker::Queue&, size_t) {}
+
+ // Wiring
+
+ void create(const broker::Queue&);
+ void destroy(const broker::Queue&);
+ void create(const broker::Exchange&);
+ void destroy(const broker::Exchange&);
+ void bind(const broker::Queue&, const broker::Exchange&,
+ const std::string&, const framing::FieldTable&);
+ void unbind(const broker::Queue&, const broker::Exchange&,
+ const std::string&, const framing::FieldTable&);
+
+
+ private:
+ uint32_t nextRoutingId();
+
+ Core& core;
+ sys::AtomicValue<uint32_t> routingId;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_BROKERHANDLER_H*/
diff --git a/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp b/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
new file mode 100644
index 0000000000..28b7dcec2e
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <qpid/Options.h>
+#include <qpid/broker/Broker.h>
+#include "Core.h"
+
+namespace qpid {
+namespace cluster {
+using broker::Broker;
+
+// TODO aconway 2010-10-19: experimental new cluster code.
+
+/**
+ * Plugin for the cluster.
+ */
+struct Cluster2Plugin : public Plugin {
+ struct Opts : public Options {
+ Core::Settings& settings;
+ Opts(Core::Settings& s) : Options("Cluster Options"), settings(s) {
+ addOptions()
+ ("cluster2-name", optValue(settings.name, "NAME"), "Name of cluster to join");
+ // TODO aconway 2010-10-19: copy across other options from ClusterPlugin.h
+ }
+ };
+
+ Core::Settings settings;
+ Opts options;
+ Core* core; // Core deletes itself on shutdown.
+
+ Cluster2Plugin() : options(settings), core(0) {}
+
+ Options* getOptions() { return &options; }
+
+ void earlyInitialize(Plugin::Target& target) {
+ if (settings.name.empty()) return;
+ Broker* broker = dynamic_cast<Broker*>(&target);
+ if (!broker) return;
+ core = new Core(settings, *broker);
+ }
+
+ void initialize(Plugin::Target& target) {
+ Broker* broker = dynamic_cast<Broker*>(&target);
+ if (broker && core) core->initialize();
+ }
+};
+
+static Cluster2Plugin instance; // Static initialization.
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.cpp b/qpid/cpp/src/qpid/cluster/exp/Core.cpp
new file mode 100644
index 0000000000..93ed96b9d8
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/Core.cpp
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Core.h"
+#include "EventHandler.h"
+#include "BrokerHandler.h"
+#include "WiringHandler.h"
+#include "MessageHandler.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/SignalHandler.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/log/Statement.h"
+#include <sys/uio.h> // For iovec
+
+namespace qpid {
+namespace cluster {
+
+Core::Core(const Settings& s, broker::Broker& b) :
+ broker(b),
+ eventHandler(new EventHandler(*this))
+{
+ eventHandler->add(boost::shared_ptr<HandlerBase>(new WiringHandler(*eventHandler)));
+ eventHandler->add(boost::shared_ptr<HandlerBase>(new MessageHandler(*eventHandler)));
+
+ std::auto_ptr<BrokerHandler> bh(new BrokerHandler(*this));
+ brokerHandler = bh.get();
+ // BrokerHandler belongs to Broker
+ broker.setCluster(std::auto_ptr<broker::Cluster>(bh));
+ eventHandler->start();
+ eventHandler->getCpg().join(s.name);
+ // TODO aconway 2010-11-18: logging standards
+ QPID_LOG(notice, "cluster: joined " << s.name << ", member-id="<< eventHandler->getSelf());
+}
+
+void Core::initialize() {}
+
+void Core::fatal() {
+ // FIXME aconway 2010-10-20: error handling
+ assert(0);
+ broker::SignalHandler::shutdown();
+}
+
+void Core::mcast(const framing::AMQBody& body) {
+ QPID_LOG(trace, "cluster multicast: " << body);
+ // FIXME aconway 2010-10-20: use Multicaster, or bring in its features.
+ // here we multicast Frames rather than Events.
+ framing::AMQFrame f(body);
+ std::string data(f.encodedSize(), char());
+ framing::Buffer buf(&data[0], data.size());
+ f.encode(buf);
+ iovec iov = { buf.getPointer(), buf.getSize() };
+ while (!eventHandler->getCpg().mcast(&iov, 1))
+ ::usleep(1000); // FIXME aconway 2010-10-20: flow control
+}
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.h b/qpid/cpp/src/qpid/cluster/exp/Core.h
new file mode 100644
index 0000000000..3e53e0a65b
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/Core.h
@@ -0,0 +1,93 @@
+#ifndef QPID_CLUSTER_CORE_H
+#define QPID_CLUSTER_CORE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <memory>
+#include "LockedMap.h"
+#include "qpid/cluster/types.h"
+#include "qpid/cluster/Cpg.h"
+#include "qpid/broker/QueuedMessage.h"
+
+// TODO aconway 2010-10-19: experimental cluster code.
+
+namespace qpid {
+
+namespace framing{
+class AMQBody;
+}
+
+namespace broker {
+class Broker;
+}
+
+namespace cluster {
+class EventHandler;
+class BrokerHandler;
+
+/**
+ * Cluster core state machine.
+ * Holds together the various objects that implement cluster behavior,
+ * and holds state that is shared by multiple components.
+ *
+ * Thread safe: called from broker connection threads and CPG dispatch threads.
+ */
+class Core
+{
+ public:
+ /** Configuration settings */
+ struct Settings {
+ std::string name;
+ };
+
+ typedef LockedMap<RoutingId, boost::intrusive_ptr<broker::Message> > RoutingMap;
+
+ /** Constructed during Plugin::earlyInitialize() */
+ Core(const Settings&, broker::Broker&);
+
+ /** Called during Plugin::initialize() */
+ void initialize();
+
+ /** Shut down broker due to fatal error. Caller should log a critical message */
+ void fatal();
+
+ /** Multicast an event */
+ void mcast(const framing::AMQBody&);
+
+ broker::Broker& getBroker() { return broker; }
+ EventHandler& getEventHandler() { return *eventHandler; }
+ BrokerHandler& getBrokerHandler() { return *brokerHandler; }
+
+ /** Map of messages that are currently being routed.
+ * Used to pass messages being routed from BrokerHandler to MessageHandler
+ */
+ RoutingMap& getRoutingMap() { return routingMap; }
+ private:
+ broker::Broker& broker;
+ std::auto_ptr<EventHandler> eventHandler; // Handles CPG events.
+ BrokerHandler* brokerHandler; // Handles broker events.
+ RoutingMap routingMap;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_CORE_H*/
diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
new file mode 100644
index 0000000000..c0e3e5fc42
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Core.h"
+#include "EventHandler.h"
+#include "HandlerBase.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/cluster/types.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AllInvoker.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace cluster {
+
+EventHandler::EventHandler(Core& c) :
+ core(c),
+ cpg(*this), // FIXME aconway 2010-10-20: belongs on Core.
+ dispatcher(cpg, core.getBroker().getPoller(), boost::bind(&Core::fatal, &core)),
+ self(cpg.self())
+{}
+
+EventHandler::~EventHandler() {}
+
+void EventHandler::add(const boost::shared_ptr<HandlerBase>& handler) {
+ handlers.push_back(handler);
+}
+
+void EventHandler::start() {
+ dispatcher.start();
+}
+
+// Print member ID or "self" if member is self
+struct PrettyId {
+ MemberId id, self;
+ PrettyId(const MemberId& id_, const MemberId& self_) : id(id_), self(self_) {}
+};
+
+std::ostream& operator<<(std::ostream& o, const PrettyId& id) {
+ if (id.id == id.self) return o << "self";
+ else return o << id.id;
+}
+
+// Deliver CPG message.
+void EventHandler::deliver(
+ cpg_handle_t /*handle*/,
+ const cpg_name* /*group*/,
+ uint32_t nodeid,
+ uint32_t pid,
+ void* msg,
+ int msg_len)
+{
+ sender = MemberId(nodeid, pid);
+ framing::Buffer buf(static_cast<char*>(msg), msg_len);
+ framing::AMQFrame frame;
+ while (buf.available()) {
+ frame.decode(buf);
+ assert(frame.getBody());
+ QPID_LOG(trace, "cluster deliver: " << PrettyId(sender, self) << " "
+ << *frame.getBody());
+ try {
+ invoke(*frame.getBody());
+ } catch (const std::exception& e) {
+ // Note: exceptions are assumed to be survivable,
+ // fatal errors should log a message and call Core::fatal.
+ QPID_LOG(error, e.what());
+ }
+ }
+}
+
+void EventHandler::invoke(const framing::AMQBody& body) {
+ for (Handlers::iterator i = handlers.begin(); i != handlers.end(); ++i)
+ if ((*i)->invoke(body)) return;
+ QPID_LOG(error, "Cluster received unknown control: " << body );
+ assert(0); // Error handling
+}
+
+struct PrintAddrs {
+ PrintAddrs(const cpg_address* a, int n ) : addrs(a), count(n) {}
+ const cpg_address* addrs;
+ int count;
+};
+
+std::ostream& operator<<(std::ostream& o, const PrintAddrs& pa) {
+ for (const cpg_address* a = pa.addrs; a != pa.addrs+pa.count; ++a)
+ o << MemberId(*a) << " ";
+ return o;
+}
+
+// CPG config-change callback.
+void EventHandler::configChange (
+ cpg_handle_t /*handle*/,
+ const cpg_name */*group*/,
+ const cpg_address *members, int nMembers,
+ const cpg_address *left, int nLeft,
+ const cpg_address *joined, int nJoined)
+{
+ // FIXME aconway 2010-10-20: TODO
+ QPID_LOG(notice, "cluster: new membership: " << PrintAddrs(members, nMembers));
+ QPID_LOG_IF(notice, nLeft, "cluster: members left: " << PrintAddrs(left, nLeft));
+ QPID_LOG_IF(notice, nJoined, "cluster: members joined: " << PrintAddrs(joined, nJoined));
+}
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.h b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
new file mode 100644
index 0000000000..b946c27084
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
@@ -0,0 +1,95 @@
+#ifndef QPID_CLUSTER_EVENTHANDLER_H
+#define QPID_CLUSTER_EVENTHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// TODO aconway 2010-10-19: experimental cluster code.
+
+#include "qpid/cluster/Cpg.h"
+#include "qpid/cluster/PollerDispatch.h"
+#include "qpid/cluster/types.h"
+#include <boost/shared_ptr.hpp>
+#include <vector>
+
+namespace qpid {
+
+namespace framing {
+class AMQBody;
+}
+
+namespace cluster {
+class Core;
+class HandlerBase;
+
+/**
+ * Dispatch events received from a CPG group.
+ * A container for Handler objects that handle specific cluster.xml classes.
+ * Thread unsafe: only called in its own CPG deliver thread context.
+ */
+class EventHandler : public Cpg::Handler
+{
+ public:
+ EventHandler(Core&);
+ ~EventHandler();
+
+ /** Add a handler */
+ void add(const boost::shared_ptr<HandlerBase>&);
+
+ /** Start polling */
+ void start();
+
+ void deliver( // CPG deliver callback.
+ cpg_handle_t /*handle*/,
+ const struct cpg_name *group,
+ uint32_t /*nodeid*/,
+ uint32_t /*pid*/,
+ void* /*msg*/,
+ int /*msg_len*/);
+
+ void configChange( // CPG config change callback.
+ cpg_handle_t /*handle*/,
+ const struct cpg_name */*group*/,
+ const struct cpg_address */*members*/, int /*nMembers*/,
+ const struct cpg_address */*left*/, int /*nLeft*/,
+ const struct cpg_address */*joined*/, int /*nJoined*/
+ );
+
+ MemberId getSender() { return sender; }
+ MemberId getSelf() { return self; }
+ Core& getCore() { return core; }
+ Cpg& getCpg() { return cpg; }
+
+ private:
+ void invoke(const framing::AMQBody& body);
+
+ Core& core;
+ Cpg cpg;
+ PollerDispatch dispatcher;
+ MemberId sender; // sender of current event.
+ MemberId self;
+
+ typedef std::vector<boost::shared_ptr<HandlerBase> > Handlers;
+ Handlers handlers;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_EVENTHANDLER_H*/
diff --git a/qpid/cpp/src/qpid/cluster/exp/HandlerBase.cpp b/qpid/cpp/src/qpid/cluster/exp/HandlerBase.cpp
new file mode 100644
index 0000000000..c738fb2993
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/HandlerBase.cpp
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "HandlerBase.h"
+#include "EventHandler.h"
+
+namespace qpid {
+namespace cluster {
+
+HandlerBase::HandlerBase(EventHandler& eh) : eventHandler(eh) {}
+
+HandlerBase::~HandlerBase() {}
+
+MemberId HandlerBase::sender() { return eventHandler.getSender(); }
+
+MemberId HandlerBase::self() { return eventHandler.getSelf(); }
+
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h b/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h
new file mode 100644
index 0000000000..455375be5b
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h
@@ -0,0 +1,54 @@
+#ifndef QPID_CLUSTER_HANDLERBASE_H
+#define QPID_CLUSTER_HANDLERBASE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/cluster/types.h"
+
+namespace qpid {
+
+namespace framing {
+class AMQBody;
+}
+
+namespace cluster {
+class EventHandler;
+
+/**
+ * Base class for handlers of events, children of the EventHandler.
+ */
+class HandlerBase
+{
+ public:
+ HandlerBase(EventHandler&);
+ virtual ~HandlerBase();
+
+ virtual bool invoke(const framing::AMQBody& body) = 0;
+
+ protected:
+ EventHandler& eventHandler;
+ MemberId sender();
+ MemberId self();
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_HANDLERBASE_H*/
diff --git a/qpid/cpp/src/qpid/cluster/exp/LockedMap.h b/qpid/cpp/src/qpid/cluster/exp/LockedMap.h
new file mode 100644
index 0000000000..0736e7ac35
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/LockedMap.h
@@ -0,0 +1,73 @@
+#ifndef QPID_CLUSTER_LOCKEDMAP_H
+#define QPID_CLUSTER_LOCKEDMAP_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Mutex.h"
+#include <map>
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * A reader-writer locked thread safe map.
+ */
+template <class Key, class Value>
+class LockedMap
+{
+ public:
+ /** Get value associated with key, returns Value() if none. */
+ Value get(const Key& key) const {
+ sys::RWlock::ScopedRlock r(lock);
+ typename Map::const_iterator i = map.find(key);
+ if (i == map.end()) return Value();
+ else return i->second;
+ }
+
+ /** Associate value with key, overwriting any previous value for key. */
+ void put(const Key& key, const Value& value) {
+ sys::RWlock::ScopedWlock w(lock);
+ map[key] = value;
+ }
+
+ /** Associate value with key if there is not already a value associated with key.
+ * Returns true if the value was added.
+ */
+ bool add(const Key& key, const Value& value) {
+ sys::RWlock::ScopedWlock w(lock);
+ return map.insert(key, value).second;
+ }
+
+ /** Erase the value associated with key if any. Return true if a value was erased. */
+ bool erase(const Key& key) {
+ sys::RWlock::ScopedWlock w(lock);
+ return map.erase(key);
+ }
+
+ private:
+ typedef std::map<Key, Value> Map;
+ Map map;
+ mutable sys::RWlock lock;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_LOCKEDMAP_H*/
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
new file mode 100644
index 0000000000..d4095e5bc1
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Core.h"
+#include "MessageHandler.h"
+#include "BrokerHandler.h"
+#include "EventHandler.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/framing/AllInvoker.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/log/Statement.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace cluster {
+using namespace broker;
+
+MessageHandler::MessageHandler(EventHandler& e) :
+ HandlerBase(e),
+ broker(e.getCore().getBroker())
+{}
+
+bool MessageHandler::invoke(const framing::AMQBody& body) {
+ return framing::invoke(*this, body).wasHandled();
+}
+
+void MessageHandler::routing(RoutingId routingId, const std::string& message) {
+ if (sender() == self()) return; // Already in getCore().getRoutingMap()
+ boost::intrusive_ptr<Message> msg = new Message;
+ // FIXME aconway 2010-10-28: decode message in bounded-size buffers.
+ framing::Buffer buf(const_cast<char*>(&message[0]), message.size());
+ msg->decodeHeader(buf);
+ msg->decodeContent(buf);
+ memberMap[sender()].routingMap[routingId] = msg;
+}
+
+boost::shared_ptr<broker::Queue> MessageHandler::findQueue(
+ const std::string& q, const char* msg)
+{
+ boost::shared_ptr<Queue> queue = broker.getQueues().find(q);
+ if (!queue) throw Exception(QPID_MSG(msg << ": unknown queue " << q));
+ return queue;
+}
+
+void MessageHandler::enqueue(RoutingId routingId, const std::string& q) {
+ boost::shared_ptr<Queue> queue = findQueue(q, "Cluster enqueue failed");
+ boost::intrusive_ptr<Message> msg;
+ if (sender() == self())
+ msg = eventHandler.getCore().getRoutingMap().get(routingId);
+ else
+ msg = memberMap[sender()].routingMap[routingId];
+ if (!msg) throw Exception(QPID_MSG("Cluster enqueue on " << q
+ << " failed: unknown message"));
+ BrokerHandler::ScopedSuppressReplication ssr;
+ queue->deliver(msg);
+}
+
+void MessageHandler::routed(RoutingId routingId) {
+ if (sender() == self())
+ eventHandler.getCore().getRoutingMap().erase(routingId);
+ else
+ memberMap[sender()].routingMap.erase(routingId);
+}
+
+void MessageHandler::dequeue(const std::string& q, uint32_t position) {
+ if (sender() == self()) {
+ // FIXME aconway 2010-10-28: we should complete the ack that initiated
+ // the dequeue at this point, see BrokerHandler::dequeue
+ return;
+ }
+ boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue failed");
+ BrokerHandler::ScopedSuppressReplication ssr;
+ QueuedMessage qm;
+ // FIXME aconway 2010-10-28: when we replicate acquires, the acquired
+ // messages will be stored by MessageHandler::acquire.
+ if (queue->acquireMessageAt(position, qm)) {
+ assert(qm.position.getValue() == position);
+ assert(qm.payload);
+ queue->dequeue(0, qm);
+ }
+}
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
new file mode 100644
index 0000000000..f87f22a1ec
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
@@ -0,0 +1,73 @@
+#ifndef QPID_CLUSTER_MESSAGEHANDLER_H
+#define QPID_CLUSTER_MESSAGEHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// TODO aconway 2010-10-19: experimental cluster code.
+
+#include "HandlerBase.h"
+#include "qpid/framing/AMQP_AllOperations.h"
+#include <boost/intrusive_ptr.hpp>
+#include <map>
+
+namespace qpid {
+
+namespace broker {
+class Message;
+class Broker;
+class Queue;
+}
+
+namespace cluster {
+class EventHandler;
+class BrokerHandler;
+
+/**
+ * Handler for message disposition events.
+ */
+class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler,
+ public HandlerBase
+{
+ public:
+ MessageHandler(EventHandler&);
+
+ bool invoke(const framing::AMQBody& body);
+
+ void routing(uint32_t routingId, const std::string& message);
+ void enqueue(uint32_t routingId, const std::string& queue);
+ void routed(uint32_t routingId);
+ void dequeue(const std::string& queue, uint32_t position);
+ private:
+ struct Member {
+ typedef std::map<uint32_t, boost::intrusive_ptr<broker::Message> > RoutingMap;
+ RoutingMap routingMap;
+ };
+ typedef std::map<MemberId, Member> MemberMap;
+
+ boost::shared_ptr<broker::Queue> findQueue(const std::string& q, const char* msg);
+
+ broker::Broker& broker;
+ MemberMap memberMap;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_MESSAGEHANDLER_H*/
diff --git a/qpid/cpp/src/qpid/cluster/exp/README.txt b/qpid/cpp/src/qpid/cluster/exp/README.txt
new file mode 100644
index 0000000000..97f2a10d84
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/README.txt
@@ -0,0 +1,2 @@
+
+Experimental code to test ideas about a new cluster design.
diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
new file mode 100644
index 0000000000..04a76b9758
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
@@ -0,0 +1,111 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Core.h"
+#include "WiringHandler.h"
+#include "EventHandler.h"
+#include "BrokerHandler.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/ExchangeRegistry.h"
+#include "qpid/broker/QueueRegistry.h"
+#include "qpid/framing/AllInvoker.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace cluster {
+using namespace broker;
+using framing::FieldTable;
+
+WiringHandler::WiringHandler(EventHandler& e) :
+ HandlerBase(e),
+ broker(e.getCore().getBroker()),
+ recovery(broker.getQueues(), broker.getExchanges(),
+ broker.getLinks(), broker.getDtxManager())
+{}
+
+bool WiringHandler::invoke(const framing::AMQBody& body) {
+ return framing::invoke(*this, body).wasHandled();
+}
+
+void WiringHandler::createQueue(const std::string& data) {
+ if (sender() == self()) return;
+ BrokerHandler::ScopedSuppressReplication ssr;
+ framing::Buffer buf(const_cast<char*>(&data[0]), data.size());
+ // TODO aconway 2011-02-21: asymetric - RecoveryManager vs Broker::create*()
+ RecoverableQueue::shared_ptr queue = recovery.recoverQueue(buf);
+ QPID_LOG(debug, "cluster: create queue " << queue->getName());
+}
+
+void WiringHandler::destroyQueue(const std::string& name) {
+ if (sender() == self()) return;
+ QPID_LOG(debug, "cluster: destroy queue " << name);
+ BrokerHandler::ScopedSuppressReplication ssr;
+ broker.deleteQueue(name, std::string(), std::string());
+}
+
+void WiringHandler::createExchange(const std::string& data) {
+ if (sender() == self()) return;
+ BrokerHandler::ScopedSuppressReplication ssr;
+ framing::Buffer buf(const_cast<char*>(&data[0]), data.size());
+ // TODO aconway 2011-02-21: asymetric - RecoveryManager vs Broker::create*()
+ RecoverableExchange::shared_ptr exchange = recovery.recoverExchange(buf);
+ QPID_LOG(debug, "cluster: create exchange " << exchange->getName());
+}
+
+void WiringHandler::destroyExchange(const std::string& name) {
+ if (sender() == self()) return;
+ QPID_LOG(debug, "cluster: destroy exchange " << name);
+ BrokerHandler::ScopedSuppressReplication ssr;
+ broker.getExchanges().destroy(name);
+}
+
+void WiringHandler::bind(
+ const std::string& queueName, const std::string& exchangeName,
+ const std::string& routingKey, const FieldTable& arguments)
+{
+ if (sender() == self()) return;
+ QPID_LOG(debug, "cluster: bind queue=" << queueName
+ << " exchange=" << exchangeName
+ << " key=" << routingKey
+ << " arguments=" << arguments);
+ BrokerHandler::ScopedSuppressReplication ssr;
+ broker.bind(queueName, exchangeName, routingKey, arguments, std::string(), std::string());
+}
+
+void WiringHandler::unbind(
+ const std::string& queueName, const std::string& exchangeName,
+ const std::string& routingKey, const FieldTable& arguments)
+{
+ if (sender() == self()) return;
+ QPID_LOG(debug, "cluster: unbind queue=" << queueName
+ << " exchange=" << exchangeName
+ << " key=" << routingKey
+ << " arguments=" << arguments);
+ BrokerHandler::ScopedSuppressReplication ssr;
+ broker.unbind(queueName, exchangeName, routingKey, std::string(), std::string());
+}
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h
new file mode 100644
index 0000000000..e375cf6a95
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h
@@ -0,0 +1,75 @@
+#ifndef QPID_CLUSTER_WIRINGHANDLER_H
+#define QPID_CLUSTER_WIRINGHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// TODO aconway 2010-10-19: experimental cluster code.
+
+#include "HandlerBase.h"
+#include "qpid/broker/RecoveryManagerImpl.h"
+#include "qpid/framing/AMQP_AllOperations.h"
+#include <boost/intrusive_ptr.hpp>
+#include <map>
+
+namespace qpid {
+
+namespace framing {
+class FieldTable;
+}
+
+namespace broker {
+class Broker;
+}
+
+namespace cluster {
+class EventHandler;
+
+
+/**
+ * Handler for wiring disposition events.
+ */
+class WiringHandler : public framing::AMQP_AllOperations::ClusterWiringHandler,
+ public HandlerBase
+{
+ public:
+ WiringHandler(EventHandler&);
+
+ bool invoke(const framing::AMQBody& body);
+
+ void createQueue(const std::string& data);
+ void destroyQueue(const std::string& name);
+ void createExchange(const std::string& data);
+ void destroyExchange(const std::string& name);
+ void bind(const std::string& queue, const std::string& exchange,
+ const std::string& routingKey, const framing::FieldTable& arguments);
+ void unbind(const std::string& queue, const std::string& exchange,
+ const std::string& routingKey, const framing::FieldTable& arguments);
+
+
+ private:
+ broker::Broker& broker;
+ broker::RecoveryManagerImpl recovery;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_WIRINGHANDLER_H*/
diff --git a/qpid/cpp/src/qpid/cluster/types.h b/qpid/cpp/src/qpid/cluster/types.h
index bfb4fd5b9e..dec377b173 100644
--- a/qpid/cpp/src/qpid/cluster/types.h
+++ b/qpid/cpp/src/qpid/cluster/types.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -38,7 +38,7 @@ extern "C" {
# include <corosync/cpg.h>
#else
# error "No cpg.h header file available"
-#endif
+#endif
}
namespace qpid {
@@ -79,6 +79,9 @@ std::ostream& operator<<(std::ostream&, const ConnectionId&);
std::ostream& operator<<(std::ostream&, EventType);
+/** Number to identify a message being routed. */
+typedef uint32_t RoutingId;
+
}} // namespace qpid::cluster
#endif /*!QPID_CLUSTER_TYPES_H*/
diff --git a/qpid/cpp/src/tests/BrokerClusterCalls.cpp b/qpid/cpp/src/tests/BrokerClusterCalls.cpp
new file mode 100644
index 0000000000..aa02d22267
--- /dev/null
+++ b/qpid/cpp/src/tests/BrokerClusterCalls.cpp
@@ -0,0 +1,419 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+///@file
+// Tests using a dummy broker::Cluster implementation to verify the expected
+// Cluster functions are called for various actions on the broker.
+//
+
+#include "unit_test.h"
+#include "test_tools.h"
+#include "qpid/broker/Cluster.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Session.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Duration.h"
+#include "BrokerFixture.h"
+#include <boost/assign.hpp>
+#include <boost/format.hpp>
+
+using namespace std;
+using namespace boost;
+using namespace boost::assign;
+using namespace qpid::messaging;
+using boost::format;
+using boost::intrusive_ptr;
+
+namespace qpid {
+namespace tests {
+
+class DummyCluster : public broker::Cluster
+{
+ private:
+ /** Flag used to ignore events other than enqueues while routing,
+ * e.g. acquires and accepts generated in a ring queue to replace an element..
+ * In real impl would be a thread-local variable.
+ */
+ bool isRouting;
+
+ void recordQm(const string& op, const broker::QueuedMessage& qm) {
+ history += (format("%s(%s, %d, %s)") % op % qm.queue->getName()
+ % qm.position % qm.payload->getFrames().getContent()).str();
+ }
+ void recordMsg(const string& op, broker::Queue& q, intrusive_ptr<broker::Message> msg) {
+ history += (format("%s(%s, %s)") % op % q.getName() % msg->getFrames().getContent()).str();
+ }
+ void recordStr(const string& op, const string& name) {
+ history += (format("%s(%s)") % op % name).str();
+ }
+ public:
+ // Messages
+
+ virtual void routing(const boost::intrusive_ptr<broker::Message>& m) {
+ isRouting = true;
+ history += (format("routing(%s)") % m->getFrames().getContent()).str();
+ }
+
+ virtual bool enqueue(broker::Queue& q, const intrusive_ptr<broker::Message>&msg) {
+ recordMsg("enqueue", q, msg);
+ return true;
+ }
+
+ virtual void routed(const boost::intrusive_ptr<broker::Message>& m) {
+ history += (format("routed(%s)") % m->getFrames().getContent()).str();
+ isRouting = false;
+ }
+ virtual void acquire(const broker::QueuedMessage& qm) {
+ if (!isRouting) recordQm("acquire", qm);
+ }
+ virtual void release(const broker::QueuedMessage& qm) {
+ if (!isRouting) recordQm("release", qm);
+ }
+ virtual void dequeue(const broker::QueuedMessage& qm) {
+ if (!isRouting) recordQm("dequeue", qm);
+ }
+
+ // Consumers
+
+ virtual void consume(const broker::Queue& q, size_t n) {
+ history += (format("consume(%s, %d)") % q.getName() % n).str();
+ }
+ virtual void cancel(const broker::Queue& q, size_t n) {
+ history += (format("cancel(%s, %d)") % q.getName() % n).str();
+ }
+
+ // Wiring
+
+ virtual void create(const broker::Queue& q) { recordStr("createq", q.getName()); }
+ virtual void destroy(const broker::Queue& q) { recordStr("destroyq", q.getName()); }
+ virtual void create(const broker::Exchange& ex) { recordStr("createex", ex.getName()); }
+ virtual void destroy(const broker::Exchange& ex) { recordStr("destroyex", ex.getName()); }
+ virtual void bind(const broker::Queue& q, const broker::Exchange& ex, const std::string& key, const framing::FieldTable& /*args*/) {
+ history += (format("bind(%s, %s, %s)") % q.getName() % ex.getName() % key).str();
+ }
+ virtual void unbind(const broker::Queue& q, const broker::Exchange& ex, const std::string& key, const framing::FieldTable& /*args*/) {
+ history += (format("unbind(%s, %s, %s)")% q.getName()%ex.getName()%key).str();
+ }
+ vector<string> history;
+};
+
+QPID_AUTO_TEST_SUITE(BrokerClusterCallsTestSuite)
+
+// Broker fixture with DummyCluster set up and some new API client bits.
+struct DummyClusterFixture: public BrokerFixture {
+ Connection c;
+ Session s;
+ DummyCluster*dc;
+ DummyClusterFixture() {
+ broker->setCluster(auto_ptr<broker::Cluster>(new DummyCluster));
+ dc = &static_cast<DummyCluster&>(broker->getCluster());
+ c = Connection("localhost:"+lexical_cast<string>(getPort()));
+ c.open();
+ s = c.createSession();
+ }
+ ~DummyClusterFixture() {
+ c.close();
+ }
+};
+
+QPID_AUTO_TEST_CASE(testSimplePubSub) {
+ DummyClusterFixture f;
+ vector<string>& h = f.dc->history;
+
+ // Queue creation
+ Sender sender = f.s.createSender("q;{create:always,delete:always}");
+ size_t i = 0;
+ BOOST_CHECK_EQUAL(h.at(i++), "createq(q)"); // Note: at() does bounds checking.
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ // Consumer
+ Receiver receiver = f.s.createReceiver("q");
+ f.s.sync();
+ BOOST_CHECK_EQUAL(h.at(i++), "consume(q, 1)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ // Send message
+ sender.send(Message("a"));
+ f.s.sync();
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+ // Don't check size here as it is uncertain whether acquire has happened yet.
+
+ // Acquire message
+ Message m = receiver.fetch(Duration::SECOND);
+ BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ // Acknowledge message
+ f.s.acknowledge(true);
+ f.s.sync();
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ // Close a consumer
+ receiver.close();
+ BOOST_CHECK_EQUAL(h.at(i++), "cancel(q, 0)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ // Destroy the queue
+ f.c.close();
+ BOOST_CHECK_EQUAL(h.at(i++), "destroyq(q)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+}
+
+QPID_AUTO_TEST_CASE(testReleaseReject) {
+ DummyClusterFixture f;
+ vector<string>& h = f.dc->history;
+
+ Sender sender = f.s.createSender("q;{create:always,delete:always,node:{x-declare:{alternate-exchange:amq.fanout}}}");
+ sender.send(Message("a"));
+ Receiver receiver = f.s.createReceiver("q");
+ Receiver altReceiver = f.s.createReceiver("amq.fanout;{link:{name:altq}}");
+ Message m = receiver.fetch(Duration::SECOND);
+ h.clear();
+
+ // Explicit release
+ f.s.release(m);
+ f.s.sync();
+ size_t i = 0;
+ BOOST_CHECK_EQUAL(h.at(i++), "release(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ // Implicit release on closing connection.
+ Connection c("localhost:"+lexical_cast<string>(f.getPort()));
+ c.open();
+ Session s = c.createSession();
+ Receiver r = s.createReceiver("q");
+ m = r.fetch(Duration::SECOND);
+ h.clear();
+ i = 0;
+ c.close();
+ BOOST_CHECK_EQUAL(h.at(i++), "cancel(q, 1)");
+ BOOST_CHECK_EQUAL(h.at(i++), "release(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ // Reject message, goes to alternate exchange.
+ m = receiver.fetch(Duration::SECOND);
+ h.clear();
+ i = 0;
+ f.s.reject(m);
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); // Routing to alt exchange
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(amq.fanout_altq, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+ m = altReceiver.fetch(Duration::SECOND);
+ BOOST_CHECK_EQUAL(m.getContent(), "a");
+
+ // Timed out message
+ h.clear();
+ i = 0;
+ m = Message("t");
+ m.setTtl(Duration(1)); // Timeout 1ms
+ sender.send(m);
+ usleep(2000); // Sleep 2ms
+ bool received = receiver.fetch(m, Duration::IMMEDIATE);
+ BOOST_CHECK(!received); // Timed out
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(t)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, t)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(t)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, t)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ // Message replaced on LVQ
+ sender = f.s.createSender("lvq;{create:always,delete:always,node:{x-declare:{arguments:{qpid.last_value_queue:1}}}}");
+ m = Message("a");
+ m.getProperties()["qpid.LVQ_key"] = "foo";
+ sender.send(m);
+ f.s.sync();
+ BOOST_CHECK_EQUAL(h.at(i++), "createq(lvq)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ m = Message("b");
+ m.getProperties()["qpid.LVQ_key"] = "foo";
+ sender.send(m);
+ f.s.sync();
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(b)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ receiver = f.s.createReceiver("lvq");
+ BOOST_CHECK_EQUAL(receiver.fetch(Duration::SECOND).getContent(), "b");
+ f.s.acknowledge(true);
+ BOOST_CHECK_EQUAL(h.at(i++), "consume(lvq, 1)");
+ BOOST_CHECK_EQUAL(h.at(i++), "acquire(lvq, 1, b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 1, b)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+}
+
+QPID_AUTO_TEST_CASE(testFanout) {
+ DummyClusterFixture f;
+ vector<string>& h = f.dc->history;
+
+ Receiver r1 = f.s.createReceiver("amq.fanout;{link:{name:r1}}");
+ Receiver r2 = f.s.createReceiver("amq.fanout;{link:{name:r2}}");
+ Sender sender = f.s.createSender("amq.fanout");
+ r1.setCapacity(0); // Don't receive immediately.
+ r2.setCapacity(0);
+ h.clear();
+ size_t i = 0;
+
+ // Send message
+ sender.send(Message("a"));
+ f.s.sync();
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
+ BOOST_CHECK_EQUAL(0u, h.at(i++).find("enqueue(amq.fanout_r"));
+ BOOST_CHECK_EQUAL(0u, h.at(i++).find("enqueue(amq.fanout_r"));
+ BOOST_CHECK(h.at(i-1) != h.at(i-2));
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ // Receive messages
+ Message m1 = r1.fetch(Duration::SECOND);
+ f.s.acknowledge(m1, true);
+ Message m2 = r2.fetch(Duration::SECOND);
+ f.s.acknowledge(m2, true);
+
+ BOOST_CHECK_EQUAL(h.at(i++), "acquire(amq.fanout_r1, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(amq.fanout_r1, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "acquire(amq.fanout_r2, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(amq.fanout_r2, 1, a)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+}
+
+QPID_AUTO_TEST_CASE(testRingQueue) {
+ DummyClusterFixture f;
+ vector<string>& h = f.dc->history;
+
+ // FIXME aconway 2010-10-15: QPID-2908 ring queue address string is not working,
+ // so we can't do this:
+ // Sender sender = f.s.createSender("ring;{create:always,node:{x-declare:{arguments:{qpid.max_size:3,qpid.policy_type:ring}}}}");
+ // Must use old API to declare ring queue:
+ qpid::client::Connection c;
+ f.open(c);
+ qpid::client::Session s = c.newSession();
+ qpid::framing::FieldTable args;
+ args.setInt("qpid.max_size", 3);
+ args.setString("qpid.policy_type","ring");
+ s.queueDeclare(qpid::client::arg::queue="ring", qpid::client::arg::arguments=args);
+ c.close();
+ Sender sender = f.s.createSender("ring");
+
+ size_t i = 0;
+ // Send message
+ sender.send(Message("a"));
+ sender.send(Message("b"));
+ sender.send(Message("c"));
+ sender.send(Message("d"));
+ f.s.sync();
+
+ BOOST_CHECK_EQUAL(h.at(i++), "createq(ring)");
+
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(b)");
+
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(c)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, c)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(c)");
+
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(d)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, d)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(d)");
+
+ Receiver receiver = f.s.createReceiver("ring");
+ BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "b");
+ BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "c");
+ BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "d");
+ f.s.acknowledge(true);
+
+ BOOST_CHECK_EQUAL(h.at(i++), "consume(ring, 1)");
+ BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 2, b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 3, c)");
+ BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 4, d)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 2, b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 3, c)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 4, d)");
+
+ BOOST_CHECK_EQUAL(h.size(), i);
+}
+
+QPID_AUTO_TEST_CASE(testTransactions) {
+ DummyClusterFixture f;
+ vector<string>& h = f.dc->history;
+ Session ts = f.c.createTransactionalSession();
+ Sender sender = ts.createSender("q;{create:always,delete:always}");
+ size_t i = 0;
+ BOOST_CHECK_EQUAL(h.at(i++), "createq(q)"); // Note: at() does bounds checking.
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ sender.send(Message("a"));
+ sender.send(Message("b"));
+ ts.sync();
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(b)");
+ BOOST_CHECK_EQUAL(h.size(), i); // Not replicated till commit
+ ts.commit();
+ // FIXME aconway 2010-10-18: As things stand the cluster is not
+ // compatible with transactions
+ // - enqueues occur after routing is complete
+ // - no call to Cluster::enqueue, should be in Queue::process?
+ // - no transaction context associated with messages in the Cluster interface.
+ // - no call to Cluster::accept in Queue::dequeueCommitted
+ // BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, a)");
+ // BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, b)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+
+ Receiver receiver = ts.createReceiver("q");
+ BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "a");
+ BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "b");
+ ts.acknowledge();
+ ts.sync();
+ BOOST_CHECK_EQUAL(h.at(i++), "consume(q, 1)");
+ BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 2, b)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+ ts.commit();
+ ts.sync();
+ // BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)");
+ // BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 2, b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, b)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+}
+
+QPID_AUTO_TEST_SUITE_END()
+
+}} // namespace qpid::tests
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index cd569e901c..bd75432d57 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -25,7 +25,7 @@ QMF_GEN=$(top_srcdir)/managementgen/qmf-gen
abs_builddir=@abs_builddir@
abs_srcdir=@abs_srcdir@
-extra_libs =
+extra_libs =
lib_client = $(abs_builddir)/../libqpidclient.la
lib_messaging = $(abs_builddir)/../libqpidmessaging.la
lib_common = $(abs_builddir)/../libqpidcommon.la
@@ -36,7 +36,7 @@ lib_qmf2 = $(abs_builddir)/../libqmf2.la
#
# Initialize variables that are incremented with +=
-#
+#
check_PROGRAMS=
check_LTLIBRARIES=
TESTS=
@@ -61,9 +61,9 @@ tmodule_LTLIBRARIES=
# Unit test program
#
# Unit tests are built as a single program to reduce valgrind overhead
-# when running the tests. If you want to build a subset of the tests do
+# when running the tests. If you want to build a subset of the tests do
# rm -f unit_test; make unit_test unit_test_OBJECTS="unit_test.o SelectedTest.o"
-#
+#
TESTS+=unit_test
check_PROGRAMS+=unit_test
@@ -124,7 +124,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \
Variant.cpp \
Address.cpp \
ClientMessage.cpp \
- Qmf2.cpp
+ Qmf2.cpp \
+ BrokerClusterCalls.cpp
if HAVE_XML
unit_test_SOURCES+= XmlClientSessionTest.cpp
@@ -188,32 +189,32 @@ qpid_send_LDADD = $(lib_messaging)
qpidtest_PROGRAMS+=qpid-perftest
qpid_perftest_SOURCES=qpid-perftest.cpp test_tools.h TestOptions.h ConnectionOptions.h
qpid_perftest_INCLUDES=$(PUBLIC_INCLUDES)
-qpid_perftest_LDADD=$(lib_client)
+qpid_perftest_LDADD=$(lib_client)
qpidtest_PROGRAMS+=qpid-txtest
qpid_txtest_INCLUDES=$(PUBLIC_INCLUDES)
qpid_txtest_SOURCES=qpid-txtest.cpp TestOptions.h ConnectionOptions.h
-qpid_txtest_LDADD=$(lib_client)
+qpid_txtest_LDADD=$(lib_client)
qpidtest_PROGRAMS+=qpid-latency-test
qpid_latency_test_INCLUDES=$(PUBLIC_INCLUDES)
qpid_latency_test_SOURCES=qpid-latency-test.cpp TestOptions.h ConnectionOptions.h
-qpid_latency_test_LDADD=$(lib_client)
+qpid_latency_test_LDADD=$(lib_client)
qpidtest_PROGRAMS+=qpid-client-test
qpid_client_test_INCLUDES=$(PUBLIC_INCLUDES)
qpid_client_test_SOURCES=qpid-client-test.cpp TestOptions.h ConnectionOptions.h
-qpid_client_test_LDADD=$(lib_client)
+qpid_client_test_LDADD=$(lib_client)
qpidtest_PROGRAMS+=qpid-topic-listener
qpid_topic_listener_INCLUDES=$(PUBLIC_INCLUDES)
qpid_topic_listener_SOURCES=qpid-topic-listener.cpp TestOptions.h ConnectionOptions.h
-qpid_topic_listener_LDADD=$(lib_client)
+qpid_topic_listener_LDADD=$(lib_client)
qpidtest_PROGRAMS+=qpid-topic-publisher
qpid_topic_publisher_INCLUDES=$(PUBLIC_INCLUDES)
qpid_topic_publisher_SOURCES=qpid-topic-publisher.cpp TestOptions.h ConnectionOptions.h
-qpid_topic_publisher_LDADD=$(lib_client)
+qpid_topic_publisher_LDADD=$(lib_client)
qpidtest_PROGRAMS+=qpid-ping
qpid_ping_INCLUDES=$(PUBLIC_INCLUDES)
@@ -232,17 +233,17 @@ echotest_LDADD=$(lib_client)
check_PROGRAMS+=publish
publish_INCLUDES=$(PUBLIC_INCLUDES)
publish_SOURCES=publish.cpp TestOptions.h ConnectionOptions.h
-publish_LDADD=$(lib_client)
+publish_LDADD=$(lib_client)
check_PROGRAMS+=consume
consume_INCLUDES=$(PUBLIC_INCLUDES)
consume_SOURCES=consume.cpp TestOptions.h ConnectionOptions.h
-consume_LDADD=$(lib_client)
+consume_LDADD=$(lib_client)
check_PROGRAMS+=header_test
header_test_INCLUDES=$(PUBLIC_INCLUDES)
header_test_SOURCES=header_test.cpp TestOptions.h ConnectionOptions.h
-header_test_LDADD=$(lib_client)
+header_test_LDADD=$(lib_client)
check_PROGRAMS+=failover_soak
failover_soak_INCLUDES=$(PUBLIC_INCLUDES)
@@ -251,28 +252,28 @@ failover_soak_LDADD=$(lib_client) $(lib_broker)
check_PROGRAMS+=declare_queues
declare_queues_INCLUDES=$(PUBLIC_INCLUDES)
-declare_queues_SOURCES=declare_queues.cpp
-declare_queues_LDADD=$(lib_client)
+declare_queues_SOURCES=declare_queues.cpp
+declare_queues_LDADD=$(lib_client)
check_PROGRAMS+=replaying_sender
replaying_sender_INCLUDES=$(PUBLIC_INCLUDES)
-replaying_sender_SOURCES=replaying_sender.cpp
-replaying_sender_LDADD=$(lib_client)
+replaying_sender_SOURCES=replaying_sender.cpp
+replaying_sender_LDADD=$(lib_client)
check_PROGRAMS+=resuming_receiver
resuming_receiver_INCLUDES=$(PUBLIC_INCLUDES)
-resuming_receiver_SOURCES=resuming_receiver.cpp
-resuming_receiver_LDADD=$(lib_client)
+resuming_receiver_SOURCES=resuming_receiver.cpp
+resuming_receiver_LDADD=$(lib_client)
check_PROGRAMS+=txshift
txshift_INCLUDES=$(PUBLIC_INCLUDES)
txshift_SOURCES=txshift.cpp TestOptions.h ConnectionOptions.h
-txshift_LDADD=$(lib_client)
+txshift_LDADD=$(lib_client)
check_PROGRAMS+=txjob
txjob_INCLUDES=$(PUBLIC_INCLUDES)
txjob_SOURCES=txjob.cpp TestOptions.h ConnectionOptions.h
-txjob_LDADD=$(lib_client)
+txjob_LDADD=$(lib_client)
check_PROGRAMS+=PollerTest
PollerTest_SOURCES=PollerTest.cpp
@@ -295,7 +296,7 @@ TESTS_ENVIRONMENT = \
VALGRIND=$(VALGRIND) \
LIBTOOL="$(LIBTOOL)" \
QPID_DATA_DIR= \
- $(srcdir)/run_test
+ $(srcdir)/run_test
system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest
TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_federation_sys_tests \
@@ -342,7 +343,8 @@ EXTRA_DIST += \
start_broker.ps1 \
stop_broker.ps1 \
topictest.ps1 \
- run_queue_flow_limit_tests
+ run_queue_flow_limit_tests \
+ run_cluster_authentication_test
check_LTLIBRARIES += libdlclose_noop.la
libdlclose_noop_la_LDFLAGS = -module -rpath $(abs_builddir)
@@ -366,6 +368,7 @@ EXTRA_DIST+= \
run_failover_soak \
reliable_replication_test \
federated_cluster_test_with_node_failure \
+ run_cluster_authentication_soak \
sasl_test_setup.sh
check-long:
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 16d7fb0b78..3e96adc8bf 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -401,17 +401,25 @@ class Cluster:
_cluster_count = 0
- def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False):
+ def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True,
+ cluster2=False, show_cmd=False):
+ if cluster2:
+ cluster_name = "--cluster2-name"
+ cluster_lib = BrokerTest.cluster2_lib
+ else:
+ cluster_name = "--cluster-name"
+ cluster_lib = BrokerTest.cluster_lib
self.test = test
self._brokers=[]
self.name = "cluster%d" % Cluster._cluster_count
Cluster._cluster_count += 1
# Use unique cluster name
self.args = copy(args)
- self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ]
+ self.args += [ cluster_name,
+ "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ]
self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"]
- assert BrokerTest.cluster_lib, "Cannot locate cluster plug-in"
- self.args += [ "--load-module", BrokerTest.cluster_lib ]
+ assert cluster_lib, "Cannot locate cluster plug-in"
+ self.args += [ "--load-module", cluster_lib ]
self.start_n(count, expect=expect, wait=wait, show_cmd=show_cmd)
def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0, show_cmd=False):
@@ -440,6 +448,7 @@ class BrokerTest(TestCase):
# Environment settings.
qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC"))
cluster_lib = os.getenv("CLUSTER_LIB")
+ cluster2_lib = os.getenv("CLUSTER2_LIB")
xml_lib = os.getenv("XML_LIB")
qpid_config_exec = os.getenv("QPID_CONFIG_EXEC")
qpid_route_exec = os.getenv("QPID_ROUTE_EXEC")
@@ -490,9 +499,11 @@ class BrokerTest(TestCase):
raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e))
return b
- def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False):
+ def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True, cluster2=False,
+ show_cmd=False):
"""Create and return a cluster ready for use"""
- cluster = Cluster(self, count, args, expect=expect, wait=wait, show_cmd=show_cmd)
+ cluster = Cluster(self, count, args, expect=expect, wait=wait, cluster2=cluster2,
+ show_cmd=show_cmd)
return cluster
def browse(self, session, queue, timeout=0):
diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk
index 7d17dd7bde..bf5064e74c 100644
--- a/qpid/cpp/src/tests/cluster.mk
+++ b/qpid/cpp/src/tests/cluster.mk
@@ -92,7 +92,7 @@ cluster_test_SOURCES = \
PartialFailure.cpp \
ClusterFailover.cpp
-cluster_test_LDADD=$(lib_client) $(lib_broker) ../cluster.la -lboost_unit_test_framework
+cluster_test_LDADD=$(lib_client) $(lib_broker) $(lib_messaging) ../cluster.la -lboost_unit_test_framework
qpidtest_SCRIPTS += run_cluster_tests brokertest.py cluster_tests.py cluster_test_logs.py run_long_cluster_tests long_cluster_tests.py testlib.py cluster_tests.fail
qpidtest_SCRIPTS += $(CLUSTER_TEST_SCRIPTS_LIST)
diff --git a/qpid/cpp/src/tests/cluster2_tests.py b/qpid/cpp/src/tests/cluster2_tests.py
new file mode 100755
index 0000000000..f17dfe2961
--- /dev/null
+++ b/qpid/cpp/src/tests/cluster2_tests.py
@@ -0,0 +1,116 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, signal, sys, time, imp, re, subprocess
+from qpid import datatypes, messaging
+from brokertest import *
+from qpid.harness import Skipped
+from qpid.messaging import Message
+from qpid.messaging.exceptions import *
+from threading import Thread, Lock
+from logging import getLogger
+from itertools import chain
+
+log = getLogger("qpid.cluster_tests")
+
+class Cluster2Tests(BrokerTest):
+ """Tests for new cluster code."""
+
+ def verify_content(self, content, receiver):
+ for c in content: self.assertEqual(c, receiver.fetch(1).content)
+ self.assertRaises(Empty, receiver.fetch, 0)
+
+ def test_message_enqueue(self):
+ """Test basic replication of enqueued messages.
+ Verify that fanout messages are replicated correctly.
+ """
+
+ cluster = self.cluster(2, cluster2=True)
+
+ sn0 = cluster[0].connect().session()
+ r0p = sn0.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}");
+ r0q = sn0.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}");
+ s0 = sn0.sender("amq.fanout");
+
+ sn1 = cluster[1].connect().session()
+ r1p = sn1.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}");
+ r1q = sn1.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}");
+
+
+ # Send messages on member 0
+ content = ["a","b","c"]
+ for m in content: s0.send(Message(m))
+
+ # Browse on both members.
+ self.verify_content(content, r0p)
+ self.verify_content(content, r0q)
+ self.verify_content(content, r1p)
+ self.verify_content(content, r1q)
+
+ sn1.connection.close()
+ sn0.connection.close()
+
+ def test_message_dequeue(self):
+ """Test replication of dequeues"""
+ cluster = self.cluster(2, cluster2=True)
+ sn0 = cluster[0].connect().session()
+ s0 = sn0.sender("q;{create:always,delete:always}")
+ r0 = sn0.receiver("q")
+ sn1 = cluster[1].connect().session()
+ r1 = sn1.receiver("q;{create:always}") # Not yet replicating wiring.
+
+ content = ["a","b","c"]
+ for m in content: s0.send(Message(m))
+ # Verify enqueued on cluster[1]
+ self.verify_content(content, sn1.receiver("q;{mode:browse}"))
+ # Dequeue on cluster[0]
+ self.assertEqual(r0.fetch(1).content, "a")
+ sn0.acknowledge(sync=True)
+
+ # Verify dequeued on cluster[0] and cluster[1]
+ self.verify_content(["b", "c"], sn0.receiver("q;{mode:browse}"))
+ self.verify_content(["b", "c"], sn1.receiver("q;{mode:browse}"))
+
+ def test_wiring(self):
+ """Test replication of wiring"""
+ cluster = self.cluster(2, cluster2=True)
+ sn0 = cluster[0].connect().session()
+ sn1 = cluster[1].connect().session()
+
+ # Test creation of queue, exchange, binding
+ r0ex = sn0.receiver("ex; {create:always, delete:always, node:{type:topic, x-declare:{name:ex, type:'direct'}}}")
+ r0q = sn0.receiver("q; {create:always, delete:always, link:{x-bindings:[{exchange:ex,queue:q,key:k}]}}")
+
+ # Verify objects were created on member 1
+ r1 = sn1.receiver("q") # Queue
+ s1ex = sn1.sender("ex/k; {node:{type:topic}}"); # Exchange
+ s1ex.send(Message("x")) # Binding with key k
+ self.assertEqual(r1.fetch(1).content, "x")
+
+ # Test destroy.
+ r0q.close() # Delete queue q
+ self.assertRaises(NotFound, sn1.receiver, "q")
+ r0ex.close() # Delete exchange ex
+ # FIXME aconway 2010-11-05: this does not raise NotFound, sn1 is caching "ex"
+ # self.assertRaises(NotFound, sn1.sender, "ex")
+ # Have to create a new session.
+ self.assertRaises(NotFound, cluster[1].connect().session().receiver, "ex")
+
+ # FIXME aconway 2010-10-29: test unbind, may need to use old API.
diff --git a/qpid/cpp/src/tests/run_cluster_tests b/qpid/cpp/src/tests/run_cluster_tests
index e136d3810a..3971a39144 100755
--- a/qpid/cpp/src/tests/run_cluster_tests
+++ b/qpid/cpp/src/tests/run_cluster_tests
@@ -33,5 +33,5 @@ mkdir -p $OUTDIR
CLUSTER_TESTS_IGNORE=${CLUSTER_TESTS_IGNORE:--i cluster_tests.StoreTests.* -I $srcdir/cluster_tests.fail}
CLUSTER_TESTS=${CLUSTER_TESTS:-$*}
-with_ais_group $QPID_PYTHON_TEST -DOUTDIR=$OUTDIR -m cluster_tests $CLUSTER_TESTS_IGNORE $CLUSTER_TESTS || exit 1
+with_ais_group $QPID_PYTHON_TEST -DOUTDIR=$OUTDIR -m cluster_tests -m cluster2_tests $CLUSTER_TESTS_IGNORE $CLUSTER_TESTS || exit 1
rm -rf $OUTDIR
diff --git a/qpid/cpp/src/tests/test_env.sh.in b/qpid/cpp/src/tests/test_env.sh.in
index 842d7729cb..67b4df14bf 100644
--- a/qpid/cpp/src/tests/test_env.sh.in
+++ b/qpid/cpp/src/tests/test_env.sh.in
@@ -63,6 +63,7 @@ export TEST_STORE_LIB=$testmoduledir/test_store.so
exportmodule() { test -f $moduledir/$2 && eval "export $1=$moduledir/$2"; }
exportmodule ACL_LIB acl.so
exportmodule CLUSTER_LIB cluster.so
+exportmodule CLUSTER2_LIB cluster2.so
exportmodule REPLICATING_LISTENER_LIB replicating_listener.so
exportmodule REPLICATION_EXCHANGE_LIB replication_exchange.so
exportmodule SSLCONNECTOR_LIB sslconnector.so
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 899625f5ec..b782a6d606 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -326,4 +326,62 @@
</class>
+
+ <!-- TODO aconway 2010-10-20: Experimental classes for new cluster. -->
+
+ <!-- Message delivery and disposition -->
+ <class name="cluster-message" code="0x82">
+ <!-- FIXME aconway 2010-10-19: create message in fragments -->
+ <control name="routing" code="0x1">
+ <field name="routing-id" type="uint32"/>
+ <field name="message" type="str32"/>
+ </control>
+
+ <control name="enqueue" code="0x2">
+ <field name="routing-id" type="uint32"/>
+ <field name="queue" type="queue.name"/>
+ </control>
+
+ <control name="routed" code="0x3">
+ <field name="routing-id" type="uint32"/>
+ </control>
+
+ <control name="dequeue" code="0x4">
+ <field name="queue" type="queue.name"/>
+ <field name="position" type="uint32"/>
+ </control>
+ </class>
+
+ <class name="cluster-wiring" code="0x83">
+ <control name="create-queue" code="0x1">
+ <field name="data" type="str32"/>
+ </control>
+
+ <control name="destroy-queue" code="0x2">
+ <field name="name" type="queue.name"/>
+ </control>
+
+ <control name="create-exchange" code="0x3">
+ <field name="data" type="str32"/>
+ </control>
+
+ <control name="destroy-exchange" code="0x4">
+ <field name="name" type="exchange.name"/>
+ </control>
+
+ <control name="bind" code="0x5">
+ <field name="queue" type="queue.name"/>
+ <field name="exchange" type="exchange.name"/>
+ <field name="binding-key" type="str8"/>
+ <field name="arguments" type="map"/>
+ </control>
+
+ <control name="unbind" code="0x6">
+ <field name="queue" type="queue.name"/>
+ <field name="exchange" type="exchange.name"/>
+ <field name="binding-key" type="str8"/>
+ <field name="arguments" type="map"/>
+ </control>
+
+ </class>
</amqp>