summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-02-14 16:04:56 +0000
committerAlan Conway <aconway@apache.org>2012-02-14 16:04:56 +0000
commitd298ea877c9b18ae87e9bba7baa54cf18121f18d (patch)
treee037c1e286bd5c352ad9caa29395dd361a7a0ce0
parent7a4479355a6391b4793c9362989b0a97155c26ea (diff)
downloadqpid-python-d298ea877c9b18ae87e9bba7baa54cf18121f18d.tar.gz
QPID-3603: Clean up HA log messages.
- Reduce verbosity, drop unknown event messages. - Lots of clarifications - Fix minor test bug in ha_tests.py. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-6@1244055 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/ha.mk2
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp10
-rw-r--r--qpid/cpp/src/qpid/ha/Logging.cpp36
-rw-r--r--qpid/cpp/src/qpid/ha/Logging.h55
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp46
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h7
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp57
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h2
-rw-r--r--qpid/cpp/src/qpid/ha/WiringReplicator.cpp38
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py1
11 files changed, 92 insertions, 164 deletions
diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk
index dc4e7c8d0a..d367ba2101 100644
--- a/qpid/cpp/src/ha.mk
+++ b/qpid/cpp/src/ha.mk
@@ -28,8 +28,6 @@ ha_la_SOURCES = \
qpid/ha/HaBroker.cpp \
qpid/ha/HaBroker.h \
qpid/ha/HaPlugin.cpp \
- qpid/ha/Logging.h \
- qpid/ha/Logging.cpp \
qpid/ha/Settings.h \
qpid/ha/QueueReplicator.h \
qpid/ha/QueueReplicator.cpp \
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 1a3b6ade84..de60484189 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -1466,7 +1466,7 @@ void Queue::query(qpid::types::Variant::Map& results) const
void Queue::setPosition(SequenceNumber n) {
Mutex::ScopedLock locker(messageLock);
sequence = n;
- QPID_LOG(info, "Set position to " << sequence << " on " << getName());
+ QPID_LOG(trace, "Set position to " << sequence << " on " << getName());
}
SequenceNumber Queue::getPosition() {
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 92c431ea61..36859909df 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -59,17 +59,17 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
mgmtObject->set_status("solo");
ma->addObject(mgmtObject);
}
- QPID_LOG(notice, "HA: Initialized: client-url=" << clientUrl
- << " broker-url=" << brokerUrl);
// FIXME aconway 2011-11-22: temporary hack to identify primary.
- if (s.brokerUrl != "primary")
- backup.reset(new Backup(broker, s));
+ bool isPrimary = (s.brokerUrl == "primary");
+ QPID_LOG(notice, "HA: " << (isPrimary ? "Primary" : "Backup")
+ << " initialized: client-url=" << clientUrl
+ << " broker-url=" << brokerUrl);
+ if (!isPrimary) backup.reset(new Backup(broker, s));
// Register a factory for replicating subscriptions.
broker.getConsumerFactories().add(
boost::shared_ptr<ReplicatingSubscription::Factory>(
new ReplicatingSubscription::Factory()));
}
-
HaBroker::~HaBroker() {}
Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) {
diff --git a/qpid/cpp/src/qpid/ha/Logging.cpp b/qpid/cpp/src/qpid/ha/Logging.cpp
deleted file mode 100644
index 7d8ee38367..0000000000
--- a/qpid/cpp/src/qpid/ha/Logging.cpp
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "Logging.h"
-#include "qpid/broker/QueuedMessage.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/framing/SequenceNumber.h"
-
-namespace qpid {
-namespace ha {
-
-QueuePos::QueuePos(const broker::QueuedMessage& qm)
- : queue(qm.queue), position(qm.position) {}
-
-std::ostream& operator<<(std::ostream& o, const QueuePos& qp) {
- return o << qp.queue->getName() << "[" << qp.position << "]";
-}
-
-}} // namesopace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Logging.h b/qpid/cpp/src/qpid/ha/Logging.h
deleted file mode 100644
index 3b12baa390..0000000000
--- a/qpid/cpp/src/qpid/ha/Logging.h
+++ /dev/null
@@ -1,55 +0,0 @@
-#ifndef QPID_HA_HAOSTREAM_H
-#define QPID_HA_HAOSTREAM_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <iosfwd>
-
-/**@file ostream helpers used in log messages. */
-
-namespace qpid {
-
-namespace broker {
-class Queue;
-class QueuedMessage;
-}
-
-namespace framing {
-class SequenceNumber;
-}
-
-namespace ha {
-
-// Other printable helpers
-
-struct QueuePos {
- const broker::Queue* queue;
- const framing::SequenceNumber& position;
- QueuePos(const broker::Queue* q, const framing::SequenceNumber& pos)
- : queue(q), position(pos) {}
- QueuePos(const broker::QueuedMessage& qm);
-};
-
-std::ostream& operator<<(std::ostream& o, const QueuePos& h);
-
-}} // namespace qpid::ha
-
-#endif /*!QPID_HA_HAOSTREAM_H*/
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 86712b4bdc..34916c2d1e 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -21,7 +21,6 @@
#include "QueueReplicator.h"
#include "ReplicatingSubscription.h"
-#include "Logging.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Link.h"
@@ -32,6 +31,7 @@
#include "qpid/framing/FieldTable.h"
#include "qpid/log/Statement.h"
#include <boost/shared_ptr.hpp>
+#include <ostream>
namespace {
const std::string QPID_REPLICATOR_("qpid.replicator-");
@@ -47,12 +47,19 @@ using namespace framing;
const std::string QueueReplicator::DEQUEUE_EVENT_KEY("qpid.dequeue-event");
const std::string QueueReplicator::POSITION_EVENT_KEY("qpid.position-event");
+std::string QueueReplicator::replicatorName(const std::string& queueName) {
+ return QPID_REPLICATOR_ + queueName;
+}
+
+std::ostream& operator<<(std::ostream& o, const QueueReplicator& qr) {
+ return o << "HA: Backup queue " << qr.queue->getName() << ": ";
+}
+
QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l)
- : Exchange(QPID_REPLICATOR_+q->getName(), 0, 0), // FIXME aconway 2011-11-24: hidden from management?
- queue(q), link(l)
+ : Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l)
{
- QPID_LOG(debug, "HA: Replicating queue " << q->getName() << " " << q->getSettings());
- // Declare the replicator bridge.
+ QPID_LOG(info, *this << "Created, settings: " << q->getSettings());
+
queue->getBroker()->getLinks().declare(
link->getHost(), link->getPort(),
false, // durable
@@ -69,12 +76,15 @@ QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<L
);
}
-QueueReplicator::~QueueReplicator() {}
+QueueReplicator::~QueueReplicator() {
+ // FIXME aconway 2011-12-21: causes race condition? Restore.
+// queue->getBroker()->getLinks().destroy(
+// link->getHost(), link->getPort(), queue->getName(), getName(), string());
+}
-// NB: This is called back ina broker connection thread when the
-// bridge is created.
-void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
- // No lock needed, no mutable member variables are used.
+// Called in a broker connection thread when the bridge is created.
+void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler)
+{
framing::AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
framing::FieldTable settings;
@@ -91,11 +101,12 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
queue->setPosition(0);
settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
+ // TODO aconway 2011-12-19: optimize.
settings.setInt(QPID_SYNC_FREQUENCY, 1);
peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false, "", 0, settings);
peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
- QPID_LOG(debug, "HA: Backup activated bridge from " << args.i_src << " to " << args.i_dest);
+ QPID_LOG(debug, *this << "Activated bridge from " << args.i_src << " to " << args.i_dest);
}
namespace {
@@ -115,34 +126,37 @@ void QueueReplicator::dequeue(SequenceNumber n, const sys::Mutex::ScopedLock&)
QueuedMessage message;
if (queue->acquireMessageAt(n, message)) {
queue->dequeue(0, message);
- QPID_LOG(trace, "HA: Backup dequeued: "<< QueuePos(message));
+ QPID_LOG(trace, *this << "Dequeued message "<< message.position);
}
}
}
-void QueueReplicator::route(Deliverable& msg, const std::string& key, const FieldTable* /*args*/)
+// Called in connection thread of the queues bridge to primary.
+void QueueReplicator::route(Deliverable& msg, const std::string& key, const FieldTable*)
{
sys::Mutex::ScopedLock l(lock);
if (key == DEQUEUE_EVENT_KEY) {
SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage());
- QPID_LOG(trace, "HA: Backup received dequeues: " << dequeues);
+ QPID_LOG(trace, *this << "Received dequeues: " << dequeues);
//TODO: should be able to optimise the following
for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++)
dequeue(*i, l);
} else if (key == POSITION_EVENT_KEY) {
SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage());
+ QPID_LOG(trace, *this << "Advance position: from " << queue->getPosition()
+ << " to " << position);
assert(queue->getPosition() <= position);
//TODO aconway 2011-12-14: Optimize this?
for (SequenceNumber i = queue->getPosition(); i < position; ++i)
dequeue(i,l);
queue->setPosition(position);
- QPID_LOG(trace, "HA: Backup advanced to: " << QueuePos(queue.get(), queue->getPosition()));
} else {
- QPID_LOG(trace, "HA: Backup enqueued message: " << QueuePos(queue.get(), queue->getPosition()+1));
msg.deliverTo(queue);
+ QPID_LOG(trace, *this << "Enqueued message " << queue->getPosition());
}
}
+// Unused Exchange methods.
bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; }
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index e864d6b130..518e97f754 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -23,6 +23,7 @@
*/
#include "qpid/broker/Exchange.h"
#include "qpid/framing/SequenceSet.h"
+#include <iosfwd>
namespace qpid {
@@ -44,16 +45,18 @@ namespace ha {
* Creates a ReplicatingSubscription on the primary by passing special
* arguments to the consume command.
*
- * THREAD SAFE: Called in arbitrary connection threads.
+ * THREAD UNSAFE: Only called in the connection thread of the source queue.
*/
class QueueReplicator : public broker::Exchange
{
public:
static const std::string DEQUEUE_EVENT_KEY;
static const std::string POSITION_EVENT_KEY;
+ static std::string replicatorName(const std::string& queueName);
QueueReplicator(boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l);
~QueueReplicator();
+
std::string getType() const;
bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
@@ -67,6 +70,8 @@ class QueueReplicator : public broker::Exchange
sys::Mutex lock;
boost::shared_ptr<broker::Queue> queue;
boost::shared_ptr<broker::Link> link;
+
+ friend std::ostream& operator<<(std::ostream&, const QueueReplicator&);
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index f2fe747cf2..be7694b93c 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -20,7 +20,6 @@
*/
#include "ReplicatingSubscription.h"
-#include "Logging.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/ConnectionState.h"
@@ -43,6 +42,13 @@ const string DOLLAR("$");
const string INTERNAL("-internal");
} // namespace
+
+ostream& operator<<(ostream& o, const ReplicatingSubscription& rs) {
+ string url = rs.parent->getSession().getConnection().getUrl();
+ string qname= rs.getQueue()->getName();
+ return o << "HA: Primary: " << qname << "(" << url << "):";
+}
+
string mask(const string& in)
{
return DOLLAR + in + INTERNAL;
@@ -96,7 +102,7 @@ ReplicatingSubscription::ReplicatingSubscription(
// can be re-introduced later. Last revision with the optimization:
// r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
- QPID_LOG(debug, "HA: Started " << *this << " subscription " << name);
+ QPID_LOG(debug, *this << "Created subscription " << name);
// Note that broker::Queue::getPosition() returns the sequence
// number that will be assigned to the next message *minus 1*.
@@ -125,36 +131,39 @@ bool ReplicatingSubscription::deliver(QueuedMessage& m) {
if (position - backupPosition > 1) {
// Position has advanced because of messages dequeued ahead of us.
SequenceNumber send(position);
- // Send the position before m was enqueued.
- sendPositionEvent(--send, l);
+ --send; // Send the position before m was enqueued.
+ sendPositionEvent(send, l);
+ QPID_LOG(trace, *this << "Sending position " << send
+ << ", was " << backupPosition);
}
backupPosition = position;
}
- QPID_LOG(trace, "HA: Replicating " << QueuePos(m) << " to " << *this);
+ QPID_LOG(trace, *this << "Replicating message " << m.position);
}
return ConsumerImpl::deliver(m);
}
+ReplicatingSubscription::~ReplicatingSubscription() {}
+
+// Called in the subscription's connection thread.
void ReplicatingSubscription::cancel()
{
- QPID_LOG(debug, "HA: Cancelled " << *this);
+ QPID_LOG(debug, *this <<"Cancelled");
getQueue()->removeObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
}
-ReplicatingSubscription::~ReplicatingSubscription() {}
-
-//called before we get notified of the message being available and
-//under the message lock in the queue
+// Called before we get notified of the message being available and
+// under the message lock in the queue. Called in arbitrary connection thread.
void ReplicatingSubscription::enqueued(const QueuedMessage& m)
{
//delay completion
m.payload->getIngressCompletion().startCompleter();
}
-// Called with lock held.
+// Called with lock held. Called in subscription's connection thread.
void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l)
{
- QPID_LOG(trace, "HA: Sending dequeues " << dequeues << " to " << *this);
+ QPID_LOG(trace, *this << "Sending dequeues " << dequeues);
string buf(dequeues.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
dequeues.encode(buffer);
@@ -163,12 +172,10 @@ void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l)
sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l);
}
-// Called with lock held.
+// Called with lock held. Called in subscription's connection thread.
void ReplicatingSubscription::sendPositionEvent(
SequenceNumber position, const sys::Mutex::ScopedLock&l )
{
- QPID_LOG(trace, "HA: Sending position " << QueuePos(getQueue().get(), position)
- << " on " << *this);
string buf(backupPosition.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
position.encode(buffer);
@@ -209,21 +216,24 @@ void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer&
}
// Called after the message has been removed from the deque and under
-// the message lock in the queue.
+// the message lock in the queue. Called in arbitrary connection threads.
void ReplicatingSubscription::dequeued(const QueuedMessage& m)
{
+ QPID_LOG(trace, *this << "Dequeued message " << m.position);
{
sys::Mutex::ScopedLock l(lock);
dequeues.add(m.position);
- QPID_LOG(trace, "HA: Will dequeue " << QueuePos(m) << " on " << *this);
}
notify(); // Ensure a call to doDispatch
+ // FIXME aconway 2011-12-20: not thread safe to access position here,
+ // we're not in the dispatch thread.
if (m.position > position) {
m.payload->getIngressCompletion().finishCompleter();
- QPID_LOG(trace, "HA: Completed " << QueuePos(m) << " early on " << *this);
+ QPID_LOG(trace, *this << "Completed message " << m.position << " early");
}
}
+// Called in subscription's connection thread.
bool ReplicatingSubscription::doDispatch()
{
{
@@ -235,20 +245,11 @@ bool ReplicatingSubscription::doDispatch()
ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {}
ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {}
-bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m)
-{
- return delegate.deliver(m);
-}
+bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) { return delegate.deliver(m); }
void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); }
bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); }
bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); }
void ReplicatingSubscription::DelegatingConsumer::cancel() {}
OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); }
-
-ostream& operator<<(ostream& o, const ReplicatingSubscription& rs) {
- string url = rs.parent->getSession().getConnection().getUrl();
- return o << rs.getQueue()->getName() << " backup on " << url;
-}
-
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index c157a5b378..31246507c9 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -49,7 +49,7 @@ namespace ha {
* Runs on the primary. Delays completion of messages till the backup
* has acknowledged, informs backup of locally dequeued messages.
*
- * THREAD SAFE: Used as a consume in subscription's connection
+ * THREAD SAFE: Used as a consumer in subscription's connection
* thread, and as a QueueObserver in arbitrary connection threads.
*/
class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
index 105a83118e..4a192cd91e 100644
--- a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
@@ -229,8 +229,6 @@ void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const fram
Variant::Map& map = i->asMap();
Variant::Map& schema = map[SCHEMA_ID].asMap();
Variant::Map& values = map[VALUES].asMap();
- QPID_LOG(trace, "HA: Backup received event: schema=" << schema
- << " values=" << values);
if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values);
else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values);
else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values);
@@ -244,16 +242,15 @@ void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const fram
Variant::Map& values = i->asMap()[VALUES].asMap();
framing::FieldTable args;
amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
- QPID_LOG(trace, "HA: Backup received response type=" << type
- << " values=" << values);
if (type == QUEUE) doResponseQueue(values);
else if (type == EXCHANGE) doResponseExchange(values);
else if (type == BINDING) doResponseBind(values);
+ else QPID_LOG(error, "HA: Backup received unknown response: type=" << type
+ << " values=" << values);
+
// FIXME aconway 2011-12-06: handle all relevant response types.
}
- } else {
- QPID_LOG(error, "HA: Backup replication got unexpected message: " << *headers);
- }
+ } else QPID_LOG(error, "HA: Backup received unexpected message: " << *headers);
} catch (const std::exception& e) {
QPID_LOG(error, "HA: Backup replication error: " << e.what() << ": while handling: " << list);
}
@@ -280,11 +277,11 @@ void WiringReplicator::doEventQueueDeclare(Variant::Map& values) {
// re-create from event.
// Events are always up to date, whereas responses may be
// out of date.
- QPID_LOG(debug, "HA: Created backup queue from event: " << name);
+ QPID_LOG(debug, "HA: Backup created queue: " << name);
startQueueReplicator(result.first);
} else {
// FIXME aconway 2011-12-02: what's the right way to handle this?
- QPID_LOG(warning, "HA: Queue already exists on backup: " << name);
+ QPID_LOG(warning, "HA: Backup queue already exists: " << name);
}
}
}
@@ -293,11 +290,14 @@ void WiringReplicator::doEventQueueDelete(Variant::Map& values) {
string name = values[QNAME].asString();
boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
if (queue && replicateLevel(queue->getSettings())) {
- QPID_LOG(debug, "HA: Deleting backup queue from event: " << name);
+ QPID_LOG(debug, "HA: Backup deleting queue: " << name);
broker.deleteQueue(
name,
values[USER].asString(),
values[RHOST].asString());
+ // FIXME aconway 2011-12-21: casuses race conditions? Restore.
+// // Also delete the QueueReplicator exchange for this queue.
+// broker.getExchanges().destroy(QueueReplicator::replicatorName(name));
}
}
@@ -316,11 +316,11 @@ void WiringReplicator::doEventExchangeDeclare(Variant::Map& values) {
values[USER].asString(),
values[RHOST].asString()).second)
{
- QPID_LOG(debug, "HA: created backup exchange from event: " << name);
+ QPID_LOG(debug, "HA: Backup created exchange: " << name);
} else {
// FIXME aconway 2011-11-22: should delete pre-exisitng exchange
// and re-create from event. See comment in doEventQueueDeclare.
- QPID_LOG(warning, "HA: Exchange already exists on backup: " << name);
+ QPID_LOG(warning, "HA: Backup exchange already exists: " << name);
}
}
}
@@ -330,7 +330,7 @@ void WiringReplicator::doEventExchangeDelete(Variant::Map& values) {
try {
boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
if (exchange && replicateLevel(exchange->getArgs())) {
- QPID_LOG(debug, "HA: Deleting backup exchange:" << name);
+ QPID_LOG(debug, "HA: Backup deleting exchange:" << name);
broker.deleteExchange(
name,
values[USER].asString(),
@@ -352,7 +352,7 @@ void WiringReplicator::doEventBind(Variant::Map& values) {
framing::FieldTable args;
amqp_0_10::translate(values[ARGS].asMap(), args);
string key = values[KEY].asString();
- QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName()
+ QPID_LOG(debug, "HA: Backup replicated binding exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
exchange->bind(queue, key, &args);
@@ -377,12 +377,12 @@ void WiringReplicator::doResponseQueue(Variant::Map& values) {
""/*TODO: who is the user?*/,
""/*TODO: what should we use as connection id?*/);
if (result.second) {
- QPID_LOG(debug, "HA: Created backup queue from response: " << values[NAME]);
+ QPID_LOG(debug, "HA: Backup created catch-up queue: " << values[NAME]);
startQueueReplicator(result.first);
} else {
// FIXME aconway 2011-11-22: Normal to find queue already
// exists if we're failing over.
- QPID_LOG(warning, "HA: Queue already exists on backup: " << name);
+ QPID_LOG(warning, "HA: Backup catch-up queue already exists: " << name);
}
}
@@ -400,9 +400,9 @@ void WiringReplicator::doResponseExchange(Variant::Map& values) {
""/*TODO: who is the user?*/,
""/*TODO: what should we use as connection id?*/).second)
{
- QPID_LOG(debug, "HA: Created backup exchange from response: " << values[NAME]);
+ QPID_LOG(debug, "HA: Backup catch-up exchange: " << values[NAME]);
} else {
- QPID_LOG(warning, "HA: Exchange already exists on backup: " << values[QNAME]);
+ QPID_LOG(warning, "HA: Backup catch-up exchange already exists: " << values[QNAME]);
}
}
@@ -442,7 +442,7 @@ void WiringReplicator::doResponseBind(Variant::Map& values) {
amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
string key = values[KEY].asString();
exchange->bind(queue, key, &args);
- QPID_LOG(debug, "HA: Created backup binding from response: exchange=" << exchange->getName()
+ QPID_LOG(debug, "HA: Backup catch-up binding: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
}
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index e9b84050b9..ed9786674a 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -108,6 +108,7 @@ class ShortTests(BrokerTest):
# Test a series of messages, enqueue all then dequeue all.
s = p.sender(queue("foo","all"))
+ self.wait(b, "foo")
msgs = [str(i) for i in range(10)]
for m in msgs: s.send(Message(m))
self.assert_browse_retry(p, "foo", msgs)