summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-01-15 11:29:38 +0000
committerGordon Sim <gsim@apache.org>2009-01-15 11:29:38 +0000
commite46c3c0a19af0fd659cfe018c34db1c0dfd498c5 (patch)
tree661de23013441445a9b04276fc4b7220906e5d18
parent85679201de2448430804ff02d8a47894faf34f49 (diff)
downloadqpid-python-e46c3c0a19af0fd659cfe018c34db1c0dfd498c5.tar.gz
QPID-1567: Initial support for asynchronous queue state replication
* Added QueueEvents class with per broker instance * Modified qpid::broker::Queue to notify QueueEvents of enqueues and dequeues (based on configuration) * Added replication subdir containing two plugins: - an event listener that registers with QueueEvents and creates messages representing received events on a replication queue - a custom exchange type for processing messages of the format created by the listener plugin * Added new option for controlling event generation to qpid::client::QueueOptions * Added new queue option to qpid-config script for the same git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@734674 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/Makefile.am3
-rw-r--r--cpp/src/qpid/broker/Broker.cpp1
-rw-r--r--cpp/src/qpid/broker/Broker.h3
-rw-r--r--cpp/src/qpid/broker/Queue.cpp47
-rw-r--r--cpp/src/qpid/broker/Queue.h6
-rw-r--r--cpp/src/qpid/broker/QueueEvents.cpp87
-rw-r--r--cpp/src/qpid/broker/QueueEvents.h77
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp4
-rw-r--r--cpp/src/qpid/client/QueueOptions.cpp7
-rw-r--r--cpp/src/qpid/client/QueueOptions.h10
-rw-r--r--cpp/src/qpid/replication/ReplicatingEventListener.cpp122
-rw-r--r--cpp/src/qpid/replication/ReplicatingEventListener.h66
-rw-r--r--cpp/src/qpid/replication/ReplicationExchange.cpp139
-rw-r--r--cpp/src/qpid/replication/ReplicationExchange.h59
-rw-r--r--cpp/src/qpid/replication/constants.h31
-rw-r--r--cpp/src/replication.mk46
-rw-r--r--cpp/src/tests/BrokerFixture.h8
-rw-r--r--cpp/src/tests/Makefile.am6
-rw-r--r--cpp/src/tests/QueueEvents.cpp233
-rwxr-xr-xcpp/src/tests/replication_test98
-rwxr-xr-xpython/commands/qpid-config13
21 files changed, 1057 insertions, 9 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 27592f09ed..e3331daa00 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -184,6 +184,7 @@ include qmfc.mk
if HAVE_XML
include xml.mk
endif
+include replication.mk
if RDMA
@@ -400,6 +401,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/NameGenerator.cpp \
qpid/broker/NullMessageStore.cpp \
qpid/broker/QueueBindings.cpp \
+ qpid/broker/QueueEvents.cpp \
qpid/broker/QueuePolicy.cpp \
qpid/broker/QueueRegistry.cpp \
qpid/broker/RateTracker.cpp \
@@ -545,6 +547,7 @@ nobase_include_HEADERS = \
qpid/broker/PersistableMessage.h \
qpid/broker/PersistableQueue.h \
qpid/broker/QueueBindings.h \
+ qpid/broker/QueueEvents.h \
qpid/broker/QueuedMessage.h \
qpid/broker/QueuePolicy.h \
qpid/broker/QueueRegistry.h \
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 37750f8352..42a4a7b3be 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -144,6 +144,7 @@ Broker::Broker(const Broker::Options& conf) :
conf.replayHardLimit*1024),
*this),
queueCleaner(queues, timer),
+ queueEvents(poller),
getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
{
if (conf.enableMgmt) {
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index ac972b0325..247493d41c 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -32,6 +32,7 @@
#include "LinkRegistry.h"
#include "SessionManager.h"
#include "QueueCleaner.h"
+#include "QueueEvents.h"
#include "Vhost.h"
#include "System.h"
#include "Timer.h"
@@ -128,6 +129,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
Vhost::shared_ptr vhostObject;
System::shared_ptr systemObject;
QueueCleaner queueCleaner;
+ QueueEvents queueEvents;
void declareStandardExchange(const std::string& name, const std::string& type);
@@ -172,6 +174,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
DtxManager& getDtxManager() { return dtxManager; }
DataDir& getDataDir() { return dataDir; }
Options& getOptions() { return config; }
+ QueueEvents& getQueueEvents() { return queueEvents; }
SessionManager& getSessionManager() { return sessionManager; }
const std::string& getFederationTag() const { return federationTag; }
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 9089ba0c54..5acc474aa1 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -21,6 +21,7 @@
#include "Broker.h"
#include "Queue.h"
+#include "QueueEvents.h"
#include "Exchange.h"
#include "DeliverableMessage.h"
#include "MessageStore.h"
@@ -64,8 +65,11 @@ const std::string qpidLastValueQueue("qpid.last_value_queue");
const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse");
const std::string qpidPersistLastNode("qpid.persist_last_node");
const std::string qpidVQMatchProperty("qpid.LVQ_key");
-}
+const std::string qpidQueueEventGeneration("qpid.queue_event_generation");
+const int ENQUEUE_ONLY=1;
+const int ENQUEUE_AND_DEQUEUE=2;
+}
Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
@@ -85,7 +89,9 @@ Queue::Queue(const string& _name, bool _autodelete,
inLastNodeFailure(false),
persistenceId(0),
policyExceeded(false),
- mgmtObject(0)
+ mgmtObject(0),
+ eventMode(0),
+ eventMgr(0)
{
if (parent != 0)
{
@@ -207,6 +213,25 @@ void Queue::clearLVQIndex(const QueuedMessage& msg){
}
}
+bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
+{
+ Mutex::ScopedLock locker(messageLock);
+ QPID_LOG(debug, "Attempting to acquire message at " << position);
+ for (Messages::iterator i = messages.begin(); i != messages.end(); i++) {
+ if (i->position == position) {
+ message = *i;
+ if (lastValueQueue) {
+ clearLVQIndex(*i);
+ }
+ messages.erase(i);
+ QPID_LOG(debug, "Acquired message at " << i->position << " from " << name);
+ return true;
+ }
+ }
+ QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
+ return false;
+}
+
bool Queue::acquire(const QueuedMessage& msg) {
Mutex::ScopedLock locker(messageLock);
QPID_LOG(debug, "attempting to acquire " << msg.position);
@@ -515,13 +540,15 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){
lvq[key] = msg;
}else {
i->second->setReplacementMessage(msg,this);
- qm.payload = i->second;
- dequeued(qm);
+ dequeued(QueuedMessage(qm.queue, i->second, qm.position));
}
}else {
messages.push_back(qm);
listeners.populate(copy);
}
+ if (eventMode && eventMgr) {
+ eventMgr->enqueued(qm);
+ }
}
copy.notify();
}
@@ -659,6 +686,9 @@ void Queue::dequeued(const QueuedMessage& msg)
{
if (policy.get()) policy->dequeued(msg);
mgntDeqStats(msg.payload);
+ if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) {
+ eventMgr->dequeued(msg);
+ }
}
@@ -698,6 +728,8 @@ void Queue::configure(const FieldTable& _settings)
QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
<< "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements");
+ eventMode = _settings.getAsInt(qpidQueueEventGeneration);
+
if (mgmtObject != 0)
mgmtObject->set_arguments (_settings);
}
@@ -898,3 +930,10 @@ void Queue::setPosition(SequenceNumber n) {
Mutex::ScopedLock locker(messageLock);
sequence = n;
}
+
+int Queue::getEventMode() { return eventMode; }
+
+void Queue::setQueueEventManager(QueueEvents& mgr)
+{
+ eventMgr = &mgr;
+}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index e0bcc25fa3..394b5fd054 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -50,6 +50,7 @@ namespace qpid {
namespace broker {
class Broker;
class MessageStore;
+ class QueueEvents;
class QueueRegistry;
class TransactionContext;
class Exchange;
@@ -96,6 +97,8 @@ namespace qpid {
framing::SequenceNumber sequence;
qmf::org::apache::qpid::broker::Queue* mgmtObject;
RateTracker dequeueTracker;
+ int eventMode;
+ QueueEvents* eventMgr;
void push(boost::intrusive_ptr<Message>& msg);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -166,6 +169,7 @@ namespace qpid {
void unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref);
bool acquire(const QueuedMessage& msg);
+ bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message);
/**
* Delivers a message to the queue. Will record it as
@@ -279,6 +283,8 @@ namespace qpid {
* Used by cluster to replicate queues.
*/
void setPosition(framing::SequenceNumber pos);
+ int getEventMode();
+ void setQueueEventManager(QueueEvents&);
};
}
}
diff --git a/cpp/src/qpid/broker/QueueEvents.cpp b/cpp/src/qpid/broker/QueueEvents.cpp
new file mode 100644
index 0000000000..a6517e1bfe
--- /dev/null
+++ b/cpp/src/qpid/broker/QueueEvents.cpp
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "QueueEvents.h"
+#include "qpid/Exception.h"
+
+namespace qpid {
+namespace broker {
+
+QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller) :
+ eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller)
+{
+ eventQueue.start();
+}
+
+QueueEvents::~QueueEvents()
+{
+ eventQueue.stop();
+}
+
+void QueueEvents::enqueued(const QueuedMessage& m)
+{
+ eventQueue.push(Event(ENQUEUE, m));
+}
+
+void QueueEvents::dequeued(const QueuedMessage& m)
+{
+ eventQueue.push(Event(DEQUEUE, m));
+}
+
+void QueueEvents::registerListener(const std::string& id, const EventListener& listener)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ if (listeners.find(id) == listeners.end()) {
+ listeners[id] = listener;
+ } else {
+ throw Exception(QPID_MSG("Event listener already registered for '" << id << "'"));
+ }
+}
+
+void QueueEvents::unregisterListener(const std::string& id)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ if (listeners.find(id) == listeners.end()) {
+ throw Exception(QPID_MSG("No event listener registered for '" << id << "'"));
+ } else {
+ listeners.erase(id);
+ }
+}
+
+void QueueEvents::handle(EventQueue::Queue& events)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ while (!events.empty()) {
+ for (Listeners::iterator i = listeners.begin(); i != listeners.end(); i++) {
+ i->second(events.front());
+ }
+ events.pop_front();
+ }
+}
+
+void QueueEvents::shutdown()
+{
+ if (!eventQueue.empty() && !listeners.empty()) eventQueue.shutdown();
+}
+
+QueueEvents::Event::Event(EventType t, const QueuedMessage& m) : type(t), msg(m) {}
+
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/QueueEvents.h b/cpp/src/qpid/broker/QueueEvents.h
new file mode 100644
index 0000000000..2ba69e33e6
--- /dev/null
+++ b/cpp/src/qpid/broker/QueueEvents.h
@@ -0,0 +1,77 @@
+#ifndef QPID_BROKER_QUEUEEVENTS_H
+#define QPID_BROKER_QUEUEEVENTS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "QueuedMessage.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/PollableQueue.h"
+#include <map>
+#include <string>
+#include <boost/function.hpp>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Event manager for queue events. Allows queues to indicate when
+ * events have occured; allows listeners to register for notification
+ * of this. The notification happens asynchronously, in a separate
+ * thread.
+ */
+class QueueEvents
+{
+ public:
+ enum EventType {ENQUEUE, DEQUEUE};
+
+ struct Event
+ {
+ EventType type;
+ QueuedMessage msg;
+
+ Event(EventType, const QueuedMessage&);
+ };
+
+ typedef boost::function<void (Event)> EventListener;
+
+ QueueEvents(const boost::shared_ptr<sys::Poller>& poller);
+ ~QueueEvents();
+ void enqueued(const QueuedMessage&);
+ void dequeued(const QueuedMessage&);
+ void registerListener(const std::string& id, const EventListener&);
+ void unregisterListener(const std::string& id);
+ //process all outstanding events
+ void shutdown();
+ private:
+ typedef qpid::sys::PollableQueue<Event> EventQueue;
+ typedef std::map<std::string, EventListener> Listeners;
+
+ EventQueue eventQueue;
+ Listeners listeners;
+ qpid::sys::Mutex lock;//protect listeners from concurrent access
+
+ void handle(EventQueue::Queue& e);
+
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_QUEUEEVENTS_H*/
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index 4450d56efb..0966db8162 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -362,6 +362,10 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
getBroker().getExchanges().getDefault()->bind(queue, name, 0);
queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments);
+ //if event generation is turned on, pass in a pointer to
+ //the QueueEvents instance to use
+ if (queue->getEventMode()) queue->setQueueEventManager(getBroker().getQueueEvents());
+
//handle automatic cleanup:
if (exclusive) {
exclusiveQueues.push_back(queue);
diff --git a/cpp/src/qpid/client/QueueOptions.cpp b/cpp/src/qpid/client/QueueOptions.cpp
index ac65b0bc22..5493c05044 100644
--- a/cpp/src/qpid/client/QueueOptions.cpp
+++ b/cpp/src/qpid/client/QueueOptions.cpp
@@ -24,6 +24,8 @@
namespace qpid {
namespace client {
+enum QueueEventGeneration {ENQUEUE_ONLY=1, ENQUEUE_AND_DEQUEUE=2};
+
QueueOptions::QueueOptions()
{}
@@ -39,6 +41,7 @@ const std::string QueueOptions::strLastValueQueue("qpid.last_value_queue");
const std::string QueueOptions::strPersistLastNode("qpid.persist_last_node");
const std::string QueueOptions::strLVQMatchProperty("qpid.LVQ_key");
const std::string QueueOptions::strLastValueQueueNoBrowse("qpid.last_value_queue_no_browse");
+const std::string QueueOptions::strQueueEventMode("qpid.queue_event_generation");
QueueOptions::~QueueOptions()
@@ -109,6 +112,10 @@ void QueueOptions::clearOrdering()
erase(strLastValueQueue);
}
+void QueueOptions::enableQueueEvents(bool enqueueOnly)
+{
+ setInt(strQueueEventMode, enqueueOnly ? ENQUEUE_ONLY : ENQUEUE_AND_DEQUEUE);
+}
}
}
diff --git a/cpp/src/qpid/client/QueueOptions.h b/cpp/src/qpid/client/QueueOptions.h
index 114e1e49c2..8c22414cbb 100644
--- a/cpp/src/qpid/client/QueueOptions.h
+++ b/cpp/src/qpid/client/QueueOptions.h
@@ -27,7 +27,7 @@ namespace qpid {
namespace client {
enum QueueSizePolicy {NONE, REJECT, FLOW_TO_DISK, RING, RING_STRICT};
-enum QueueOrderingPolicy {FIFO, LVQ, LVQ_NO_BROWSE};
+enum QueueOrderingPolicy {FIFO, LVQ, LVQ_NO_BROWSE};
/**
* A help class to set options on the Queue. Create a configured args while
@@ -83,6 +83,13 @@ class QueueOptions: public framing::FieldTable
* Use default odering policy
*/
void clearOrdering();
+
+ /**
+ * Turns on event generation for this queue (either enqueue only
+ * or for enqueue and dequeue events); the events can then be
+ * processed by a regsitered broker plugin.
+ */
+ void enableQueueEvents(bool enqueueOnly);
static const std::string strMaxCountKey;
static const std::string strMaxSizeKey;
@@ -95,6 +102,7 @@ class QueueOptions: public framing::FieldTable
static const std::string strPersistLastNode;
static const std::string strLVQMatchProperty;
static const std::string strLastValueQueueNoBrowse;
+ static const std::string strQueueEventMode;
};
}
diff --git a/cpp/src/qpid/replication/ReplicatingEventListener.cpp b/cpp/src/qpid/replication/ReplicatingEventListener.cpp
new file mode 100644
index 0000000000..80ff77d107
--- /dev/null
+++ b/cpp/src/qpid/replication/ReplicatingEventListener.cpp
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ReplicatingEventListener.h"
+#include "constants.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/QueueEvents.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace replication {
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::replication::constants;
+
+void ReplicatingEventListener::handle(QueueEvents::Event event)
+{
+ //create event message and enqueue it on replication queue
+ FieldTable headers;
+ boost::intrusive_ptr<Message> message;
+ switch (event.type) {
+ case QueueEvents::ENQUEUE:
+ headers.setString(REPLICATION_EVENT_TYPE, ENQUEUE);
+ headers.setString(REPLICATION_TARGET_QUEUE, event.msg.queue->getName());
+ message = createEventMessage(headers);
+ queue->deliver(message);
+ //if its an enqueue, enqueue the message itself on the
+ //replication queue also:
+ queue->deliver(event.msg.payload);
+ QPID_LOG(debug, "Queued 'enqueue' event on " << event.msg.queue->getName() << " for replication");
+ break;
+ case QueueEvents::DEQUEUE:
+ headers.setString(REPLICATION_EVENT_TYPE, DEQUEUE);
+ headers.setString(REPLICATION_TARGET_QUEUE, event.msg.queue->getName());
+ headers.setInt(DEQUEUED_MESSAGE_POSITION, event.msg.position);
+ message = createEventMessage(headers);
+ queue->deliver(message);
+ QPID_LOG(debug, "Queued 'dequeue' event from " << event.msg.queue->getName() << " for replication, (from position "
+ << event.msg.position << ")");
+ break;
+ }
+}
+
+namespace {
+const std::string EMPTY;
+}
+
+boost::intrusive_ptr<Message> ReplicatingEventListener::createEventMessage(const FieldTable& headers)
+{
+ boost::intrusive_ptr<Message> msg(new Message());
+ AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), EMPTY, 0, 0));
+ AMQFrame header(in_place<AMQHeaderBody>());
+ header.setBof(false);
+ header.setEof(true);
+ header.setBos(true);
+ header.setEos(true);
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+ MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true);
+ props->setApplicationHeaders(headers);
+ return msg;
+}
+
+Options* ReplicatingEventListener::getOptions()
+{
+ return &options;
+}
+
+void ReplicatingEventListener::initialize(Plugin::Target& target)
+{
+ Broker* broker = dynamic_cast<broker::Broker*>(&target);
+ if (broker && !options.queue.empty()) {
+ if (options.createQueue) {
+ queue = broker->getQueues().declare(options.queue).first;
+ } else {
+ queue = broker->getQueues().find(options.queue);
+ }
+ if (queue) {
+ QueueEvents::EventListener callback = boost::bind(&ReplicatingEventListener::handle, this, _1);
+ broker->getQueueEvents().registerListener(options.name, callback);
+ QPID_LOG(info, "Registered replicating queue event listener");
+ } else {
+ QPID_LOG(error, "Replication queue named '" << options.queue << "' does not exist; replication plugin disabled.");
+ }
+ }
+}
+
+void ReplicatingEventListener::earlyInitialize(Target&) {}
+
+ReplicatingEventListener::PluginOptions::PluginOptions() : Options("Queue Replication Options"),
+ name("replicator"),
+ createQueue(false)
+{
+ addOptions()
+ ("replication-queue", optValue(queue, "QUEUE"), "Queue on which events for other queues are recorded")
+ ("replication-listener-name", optValue(name, "NAME"), "name by which to register the replicating event listener")
+ ("create-replication-queue", optValue(createQueue), "if set, the replication will be created if it does not exist");
+}
+
+static ReplicatingEventListener plugin;
+
+}} // namespace qpid::replication
diff --git a/cpp/src/qpid/replication/ReplicatingEventListener.h b/cpp/src/qpid/replication/ReplicatingEventListener.h
new file mode 100644
index 0000000000..25e2a5b7b9
--- /dev/null
+++ b/cpp/src/qpid/replication/ReplicatingEventListener.h
@@ -0,0 +1,66 @@
+#ifndef QPID_REPLICATION_REPLICATINGEVENTLISTENER_H
+#define QPID_REPLICATION_REPLICATINGEVENTLISTENER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/Plugin.h"
+#include "qpid/Options.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueEvents.h"
+#include "qpid/framing/FieldTable.h"
+
+namespace qpid {
+namespace replication {
+
+/**
+ * An event listener plugin that records queue events as messages on a
+ * replication queue, from where they can be consumed (e.g. by an
+ * inter-broker link to the corresponding QueueReplicationExchange
+ * plugin.
+ */
+class ReplicatingEventListener : public Plugin
+{
+ public:
+ Options* getOptions();
+ void earlyInitialize(Plugin::Target& target);
+ void initialize(Plugin::Target& target);
+ void handle(qpid::broker::QueueEvents::Event);
+ private:
+ struct PluginOptions : public Options
+ {
+ std::string queue;
+ std::string name;
+ bool createQueue;
+
+ PluginOptions();
+ };
+
+ PluginOptions options;
+ qpid::broker::Queue::shared_ptr queue;
+
+ boost::intrusive_ptr<qpid::broker::Message> createEventMessage(const qpid::framing::FieldTable& headers);
+};
+
+}} // namespace qpid::replication
+
+#endif /*!QPID_REPLICATION_REPLICATINGEVENTLISTENER_H*/
diff --git a/cpp/src/qpid/replication/ReplicationExchange.cpp b/cpp/src/qpid/replication/ReplicationExchange.cpp
new file mode 100644
index 0000000000..abe8a4dfb6
--- /dev/null
+++ b/cpp/src/qpid/replication/ReplicationExchange.cpp
@@ -0,0 +1,139 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ReplicationExchange.h"
+#include "constants.h"
+#include "qpid/Plugin.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/ExchangeRegistry.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace replication {
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::replication::constants;
+
+ReplicationExchange::ReplicationExchange(const std::string& name, bool durable,
+ const FieldTable& args,
+ QueueRegistry& qr,
+ Manageable* parent)
+ : Exchange(name, durable, args, parent), queues(qr), expectingEnqueue(false) {}
+
+std::string ReplicationExchange::getType() const { return typeName; }
+
+void ReplicationExchange::route(Deliverable& msg, const std::string& /*routingKey*/, const FieldTable* args)
+{
+ if (args) {
+ std::string eventType = args->getAsString(REPLICATION_EVENT_TYPE);
+ if (eventType == ENQUEUE) {
+ expectingEnqueue = true;
+ targetQueue = args->getAsString(REPLICATION_TARGET_QUEUE);
+ QPID_LOG(debug, "Recorded replicated 'enqueue' event for " << targetQueue);
+ return;
+ } else if (eventType == DEQUEUE) {
+ std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE);
+ Queue::shared_ptr queue = queues.find(queueName);
+ SequenceNumber position(args->getAsInt(DEQUEUED_MESSAGE_POSITION));
+
+ QueuedMessage dequeued;
+ if (queue->acquireMessageAt(position, dequeued)) {
+ queue->dequeue(0, dequeued);
+ QPID_LOG(debug, "Processed replicated 'dequeue' event from " << queueName << " at position " << position);
+ } else {
+ QPID_LOG(warning, "Could not acquire message " << position << " from " << queueName);
+ }
+
+ return;
+ } else if (!eventType.empty()) {
+ throw IllegalArgumentException(QPID_MSG("Illegal value for " << REPLICATION_EVENT_TYPE << ": " << eventType));
+ }
+ }
+ //if we get here assume its not an event message, assume its an enqueue
+ if (expectingEnqueue) {
+ Queue::shared_ptr queue = queues.find(targetQueue);
+ msg.deliverTo(queue);
+ expectingEnqueue = false;
+ targetQueue.clear();
+ QPID_LOG(debug, "Eenqueued replicated message onto " << targetQueue);
+ } else {
+ QPID_LOG(warning, "Dropping unexpected message");
+ }
+}
+
+bool ReplicationExchange::bind(Queue::shared_ptr /*queue*/, const std::string& /*routingKey*/, const FieldTable* /*args*/)
+{
+ throw NotImplementedException("Replication exchange does not support bind operation");
+}
+
+bool ReplicationExchange::unbind(Queue::shared_ptr /*queue*/, const std::string& /*routingKey*/, const FieldTable* /*args*/)
+{
+ throw NotImplementedException("Replication exchange does not support unbind operation");
+}
+
+bool ReplicationExchange::isBound(Queue::shared_ptr /*queue*/, const string* const /*routingKey*/, const FieldTable* const /*args*/)
+{
+ return false;
+}
+
+const std::string ReplicationExchange::typeName("replication");
+
+
+struct ReplicationExchangePlugin : Plugin
+{
+ Broker* broker;
+
+ ReplicationExchangePlugin();
+ void earlyInitialize(Plugin::Target& target);
+ void initialize(Plugin::Target& target);
+ Exchange::shared_ptr create(const std::string& name, bool durable,
+ const framing::FieldTable& args,
+ management::Manageable* parent);
+};
+
+ReplicationExchangePlugin::ReplicationExchangePlugin() : broker(0) {}
+
+Exchange::shared_ptr ReplicationExchangePlugin::create(const std::string& name, bool durable,
+ const framing::FieldTable& args,
+ management::Manageable* parent)
+{
+ Exchange::shared_ptr e(new ReplicationExchange(name, durable, args, broker->getQueues(), parent));
+ return e;
+}
+
+
+void ReplicationExchangePlugin::initialize(Plugin::Target& target)
+{
+ broker = dynamic_cast<broker::Broker*>(&target);
+ if (broker) {
+ ExchangeRegistry::FactoryFunction f = boost::bind(&ReplicationExchangePlugin::create, this, _1, _2, _3, _4);
+ broker->getExchanges().registerType(ReplicationExchange::typeName, f);
+ QPID_LOG(info, "Registered replication exchange");
+ }
+}
+
+void ReplicationExchangePlugin::earlyInitialize(Target&) {}
+
+static ReplicationExchangePlugin exchangePlugin;
+
+}} // namespace qpid::replication
diff --git a/cpp/src/qpid/replication/ReplicationExchange.h b/cpp/src/qpid/replication/ReplicationExchange.h
new file mode 100644
index 0000000000..ed2b5956b6
--- /dev/null
+++ b/cpp/src/qpid/replication/ReplicationExchange.h
@@ -0,0 +1,59 @@
+#ifndef QPID_REPLICATION_REPLICATIONEXCHANGE_H
+#define QPID_REPLICATION_REPLICATIONEXCHANGE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Exchange.h"
+
+namespace qpid {
+namespace replication {
+
+/**
+ * A custom exchange plugin that processes incoming messages
+ * representing enqueue or dequeue events for particular queues and
+ * carries out the corresponding action to replicate that on the local
+ * broker.
+ */
+class ReplicationExchange : public qpid::broker::Exchange
+{
+ public:
+ static const std::string typeName;
+
+ ReplicationExchange(const std::string& name, bool durable,
+ const qpid::framing::FieldTable& args,
+ qpid::broker::QueueRegistry& queues,
+ qpid::management::Manageable* parent = 0);
+
+ std::string getType() const;
+
+ void route(qpid::broker::Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
+
+ bool bind(qpid::broker::Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ bool unbind(qpid::broker::Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ bool isBound(qpid::broker::Queue::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args);
+ private:
+ qpid::broker::QueueRegistry& queues;
+ bool expectingEnqueue;
+ std::string targetQueue;
+};
+}} // namespace qpid::replication
+
+#endif /*!QPID_REPLICATION_REPLICATIONEXCHANGE_H*/
diff --git a/cpp/src/qpid/replication/constants.h b/cpp/src/qpid/replication/constants.h
new file mode 100644
index 0000000000..b0cef7570c
--- /dev/null
+++ b/cpp/src/qpid/replication/constants.h
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace qpid {
+namespace replication {
+namespace constants {
+
+const std::string REPLICATION_EVENT_TYPE("qpid.replication_event_type");
+const std::string ENQUEUE("enqueue");
+const std::string DEQUEUE("dequeue");
+const std::string REPLICATION_TARGET_QUEUE("qpid.replication_target_queue");
+const std::string DEQUEUED_MESSAGE_POSITION("qpid.dequeued_message_position");
+
+}}}
diff --git a/cpp/src/replication.mk b/cpp/src/replication.mk
new file mode 100644
index 0000000000..6830f99d36
--- /dev/null
+++ b/cpp/src/replication.mk
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Make file for building two plugins for asynchronously replicating
+# queues.
+
+dmodule_LTLIBRARIES += replicating_listener.la replication_exchange.la
+
+# a queue event listener plugin that creates messages on a replication
+# queue corresponding to enqueue and dequeue events:
+replicating_listener_la_SOURCES = \
+ qpid/replication/constants.h \
+ qpid/replication/ReplicatingEventListener.cpp \
+ qpid/replication/ReplicatingEventListener.h
+
+replicating_listener_la_LIBADD = libqpidbroker.la
+
+replicating_listener_la_LDFLAGS = $(PLUGINLDFLAGS)
+
+# a custom exchange plugin that allows an exchange to be created that
+# can process the messages from a replication queue (populated on the
+# source system by the replicating listener plugin above) and take the
+# corresponding action on the local queues
+replication_exchange_la_SOURCES = \
+ qpid/replication/constants.h \
+ qpid/replication/ReplicationExchange.cpp \
+ qpid/replication/ReplicationExchange.h
+
+replication_exchange_la_LIBADD = libqpidbroker.la
+
+replication_exchange_la_LDFLAGS = $(PLUGINLDFLAGS)
diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h
index 2a4faa2fd4..205b4d90ef 100644
--- a/cpp/src/tests/BrokerFixture.h
+++ b/cpp/src/tests/BrokerFixture.h
@@ -66,8 +66,14 @@ struct BrokerFixture : private boost::noncopyable {
brokerThread = qpid::sys::Thread(*broker);
};
- ~BrokerFixture() {
+ void shutdownBroker()
+ {
broker->shutdown();
+ broker = BrokerPtr();
+ }
+
+ ~BrokerFixture() {
+ if (broker) broker->shutdown();
brokerThread.join();
}
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 8cc72da96b..314b90ba8b 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -88,7 +88,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \
ForkedBroker.h \
ManagementTest.cpp \
MessageReplayTracker.cpp \
- ConsoleTest.cpp
+ ConsoleTest.cpp \
+ QueueEvents.cpp
if HAVE_XML
unit_test_SOURCES+= XmlClientSessionTest.cpp
@@ -198,7 +199,7 @@ DispatcherTest_LDADD=$(lib_common)
TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= BOOST_TEST_SHOW_PROGRESS=yes $(srcdir)/run_test
system_tests = client_test quick_perftest quick_topictest run_header_test quick_txtest
-TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_acl_tests run_cli_tests
+TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_acl_tests run_cli_tests replication_test
EXTRA_DIST += \
run_test vg_check \
@@ -219,6 +220,7 @@ EXTRA_DIST += \
MessageUtils.h \
TestMessageStore.h \
TxMocks.h \
+ replication_test \
start_cluster stop_cluster restart_cluster
check_LTLIBRARIES += libdlclose_noop.la
diff --git a/cpp/src/tests/QueueEvents.cpp b/cpp/src/tests/QueueEvents.cpp
new file mode 100644
index 0000000000..7aea23922d
--- /dev/null
+++ b/cpp/src/tests/QueueEvents.cpp
@@ -0,0 +1,233 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "MessageUtils.h"
+#include "unit_test.h"
+#include "BrokerFixture.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueEvents.h"
+#include "qpid/client/QueueOptions.h"
+#include "qpid/framing/SequenceNumber.h"
+#include <boost/bind.hpp>
+#include <boost/format.hpp>
+
+QPID_AUTO_TEST_SUITE(QueueEventsSuite)
+
+using namespace qpid::client;
+using namespace qpid::broker;
+using namespace qpid::sys;
+using qpid::framing::SequenceNumber;
+
+struct DummyListener
+{
+ typedef std::deque<QueueEvents::Event> Events;
+
+ Events events;
+ boost::shared_ptr<Poller> poller;
+
+ void handle(QueueEvents::Event e)
+ {
+ if (events.empty()) {
+ BOOST_FAIL("Unexpected event received");
+ } else {
+ BOOST_CHECK_EQUAL(events.front().type, e.type);
+ BOOST_CHECK_EQUAL(events.front().msg.queue, e.msg.queue);
+ BOOST_CHECK_EQUAL(events.front().msg.payload, e.msg.payload);
+ BOOST_CHECK_EQUAL(events.front().msg.position, e.msg.position);
+ events.pop_front();
+ }
+ if (events.empty() && poller) poller->shutdown();
+ }
+
+ void expect(QueueEvents::Event e)
+ {
+ events.push_back(e);
+ }
+};
+
+QPID_AUTO_TEST_CASE(testBasicEventProcessing)
+{
+ boost::shared_ptr<Poller> poller(new Poller());
+ sys::Dispatcher dispatcher(poller);
+ Thread dispatchThread(dispatcher);
+ QueueEvents events(poller);
+ DummyListener listener;
+ listener.poller = poller;
+ events.registerListener("dummy", boost::bind(&DummyListener::handle, &listener, _1));
+ //signal occurence of some events:
+ Queue queue("queue1");
+ SequenceNumber id;
+ QueuedMessage event1(&queue, MessageUtils::createMessage(), id);
+ QueuedMessage event2(&queue, MessageUtils::createMessage(), ++id);
+
+ events.enqueued(event1);
+ events.enqueued(event2);
+ events.dequeued(event1);
+ //define events expected by listener:
+ listener.expect(QueueEvents::Event(QueueEvents::ENQUEUE, event1));
+ listener.expect(QueueEvents::Event(QueueEvents::ENQUEUE, event2));
+ listener.expect(QueueEvents::Event(QueueEvents::DEQUEUE, event1));
+
+ dispatchThread.join();
+ events.shutdown();
+ events.unregisterListener("dummy");
+}
+
+
+struct EventRecorder
+{
+ struct EventRecord
+ {
+ QueueEvents::EventType type;
+ std::string queue;
+ std::string content;
+ SequenceNumber position;
+ };
+
+ typedef std::deque<EventRecord> Events;
+
+ Events events;
+
+ void handle(QueueEvents::Event event)
+ {
+ EventRecord record;
+ record.type = event.type;
+ record.queue = event.msg.queue->getName();
+ event.msg.payload->getFrames().getContent(record.content);
+ record.position = event.msg.position;
+ events.push_back(record);
+ }
+
+ void check(QueueEvents::EventType type, const std::string& queue, const std::string& content, const SequenceNumber& position)
+ {
+ if (events.empty()) {
+ BOOST_FAIL("Missed event");
+ } else {
+ BOOST_CHECK_EQUAL(events.front().type, type);
+ BOOST_CHECK_EQUAL(events.front().queue, queue);
+ BOOST_CHECK_EQUAL(events.front().content, content);
+ BOOST_CHECK_EQUAL(events.front().position, position);
+ events.pop_front();
+ }
+ }
+ void checkEnqueue(const std::string& queue, const std::string& data, const SequenceNumber& position)
+ {
+ check(QueueEvents::ENQUEUE, queue, data, position);
+ }
+
+ void checkDequeue(const std::string& queue, const std::string& data, const SequenceNumber& position)
+ {
+ check(QueueEvents::DEQUEUE, queue, data, position);
+ }
+};
+
+QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing)
+{
+ ProxySessionFixture fixture;
+ //register dummy event listener to broker
+ EventRecorder listener;
+ fixture.broker->getQueueEvents().registerListener("recorder", boost::bind(&EventRecorder::handle, &listener, _1));
+
+ //declare queue with event options specified
+ QueueOptions options;
+ options.enableQueueEvents(false);
+ std::string q("queue-events-test");
+ fixture.session.queueDeclare(arg::queue=q, arg::arguments=options);
+ //send and consume some messages
+ LocalQueue incoming;
+ Subscription sub = fixture.subs.subscribe(incoming, q);
+ for (int i = 0; i < 5; i++) {
+ fixture.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
+ }
+ for (int i = 0; i < 3; i++) {
+ BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str());
+ }
+ for (int i = 5; i < 10; i++) {
+ fixture.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
+ }
+ for (int i = 3; i < 10; i++) {
+ BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str());
+ }
+ fixture.connection.close();
+ fixture.shutdownBroker();
+
+ //check listener was notified of all events, and in correct order
+ SequenceNumber enqueueId(1);
+ SequenceNumber dequeueId(1);
+ for (int i = 0; i < 5; i++) {
+ listener.checkEnqueue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), enqueueId++);
+ }
+ for (int i = 0; i < 3; i++) {
+ listener.checkDequeue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), dequeueId++);
+ }
+ for (int i = 5; i < 10; i++) {
+ listener.checkEnqueue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), enqueueId++);
+ }
+ for (int i = 3; i < 10; i++) {
+ listener.checkDequeue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), dequeueId++);
+ }
+}
+
+QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing_enqueuesOnly)
+{
+ ProxySessionFixture fixture;
+ //register dummy event listener to broker
+ EventRecorder listener;
+ fixture.broker->getQueueEvents().registerListener("recorder", boost::bind(&EventRecorder::handle, &listener, _1));
+
+ //declare queue with event options specified
+ QueueOptions options;
+ options.enableQueueEvents(true);
+ std::string q("queue-events-test");
+ fixture.session.queueDeclare(arg::queue=q, arg::arguments=options);
+ //send and consume some messages
+ LocalQueue incoming;
+ Subscription sub = fixture.subs.subscribe(incoming, q);
+ for (int i = 0; i < 5; i++) {
+ fixture.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
+ }
+ for (int i = 0; i < 3; i++) {
+ BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str());
+ }
+ for (int i = 5; i < 10; i++) {
+ fixture.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
+ }
+ for (int i = 3; i < 10; i++) {
+ BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str());
+ }
+ fixture.connection.close();
+ fixture.shutdownBroker();
+
+ //check listener was notified of all events, and in correct order
+ SequenceNumber enqueueId(1);
+ SequenceNumber dequeueId(1);
+ for (int i = 0; i < 5; i++) {
+ listener.checkEnqueue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), enqueueId++);
+ }
+ for (int i = 5; i < 10; i++) {
+ listener.checkEnqueue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), enqueueId++);
+ }
+}
+
+QPID_AUTO_TEST_SUITE_END()
+
+
diff --git a/cpp/src/tests/replication_test b/cpp/src/tests/replication_test
new file mode 100755
index 0000000000..6bf8041343
--- /dev/null
+++ b/cpp/src/tests/replication_test
@@ -0,0 +1,98 @@
+#!/bin/sh
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Run the federation tests.
+MY_DIR=`dirname \`which $0\``
+PYTHON_DIR=${MY_DIR}/../../../python
+
+trap stop_brokers INT TERM QUIT
+
+stop_brokers() {
+ if [[ $BROKER_A ]] ; then
+ ../qpidd -q --port $BROKER_A
+ unset BROKER_A
+ fi
+ if [[ $BROKER_B ]] ; then
+ ../qpidd -q --port $BROKER_B
+ unset BROKER_B
+ fi
+}
+
+if test -d ${PYTHON_DIR} && test -e ../.libs/replicating_listener.so && test -e ../.libs/replication_exchange.so ; then
+ ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replicating_listener.so --replication-queue replication --create-replication-queue true > qpidd.port
+ BROKER_A=`cat qpidd.port`
+
+ ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so > qpidd.port
+ BROKER_B=`cat qpidd.port`
+
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add exchange replication replication
+ $PYTHON_DIR/commands/qpid-route queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication
+
+ #create test queues
+
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-a --generate-queue-events 2
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-b --generate-queue-events 2
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-c --generate-queue-events 1
+
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-a
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-b
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-c
+
+ #publish and consume from test queus on broker A:
+ rm -f queue-*.repl
+ for i in `seq 1 10`; do echo Message $i for A >> queue-a-input.repl; done
+ for i in `seq 1 20`; do echo Message $i for B >> queue-b-input.repl; done
+ for i in `seq 1 15`; do echo Message $i for C >> queue-c-input.repl; done
+
+ ./sender --port $BROKER_A --routing-key queue-a --send-eos 1 < queue-a-input.repl
+ ./sender --port $BROKER_A --routing-key queue-b --send-eos 1 < queue-b-input.repl
+ ./sender --port $BROKER_A --routing-key queue-c --send-eos 1 < queue-c-input.repl
+
+ ./receiver --port $BROKER_A --queue queue-a --messages 5 > /dev/null
+ ./receiver --port $BROKER_A --queue queue-b --messages 10 > /dev/null
+ ./receiver --port $BROKER_A --queue queue-c --messages 10 > /dev/null
+
+ #shutdown broker A then check that broker Bs versions of the queues are as expected
+ ../qpidd -q --port $BROKER_A
+ unset BROKER_A
+
+ #validate replicated queues:
+ ./receiver --port $BROKER_B --queue queue-a > queue-a-backup.repl
+ ./receiver --port $BROKER_B --queue queue-b > queue-b-backup.repl
+ ./receiver --port $BROKER_B --queue queue-c > queue-c-backup.repl
+ tail -5 queue-a-input.repl > queue-a-expected.repl
+ tail -10 queue-b-input.repl > queue-b-expected.repl
+ diff queue-a-backup.repl queue-a-expected.repl || FAIL=1
+ diff queue-b-backup.repl queue-b-expected.repl || FAIL=1
+ diff queue-c-backup.repl queue-c-input.repl || FAIL=1
+
+ if [[ $FAIL ]]; then
+ echo replication test failed: expectations not met!
+ else
+ echo queue state replicated as expected
+ rm queue-*.repl
+ fi
+
+ stop_brokers
+else
+ echo "Skipping replication test, plugins not built or python utils not located"
+fi
+
diff --git a/python/commands/qpid-config b/python/commands/qpid-config
index ff3c7db46e..ff2040e994 100755
--- a/python/commands/qpid-config
+++ b/python/commands/qpid-config
@@ -37,6 +37,7 @@ _policyType = None
_lvq = False
_msgSequence = False
_ive = False
+_eventGeneration = None
FILECOUNT = "qpid.file_count"
FILESIZE = "qpid.file_size"
@@ -47,6 +48,7 @@ CLUSTER_DURABLE = "qpid.persist_last_node"
LVQ = "qpid.last_value_queue"
MSG_SEQUENCE = "qpid.msg_sequence"
IVE = "qpid.ive"
+QUEUE_EVENT_GENERATION = "qpid.queue_event_generation"
def Usage ():
print "Usage: qpid-config [OPTIONS]"
@@ -74,6 +76,10 @@ def Usage ():
print " --max-queue-count N Maximum in-memory queue size as a number of messages"
print " --policy-type TYPE Action taken when queue limit is reached (reject, flow_to_disk, ring, ring_strict)"
print " --last-value-queue Enable LVQ behavior on the queue"
+ print " --generate-queue-events N"
+ print " If set to 1, every enqueue will generate an event that can be processed by"
+ print " registered listeners (e.g. for replication). If set to 2, events will be"
+ print " generated for enqueues and dequeues"
print
print "Add Exchange Options:"
print " --durable Exchange is durable"
@@ -193,6 +199,7 @@ class BrokerManager:
if MAX_QUEUE_COUNT in args: print "--max-queue-count=%d" % args[MAX_QUEUE_COUNT],
if POLICY_TYPE in args: print "--policy-type=%s" % args[POLICY_TYPE],
if LVQ in args and args[LVQ] == 1: print "--last-value-queue",
+ if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%d" % args[GENERATE_QUEUE_EVENTS],
print
def QueueListRecurse (self, filter):
@@ -249,6 +256,8 @@ class BrokerManager:
declArgs[CLUSTER_DURABLE] = 1
if _lvq:
declArgs[LVQ] = 1
+ if _eventGeneration:
+ declArgs[QUEUE_EVENT_GENERATION] = _eventGeneration
self.broker.getAmqpSession().queue_declare (queue=qname, durable=_durable, arguments=declArgs)
@@ -304,7 +313,7 @@ def YN (bool):
try:
longOpts = ("durable", "cluster-durable", "bindings", "broker-addr=", "file-count=",
"file-size=", "max-queue-size=", "max-queue-count=", "policy-type=",
- "last-value-queue", "sequence", "ive")
+ "last-value-queue", "sequence", "ive", "generate-queue-events=")
(optlist, encArgs) = getopt.gnu_getopt (sys.argv[1:], "a:b", longOpts)
except:
Usage ()
@@ -340,6 +349,8 @@ for opt in optlist:
_msgSequence = True
if opt[0] == "--ive":
_ive = True
+ if opt[0] == "--generate-queue-events":
+ _eventGeneration = int (opt[1])
nargs = len (cargs)
bm = BrokerManager ()