summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-01-19 23:08:11 +0000
committerAlan Conway <aconway@apache.org>2012-01-19 23:08:11 +0000
commite12d6247b844db1b307674f6c9ab10fdcd4beb0a (patch)
tree47cbc20a126c52b2f863d04aff958e8fc7039da0
parent9cd80cb66a9b832db519d03d75b6de21011c0c2f (diff)
downloadqpid-python-e12d6247b844db1b307674f6c9ab10fdcd4beb0a.tar.gz
QPID-3603: Logging improvements for bridges, links and HA classes.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1233679 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp15
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp32
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp9
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h1
-rw-r--r--qpid/cpp/src/qpid/ha/WiringReplicator.cpp23
5 files changed, 43 insertions, 37 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index 02034811bf..402b19c9a4 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -75,7 +75,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync);
agent->addObject(mgmtObject);
}
- QPID_LOG(debug, "Bridge created from " << args.i_src << " to " << args.i_dest);
+ QPID_LOG(debug, "Bridge " << name << " created from " << args.i_src << " to " << args.i_dest);
}
Bridge::~Bridge()
@@ -114,7 +114,7 @@ void Bridge::create(Connection& c)
peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options);
peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
- QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest);
+ QPID_LOG(debug, "Activated bridge " << name << " for route from queue " << args.i_src << " to " << args.i_dest);
} else {
FieldTable queueSettings;
@@ -148,9 +148,9 @@ void Bridge::create(Connection& c)
if (exchange.get() == 0)
throw Exception("Exchange not found for dynamic route");
exchange->registerDynamicBridge(this);
- QPID_LOG(debug, "Activated dynamic route for exchange " << args.i_src);
+ QPID_LOG(debug, "Activated bridge " << name << " for dynamic route for exchange " << args.i_src);
} else {
- QPID_LOG(debug, "Activated static route from exchange " << args.i_src << " to " << args.i_dest);
+ QPID_LOG(debug, "Activated bridge " << name << " for static route from exchange " << args.i_src << " to " << args.i_dest);
}
}
if (args.i_srcIsLocal) sessionHandler.getSession()->enableReceiverTracking();
@@ -162,15 +162,16 @@ void Bridge::cancel(Connection&)
peer->getMessage().cancel(args.i_dest);
peer->getSession().detach(name);
}
+ QPID_LOG(debug, "Cancelled bridge " << name);
}
void Bridge::closed()
{
if (args.i_dynamic) {
- Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src);
- if (exchange.get() != 0)
- exchange->removeDynamicBridge(this);
+ Exchange::shared_ptr exchange = link->getBroker()->getExchanges().find(args.i_src);
+ if (exchange.get()) exchange->removeDynamicBridge(this);
}
+ QPID_LOG(debug, "Closed bridge " << name);
}
void Bridge::destroy()
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index cfa8dfda87..314b97daf1 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -221,26 +221,28 @@ void Link::add(Bridge::shared_ptr bridge)
void Link::cancel(Bridge::shared_ptr bridge)
{
- Mutex::ScopedLock mutex(lock);
+ bool needIOProcessing = false;
+ {
+ Mutex::ScopedLock mutex(lock);
- for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
- if ((*i).get() == bridge.get()) {
- created.erase(i);
- break;
+ for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
+ if ((*i).get() == bridge.get()) {
+ created.erase(i);
+ break;
+ }
}
- }
- for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- if ((*i).get() == bridge.get()) {
- cancellations.push_back(bridge);
- bridge->closed();
- active.erase(i);
- break;
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ if ((*i).get() == bridge.get()) {
+ cancellations.push_back(bridge);
+ bridge->closed();
+ active.erase(i);
+ break;
+ }
}
+ needIOProcessing = !cancellations.empty();
}
-
- if (!cancellations.empty()) {
+ if (needIOProcessing)
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
- }
}
void Link::ioThreadProcessing()
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index e11fb8eb37..b6f140e86b 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -65,6 +65,7 @@ void QueueReplicator::activate() {
// Take a reference to myself to ensure not deleted before initializeBridge
// is called.
self = shared_from_this();
+ // Note this may create a new bridge or use an existing one.
queue->getBroker()->getLinks().declare(
link->getHost(), link->getPort(),
false, // durable
@@ -77,7 +78,8 @@ void QueueReplicator::activate() {
"", // excludes
false, // dynamic
0, // sync?
- // Include shared_ptr to self to ensure we not deleted before initializeBridge is called.
+ // Include shared_ptr to self to ensure we are not deleted
+ // before initializeBridge is called.
boost::bind(&QueueReplicator::initializeBridge, this, _1, _2, self)
);
}
@@ -88,6 +90,7 @@ void QueueReplicator::deactivate() {
sys::Mutex::ScopedLock l(lock);
queue->getBroker()->getLinks().destroy(
link->getHost(), link->getPort(), queue->getName(), getName(), string());
+ QPID_LOG(debug, logPrefix << "Deactivated bridge " << bridgeName);
}
// Called in a broker connection thread when the bridge is created.
@@ -95,7 +98,7 @@ void QueueReplicator::deactivate() {
void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler,
boost::shared_ptr<QueueReplicator> /*self*/) {
sys::Mutex::ScopedLock l(lock);
-
+ bridgeName = bridge.getName();
framing::AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
framing::FieldTable settings;
@@ -117,7 +120,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false, "", 0, settings);
peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
- QPID_LOG(debug, logPrefix << "Activated bridge from " << args.i_src << " to " << args.i_dest);
+ QPID_LOG(debug, logPrefix << "Activated bridge " << bridgeName);
}
namespace {
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index 9d0ea307ef..4846d01b76 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -75,6 +75,7 @@ class QueueReplicator : public broker::Exchange,
void dequeue(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
std::string logPrefix;
+ std::string bridgeName;
sys::Mutex lock;
boost::shared_ptr<broker::Queue> queue;
boost::shared_ptr<broker::Link> link;
diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
index 58cacb16f4..b86b7cec4a 100644
--- a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
@@ -260,7 +260,7 @@ void WiringReplicator::doEventQueueDeclare(Variant::Map& values) {
string name = values[QNAME].asString();
Variant::Map argsMap = values[ARGS].asMap();
if (values[DISP] == CREATED && replicateLevel(argsMap)) {
- framing::FieldTable args;
+ framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
std::pair<boost::shared_ptr<Queue>, bool> result =
broker.createQueue(
@@ -287,21 +287,20 @@ void WiringReplicator::doEventQueueDeclare(Variant::Map& values) {
}
void WiringReplicator::doEventQueueDelete(Variant::Map& values) {
+ // The remote queue has already been deleted so replicator
+ // sessions may be closed by a "queue deleted" exception.
string name = values[QNAME].asString();
boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
if (queue && replicateLevel(queue->getSettings())) {
QPID_LOG(debug, "HA: Backup deleting queue: " << name);
- broker.deleteQueue(
- name,
- values[USER].asString(),
- values[RHOST].asString());
- // Delete the QueueReplicator exchange for this queue.
- boost::shared_ptr<broker::Exchange> ex =
- broker.getExchanges().find(QueueReplicator::replicatorName(name));
- boost::shared_ptr<QueueReplicator> qr =
- boost::dynamic_pointer_cast<QueueReplicator>(ex);
+ string rname = QueueReplicator::replicatorName(name);
+ boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname);
+ boost::shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex);
if (qr) qr->deactivate();
- broker.getExchanges().destroy(QueueReplicator::replicatorName(name));
+ // QueueReplicator's bridge is now queued for destruction but may not
+ // actually be destroyed, deleting the exhange
+ broker.getExchanges().destroy(rname);
+ broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString());
}
}
@@ -455,8 +454,8 @@ void WiringReplicator::doResponseBind(Variant::Map& values) {
void WiringReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) {
if (replicateLevel(queue->getSettings()) == RL_ALL) {
boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
- qr->activate();
broker.getExchanges().registerExchange(qr);
+ qr->activate();
}
}