diff options
| author | Alan Conway <aconway@apache.org> | 2013-11-12 16:58:52 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2013-11-12 16:58:52 +0000 |
| commit | f87a61f06c3aa3d866cd3cc2fccf003276f6949a (patch) | |
| tree | be0c23f69d8a8de9f66b814a58121baa14ec8382 /cpp/src/qpid/ha/BrokerReplicator.cpp | |
| parent | 8f88e1e89b43c6d86851fe2fa183ff4ea58d352b (diff) | |
| download | qpid-python-f87a61f06c3aa3d866cd3cc2fccf003276f6949a.tar.gz | |
QPID-5275: HA transactions failing in qpid-cluster-benchmark
The test was failing due to incorrect handling of the transaction lifecycle:
- Failing to handle the automatic rollback of the empty TX at session close.
- Deleting the tx-q before all backups were finished with it.
The fixes include
- Make tx-q auto-delete, deleted only when the TxReplicators cancel their subscriptions.
- Use markInUse/releaseFromUse on the primary to keep the tx-q until the primary is done.
- Count TxReplicators for auto-delete (unlike normal QueueReplicators)
- Improved error handling and log messages
- Handle *incoming* exceptions on a federation link by passing to ErrorListener
- QueueReplicator catches incoming not-found and resource-deleted exceptions
- close the backup bridge, handle race between subscribe and delete.
- Simplify QueueSnapshots, remove need for snapshot map.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1541146 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/ha/BrokerReplicator.cpp')
| -rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.cpp | 13 |
1 files changed, 10 insertions, 3 deletions
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp index a59c874594..d27d5e84b3 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -188,6 +188,12 @@ class BrokerReplicator::ErrorListener : public SessionHandler::ErrorListener { void executionException(framing::execution::ErrorCode, const std::string& msg) { QPID_LOG(error, logPrefix << "Execution error: " << msg); } + + void incomingExecutionException( + framing::execution::ErrorCode, const std::string& msg) { + QPID_LOG(error, logPrefix << "Incoming execution error: " << msg); + } + void detach() { QPID_LOG(debug, logPrefix << "Session detached."); } @@ -453,7 +459,7 @@ void BrokerReplicator::route(Deliverable& msg) { if (msg.getMessage().getPropertyAsString(QMF_CONTENT) == EVENT) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { Variant::Map& map = i->asMap(); - QPID_LOG(trace, "Broker replicator event: " << map); + QPID_LOG(debug, "Broker replicator event: " << map); Variant::Map& schema = map[SCHEMA_ID].asMap(); Variant::Map& values = map[VALUES].asMap(); std::string key = (schema[PACKAGE_NAME].asString() + @@ -465,7 +471,7 @@ void BrokerReplicator::route(Deliverable& msg) { } else if (msg.getMessage().getPropertyAsString(QMF_OPCODE) == QUERY_RESPONSE) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { Variant::Map& map = i->asMap(); - QPID_LOG(trace, "Broker replicator response: " << map); + QPID_LOG(debug, "Broker replicator response: " << map); string type = map[SCHEMA_ID].asMap()[CLASS_NAME].asString(); Variant::Map& values = map[VALUES].asMap(); framing::FieldTable args; @@ -758,7 +764,7 @@ const string REPLICATE_DEFAULT="replicateDefault"; // Received the ha-broker configuration object for the primary broker. void BrokerReplicator::doResponseHaBroker(Variant::Map& values) { try { - QPID_LOG(trace, logPrefix << "HA Broker response: " << values); + QPID_LOG(debug, logPrefix << "HA Broker response: " << values); ReplicateLevel mine = haBroker.getSettings().replicateDefault.get(); ReplicateLevel primary = replicationTest.getLevel(values[REPLICATE_DEFAULT].asString()); if (mine != primary) @@ -882,6 +888,7 @@ string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; void BrokerReplicator::disconnectedExchange(boost::shared_ptr<Exchange> ex) { boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex)); + // FIXME aconway 2013-11-01: move logic with releaseFromUse to QueueReplicator if (qr) { qr->disconnect(); if (TxReplicator::isTxQueue(qr->getQueue()->getName())) { |
