diff options
Diffstat (limited to 'cpp')
27 files changed, 422 insertions, 133 deletions
diff --git a/cpp/rubygen/framing.0-10/Proxy.rb b/cpp/rubygen/framing.0-10/Proxy.rb index e647a8da0b..97d0df7c58 100755 --- a/cpp/rubygen/framing.0-10/Proxy.rb +++ b/cpp/rubygen/framing.0-10/Proxy.rb @@ -37,7 +37,7 @@ class ProxyGen < CppGen def inner_class_decl(c) cname=c.name.caps - cpp_class(cname, "Proxy") { + cpp_class(cname, "public Proxy") { gen <<EOS public: #{cname}(FrameHandler& f) : Proxy(f) {} @@ -92,7 +92,7 @@ EOS genl namespace("qpid::framing") { genl "#{@classname}::#{@classname}(FrameHandler& f) :" - gen " Proxy(f)" + gen " Proxy(f)" @amqp.classes.each { |c| gen ",\n "+proxy_member(c)+"(f)" } genl "{}\n" @amqp.classes.each { |c| inner_class_defn(c) } diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp index 0e57e4b3f1..2f6b59e901 100644 --- a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp +++ b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp @@ -299,6 +299,12 @@ void SessionHandler::sendCommandPoint(const SessionPoint& point) { } } +void SessionHandler::markReadyToSend() { + if (!sendReady) { + sendReady = true; + } +} + void SessionHandler::sendTimeout(uint32_t t) { checkAttached(); peer.requestTimeout(t); diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.h b/cpp/src/qpid/amqp_0_10/SessionHandler.h index 016de454cc..967e89c984 100644 --- a/cpp/src/qpid/amqp_0_10/SessionHandler.h +++ b/cpp/src/qpid/amqp_0_10/SessionHandler.h @@ -60,6 +60,7 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler, void sendAttach(bool force); void sendTimeout(uint32_t t); void sendFlush(); + void markReadyToSend();//TODO: only needed for inter-broker bridge; cleanup /** True if the handler is ready to send and receive */ bool ready() const; diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index f9cb7ccd3c..6129f13ede 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -68,7 +68,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, mgmtObject = new _qmf::Bridge (agent, this, link, id, args.i_durable, args.i_src, args.i_dest, args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, - args.i_tag, args.i_excludes, args.i_dynamic); + args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync); if (!args.i_durable) agent->addObject(mgmtObject); } @@ -81,6 +81,8 @@ Bridge::~Bridge() void Bridge::create(ConnectionState& c) { + FieldTable options; + if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync); connState = &c; if (args.i_srcIsLocal) { if (args.i_dynamic) @@ -103,7 +105,7 @@ void Bridge::create(ConnectionState& c) session->commandPoint(0,0); if (args.i_srcIsQueue) { - peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable()); + peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options); peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); } else { @@ -194,9 +196,10 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) buffer.getShortString(id); buffer.getShortString(excludes); bool dynamic(buffer.getOctet()); + uint16_t sync = buffer.getShort(); return links.declare(host, port, durable, src, dest, key, - is_queue, is_local, id, excludes, dynamic).first; + is_queue, is_local, id, excludes, dynamic, sync).first; } void Bridge::encode(Buffer& buffer) const @@ -213,6 +216,7 @@ void Bridge::encode(Buffer& buffer) const buffer.putShortString(args.i_tag); buffer.putShortString(args.i_excludes); buffer.putOctet(args.i_dynamic ? 1 : 0); + buffer.putShort(args.i_sync); } uint32_t Bridge::encodedSize() const @@ -228,7 +232,8 @@ uint32_t Bridge::encodedSize() const + 1 // srcIsLocal + args.i_tag.size() + 1 + args.i_excludes.size() + 1 - + 1; // dynamic + + 1 // dynamic + + 2; // sync } management::ManagementObject* Bridge::GetManagementObject (void) const diff --git a/cpp/src/qpid/broker/DeliveryAdapter.h b/cpp/src/qpid/broker/DeliveryAdapter.h index 0e9d7d3929..d661b7caa5 100644 --- a/cpp/src/qpid/broker/DeliveryAdapter.h +++ b/cpp/src/qpid/broker/DeliveryAdapter.h @@ -43,7 +43,7 @@ class DeliveryRecord; class DeliveryAdapter { public: - virtual void deliver(DeliveryRecord&) = 0; + virtual void deliver(DeliveryRecord&, bool sync) = 0; virtual ~DeliveryAdapter(){} }; diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index b0c060aea5..90ec67f477 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -81,7 +81,7 @@ void DeliveryRecord::redeliver(SemanticState* const session) { requeue(); }else{ msg.payload->redeliver();//mark as redelivered - session->deliver(*this); + session->deliver(*this, false); } } } diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index bda9c80f0b..92417608b7 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -125,17 +125,19 @@ void Link::startConnectionLH () void Link::established () { - Mutex::ScopedLock mutex(lock); stringstream addr; addr << host << ":" << port; QPID_LOG (info, "Inter-broker link established to " << addr.str()); agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str())); - setStateLH(STATE_OPERATIONAL); - currentInterval = 1; - visitCount = 0; - if (closing) - destroy(); + { + Mutex::ScopedLock mutex(lock); + setStateLH(STATE_OPERATIONAL); + currentInterval = 1; + visitCount = 0; + if (closing) + destroy(); + } } void Link::closed (int, std::string text) @@ -170,9 +172,9 @@ void Link::closed (int, std::string text) void Link::destroy () { + Bridges toDelete; { Mutex::ScopedLock mutex(lock); - Bridges toDelete; AclModule* acl = getBroker()->getAcl(); std::string userID = getUsername() + "@" + getBroker()->getOptions().realm; @@ -195,12 +197,11 @@ void Link::destroy () for (Bridges::iterator i = created.begin(); i != created.end(); i++) toDelete.push_back(*i); created.clear(); - - // Now delete all bridges on this link. - for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++) - (*i)->destroy(); - toDelete.clear(); } + // Now delete all bridges on this link (don't hold the lock for this). + for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++) + (*i)->destroy(); + toDelete.clear(); links->destroy (host, port); } @@ -386,7 +387,7 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& te links->declare (host, port, iargs.i_durable, iargs.i_src, iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue, iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes, - iargs.i_dynamic); + iargs.i_dynamic, iargs.i_sync); if (result.second && iargs.i_durable) store->create(*result.first); diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp index 960e9f21ba..274e673bd3 100644 --- a/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/cpp/src/qpid/broker/LinkRegistry.cpp @@ -92,7 +92,8 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host, bool isLocal, std::string& tag, std::string& excludes, - bool dynamic) + bool dynamic, + uint16_t sync) { Mutex::ScopedLock locker(lock); stringstream keystream; @@ -121,6 +122,7 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host, args.i_tag = tag; args.i_excludes = excludes; args.i_dynamic = dynamic; + args.i_sync = sync; bridge = Bridge::shared_ptr (new Bridge (l->second.get(), l->second->nextChannel(), diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h index d563412cc1..d52b7a7394 100644 --- a/cpp/src/qpid/broker/LinkRegistry.h +++ b/cpp/src/qpid/broker/LinkRegistry.h @@ -85,7 +85,8 @@ namespace broker { bool isLocal, std::string& id, std::string& excludes, - bool dynamic); + bool dynamic, + uint16_t sync); void destroy(const std::string& host, const uint16_t port); void destroy(const std::string& host, diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index d9896b388b..f9f75679e5 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -256,7 +256,9 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, arguments(_arguments), msgCredit(0), byteCredit(0), - notifyEnabled(true) {} + notifyEnabled(true), + syncFrequency(_arguments.getAsInt("qpid.sync_frequency")), + deliveryCount(0) {} OwnershipToken* SemanticState::ConsumerImpl::getSession() { @@ -267,7 +269,9 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) { allocateCredit(msg.payload); DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing); - parent->deliver(record); + bool sync = syncFrequency && ++deliveryCount >= syncFrequency; + if (sync) deliveryCount = 0;//reset + parent->deliver(record, sync); if (!ackExpected) record.setEnded();//allows message to be released now its been delivered if (windowing || ackExpected || !acquire) { parent->record(record); @@ -449,9 +453,9 @@ void SemanticState::recover(bool requeue) } } -void SemanticState::deliver(DeliveryRecord& msg) +void SemanticState::deliver(DeliveryRecord& msg, bool sync) { - return deliveryAdapter.deliver(msg); + return deliveryAdapter.deliver(msg, sync); } SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination) diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 340017ddf0..a1bee23fd2 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -77,6 +77,8 @@ class SemanticState : public sys::OutputTask, uint32_t msgCredit; uint32_t byteCredit; bool notifyEnabled; + const int syncFrequency; + int deliveryCount; bool checkCredit(boost::intrusive_ptr<Message>& msg); void allocateCredit(boost::intrusive_ptr<Message>& msg); @@ -197,7 +199,7 @@ class SemanticState : public sys::OutputTask, void suspendDtx(const std::string& xid); void resumeDtx(const std::string& xid); void recover(bool requeue); - void deliver(DeliveryRecord& message); + void deliver(DeliveryRecord& message, bool sync); void acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired); void release(DeliveryId first, DeliveryId last, bool setRedelivered); void reject(DeliveryId first, DeliveryId last); diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index 163102d008..84102fb015 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -89,13 +89,14 @@ void SessionHandler::readyToSend() { // in the bridge. // void SessionHandler::attached(const std::string& name) { - if (session.get()) + if (session.get()) { checkName(name); - else { + } else { SessionId id(connection.getUserId(), name); SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig(); session.reset(new SessionState(connection.getBroker(), *this, id, config)); -} + markReadyToSend(); + } } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 4f088fdf4c..d0804d66b9 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -175,7 +175,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceN } if (method->isSync()) { incomplete.process(enqueuedOp, true); - sendCompletion(); + sendAcceptAndCompletion(); } } @@ -207,18 +207,27 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) //hold up execution until async enqueue is complete if (msg->getFrames().getMethod()->isSync()) { incomplete.process(enqueuedOp, true); - sendCompletion(); + sendAcceptAndCompletion(); } else { incomplete.process(enqueuedOp, false); } } } +void SessionState::sendAcceptAndCompletion() +{ + if (!accepted.empty()) { + getProxy().getMessage().accept(accepted); + accepted.clear(); + } + sendCompletion(); +} + void SessionState::enqueued(boost::intrusive_ptr<Message> msg) { receiverCompleted(msg->getCommandId()); - if (msg->requiresAccept()) - getProxy().getMessage().accept(SequenceSet(msg->getCommandId())); + if (msg->requiresAccept()) + accepted.add(msg->getCommandId()); } void SessionState::handleIn(AMQFrame& frame) { @@ -240,16 +249,23 @@ void SessionState::handleOut(AMQFrame& frame) { handler->out(frame); } -void SessionState::deliver(DeliveryRecord& msg) +void SessionState::deliver(DeliveryRecord& msg, bool sync) { uint32_t maxFrameSize = getConnection().getFrameMax(); assert(senderGetCommandPoint().offset == 0); SequenceNumber commandId = senderGetCommandPoint().command; msg.deliver(getProxy().getHandler(), commandId, maxFrameSize); assert(senderGetCommandPoint() == SessionPoint(commandId+1, 0)); // Delivery has moved sendPoint. + if (sync) { + AMQP_ClientProxy::Execution& p(getProxy().getExecution()); + Proxy::ScopedSync s(p); + p.sync(); + } } -void SessionState::sendCompletion() { handler->sendCompletion(); } +void SessionState::sendCompletion() { + handler->sendCompletion(); +} void SessionState::senderCompleted(const SequenceSet& commands) { qpid::SessionState::senderCompleted(commands); diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index 035a444127..f5f1bde2a2 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -93,12 +93,12 @@ class SessionState : public qpid::SessionState, void sendCompletion(); //delivery adapter methods: - void deliver(DeliveryRecord&); + void deliver(DeliveryRecord&, bool sync); // Manageable entry points management::ManagementObject* GetManagementObject (void) const; management::Manageable::status_t - ManagementMethod (uint32_t methodId, management::Args& args, std::string&); + ManagementMethod (uint32_t methodId, management::Args& args, std::string&); void readyToSend(); @@ -119,6 +119,8 @@ class SessionState : public qpid::SessionState, void handleInLast(framing::AMQFrame& frame); void handleOutLast(framing::AMQFrame& frame); + void sendAcceptAndCompletion(); + Broker& broker; SessionHandler* handler; sys::AbsTime expiry; // Used by SessionManager. @@ -128,7 +130,8 @@ class SessionState : public qpid::SessionState, IncompleteMessageList incomplete; IncompleteMessageList::CompletionListener enqueuedOp; qmf::org::apache::qpid::broker::Session* mgmtObject; - + qpid::framing::SequenceSet accepted; + friend class SessionManager; }; diff --git a/cpp/src/qpid/framing/Proxy.cpp b/cpp/src/qpid/framing/Proxy.cpp index 6b37fb368d..6c3724bcdb 100644 --- a/cpp/src/qpid/framing/Proxy.cpp +++ b/cpp/src/qpid/framing/Proxy.cpp @@ -18,15 +18,21 @@ #include "Proxy.h" #include "AMQFrame.h" +#include "AMQMethodBody.h" +#include "qpid/log/Statement.h" namespace qpid { namespace framing { -Proxy::Proxy(FrameHandler& h) : out(&h) {} +Proxy::Proxy(FrameHandler& h) : out(&h), sync(false) {} Proxy::~Proxy() {} void Proxy::send(const AMQBody& b) { + if (sync) { + const AMQMethodBody* m = dynamic_cast<const AMQMethodBody*>(&b); + if (m) m->setSync(sync); + } AMQFrame f(b); out->handle(f); } @@ -39,4 +45,7 @@ FrameHandler& Proxy::getHandler() { return *out; } void Proxy::setHandler(FrameHandler& f) { out=&f; } +Proxy::ScopedSync::ScopedSync(Proxy& p) : proxy(p) { proxy.sync = true; } +Proxy::ScopedSync::~ScopedSync() { proxy.sync = false; } + }} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/Proxy.h b/cpp/src/qpid/framing/Proxy.h index 3dc082097a..5e2c886af2 100644 --- a/cpp/src/qpid/framing/Proxy.h +++ b/cpp/src/qpid/framing/Proxy.h @@ -33,6 +33,14 @@ class AMQBody; class Proxy { public: + class ScopedSync + { + Proxy& proxy; + public: + ScopedSync(Proxy& p); + ~ScopedSync(); + }; + Proxy(FrameHandler& h); virtual ~Proxy(); @@ -42,9 +50,9 @@ class Proxy FrameHandler& getHandler(); void setHandler(FrameHandler&); - private: FrameHandler* out; + bool sync; }; }} // namespace qpid::framing diff --git a/cpp/src/qpid/replication/ReplicatingEventListener.cpp b/cpp/src/qpid/replication/ReplicatingEventListener.cpp index 80ff77d107..d50ef852ef 100644 --- a/cpp/src/qpid/replication/ReplicatingEventListener.cpp +++ b/cpp/src/qpid/replication/ReplicatingEventListener.cpp @@ -23,6 +23,7 @@ #include "qpid/broker/Broker.h" #include "qpid/broker/QueueEvents.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/FrameHandler.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" @@ -35,27 +36,14 @@ using namespace qpid::replication::constants; void ReplicatingEventListener::handle(QueueEvents::Event event) { - //create event message and enqueue it on replication queue - FieldTable headers; - boost::intrusive_ptr<Message> message; switch (event.type) { case QueueEvents::ENQUEUE: - headers.setString(REPLICATION_EVENT_TYPE, ENQUEUE); - headers.setString(REPLICATION_TARGET_QUEUE, event.msg.queue->getName()); - message = createEventMessage(headers); - queue->deliver(message); - //if its an enqueue, enqueue the message itself on the - //replication queue also: - queue->deliver(event.msg.payload); - QPID_LOG(debug, "Queued 'enqueue' event on " << event.msg.queue->getName() << " for replication"); + deliverEnqueueMessage(event.msg); + QPID_LOG(debug, "Queuing 'enqueue' event on " << event.msg.queue->getName() << " for replication"); break; case QueueEvents::DEQUEUE: - headers.setString(REPLICATION_EVENT_TYPE, DEQUEUE); - headers.setString(REPLICATION_TARGET_QUEUE, event.msg.queue->getName()); - headers.setInt(DEQUEUED_MESSAGE_POSITION, event.msg.position); - message = createEventMessage(headers); - queue->deliver(message); - QPID_LOG(debug, "Queued 'dequeue' event from " << event.msg.queue->getName() << " for replication, (from position " + deliverDequeueMessage(event.msg); + QPID_LOG(debug, "Queuing 'dequeue' event from " << event.msg.queue->getName() << " for replication, (from position " << event.msg.position << ")"); break; } @@ -65,20 +53,64 @@ namespace { const std::string EMPTY; } -boost::intrusive_ptr<Message> ReplicatingEventListener::createEventMessage(const FieldTable& headers) +void ReplicatingEventListener::deliverDequeueMessage(const QueuedMessage& dequeued) +{ + FieldTable headers; + headers.setString(REPLICATION_TARGET_QUEUE, dequeued.queue->getName()); + headers.setInt(REPLICATION_EVENT_SEQNO, ++sequence); + headers.setInt(REPLICATION_EVENT_TYPE, DEQUEUE); + headers.setInt(DEQUEUED_MESSAGE_POSITION, dequeued.position); + boost::intrusive_ptr<Message> msg(createMessage(headers)); + queue->deliver(msg); +} + +void ReplicatingEventListener::deliverEnqueueMessage(const QueuedMessage& enqueued) +{ + boost::intrusive_ptr<Message> msg(cloneMessage(*(enqueued.queue), enqueued.payload)); + FieldTable& headers = msg->getProperties<MessageProperties>()->getApplicationHeaders(); + headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName()); + headers.setInt(REPLICATION_EVENT_SEQNO, ++sequence); + headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE); + queue->deliver(msg); +} + +boost::intrusive_ptr<Message> ReplicatingEventListener::createMessage(const FieldTable& headers) +{ + boost::intrusive_ptr<Message> msg(new Message()); + AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), EMPTY, 0, 0)); + AMQFrame header(in_place<AMQHeaderBody>()); + header.setBof(false); + header.setEof(true); + header.setBos(true); + header.setEos(true); + msg->getFrames().append(method); + msg->getFrames().append(header); + MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true); + props->setApplicationHeaders(headers); + return msg; +} + +struct AppendingHandler : FrameHandler +{ + boost::intrusive_ptr<Message> msg; + + AppendingHandler(boost::intrusive_ptr<Message> m) : msg(m) {} + + void handle(AMQFrame& f) + { + msg->getFrames().append(f); + } +}; + +boost::intrusive_ptr<Message> ReplicatingEventListener::cloneMessage(Queue& queue, boost::intrusive_ptr<Message> original) { - boost::intrusive_ptr<Message> msg(new Message()); - AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), EMPTY, 0, 0)); - AMQFrame header(in_place<AMQHeaderBody>()); - header.setBof(false); - header.setEof(true); - header.setBos(true); - header.setEos(true); - msg->getFrames().append(method); - msg->getFrames().append(header); - MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true); - props->setApplicationHeaders(headers); - return msg; + boost::intrusive_ptr<Message> copy(new Message()); + AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), EMPTY, 0, 0)); + AppendingHandler handler(copy); + handler.handle(method); + original->sendHeader(handler, std::numeric_limits<int16_t>::max()); + original->sendContent(queue, handler, std::numeric_limits<int16_t>::max()); + return copy; } Options* ReplicatingEventListener::getOptions() diff --git a/cpp/src/qpid/replication/ReplicatingEventListener.h b/cpp/src/qpid/replication/ReplicatingEventListener.h index 25e2a5b7b9..7616c7ac8a 100644 --- a/cpp/src/qpid/replication/ReplicatingEventListener.h +++ b/cpp/src/qpid/replication/ReplicatingEventListener.h @@ -28,6 +28,7 @@ #include "qpid/broker/Queue.h" #include "qpid/broker/QueueEvents.h" #include "qpid/framing/FieldTable.h" +#include "qpid/framing/SequenceNumber.h" namespace qpid { namespace replication { @@ -57,8 +58,14 @@ class ReplicatingEventListener : public Plugin PluginOptions options; qpid::broker::Queue::shared_ptr queue; + qpid::framing::SequenceNumber sequence; - boost::intrusive_ptr<qpid::broker::Message> createEventMessage(const qpid::framing::FieldTable& headers); + void deliverDequeueMessage(const qpid::broker::QueuedMessage& enqueued); + void deliverEnqueueMessage(const qpid::broker::QueuedMessage& enqueued); + + boost::intrusive_ptr<qpid::broker::Message> createMessage(const qpid::framing::FieldTable& headers); + boost::intrusive_ptr<qpid::broker::Message> cloneMessage(qpid::broker::Queue& queue, + boost::intrusive_ptr<qpid::broker::Message> original); }; }} // namespace qpid::replication diff --git a/cpp/src/qpid/replication/ReplicationExchange.cpp b/cpp/src/qpid/replication/ReplicationExchange.cpp index abe8a4dfb6..639cfb5d2e 100644 --- a/cpp/src/qpid/replication/ReplicationExchange.cpp +++ b/cpp/src/qpid/replication/ReplicationExchange.cpp @@ -38,46 +38,75 @@ ReplicationExchange::ReplicationExchange(const std::string& name, bool durable, const FieldTable& args, QueueRegistry& qr, Manageable* parent) - : Exchange(name, durable, args, parent), queues(qr), expectingEnqueue(false) {} + : Exchange(name, durable, args, parent), queues(qr), init(false) {} std::string ReplicationExchange::getType() const { return typeName; } void ReplicationExchange::route(Deliverable& msg, const std::string& /*routingKey*/, const FieldTable* args) { if (args) { - std::string eventType = args->getAsString(REPLICATION_EVENT_TYPE); - if (eventType == ENQUEUE) { - expectingEnqueue = true; - targetQueue = args->getAsString(REPLICATION_TARGET_QUEUE); - QPID_LOG(debug, "Recorded replicated 'enqueue' event for " << targetQueue); - return; - } else if (eventType == DEQUEUE) { - std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE); - Queue::shared_ptr queue = queues.find(queueName); - SequenceNumber position(args->getAsInt(DEQUEUED_MESSAGE_POSITION)); - - QueuedMessage dequeued; - if (queue->acquireMessageAt(position, dequeued)) { - queue->dequeue(0, dequeued); - QPID_LOG(debug, "Processed replicated 'dequeue' event from " << queueName << " at position " << position); - } else { - QPID_LOG(warning, "Could not acquire message " << position << " from " << queueName); + int eventType = args->getAsInt(REPLICATION_EVENT_TYPE); + if (eventType) { + if (isDuplicate(args)) return; + switch (eventType) { + case ENQUEUE: + handleEnqueueEvent(args, msg); + return; + case DEQUEUE: + handleDequeueEvent(args); + return; + default: + throw IllegalArgumentException(QPID_MSG("Illegal value for " << REPLICATION_EVENT_TYPE << ": " << eventType)); } - - return; - } else if (!eventType.empty()) { - throw IllegalArgumentException(QPID_MSG("Illegal value for " << REPLICATION_EVENT_TYPE << ": " << eventType)); } + } else { + QPID_LOG(warning, "Dropping unexpected message with no headers"); } - //if we get here assume its not an event message, assume its an enqueue - if (expectingEnqueue) { - Queue::shared_ptr queue = queues.find(targetQueue); - msg.deliverTo(queue); - expectingEnqueue = false; - targetQueue.clear(); - QPID_LOG(debug, "Eenqueued replicated message onto " << targetQueue); +} + +void ReplicationExchange::handleEnqueueEvent(const FieldTable* args, Deliverable& msg) +{ + std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE); + Queue::shared_ptr queue = queues.find(queueName); + FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders(); + headers.erase(REPLICATION_TARGET_QUEUE); + headers.erase(REPLICATION_EVENT_SEQNO); + headers.erase(REPLICATION_EVENT_TYPE); + msg.deliverTo(queue); + QPID_LOG(debug, "Enqueued replicated message onto " << queue); +} + +void ReplicationExchange::handleDequeueEvent(const FieldTable* args) +{ + std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE); + Queue::shared_ptr queue = queues.find(queueName); + SequenceNumber position(args->getAsInt(DEQUEUED_MESSAGE_POSITION)); + + QueuedMessage dequeued; + if (queue->acquireMessageAt(position, dequeued)) { + queue->dequeue(0, dequeued); + QPID_LOG(debug, "Processed replicated 'dequeue' event from " << queueName << " at position " << position); + } else { + QPID_LOG(warning, "Could not acquire message " << position << " from " << queueName); + } +} + +bool ReplicationExchange::isDuplicate(const FieldTable* args) +{ + SequenceNumber seqno(args->getAsInt(REPLICATION_EVENT_SEQNO)); + if (!init) { + init = true; + sequence = seqno; + return false; + } else if (seqno > sequence) { + if (seqno - sequence > 1) { + QPID_LOG(error, "Gap in replication event sequence between: " << sequence << " and " << seqno); + } + sequence = seqno; + return false; } else { - QPID_LOG(warning, "Dropping unexpected message"); + QPID_LOG(info, "Duplicate detected: seqno=" << seqno << " (last seqno=" << sequence << ")"); + return true; } } diff --git a/cpp/src/qpid/replication/ReplicationExchange.h b/cpp/src/qpid/replication/ReplicationExchange.h index ed2b5956b6..897e4a954e 100644 --- a/cpp/src/qpid/replication/ReplicationExchange.h +++ b/cpp/src/qpid/replication/ReplicationExchange.h @@ -22,6 +22,7 @@ * */ #include "qpid/broker/Exchange.h" +#include "qpid/framing/SequenceNumber.h" namespace qpid { namespace replication { @@ -51,8 +52,12 @@ class ReplicationExchange : public qpid::broker::Exchange bool isBound(qpid::broker::Queue::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args); private: qpid::broker::QueueRegistry& queues; - bool expectingEnqueue; - std::string targetQueue; + qpid::framing::SequenceNumber sequence; + bool init; + + bool isDuplicate(const qpid::framing::FieldTable* args); + void handleEnqueueEvent(const qpid::framing::FieldTable* args, qpid::broker::Deliverable& msg); + void handleDequeueEvent(const qpid::framing::FieldTable* args); }; }} // namespace qpid::replication diff --git a/cpp/src/qpid/replication/constants.h b/cpp/src/qpid/replication/constants.h index b0cef7570c..fb7085c570 100644 --- a/cpp/src/qpid/replication/constants.h +++ b/cpp/src/qpid/replication/constants.h @@ -22,10 +22,12 @@ namespace qpid { namespace replication { namespace constants { -const std::string REPLICATION_EVENT_TYPE("qpid.replication_event_type"); -const std::string ENQUEUE("enqueue"); -const std::string DEQUEUE("dequeue"); -const std::string REPLICATION_TARGET_QUEUE("qpid.replication_target_queue"); -const std::string DEQUEUED_MESSAGE_POSITION("qpid.dequeued_message_position"); +const std::string REPLICATION_EVENT_TYPE("qpid.replication.type"); +const std::string REPLICATION_EVENT_SEQNO("qpid.replication.seqno"); +const std::string REPLICATION_TARGET_QUEUE("qpid.replication.target_queue"); +const std::string DEQUEUED_MESSAGE_POSITION("qpid.replication.message"); + +const int ENQUEUE(1); +const int DEQUEUE(2); }}} diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 314b90ba8b..ff2ee060c2 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -89,7 +89,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ ManagementTest.cpp \ MessageReplayTracker.cpp \ ConsoleTest.cpp \ - QueueEvents.cpp + QueueEvents.cpp \ + ProxyTest.cpp if HAVE_XML unit_test_SOURCES+= XmlClientSessionTest.cpp @@ -249,7 +250,7 @@ CLEANFILES+=valgrind.out *.log *.vglog* dummy_test $(unit_wrappers) # Longer running stability tests, not run by default check: target. # Not run under valgrind, too slow -LONG_TESTS=fanout_perftest shared_perftest multiq_perftest topic_perftest run_failover_soak +LONG_TESTS=fanout_perftest shared_perftest multiq_perftest topic_perftest run_failover_soak reliable_replication_test EXTRA_DIST+=$(LONG_TESTS) run_perftest check-long: $(MAKE) check TESTS="start_broker $(LONG_TESTS) stop_broker" VALGRIND= diff --git a/cpp/src/tests/ProxyTest.cpp b/cpp/src/tests/ProxyTest.cpp new file mode 100644 index 0000000000..9007f3dc97 --- /dev/null +++ b/cpp/src/tests/ProxyTest.cpp @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/ExecutionSyncBody.h" +#include "qpid/framing/Proxy.h" +#include <alloca.h> + +#include "unit_test.h" + +using namespace qpid::framing; + +QPID_AUTO_TEST_SUITE(ProxyTestSuite) + + +QPID_AUTO_TEST_CASE(testScopedSync) +{ + struct DummyHandler : FrameHandler + { + void handle(AMQFrame& f) { + AMQMethodBody* m = f.getMethod(); + BOOST_CHECK(m); + BOOST_CHECK(m->isA<ExecutionSyncBody>()); + BOOST_CHECK(m->isSync()); + } + }; + DummyHandler f; + Proxy p(f); + Proxy::ScopedSync s(p); + p.send(ExecutionSyncBody(p.getVersion())); +} + +QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/src/tests/QueueEvents.cpp b/cpp/src/tests/QueueEvents.cpp index 7aea23922d..f6b76b69ba 100644 --- a/cpp/src/tests/QueueEvents.cpp +++ b/cpp/src/tests/QueueEvents.cpp @@ -168,7 +168,7 @@ QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing) BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str()); } fixture.connection.close(); - fixture.shutdownBroker(); + fixture.broker->getQueueEvents().shutdown(); //check listener was notified of all events, and in correct order SequenceNumber enqueueId(1); @@ -215,7 +215,7 @@ QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing_enqueuesOnly) BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str()); } fixture.connection.close(); - fixture.shutdownBroker(); + fixture.broker->getQueueEvents().shutdown(); //check listener was notified of all events, and in correct order SequenceNumber enqueueId(1); diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py index ad82964007..9b0be8f979 100755 --- a/cpp/src/tests/federation.py +++ b/cpp/src/tests/federation.py @@ -64,7 +64,7 @@ class FederationTests(TestBase010): self.assertEqual(result.status, 0) link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "amq.direct", "amq.direct", "my-key", "", "", False, False, False) + result = link.bridge(False, "amq.direct", "amq.direct", "my-key", "", "", False, False, False, 0) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] @@ -88,7 +88,7 @@ class FederationTests(TestBase010): self.assertEqual(result.status, 0) link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False) + result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False, 0) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] @@ -135,7 +135,7 @@ class FederationTests(TestBase010): self.assertEqual(result.status, 0) link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, True, False) + result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, True, False, 0) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] @@ -195,7 +195,7 @@ class FederationTests(TestBase010): self.assertEqual(result.status, 0) link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False) + result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False, 1) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] @@ -244,8 +244,8 @@ class FederationTests(TestBase010): l_link = self.qmf.getObjects(_class="link", _broker=l_broker)[0] r_link = self.qmf.getObjects(_class="link", _broker=r_broker)[0] - l_res = l_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False) - r_res = r_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False) + l_res = l_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False, 0) + r_res = r_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False, 0) self.assertEqual(l_res.status, 0) self.assertEqual(r_res.status, 0) @@ -296,7 +296,7 @@ class FederationTests(TestBase010): link = qmf.getObjects(_class="link")[0] result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "my-bridge-id", - "exclude-me,also-exclude-me", False, False, False) + "exclude-me,also-exclude-me", False, False, False, 0) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] @@ -354,7 +354,7 @@ class FederationTests(TestBase010): self.assertEqual(result.status, 0) link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "fed.fanout", "fed.fanout", "", "", "", False, False, True) + result = link.bridge(False, "fed.fanout", "fed.fanout", "", "", "", False, False, True, 0) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] sleep(5) @@ -401,7 +401,7 @@ class FederationTests(TestBase010): self.assertEqual(result.status, 0) link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "fed.direct", "fed.direct", "", "", "", False, False, True) + result = link.bridge(False, "fed.direct", "fed.direct", "", "", "", False, False, True, 0) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] sleep(5) @@ -448,7 +448,7 @@ class FederationTests(TestBase010): self.assertEqual(result.status, 0) link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True) + result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True, 0) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] sleep(5) @@ -478,8 +478,7 @@ class FederationTests(TestBase010): sleep(3) self.assertEqual(len(qmf.getObjects(_class="bridge")), 0) self.assertEqual(len(qmf.getObjects(_class="link")), 0) - - + def getProperty(self, msg, name): for h in msg.headers: if hasattr(h, name): return getattr(h, name) diff --git a/cpp/src/tests/reliable_replication_test b/cpp/src/tests/reliable_replication_test new file mode 100755 index 0000000000..1a2fa917b3 --- /dev/null +++ b/cpp/src/tests/reliable_replication_test @@ -0,0 +1,98 @@ +#!/bin/sh + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Test reliability of the replication feature in the face of link +# failures: +MY_DIR=`dirname \`which $0\`` +PYTHON_DIR=${MY_DIR}/../../../python + +trap stop_brokers EXIT + +stop_brokers() { + if [[ $BROKER_A ]] ; then + ../qpidd -q --port $BROKER_A + unset BROKER_A + fi + if [[ $BROKER_B ]] ; then + ../qpidd -q --port $BROKER_B + unset BROKER_B + fi +} + +setup() { + rm -f replication-source.log replication-dest.log + ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replicating_listener.so --replication-queue replication --create-replication-queue true --log-enable trace+ --log-to-file replication-source.log --log-to-stderr 0 > qpidd.port + BROKER_A=`cat qpidd.port` + + ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so --log-enable info+ --log-to-file replication-dest.log --log-to-stderr 0 > qpidd.port + BROKER_B=`cat qpidd.port` + + #../qpidd --port 5555 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replicating_listener.so --replication-queue replication --create-replication-queue true --log-enable trace+ --log-to-file replication-source.log --log-to-stderr 0 & + #BROKER_A=5555 + + #../qpidd --port 6666 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so --log-enable info+ --log-to-file replication-dest.log --log-to-stderr 0 & + #BROKER_B=6666 + echo "Testing replication from port $BROKER_A to port $BROKER_B" + export PYTHONPATH=$PYTHON_DIR + + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add exchange replication replication + $PYTHON_DIR/commands/qpid-route --ack 500 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication + + #create test queue (only replicate enqueues for this test): + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-a --generate-queue-events 1 + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-a +} + +send() { + ./sender --port $BROKER_A --routing-key queue-a --send-eos 1 < replicated.expected +} + +receive() { + rm -f replicated.actual + ./receiver --port $BROKER_B --queue queue-a > replicated.actual +} + +bounce_link() { + echo "Destroying link..." + $PYTHON_DIR/commands/qpid-route link del "localhost:$BROKER_B" "localhost:$BROKER_A" + echo "Link destroyed; recreating route..." + sleep 2 + $PYTHON_DIR/commands/qpid-route --ack 500 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication + echo "Route re-established" +} + +if test -d ${PYTHON_DIR} && test -e ../.libs/replicating_listener.so && test -e ../.libs/replication_exchange.so ; then + setup + for i in `seq 1 100000`; do echo Message $i; done > replicated.expected + send & + receive & + for i in `seq 1 5`; do sleep 10; bounce_link; done; + wait + #check that received list is identical to sent list + diff replicated.actual replicated.expected || FAIL=1 + if [[ $FAIL ]]; then + echo reliable replication test failed: expectations not met! + else + echo replication reliable in the face of link failures + rm -f replication.actual replication.expected replication-source.log replication-dest.log + fi +fi + diff --git a/cpp/src/tests/replication_test b/cpp/src/tests/replication_test index 931078c047..9b6e5cfb29 100755 --- a/cpp/src/tests/replication_test +++ b/cpp/src/tests/replication_test @@ -19,7 +19,7 @@ # under the License. # -# Run the federation tests. +# Run a test of the replication feature MY_DIR=`dirname \`which $0\`` PYTHON_DIR=${MY_DIR}/../../../python @@ -37,15 +37,18 @@ stop_brokers() { } if test -d ${PYTHON_DIR} && test -e ../.libs/replicating_listener.so && test -e ../.libs/replication_exchange.so ; then - ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replicating_listener.so --replication-queue replication --create-replication-queue true > qpidd.port + rm -f queue-*.repl replication-*.log #cleanup from any earlier runs + + ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replicating_listener.so --replication-queue replication --create-replication-queue true --log-enable info+ --log-to-file replication-source.log --log-to-stderr 0 > qpidd.port BROKER_A=`cat qpidd.port` - ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so > qpidd.port + ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so --log-enable info+ --log-to-file replication-dest.log --log-to-stderr 0 > qpidd.port BROKER_B=`cat qpidd.port` export PYTHONPATH=$PYTHON_DIR + echo "Running replication test between localhost:$BROKER_A and localhost:$BROKER_B" $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add exchange replication replication - $PYTHON_DIR/commands/qpid-route queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication + $PYTHON_DIR/commands/qpid-route --ack 5 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication #create test queues @@ -58,7 +61,6 @@ if test -d ${PYTHON_DIR} && test -e ../.libs/replicating_listener.so && test -e $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-c #publish and consume from test queus on broker A: - rm -f queue-*.repl for i in `seq 1 10`; do echo Message $i for A >> queue-a-input.repl; done for i in `seq 1 20`; do echo Message $i for B >> queue-b-input.repl; done for i in `seq 1 15`; do echo Message $i for C >> queue-c-input.repl; done @@ -79,6 +81,9 @@ if test -d ${PYTHON_DIR} && test -e ../.libs/replicating_listener.so && test -e ./receiver --port $BROKER_B --queue queue-a > queue-a-backup.repl ./receiver --port $BROKER_B --queue queue-b > queue-b-backup.repl ./receiver --port $BROKER_B --queue queue-c > queue-c-backup.repl + + stop_brokers + tail -5 queue-a-input.repl > queue-a-expected.repl tail -10 queue-b-input.repl > queue-b-expected.repl diff queue-a-backup.repl queue-a-expected.repl || FAIL=1 @@ -87,12 +92,12 @@ if test -d ${PYTHON_DIR} && test -e ../.libs/replicating_listener.so && test -e if [[ $FAIL ]]; then echo replication test failed: expectations not met! + exit 1 else echo queue state replicated as expected - rm queue-*.repl + rm -f queue-*.repl replication-*.log fi - stop_brokers else echo "Skipping replication test, plugins not built or python utils not located" fi |