From 67b401962dadbf206ad598b4175fc82cac986ed8 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 19 Jan 2012 23:06:50 +0000 Subject: QPID-3603: Clean up HA log messages. - Reduce verbosity, drop unknown event messages. - Lots of clarifications - Fix minor test bug in ha_tests.py. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1233669 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/ha.mk | 2 - qpid/cpp/src/qpid/broker/Queue.cpp | 2 +- qpid/cpp/src/qpid/ha/HaBroker.cpp | 10 ++--- qpid/cpp/src/qpid/ha/Logging.cpp | 36 --------------- qpid/cpp/src/qpid/ha/Logging.h | 55 ----------------------- qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 46 ++++++++++++------- qpid/cpp/src/qpid/ha/QueueReplicator.h | 7 ++- qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 57 ++++++++++++------------ qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 2 +- qpid/cpp/src/qpid/ha/WiringReplicator.cpp | 38 ++++++++-------- qpid/cpp/src/tests/ha_tests.py | 1 + 11 files changed, 92 insertions(+), 164 deletions(-) delete mode 100644 qpid/cpp/src/qpid/ha/Logging.cpp delete mode 100644 qpid/cpp/src/qpid/ha/Logging.h diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk index dc4e7c8d0a..d367ba2101 100644 --- a/qpid/cpp/src/ha.mk +++ b/qpid/cpp/src/ha.mk @@ -28,8 +28,6 @@ ha_la_SOURCES = \ qpid/ha/HaBroker.cpp \ qpid/ha/HaBroker.h \ qpid/ha/HaPlugin.cpp \ - qpid/ha/Logging.h \ - qpid/ha/Logging.cpp \ qpid/ha/Settings.h \ qpid/ha/QueueReplicator.h \ qpid/ha/QueueReplicator.cpp \ diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index b95c27982f..4298e14627 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -1324,7 +1324,7 @@ void Queue::query(qpid::types::Variant::Map& results) const void Queue::setPosition(SequenceNumber n) { Mutex::ScopedLock locker(messageLock); sequence = n; - QPID_LOG(info, "Set position to " << sequence << " on " << getName()); + QPID_LOG(trace, "Set position to " << sequence << " on " << getName()); } SequenceNumber Queue::getPosition() { diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 92c431ea61..36859909df 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -59,17 +59,17 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) mgmtObject->set_status("solo"); ma->addObject(mgmtObject); } - QPID_LOG(notice, "HA: Initialized: client-url=" << clientUrl - << " broker-url=" << brokerUrl); // FIXME aconway 2011-11-22: temporary hack to identify primary. - if (s.brokerUrl != "primary") - backup.reset(new Backup(broker, s)); + bool isPrimary = (s.brokerUrl == "primary"); + QPID_LOG(notice, "HA: " << (isPrimary ? "Primary" : "Backup") + << " initialized: client-url=" << clientUrl + << " broker-url=" << brokerUrl); + if (!isPrimary) backup.reset(new Backup(broker, s)); // Register a factory for replicating subscriptions. broker.getConsumerFactories().add( boost::shared_ptr( new ReplicatingSubscription::Factory())); } - HaBroker::~HaBroker() {} Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) { diff --git a/qpid/cpp/src/qpid/ha/Logging.cpp b/qpid/cpp/src/qpid/ha/Logging.cpp deleted file mode 100644 index 7d8ee38367..0000000000 --- a/qpid/cpp/src/qpid/ha/Logging.cpp +++ /dev/null @@ -1,36 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "Logging.h" -#include "qpid/broker/QueuedMessage.h" -#include "qpid/broker/Queue.h" -#include "qpid/framing/SequenceNumber.h" - -namespace qpid { -namespace ha { - -QueuePos::QueuePos(const broker::QueuedMessage& qm) - : queue(qm.queue), position(qm.position) {} - -std::ostream& operator<<(std::ostream& o, const QueuePos& qp) { - return o << qp.queue->getName() << "[" << qp.position << "]"; -} - -}} // namesopace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Logging.h b/qpid/cpp/src/qpid/ha/Logging.h deleted file mode 100644 index 3b12baa390..0000000000 --- a/qpid/cpp/src/qpid/ha/Logging.h +++ /dev/null @@ -1,55 +0,0 @@ -#ifndef QPID_HA_HAOSTREAM_H -#define QPID_HA_HAOSTREAM_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include - -/**@file ostream helpers used in log messages. */ - -namespace qpid { - -namespace broker { -class Queue; -class QueuedMessage; -} - -namespace framing { -class SequenceNumber; -} - -namespace ha { - -// Other printable helpers - -struct QueuePos { - const broker::Queue* queue; - const framing::SequenceNumber& position; - QueuePos(const broker::Queue* q, const framing::SequenceNumber& pos) - : queue(q), position(pos) {} - QueuePos(const broker::QueuedMessage& qm); -}; - -std::ostream& operator<<(std::ostream& o, const QueuePos& h); - -}} // namespace qpid::ha - -#endif /*!QPID_HA_HAOSTREAM_H*/ diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 86712b4bdc..34916c2d1e 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -21,7 +21,6 @@ #include "QueueReplicator.h" #include "ReplicatingSubscription.h" -#include "Logging.h" #include "qpid/broker/Bridge.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Link.h" @@ -32,6 +31,7 @@ #include "qpid/framing/FieldTable.h" #include "qpid/log/Statement.h" #include +#include namespace { const std::string QPID_REPLICATOR_("qpid.replicator-"); @@ -47,12 +47,19 @@ using namespace framing; const std::string QueueReplicator::DEQUEUE_EVENT_KEY("qpid.dequeue-event"); const std::string QueueReplicator::POSITION_EVENT_KEY("qpid.position-event"); +std::string QueueReplicator::replicatorName(const std::string& queueName) { + return QPID_REPLICATOR_ + queueName; +} + +std::ostream& operator<<(std::ostream& o, const QueueReplicator& qr) { + return o << "HA: Backup queue " << qr.queue->getName() << ": "; +} + QueueReplicator::QueueReplicator(boost::shared_ptr q, boost::shared_ptr l) - : Exchange(QPID_REPLICATOR_+q->getName(), 0, 0), // FIXME aconway 2011-11-24: hidden from management? - queue(q), link(l) + : Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l) { - QPID_LOG(debug, "HA: Replicating queue " << q->getName() << " " << q->getSettings()); - // Declare the replicator bridge. + QPID_LOG(info, *this << "Created, settings: " << q->getSettings()); + queue->getBroker()->getLinks().declare( link->getHost(), link->getPort(), false, // durable @@ -69,12 +76,15 @@ QueueReplicator::QueueReplicator(boost::shared_ptr q, boost::shared_ptrgetBroker()->getLinks().destroy( +// link->getHost(), link->getPort(), queue->getName(), getName(), string()); +} -// NB: This is called back ina broker connection thread when the -// bridge is created. -void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { - // No lock needed, no mutable member variables are used. +// Called in a broker connection thread when the bridge is created. +void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) +{ framing::AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); framing::FieldTable settings; @@ -91,11 +101,12 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa queue->setPosition(0); settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1); + // TODO aconway 2011-12-19: optimize. settings.setInt(QPID_SYNC_FREQUENCY, 1); peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false, "", 0, settings); peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); - QPID_LOG(debug, "HA: Backup activated bridge from " << args.i_src << " to " << args.i_dest); + QPID_LOG(debug, *this << "Activated bridge from " << args.i_src << " to " << args.i_dest); } namespace { @@ -115,34 +126,37 @@ void QueueReplicator::dequeue(SequenceNumber n, const sys::Mutex::ScopedLock&) QueuedMessage message; if (queue->acquireMessageAt(n, message)) { queue->dequeue(0, message); - QPID_LOG(trace, "HA: Backup dequeued: "<< QueuePos(message)); + QPID_LOG(trace, *this << "Dequeued message "<< message.position); } } } -void QueueReplicator::route(Deliverable& msg, const std::string& key, const FieldTable* /*args*/) +// Called in connection thread of the queues bridge to primary. +void QueueReplicator::route(Deliverable& msg, const std::string& key, const FieldTable*) { sys::Mutex::ScopedLock l(lock); if (key == DEQUEUE_EVENT_KEY) { SequenceSet dequeues = decodeContent(msg.getMessage()); - QPID_LOG(trace, "HA: Backup received dequeues: " << dequeues); + QPID_LOG(trace, *this << "Received dequeues: " << dequeues); //TODO: should be able to optimise the following for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++) dequeue(*i, l); } else if (key == POSITION_EVENT_KEY) { SequenceNumber position = decodeContent(msg.getMessage()); + QPID_LOG(trace, *this << "Advance position: from " << queue->getPosition() + << " to " << position); assert(queue->getPosition() <= position); //TODO aconway 2011-12-14: Optimize this? for (SequenceNumber i = queue->getPosition(); i < position; ++i) dequeue(i,l); queue->setPosition(position); - QPID_LOG(trace, "HA: Backup advanced to: " << QueuePos(queue.get(), queue->getPosition())); } else { - QPID_LOG(trace, "HA: Backup enqueued message: " << QueuePos(queue.get(), queue->getPosition()+1)); msg.deliverTo(queue); + QPID_LOG(trace, *this << "Enqueued message " << queue->getPosition()); } } +// Unused Exchange methods. bool QueueReplicator::bind(boost::shared_ptr, const std::string&, const FieldTable*) { return false; } bool QueueReplicator::unbind(boost::shared_ptr, const std::string&, const FieldTable*) { return false; } bool QueueReplicator::isBound(boost::shared_ptr, const std::string* const, const FieldTable* const) { return false; } diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index e864d6b130..518e97f754 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -23,6 +23,7 @@ */ #include "qpid/broker/Exchange.h" #include "qpid/framing/SequenceSet.h" +#include namespace qpid { @@ -44,16 +45,18 @@ namespace ha { * Creates a ReplicatingSubscription on the primary by passing special * arguments to the consume command. * - * THREAD SAFE: Called in arbitrary connection threads. + * THREAD UNSAFE: Only called in the connection thread of the source queue. */ class QueueReplicator : public broker::Exchange { public: static const std::string DEQUEUE_EVENT_KEY; static const std::string POSITION_EVENT_KEY; + static std::string replicatorName(const std::string& queueName); QueueReplicator(boost::shared_ptr q, boost::shared_ptr l); ~QueueReplicator(); + std::string getType() const; bool bind(boost::shared_ptr, const std::string&, const framing::FieldTable*); bool unbind(boost::shared_ptr, const std::string&, const framing::FieldTable*); @@ -67,6 +70,8 @@ class QueueReplicator : public broker::Exchange sys::Mutex lock; boost::shared_ptr queue; boost::shared_ptr link; + + friend std::ostream& operator<<(std::ostream&, const QueueReplicator&); }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index f2fe747cf2..be7694b93c 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -20,7 +20,6 @@ */ #include "ReplicatingSubscription.h" -#include "Logging.h" #include "qpid/broker/Queue.h" #include "qpid/broker/SessionContext.h" #include "qpid/broker/ConnectionState.h" @@ -43,6 +42,13 @@ const string DOLLAR("$"); const string INTERNAL("-internal"); } // namespace + +ostream& operator<<(ostream& o, const ReplicatingSubscription& rs) { + string url = rs.parent->getSession().getConnection().getUrl(); + string qname= rs.getQueue()->getName(); + return o << "HA: Primary: " << qname << "(" << url << "):"; +} + string mask(const string& in) { return DOLLAR + in + INTERNAL; @@ -96,7 +102,7 @@ ReplicatingSubscription::ReplicatingSubscription( // can be re-introduced later. Last revision with the optimization: // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters. - QPID_LOG(debug, "HA: Started " << *this << " subscription " << name); + QPID_LOG(debug, *this << "Created subscription " << name); // Note that broker::Queue::getPosition() returns the sequence // number that will be assigned to the next message *minus 1*. @@ -125,36 +131,39 @@ bool ReplicatingSubscription::deliver(QueuedMessage& m) { if (position - backupPosition > 1) { // Position has advanced because of messages dequeued ahead of us. SequenceNumber send(position); - // Send the position before m was enqueued. - sendPositionEvent(--send, l); + --send; // Send the position before m was enqueued. + sendPositionEvent(send, l); + QPID_LOG(trace, *this << "Sending position " << send + << ", was " << backupPosition); } backupPosition = position; } - QPID_LOG(trace, "HA: Replicating " << QueuePos(m) << " to " << *this); + QPID_LOG(trace, *this << "Replicating message " << m.position); } return ConsumerImpl::deliver(m); } +ReplicatingSubscription::~ReplicatingSubscription() {} + +// Called in the subscription's connection thread. void ReplicatingSubscription::cancel() { - QPID_LOG(debug, "HA: Cancelled " << *this); + QPID_LOG(debug, *this <<"Cancelled"); getQueue()->removeObserver(boost::dynamic_pointer_cast(shared_from_this())); } -ReplicatingSubscription::~ReplicatingSubscription() {} - -//called before we get notified of the message being available and -//under the message lock in the queue +// Called before we get notified of the message being available and +// under the message lock in the queue. Called in arbitrary connection thread. void ReplicatingSubscription::enqueued(const QueuedMessage& m) { //delay completion m.payload->getIngressCompletion().startCompleter(); } -// Called with lock held. +// Called with lock held. Called in subscription's connection thread. void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l) { - QPID_LOG(trace, "HA: Sending dequeues " << dequeues << " to " << *this); + QPID_LOG(trace, *this << "Sending dequeues " << dequeues); string buf(dequeues.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); dequeues.encode(buffer); @@ -163,12 +172,10 @@ void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l) sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l); } -// Called with lock held. +// Called with lock held. Called in subscription's connection thread. void ReplicatingSubscription::sendPositionEvent( SequenceNumber position, const sys::Mutex::ScopedLock&l ) { - QPID_LOG(trace, "HA: Sending position " << QueuePos(getQueue().get(), position) - << " on " << *this); string buf(backupPosition.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); position.encode(buffer); @@ -209,21 +216,24 @@ void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& } // Called after the message has been removed from the deque and under -// the message lock in the queue. +// the message lock in the queue. Called in arbitrary connection threads. void ReplicatingSubscription::dequeued(const QueuedMessage& m) { + QPID_LOG(trace, *this << "Dequeued message " << m.position); { sys::Mutex::ScopedLock l(lock); dequeues.add(m.position); - QPID_LOG(trace, "HA: Will dequeue " << QueuePos(m) << " on " << *this); } notify(); // Ensure a call to doDispatch + // FIXME aconway 2011-12-20: not thread safe to access position here, + // we're not in the dispatch thread. if (m.position > position) { m.payload->getIngressCompletion().finishCompleter(); - QPID_LOG(trace, "HA: Completed " << QueuePos(m) << " early on " << *this); + QPID_LOG(trace, *this << "Completed message " << m.position << " early"); } } +// Called in subscription's connection thread. bool ReplicatingSubscription::doDispatch() { { @@ -235,20 +245,11 @@ bool ReplicatingSubscription::doDispatch() ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {} ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {} -bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) -{ - return delegate.deliver(m); -} +bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) { return delegate.deliver(m); } void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); } bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr msg) { return delegate.filter(msg); } bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr msg) { return delegate.accept(msg); } void ReplicatingSubscription::DelegatingConsumer::cancel() {} OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); } - -ostream& operator<<(ostream& o, const ReplicatingSubscription& rs) { - string url = rs.parent->getSession().getConnection().getUrl(); - return o << rs.getQueue()->getName() << " backup on " << url; -} - }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index c157a5b378..31246507c9 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -49,7 +49,7 @@ namespace ha { * Runs on the primary. Delays completion of messages till the backup * has acknowledged, informs backup of locally dequeued messages. * - * THREAD SAFE: Used as a consume in subscription's connection + * THREAD SAFE: Used as a consumer in subscription's connection * thread, and as a QueueObserver in arbitrary connection threads. */ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp index 105a83118e..4a192cd91e 100644 --- a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp @@ -229,8 +229,6 @@ void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const fram Variant::Map& map = i->asMap(); Variant::Map& schema = map[SCHEMA_ID].asMap(); Variant::Map& values = map[VALUES].asMap(); - QPID_LOG(trace, "HA: Backup received event: schema=" << schema - << " values=" << values); if (match(schema)) doEventQueueDeclare(values); else if (match(schema)) doEventQueueDelete(values); else if (match(schema)) doEventExchangeDeclare(values); @@ -244,16 +242,15 @@ void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const fram Variant::Map& values = i->asMap()[VALUES].asMap(); framing::FieldTable args; amqp_0_10::translate(values[ARGUMENTS].asMap(), args); - QPID_LOG(trace, "HA: Backup received response type=" << type - << " values=" << values); if (type == QUEUE) doResponseQueue(values); else if (type == EXCHANGE) doResponseExchange(values); else if (type == BINDING) doResponseBind(values); + else QPID_LOG(error, "HA: Backup received unknown response: type=" << type + << " values=" << values); + // FIXME aconway 2011-12-06: handle all relevant response types. } - } else { - QPID_LOG(error, "HA: Backup replication got unexpected message: " << *headers); - } + } else QPID_LOG(error, "HA: Backup received unexpected message: " << *headers); } catch (const std::exception& e) { QPID_LOG(error, "HA: Backup replication error: " << e.what() << ": while handling: " << list); } @@ -280,11 +277,11 @@ void WiringReplicator::doEventQueueDeclare(Variant::Map& values) { // re-create from event. // Events are always up to date, whereas responses may be // out of date. - QPID_LOG(debug, "HA: Created backup queue from event: " << name); + QPID_LOG(debug, "HA: Backup created queue: " << name); startQueueReplicator(result.first); } else { // FIXME aconway 2011-12-02: what's the right way to handle this? - QPID_LOG(warning, "HA: Queue already exists on backup: " << name); + QPID_LOG(warning, "HA: Backup queue already exists: " << name); } } } @@ -293,11 +290,14 @@ void WiringReplicator::doEventQueueDelete(Variant::Map& values) { string name = values[QNAME].asString(); boost::shared_ptr queue = broker.getQueues().find(name); if (queue && replicateLevel(queue->getSettings())) { - QPID_LOG(debug, "HA: Deleting backup queue from event: " << name); + QPID_LOG(debug, "HA: Backup deleting queue: " << name); broker.deleteQueue( name, values[USER].asString(), values[RHOST].asString()); + // FIXME aconway 2011-12-21: casuses race conditions? Restore. +// // Also delete the QueueReplicator exchange for this queue. +// broker.getExchanges().destroy(QueueReplicator::replicatorName(name)); } } @@ -316,11 +316,11 @@ void WiringReplicator::doEventExchangeDeclare(Variant::Map& values) { values[USER].asString(), values[RHOST].asString()).second) { - QPID_LOG(debug, "HA: created backup exchange from event: " << name); + QPID_LOG(debug, "HA: Backup created exchange: " << name); } else { // FIXME aconway 2011-11-22: should delete pre-exisitng exchange // and re-create from event. See comment in doEventQueueDeclare. - QPID_LOG(warning, "HA: Exchange already exists on backup: " << name); + QPID_LOG(warning, "HA: Backup exchange already exists: " << name); } } } @@ -330,7 +330,7 @@ void WiringReplicator::doEventExchangeDelete(Variant::Map& values) { try { boost::shared_ptr exchange = broker.getExchanges().find(name); if (exchange && replicateLevel(exchange->getArgs())) { - QPID_LOG(debug, "HA: Deleting backup exchange:" << name); + QPID_LOG(debug, "HA: Backup deleting exchange:" << name); broker.deleteExchange( name, values[USER].asString(), @@ -352,7 +352,7 @@ void WiringReplicator::doEventBind(Variant::Map& values) { framing::FieldTable args; amqp_0_10::translate(values[ARGS].asMap(), args); string key = values[KEY].asString(); - QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName() + QPID_LOG(debug, "HA: Backup replicated binding exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); exchange->bind(queue, key, &args); @@ -377,12 +377,12 @@ void WiringReplicator::doResponseQueue(Variant::Map& values) { ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/); if (result.second) { - QPID_LOG(debug, "HA: Created backup queue from response: " << values[NAME]); + QPID_LOG(debug, "HA: Backup created catch-up queue: " << values[NAME]); startQueueReplicator(result.first); } else { // FIXME aconway 2011-11-22: Normal to find queue already // exists if we're failing over. - QPID_LOG(warning, "HA: Queue already exists on backup: " << name); + QPID_LOG(warning, "HA: Backup catch-up queue already exists: " << name); } } @@ -400,9 +400,9 @@ void WiringReplicator::doResponseExchange(Variant::Map& values) { ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/).second) { - QPID_LOG(debug, "HA: Created backup exchange from response: " << values[NAME]); + QPID_LOG(debug, "HA: Backup catch-up exchange: " << values[NAME]); } else { - QPID_LOG(warning, "HA: Exchange already exists on backup: " << values[QNAME]); + QPID_LOG(warning, "HA: Backup catch-up exchange already exists: " << values[QNAME]); } } @@ -442,7 +442,7 @@ void WiringReplicator::doResponseBind(Variant::Map& values) { amqp_0_10::translate(values[ARGUMENTS].asMap(), args); string key = values[KEY].asString(); exchange->bind(queue, key, &args); - QPID_LOG(debug, "HA: Created backup binding from response: exchange=" << exchange->getName() + QPID_LOG(debug, "HA: Backup catch-up binding: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); } diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index e9b84050b9..ed9786674a 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -108,6 +108,7 @@ class ShortTests(BrokerTest): # Test a series of messages, enqueue all then dequeue all. s = p.sender(queue("foo","all")) + self.wait(b, "foo") msgs = [str(i) for i in range(10)] for m in msgs: s.send(Message(m)) self.assert_browse_retry(p, "foo", msgs) -- cgit v1.2.1