diff options
author | Gordon Sim <gsim@apache.org> | 2009-01-15 11:29:38 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2009-01-15 11:29:38 +0000 |
commit | e46c3c0a19af0fd659cfe018c34db1c0dfd498c5 (patch) | |
tree | 661de23013441445a9b04276fc4b7220906e5d18 /cpp/src | |
parent | 85679201de2448430804ff02d8a47894faf34f49 (diff) | |
download | qpid-python-e46c3c0a19af0fd659cfe018c34db1c0dfd498c5.tar.gz |
QPID-1567: Initial support for asynchronous queue state replication
* Added QueueEvents class with per broker instance
* Modified qpid::broker::Queue to notify QueueEvents of enqueues and dequeues (based on configuration)
* Added replication subdir containing two plugins:
- an event listener that registers with QueueEvents and creates messages representing received
events on a replication queue
- a custom exchange type for processing messages of the format created by the listener plugin
* Added new option for controlling event generation to qpid::client::QueueOptions
* Added new queue option to qpid-config script for the same
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@734674 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 47 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueEvents.cpp | 87 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueEvents.h | 77 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/QueueOptions.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/client/QueueOptions.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/replication/ReplicatingEventListener.cpp | 122 | ||||
-rw-r--r-- | cpp/src/qpid/replication/ReplicatingEventListener.h | 66 | ||||
-rw-r--r-- | cpp/src/qpid/replication/ReplicationExchange.cpp | 139 | ||||
-rw-r--r-- | cpp/src/qpid/replication/ReplicationExchange.h | 59 | ||||
-rw-r--r-- | cpp/src/qpid/replication/constants.h | 31 | ||||
-rw-r--r-- | cpp/src/replication.mk | 46 | ||||
-rw-r--r-- | cpp/src/tests/BrokerFixture.h | 8 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 6 | ||||
-rw-r--r-- | cpp/src/tests/QueueEvents.cpp | 233 | ||||
-rwxr-xr-x | cpp/src/tests/replication_test | 98 |
20 files changed, 1045 insertions, 8 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 27592f09ed..e3331daa00 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -184,6 +184,7 @@ include qmfc.mk if HAVE_XML include xml.mk endif +include replication.mk if RDMA @@ -400,6 +401,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/NameGenerator.cpp \ qpid/broker/NullMessageStore.cpp \ qpid/broker/QueueBindings.cpp \ + qpid/broker/QueueEvents.cpp \ qpid/broker/QueuePolicy.cpp \ qpid/broker/QueueRegistry.cpp \ qpid/broker/RateTracker.cpp \ @@ -545,6 +547,7 @@ nobase_include_HEADERS = \ qpid/broker/PersistableMessage.h \ qpid/broker/PersistableQueue.h \ qpid/broker/QueueBindings.h \ + qpid/broker/QueueEvents.h \ qpid/broker/QueuedMessage.h \ qpid/broker/QueuePolicy.h \ qpid/broker/QueueRegistry.h \ diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 37750f8352..42a4a7b3be 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -144,6 +144,7 @@ Broker::Broker(const Broker::Options& conf) : conf.replayHardLimit*1024), *this), queueCleaner(queues, timer), + queueEvents(poller), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) { if (conf.enableMgmt) { diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index ac972b0325..247493d41c 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -32,6 +32,7 @@ #include "LinkRegistry.h" #include "SessionManager.h" #include "QueueCleaner.h" +#include "QueueEvents.h" #include "Vhost.h" #include "System.h" #include "Timer.h" @@ -128,6 +129,7 @@ class Broker : public sys::Runnable, public Plugin::Target, Vhost::shared_ptr vhostObject; System::shared_ptr systemObject; QueueCleaner queueCleaner; + QueueEvents queueEvents; void declareStandardExchange(const std::string& name, const std::string& type); @@ -172,6 +174,7 @@ class Broker : public sys::Runnable, public Plugin::Target, DtxManager& getDtxManager() { return dtxManager; } DataDir& getDataDir() { return dataDir; } Options& getOptions() { return config; } + QueueEvents& getQueueEvents() { return queueEvents; } SessionManager& getSessionManager() { return sessionManager; } const std::string& getFederationTag() const { return federationTag; } diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 9089ba0c54..5acc474aa1 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -21,6 +21,7 @@ #include "Broker.h" #include "Queue.h" +#include "QueueEvents.h" #include "Exchange.h" #include "DeliverableMessage.h" #include "MessageStore.h" @@ -64,8 +65,11 @@ const std::string qpidLastValueQueue("qpid.last_value_queue"); const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse"); const std::string qpidPersistLastNode("qpid.persist_last_node"); const std::string qpidVQMatchProperty("qpid.LVQ_key"); -} +const std::string qpidQueueEventGeneration("qpid.queue_event_generation"); +const int ENQUEUE_ONLY=1; +const int ENQUEUE_AND_DEQUEUE=2; +} Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, @@ -85,7 +89,9 @@ Queue::Queue(const string& _name, bool _autodelete, inLastNodeFailure(false), persistenceId(0), policyExceeded(false), - mgmtObject(0) + mgmtObject(0), + eventMode(0), + eventMgr(0) { if (parent != 0) { @@ -207,6 +213,25 @@ void Queue::clearLVQIndex(const QueuedMessage& msg){ } } +bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) +{ + Mutex::ScopedLock locker(messageLock); + QPID_LOG(debug, "Attempting to acquire message at " << position); + for (Messages::iterator i = messages.begin(); i != messages.end(); i++) { + if (i->position == position) { + message = *i; + if (lastValueQueue) { + clearLVQIndex(*i); + } + messages.erase(i); + QPID_LOG(debug, "Acquired message at " << i->position << " from " << name); + return true; + } + } + QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position"); + return false; +} + bool Queue::acquire(const QueuedMessage& msg) { Mutex::ScopedLock locker(messageLock); QPID_LOG(debug, "attempting to acquire " << msg.position); @@ -515,13 +540,15 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){ lvq[key] = msg; }else { i->second->setReplacementMessage(msg,this); - qm.payload = i->second; - dequeued(qm); + dequeued(QueuedMessage(qm.queue, i->second, qm.position)); } }else { messages.push_back(qm); listeners.populate(copy); } + if (eventMode && eventMgr) { + eventMgr->enqueued(qm); + } } copy.notify(); } @@ -659,6 +686,9 @@ void Queue::dequeued(const QueuedMessage& msg) { if (policy.get()) policy->dequeued(msg); mgntDeqStats(msg.payload); + if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) { + eventMgr->dequeued(msg); + } } @@ -698,6 +728,8 @@ void Queue::configure(const FieldTable& _settings) QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements"); + eventMode = _settings.getAsInt(qpidQueueEventGeneration); + if (mgmtObject != 0) mgmtObject->set_arguments (_settings); } @@ -898,3 +930,10 @@ void Queue::setPosition(SequenceNumber n) { Mutex::ScopedLock locker(messageLock); sequence = n; } + +int Queue::getEventMode() { return eventMode; } + +void Queue::setQueueEventManager(QueueEvents& mgr) +{ + eventMgr = &mgr; +} diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index e0bcc25fa3..394b5fd054 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -50,6 +50,7 @@ namespace qpid { namespace broker { class Broker; class MessageStore; + class QueueEvents; class QueueRegistry; class TransactionContext; class Exchange; @@ -96,6 +97,8 @@ namespace qpid { framing::SequenceNumber sequence; qmf::org::apache::qpid::broker::Queue* mgmtObject; RateTracker dequeueTracker; + int eventMode; + QueueEvents* eventMgr; void push(boost::intrusive_ptr<Message>& msg); void setPolicy(std::auto_ptr<QueuePolicy> policy); @@ -166,6 +169,7 @@ namespace qpid { void unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref); bool acquire(const QueuedMessage& msg); + bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message); /** * Delivers a message to the queue. Will record it as @@ -279,6 +283,8 @@ namespace qpid { * Used by cluster to replicate queues. */ void setPosition(framing::SequenceNumber pos); + int getEventMode(); + void setQueueEventManager(QueueEvents&); }; } } diff --git a/cpp/src/qpid/broker/QueueEvents.cpp b/cpp/src/qpid/broker/QueueEvents.cpp new file mode 100644 index 0000000000..a6517e1bfe --- /dev/null +++ b/cpp/src/qpid/broker/QueueEvents.cpp @@ -0,0 +1,87 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "QueueEvents.h" +#include "qpid/Exception.h" + +namespace qpid { +namespace broker { + +QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller) : + eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller) +{ + eventQueue.start(); +} + +QueueEvents::~QueueEvents() +{ + eventQueue.stop(); +} + +void QueueEvents::enqueued(const QueuedMessage& m) +{ + eventQueue.push(Event(ENQUEUE, m)); +} + +void QueueEvents::dequeued(const QueuedMessage& m) +{ + eventQueue.push(Event(DEQUEUE, m)); +} + +void QueueEvents::registerListener(const std::string& id, const EventListener& listener) +{ + qpid::sys::Mutex::ScopedLock l(lock); + if (listeners.find(id) == listeners.end()) { + listeners[id] = listener; + } else { + throw Exception(QPID_MSG("Event listener already registered for '" << id << "'")); + } +} + +void QueueEvents::unregisterListener(const std::string& id) +{ + qpid::sys::Mutex::ScopedLock l(lock); + if (listeners.find(id) == listeners.end()) { + throw Exception(QPID_MSG("No event listener registered for '" << id << "'")); + } else { + listeners.erase(id); + } +} + +void QueueEvents::handle(EventQueue::Queue& events) +{ + qpid::sys::Mutex::ScopedLock l(lock); + while (!events.empty()) { + for (Listeners::iterator i = listeners.begin(); i != listeners.end(); i++) { + i->second(events.front()); + } + events.pop_front(); + } +} + +void QueueEvents::shutdown() +{ + if (!eventQueue.empty() && !listeners.empty()) eventQueue.shutdown(); +} + +QueueEvents::Event::Event(EventType t, const QueuedMessage& m) : type(t), msg(m) {} + + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/QueueEvents.h b/cpp/src/qpid/broker/QueueEvents.h new file mode 100644 index 0000000000..2ba69e33e6 --- /dev/null +++ b/cpp/src/qpid/broker/QueueEvents.h @@ -0,0 +1,77 @@ +#ifndef QPID_BROKER_QUEUEEVENTS_H +#define QPID_BROKER_QUEUEEVENTS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "QueuedMessage.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/PollableQueue.h" +#include <map> +#include <string> +#include <boost/function.hpp> + +namespace qpid { +namespace broker { + +/** + * Event manager for queue events. Allows queues to indicate when + * events have occured; allows listeners to register for notification + * of this. The notification happens asynchronously, in a separate + * thread. + */ +class QueueEvents +{ + public: + enum EventType {ENQUEUE, DEQUEUE}; + + struct Event + { + EventType type; + QueuedMessage msg; + + Event(EventType, const QueuedMessage&); + }; + + typedef boost::function<void (Event)> EventListener; + + QueueEvents(const boost::shared_ptr<sys::Poller>& poller); + ~QueueEvents(); + void enqueued(const QueuedMessage&); + void dequeued(const QueuedMessage&); + void registerListener(const std::string& id, const EventListener&); + void unregisterListener(const std::string& id); + //process all outstanding events + void shutdown(); + private: + typedef qpid::sys::PollableQueue<Event> EventQueue; + typedef std::map<std::string, EventListener> Listeners; + + EventQueue eventQueue; + Listeners listeners; + qpid::sys::Mutex lock;//protect listeners from concurrent access + + void handle(EventQueue::Queue& e); + +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_QUEUEEVENTS_H*/ diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index 4450d56efb..0966db8162 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -362,6 +362,10 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& getBroker().getExchanges().getDefault()->bind(queue, name, 0); queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments); + //if event generation is turned on, pass in a pointer to + //the QueueEvents instance to use + if (queue->getEventMode()) queue->setQueueEventManager(getBroker().getQueueEvents()); + //handle automatic cleanup: if (exclusive) { exclusiveQueues.push_back(queue); diff --git a/cpp/src/qpid/client/QueueOptions.cpp b/cpp/src/qpid/client/QueueOptions.cpp index ac65b0bc22..5493c05044 100644 --- a/cpp/src/qpid/client/QueueOptions.cpp +++ b/cpp/src/qpid/client/QueueOptions.cpp @@ -24,6 +24,8 @@ namespace qpid { namespace client { +enum QueueEventGeneration {ENQUEUE_ONLY=1, ENQUEUE_AND_DEQUEUE=2}; + QueueOptions::QueueOptions() {} @@ -39,6 +41,7 @@ const std::string QueueOptions::strLastValueQueue("qpid.last_value_queue"); const std::string QueueOptions::strPersistLastNode("qpid.persist_last_node"); const std::string QueueOptions::strLVQMatchProperty("qpid.LVQ_key"); const std::string QueueOptions::strLastValueQueueNoBrowse("qpid.last_value_queue_no_browse"); +const std::string QueueOptions::strQueueEventMode("qpid.queue_event_generation"); QueueOptions::~QueueOptions() @@ -109,6 +112,10 @@ void QueueOptions::clearOrdering() erase(strLastValueQueue); } +void QueueOptions::enableQueueEvents(bool enqueueOnly) +{ + setInt(strQueueEventMode, enqueueOnly ? ENQUEUE_ONLY : ENQUEUE_AND_DEQUEUE); +} } } diff --git a/cpp/src/qpid/client/QueueOptions.h b/cpp/src/qpid/client/QueueOptions.h index 114e1e49c2..8c22414cbb 100644 --- a/cpp/src/qpid/client/QueueOptions.h +++ b/cpp/src/qpid/client/QueueOptions.h @@ -27,7 +27,7 @@ namespace qpid { namespace client { enum QueueSizePolicy {NONE, REJECT, FLOW_TO_DISK, RING, RING_STRICT}; -enum QueueOrderingPolicy {FIFO, LVQ, LVQ_NO_BROWSE}; +enum QueueOrderingPolicy {FIFO, LVQ, LVQ_NO_BROWSE}; /** * A help class to set options on the Queue. Create a configured args while @@ -83,6 +83,13 @@ class QueueOptions: public framing::FieldTable * Use default odering policy */ void clearOrdering(); + + /** + * Turns on event generation for this queue (either enqueue only + * or for enqueue and dequeue events); the events can then be + * processed by a regsitered broker plugin. + */ + void enableQueueEvents(bool enqueueOnly); static const std::string strMaxCountKey; static const std::string strMaxSizeKey; @@ -95,6 +102,7 @@ class QueueOptions: public framing::FieldTable static const std::string strPersistLastNode; static const std::string strLVQMatchProperty; static const std::string strLastValueQueueNoBrowse; + static const std::string strQueueEventMode; }; } diff --git a/cpp/src/qpid/replication/ReplicatingEventListener.cpp b/cpp/src/qpid/replication/ReplicatingEventListener.cpp new file mode 100644 index 0000000000..80ff77d107 --- /dev/null +++ b/cpp/src/qpid/replication/ReplicatingEventListener.cpp @@ -0,0 +1,122 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ReplicatingEventListener.h" +#include "constants.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/QueueEvents.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace replication { + +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::replication::constants; + +void ReplicatingEventListener::handle(QueueEvents::Event event) +{ + //create event message and enqueue it on replication queue + FieldTable headers; + boost::intrusive_ptr<Message> message; + switch (event.type) { + case QueueEvents::ENQUEUE: + headers.setString(REPLICATION_EVENT_TYPE, ENQUEUE); + headers.setString(REPLICATION_TARGET_QUEUE, event.msg.queue->getName()); + message = createEventMessage(headers); + queue->deliver(message); + //if its an enqueue, enqueue the message itself on the + //replication queue also: + queue->deliver(event.msg.payload); + QPID_LOG(debug, "Queued 'enqueue' event on " << event.msg.queue->getName() << " for replication"); + break; + case QueueEvents::DEQUEUE: + headers.setString(REPLICATION_EVENT_TYPE, DEQUEUE); + headers.setString(REPLICATION_TARGET_QUEUE, event.msg.queue->getName()); + headers.setInt(DEQUEUED_MESSAGE_POSITION, event.msg.position); + message = createEventMessage(headers); + queue->deliver(message); + QPID_LOG(debug, "Queued 'dequeue' event from " << event.msg.queue->getName() << " for replication, (from position " + << event.msg.position << ")"); + break; + } +} + +namespace { +const std::string EMPTY; +} + +boost::intrusive_ptr<Message> ReplicatingEventListener::createEventMessage(const FieldTable& headers) +{ + boost::intrusive_ptr<Message> msg(new Message()); + AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), EMPTY, 0, 0)); + AMQFrame header(in_place<AMQHeaderBody>()); + header.setBof(false); + header.setEof(true); + header.setBos(true); + header.setEos(true); + msg->getFrames().append(method); + msg->getFrames().append(header); + MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true); + props->setApplicationHeaders(headers); + return msg; +} + +Options* ReplicatingEventListener::getOptions() +{ + return &options; +} + +void ReplicatingEventListener::initialize(Plugin::Target& target) +{ + Broker* broker = dynamic_cast<broker::Broker*>(&target); + if (broker && !options.queue.empty()) { + if (options.createQueue) { + queue = broker->getQueues().declare(options.queue).first; + } else { + queue = broker->getQueues().find(options.queue); + } + if (queue) { + QueueEvents::EventListener callback = boost::bind(&ReplicatingEventListener::handle, this, _1); + broker->getQueueEvents().registerListener(options.name, callback); + QPID_LOG(info, "Registered replicating queue event listener"); + } else { + QPID_LOG(error, "Replication queue named '" << options.queue << "' does not exist; replication plugin disabled."); + } + } +} + +void ReplicatingEventListener::earlyInitialize(Target&) {} + +ReplicatingEventListener::PluginOptions::PluginOptions() : Options("Queue Replication Options"), + name("replicator"), + createQueue(false) +{ + addOptions() + ("replication-queue", optValue(queue, "QUEUE"), "Queue on which events for other queues are recorded") + ("replication-listener-name", optValue(name, "NAME"), "name by which to register the replicating event listener") + ("create-replication-queue", optValue(createQueue), "if set, the replication will be created if it does not exist"); +} + +static ReplicatingEventListener plugin; + +}} // namespace qpid::replication diff --git a/cpp/src/qpid/replication/ReplicatingEventListener.h b/cpp/src/qpid/replication/ReplicatingEventListener.h new file mode 100644 index 0000000000..25e2a5b7b9 --- /dev/null +++ b/cpp/src/qpid/replication/ReplicatingEventListener.h @@ -0,0 +1,66 @@ +#ifndef QPID_REPLICATION_REPLICATINGEVENTLISTENER_H +#define QPID_REPLICATION_REPLICATINGEVENTLISTENER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/Plugin.h" +#include "qpid/Options.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/QueueEvents.h" +#include "qpid/framing/FieldTable.h" + +namespace qpid { +namespace replication { + +/** + * An event listener plugin that records queue events as messages on a + * replication queue, from where they can be consumed (e.g. by an + * inter-broker link to the corresponding QueueReplicationExchange + * plugin. + */ +class ReplicatingEventListener : public Plugin +{ + public: + Options* getOptions(); + void earlyInitialize(Plugin::Target& target); + void initialize(Plugin::Target& target); + void handle(qpid::broker::QueueEvents::Event); + private: + struct PluginOptions : public Options + { + std::string queue; + std::string name; + bool createQueue; + + PluginOptions(); + }; + + PluginOptions options; + qpid::broker::Queue::shared_ptr queue; + + boost::intrusive_ptr<qpid::broker::Message> createEventMessage(const qpid::framing::FieldTable& headers); +}; + +}} // namespace qpid::replication + +#endif /*!QPID_REPLICATION_REPLICATINGEVENTLISTENER_H*/ diff --git a/cpp/src/qpid/replication/ReplicationExchange.cpp b/cpp/src/qpid/replication/ReplicationExchange.cpp new file mode 100644 index 0000000000..abe8a4dfb6 --- /dev/null +++ b/cpp/src/qpid/replication/ReplicationExchange.cpp @@ -0,0 +1,139 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ReplicationExchange.h" +#include "constants.h" +#include "qpid/Plugin.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/ExchangeRegistry.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Statement.h" +#include <boost/bind.hpp> + +namespace qpid { +namespace replication { + +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::replication::constants; + +ReplicationExchange::ReplicationExchange(const std::string& name, bool durable, + const FieldTable& args, + QueueRegistry& qr, + Manageable* parent) + : Exchange(name, durable, args, parent), queues(qr), expectingEnqueue(false) {} + +std::string ReplicationExchange::getType() const { return typeName; } + +void ReplicationExchange::route(Deliverable& msg, const std::string& /*routingKey*/, const FieldTable* args) +{ + if (args) { + std::string eventType = args->getAsString(REPLICATION_EVENT_TYPE); + if (eventType == ENQUEUE) { + expectingEnqueue = true; + targetQueue = args->getAsString(REPLICATION_TARGET_QUEUE); + QPID_LOG(debug, "Recorded replicated 'enqueue' event for " << targetQueue); + return; + } else if (eventType == DEQUEUE) { + std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE); + Queue::shared_ptr queue = queues.find(queueName); + SequenceNumber position(args->getAsInt(DEQUEUED_MESSAGE_POSITION)); + + QueuedMessage dequeued; + if (queue->acquireMessageAt(position, dequeued)) { + queue->dequeue(0, dequeued); + QPID_LOG(debug, "Processed replicated 'dequeue' event from " << queueName << " at position " << position); + } else { + QPID_LOG(warning, "Could not acquire message " << position << " from " << queueName); + } + + return; + } else if (!eventType.empty()) { + throw IllegalArgumentException(QPID_MSG("Illegal value for " << REPLICATION_EVENT_TYPE << ": " << eventType)); + } + } + //if we get here assume its not an event message, assume its an enqueue + if (expectingEnqueue) { + Queue::shared_ptr queue = queues.find(targetQueue); + msg.deliverTo(queue); + expectingEnqueue = false; + targetQueue.clear(); + QPID_LOG(debug, "Eenqueued replicated message onto " << targetQueue); + } else { + QPID_LOG(warning, "Dropping unexpected message"); + } +} + +bool ReplicationExchange::bind(Queue::shared_ptr /*queue*/, const std::string& /*routingKey*/, const FieldTable* /*args*/) +{ + throw NotImplementedException("Replication exchange does not support bind operation"); +} + +bool ReplicationExchange::unbind(Queue::shared_ptr /*queue*/, const std::string& /*routingKey*/, const FieldTable* /*args*/) +{ + throw NotImplementedException("Replication exchange does not support unbind operation"); +} + +bool ReplicationExchange::isBound(Queue::shared_ptr /*queue*/, const string* const /*routingKey*/, const FieldTable* const /*args*/) +{ + return false; +} + +const std::string ReplicationExchange::typeName("replication"); + + +struct ReplicationExchangePlugin : Plugin +{ + Broker* broker; + + ReplicationExchangePlugin(); + void earlyInitialize(Plugin::Target& target); + void initialize(Plugin::Target& target); + Exchange::shared_ptr create(const std::string& name, bool durable, + const framing::FieldTable& args, + management::Manageable* parent); +}; + +ReplicationExchangePlugin::ReplicationExchangePlugin() : broker(0) {} + +Exchange::shared_ptr ReplicationExchangePlugin::create(const std::string& name, bool durable, + const framing::FieldTable& args, + management::Manageable* parent) +{ + Exchange::shared_ptr e(new ReplicationExchange(name, durable, args, broker->getQueues(), parent)); + return e; +} + + +void ReplicationExchangePlugin::initialize(Plugin::Target& target) +{ + broker = dynamic_cast<broker::Broker*>(&target); + if (broker) { + ExchangeRegistry::FactoryFunction f = boost::bind(&ReplicationExchangePlugin::create, this, _1, _2, _3, _4); + broker->getExchanges().registerType(ReplicationExchange::typeName, f); + QPID_LOG(info, "Registered replication exchange"); + } +} + +void ReplicationExchangePlugin::earlyInitialize(Target&) {} + +static ReplicationExchangePlugin exchangePlugin; + +}} // namespace qpid::replication diff --git a/cpp/src/qpid/replication/ReplicationExchange.h b/cpp/src/qpid/replication/ReplicationExchange.h new file mode 100644 index 0000000000..ed2b5956b6 --- /dev/null +++ b/cpp/src/qpid/replication/ReplicationExchange.h @@ -0,0 +1,59 @@ +#ifndef QPID_REPLICATION_REPLICATIONEXCHANGE_H +#define QPID_REPLICATION_REPLICATIONEXCHANGE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/Exchange.h" + +namespace qpid { +namespace replication { + +/** + * A custom exchange plugin that processes incoming messages + * representing enqueue or dequeue events for particular queues and + * carries out the corresponding action to replicate that on the local + * broker. + */ +class ReplicationExchange : public qpid::broker::Exchange +{ + public: + static const std::string typeName; + + ReplicationExchange(const std::string& name, bool durable, + const qpid::framing::FieldTable& args, + qpid::broker::QueueRegistry& queues, + qpid::management::Manageable* parent = 0); + + std::string getType() const; + + void route(qpid::broker::Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args); + + bool bind(qpid::broker::Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + bool unbind(qpid::broker::Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + bool isBound(qpid::broker::Queue::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args); + private: + qpid::broker::QueueRegistry& queues; + bool expectingEnqueue; + std::string targetQueue; +}; +}} // namespace qpid::replication + +#endif /*!QPID_REPLICATION_REPLICATIONEXCHANGE_H*/ diff --git a/cpp/src/qpid/replication/constants.h b/cpp/src/qpid/replication/constants.h new file mode 100644 index 0000000000..b0cef7570c --- /dev/null +++ b/cpp/src/qpid/replication/constants.h @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace qpid { +namespace replication { +namespace constants { + +const std::string REPLICATION_EVENT_TYPE("qpid.replication_event_type"); +const std::string ENQUEUE("enqueue"); +const std::string DEQUEUE("dequeue"); +const std::string REPLICATION_TARGET_QUEUE("qpid.replication_target_queue"); +const std::string DEQUEUED_MESSAGE_POSITION("qpid.dequeued_message_position"); + +}}} diff --git a/cpp/src/replication.mk b/cpp/src/replication.mk new file mode 100644 index 0000000000..6830f99d36 --- /dev/null +++ b/cpp/src/replication.mk @@ -0,0 +1,46 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Make file for building two plugins for asynchronously replicating +# queues. + +dmodule_LTLIBRARIES += replicating_listener.la replication_exchange.la + +# a queue event listener plugin that creates messages on a replication +# queue corresponding to enqueue and dequeue events: +replicating_listener_la_SOURCES = \ + qpid/replication/constants.h \ + qpid/replication/ReplicatingEventListener.cpp \ + qpid/replication/ReplicatingEventListener.h + +replicating_listener_la_LIBADD = libqpidbroker.la + +replicating_listener_la_LDFLAGS = $(PLUGINLDFLAGS) + +# a custom exchange plugin that allows an exchange to be created that +# can process the messages from a replication queue (populated on the +# source system by the replicating listener plugin above) and take the +# corresponding action on the local queues +replication_exchange_la_SOURCES = \ + qpid/replication/constants.h \ + qpid/replication/ReplicationExchange.cpp \ + qpid/replication/ReplicationExchange.h + +replication_exchange_la_LIBADD = libqpidbroker.la + +replication_exchange_la_LDFLAGS = $(PLUGINLDFLAGS) diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h index 2a4faa2fd4..205b4d90ef 100644 --- a/cpp/src/tests/BrokerFixture.h +++ b/cpp/src/tests/BrokerFixture.h @@ -66,8 +66,14 @@ struct BrokerFixture : private boost::noncopyable { brokerThread = qpid::sys::Thread(*broker); }; - ~BrokerFixture() { + void shutdownBroker() + { broker->shutdown(); + broker = BrokerPtr(); + } + + ~BrokerFixture() { + if (broker) broker->shutdown(); brokerThread.join(); } diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 8cc72da96b..314b90ba8b 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -88,7 +88,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ ForkedBroker.h \ ManagementTest.cpp \ MessageReplayTracker.cpp \ - ConsoleTest.cpp + ConsoleTest.cpp \ + QueueEvents.cpp if HAVE_XML unit_test_SOURCES+= XmlClientSessionTest.cpp @@ -198,7 +199,7 @@ DispatcherTest_LDADD=$(lib_common) TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= BOOST_TEST_SHOW_PROGRESS=yes $(srcdir)/run_test system_tests = client_test quick_perftest quick_topictest run_header_test quick_txtest -TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_acl_tests run_cli_tests +TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_acl_tests run_cli_tests replication_test EXTRA_DIST += \ run_test vg_check \ @@ -219,6 +220,7 @@ EXTRA_DIST += \ MessageUtils.h \ TestMessageStore.h \ TxMocks.h \ + replication_test \ start_cluster stop_cluster restart_cluster check_LTLIBRARIES += libdlclose_noop.la diff --git a/cpp/src/tests/QueueEvents.cpp b/cpp/src/tests/QueueEvents.cpp new file mode 100644 index 0000000000..7aea23922d --- /dev/null +++ b/cpp/src/tests/QueueEvents.cpp @@ -0,0 +1,233 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "MessageUtils.h" +#include "unit_test.h" +#include "BrokerFixture.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/QueueEvents.h" +#include "qpid/client/QueueOptions.h" +#include "qpid/framing/SequenceNumber.h" +#include <boost/bind.hpp> +#include <boost/format.hpp> + +QPID_AUTO_TEST_SUITE(QueueEventsSuite) + +using namespace qpid::client; +using namespace qpid::broker; +using namespace qpid::sys; +using qpid::framing::SequenceNumber; + +struct DummyListener +{ + typedef std::deque<QueueEvents::Event> Events; + + Events events; + boost::shared_ptr<Poller> poller; + + void handle(QueueEvents::Event e) + { + if (events.empty()) { + BOOST_FAIL("Unexpected event received"); + } else { + BOOST_CHECK_EQUAL(events.front().type, e.type); + BOOST_CHECK_EQUAL(events.front().msg.queue, e.msg.queue); + BOOST_CHECK_EQUAL(events.front().msg.payload, e.msg.payload); + BOOST_CHECK_EQUAL(events.front().msg.position, e.msg.position); + events.pop_front(); + } + if (events.empty() && poller) poller->shutdown(); + } + + void expect(QueueEvents::Event e) + { + events.push_back(e); + } +}; + +QPID_AUTO_TEST_CASE(testBasicEventProcessing) +{ + boost::shared_ptr<Poller> poller(new Poller()); + sys::Dispatcher dispatcher(poller); + Thread dispatchThread(dispatcher); + QueueEvents events(poller); + DummyListener listener; + listener.poller = poller; + events.registerListener("dummy", boost::bind(&DummyListener::handle, &listener, _1)); + //signal occurence of some events: + Queue queue("queue1"); + SequenceNumber id; + QueuedMessage event1(&queue, MessageUtils::createMessage(), id); + QueuedMessage event2(&queue, MessageUtils::createMessage(), ++id); + + events.enqueued(event1); + events.enqueued(event2); + events.dequeued(event1); + //define events expected by listener: + listener.expect(QueueEvents::Event(QueueEvents::ENQUEUE, event1)); + listener.expect(QueueEvents::Event(QueueEvents::ENQUEUE, event2)); + listener.expect(QueueEvents::Event(QueueEvents::DEQUEUE, event1)); + + dispatchThread.join(); + events.shutdown(); + events.unregisterListener("dummy"); +} + + +struct EventRecorder +{ + struct EventRecord + { + QueueEvents::EventType type; + std::string queue; + std::string content; + SequenceNumber position; + }; + + typedef std::deque<EventRecord> Events; + + Events events; + + void handle(QueueEvents::Event event) + { + EventRecord record; + record.type = event.type; + record.queue = event.msg.queue->getName(); + event.msg.payload->getFrames().getContent(record.content); + record.position = event.msg.position; + events.push_back(record); + } + + void check(QueueEvents::EventType type, const std::string& queue, const std::string& content, const SequenceNumber& position) + { + if (events.empty()) { + BOOST_FAIL("Missed event"); + } else { + BOOST_CHECK_EQUAL(events.front().type, type); + BOOST_CHECK_EQUAL(events.front().queue, queue); + BOOST_CHECK_EQUAL(events.front().content, content); + BOOST_CHECK_EQUAL(events.front().position, position); + events.pop_front(); + } + } + void checkEnqueue(const std::string& queue, const std::string& data, const SequenceNumber& position) + { + check(QueueEvents::ENQUEUE, queue, data, position); + } + + void checkDequeue(const std::string& queue, const std::string& data, const SequenceNumber& position) + { + check(QueueEvents::DEQUEUE, queue, data, position); + } +}; + +QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing) +{ + ProxySessionFixture fixture; + //register dummy event listener to broker + EventRecorder listener; + fixture.broker->getQueueEvents().registerListener("recorder", boost::bind(&EventRecorder::handle, &listener, _1)); + + //declare queue with event options specified + QueueOptions options; + options.enableQueueEvents(false); + std::string q("queue-events-test"); + fixture.session.queueDeclare(arg::queue=q, arg::arguments=options); + //send and consume some messages + LocalQueue incoming; + Subscription sub = fixture.subs.subscribe(incoming, q); + for (int i = 0; i < 5; i++) { + fixture.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q)); + } + for (int i = 0; i < 3; i++) { + BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str()); + } + for (int i = 5; i < 10; i++) { + fixture.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q)); + } + for (int i = 3; i < 10; i++) { + BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str()); + } + fixture.connection.close(); + fixture.shutdownBroker(); + + //check listener was notified of all events, and in correct order + SequenceNumber enqueueId(1); + SequenceNumber dequeueId(1); + for (int i = 0; i < 5; i++) { + listener.checkEnqueue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), enqueueId++); + } + for (int i = 0; i < 3; i++) { + listener.checkDequeue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), dequeueId++); + } + for (int i = 5; i < 10; i++) { + listener.checkEnqueue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), enqueueId++); + } + for (int i = 3; i < 10; i++) { + listener.checkDequeue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), dequeueId++); + } +} + +QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing_enqueuesOnly) +{ + ProxySessionFixture fixture; + //register dummy event listener to broker + EventRecorder listener; + fixture.broker->getQueueEvents().registerListener("recorder", boost::bind(&EventRecorder::handle, &listener, _1)); + + //declare queue with event options specified + QueueOptions options; + options.enableQueueEvents(true); + std::string q("queue-events-test"); + fixture.session.queueDeclare(arg::queue=q, arg::arguments=options); + //send and consume some messages + LocalQueue incoming; + Subscription sub = fixture.subs.subscribe(incoming, q); + for (int i = 0; i < 5; i++) { + fixture.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q)); + } + for (int i = 0; i < 3; i++) { + BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str()); + } + for (int i = 5; i < 10; i++) { + fixture.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q)); + } + for (int i = 3; i < 10; i++) { + BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str()); + } + fixture.connection.close(); + fixture.shutdownBroker(); + + //check listener was notified of all events, and in correct order + SequenceNumber enqueueId(1); + SequenceNumber dequeueId(1); + for (int i = 0; i < 5; i++) { + listener.checkEnqueue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), enqueueId++); + } + for (int i = 5; i < 10; i++) { + listener.checkEnqueue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), enqueueId++); + } +} + +QPID_AUTO_TEST_SUITE_END() + + diff --git a/cpp/src/tests/replication_test b/cpp/src/tests/replication_test new file mode 100755 index 0000000000..6bf8041343 --- /dev/null +++ b/cpp/src/tests/replication_test @@ -0,0 +1,98 @@ +#!/bin/sh + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Run the federation tests. +MY_DIR=`dirname \`which $0\`` +PYTHON_DIR=${MY_DIR}/../../../python + +trap stop_brokers INT TERM QUIT + +stop_brokers() { + if [[ $BROKER_A ]] ; then + ../qpidd -q --port $BROKER_A + unset BROKER_A + fi + if [[ $BROKER_B ]] ; then + ../qpidd -q --port $BROKER_B + unset BROKER_B + fi +} + +if test -d ${PYTHON_DIR} && test -e ../.libs/replicating_listener.so && test -e ../.libs/replication_exchange.so ; then + ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replicating_listener.so --replication-queue replication --create-replication-queue true > qpidd.port + BROKER_A=`cat qpidd.port` + + ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so > qpidd.port + BROKER_B=`cat qpidd.port` + + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add exchange replication replication + $PYTHON_DIR/commands/qpid-route queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication + + #create test queues + + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-a --generate-queue-events 2 + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-b --generate-queue-events 2 + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-c --generate-queue-events 1 + + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-a + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-b + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-c + + #publish and consume from test queus on broker A: + rm -f queue-*.repl + for i in `seq 1 10`; do echo Message $i for A >> queue-a-input.repl; done + for i in `seq 1 20`; do echo Message $i for B >> queue-b-input.repl; done + for i in `seq 1 15`; do echo Message $i for C >> queue-c-input.repl; done + + ./sender --port $BROKER_A --routing-key queue-a --send-eos 1 < queue-a-input.repl + ./sender --port $BROKER_A --routing-key queue-b --send-eos 1 < queue-b-input.repl + ./sender --port $BROKER_A --routing-key queue-c --send-eos 1 < queue-c-input.repl + + ./receiver --port $BROKER_A --queue queue-a --messages 5 > /dev/null + ./receiver --port $BROKER_A --queue queue-b --messages 10 > /dev/null + ./receiver --port $BROKER_A --queue queue-c --messages 10 > /dev/null + + #shutdown broker A then check that broker Bs versions of the queues are as expected + ../qpidd -q --port $BROKER_A + unset BROKER_A + + #validate replicated queues: + ./receiver --port $BROKER_B --queue queue-a > queue-a-backup.repl + ./receiver --port $BROKER_B --queue queue-b > queue-b-backup.repl + ./receiver --port $BROKER_B --queue queue-c > queue-c-backup.repl + tail -5 queue-a-input.repl > queue-a-expected.repl + tail -10 queue-b-input.repl > queue-b-expected.repl + diff queue-a-backup.repl queue-a-expected.repl || FAIL=1 + diff queue-b-backup.repl queue-b-expected.repl || FAIL=1 + diff queue-c-backup.repl queue-c-input.repl || FAIL=1 + + if [[ $FAIL ]]; then + echo replication test failed: expectations not met! + else + echo queue state replicated as expected + rm queue-*.repl + fi + + stop_brokers +else + echo "Skipping replication test, plugins not built or python utils not located" +fi + |