summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/QueueRegistry.cpp4
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp3
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp27
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.h2
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicator.cpp23
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicator.h4
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py7
7 files changed, 42 insertions, 28 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
index 631718e7ae..606a8cceae 100644
--- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -121,7 +121,9 @@ Queue::shared_ptr QueueRegistry::find(const string& name){
Queue::shared_ptr QueueRegistry::get(const string& name) {
Queue::shared_ptr q = find(name);
- if (!q) throw framing::NotFoundException(QPID_MSG("Queue not found: "<<name));
+ if (!q) {
+ throw framing::NotFoundException(QPID_MSG("Queue not found: "<<name));
+ }
return q;
}
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 1e09caedb6..eb1206437a 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -269,7 +269,8 @@ class BrokerReplicator::UpdateTracker {
void clean(const std::string& name) {
QPID_LOG(info, "Backup: Deleted " << type << " " << name <<
": no longer exists on primary");
- cleanFn(name);
+ try { cleanFn(name); }
+ catch (const framing::NotFoundException&) {}
}
std::string type;
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index 04eede6fe0..41494694de 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -83,7 +83,7 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
replicationTest(hb.getSettings().replicateDefault.get()),
id(true),
exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()),
- failed(false), ended(false)
+ failed(false), ended(false), complete(false)
{
logPrefix = "Primary transaction "+shortStr(id)+": ";
@@ -165,7 +165,7 @@ void PrimaryTxObserver::commit() {
sys::Mutex::ScopedLock l(lock);
QPID_LOG(debug, logPrefix << "Commit");
txQueue->deliver(TxCommitEvent().message());
- ended = true;
+ complete = true;
end(l);
}
@@ -173,16 +173,25 @@ void PrimaryTxObserver::rollback() {
sys::Mutex::ScopedLock l(lock);
QPID_LOG(debug, logPrefix << "Rollback");
txQueue->deliver(TxRollbackEvent().message());
- ended = true;
+ complete = true;
end(l);
}
void PrimaryTxObserver::end(sys::Mutex::ScopedLock&) {
- // Don't destroy the tx-queue if there are connected subscriptions.
- if (ended && unfinished.empty()) {
- haBroker.getBroker().deleteQueue(
- txQueue->getName(), haBroker.getUserId(), string());
- broker.getExchanges().destroy(getExchangeName());
+ // Don't destroy the tx-queue until the transaction is complete and there
+ // are no connected subscriptions.
+ if (!ended && complete && unfinished.empty()) {
+ ended = true;
+ try {
+ haBroker.getBroker().deleteQueue(txQueue->getName(), haBroker.getUserId(), string());
+ } catch (const std::exception& e) {
+ QPID_LOG(error, logPrefix << "Deleting transaction queue: " << e.what());
+ }
+ try {
+ broker.getExchanges().destroy(getExchangeName());
+ } catch (const std::exception& e) {
+ QPID_LOG(error, logPrefix << "Deleting transaction exchange: " << e.what());
+ }
}
}
@@ -207,7 +216,7 @@ void PrimaryTxObserver::cancel(const ReplicatingSubscription& rs) {
sys::Mutex::ScopedLock l(lock);
types::Uuid backup = rs.getBrokerInfo().getSystemId();
if (unprepared.find(backup) != unprepared.end()) {
- ended = failed = true; // Canceled before prepared.
+ complete = failed = true; // Canceled before prepared.
unprepared.erase(backup); // Consider it prepared-fail
}
unfinished.erase(backup);
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
index 2a378e1413..fb9db25e85 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
@@ -103,7 +103,7 @@ class PrimaryTxObserver : public broker::TransactionObserver,
std::string exchangeName;
QueuePtr txQueue;
QueueIdsMap enqueues;
- bool failed, ended;
+ bool failed, ended, complete;
UuidSet members; // All members of transaction.
UuidSet unprepared; // Members that have not yet responded to prepare.
diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.cpp b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
index 0ec0643491..63301a92f5 100644
--- a/qpid/cpp/src/qpid/ha/TxReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
@@ -35,6 +35,7 @@
#include "qpid/broker/TxBuffer.h"
#include "qpid/broker/TxAccept.h"
#include "qpid/broker/amqp_0_10/Connection.h"
+#include "qpid/broker/DeliverableMessage.h"
#include "qpid/framing/BufferTypes.h"
#include "qpid/log/Statement.h"
#include <boost/shared_ptr.hpp>
@@ -79,7 +80,7 @@ TxReplicator::TxReplicator(
txBuffer(new broker::TxBuffer),
store(hb.getBroker().hasStore() ? &hb.getBroker().getStore() : 0),
channel(link->nextChannel()),
- complete(false),
+ complete(false), ignore(false),
dequeueState(hb.getBroker().getQueues())
{
string id(getTxId(txQueue->getName()));
@@ -119,6 +120,10 @@ void TxReplicator::sendMessage(const broker::Message& msg, sys::Mutex::ScopedLoc
}
}
+void TxReplicator::route(broker::Deliverable& deliverable) {
+ if (!ignore) QueueReplicator::route(deliverable);
+}
+
void TxReplicator::deliver(const broker::Message& m_) {
sys::Mutex::ScopedLock l(lock);
// Deliver message to the target queue, not the tx-queue.
@@ -215,30 +220,28 @@ void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock& l) {
end(l);
}
-void TxReplicator::members(const string& data, sys::Mutex::ScopedLock& l) {
+void TxReplicator::members(const string& data, sys::Mutex::ScopedLock&) {
TxMembersEvent e;
decodeStr(data, e);
QPID_LOG(debug, logPrefix << "Members: " << e.members);
if (!e.members.count(haBroker.getMembership().getSelf().getSystemId())) {
- QPID_LOG(debug, logPrefix << "Not a member of transaction, terminating");
- end(l);
+ QPID_LOG(info, logPrefix << "Not participating in transaction");
+ ignore = true;
}
}
void TxReplicator::end(sys::Mutex::ScopedLock&) {
complete = true;
if (!getQueue()) return; // Already destroyed
- // Destroy the tx-queue, which will destroy this via QueueReplicator destroy.
- // Need to do this now to cancel the subscription to the primary tx-queue
- // which informs the primary that we have completed the transaction.
- haBroker.getBroker().deleteQueue(
- getQueue()->getName(), haBroker.getUserId(), string());
+ // Destroy will cancel the subscription to the primary tx-queue which
+ // informs the primary that we have completed the transaction.
+ destroy();
}
void TxReplicator::destroy() {
QueueReplicator::destroy();
sys::Mutex::ScopedLock l(lock);
- if (!complete) rollback(string(), l);
+ if (!ignore && !complete) rollback(string(), l);
}
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.h b/qpid/cpp/src/qpid/ha/TxReplicator.h
index 4d2eb2f242..214b2a8a5f 100644
--- a/qpid/cpp/src/qpid/ha/TxReplicator.h
+++ b/qpid/cpp/src/qpid/ha/TxReplicator.h
@@ -36,6 +36,7 @@ class TxAccept;
class DtxBuffer;
class Broker;
class MessageStore;
+class Deliverable;
}
namespace ha {
@@ -63,6 +64,7 @@ class TxReplicator : public QueueReplicator {
std::string getType() const;
// QueueReplicator overrides
+ void route(broker::Deliverable& deliverable);
void destroy();
protected:
@@ -91,7 +93,7 @@ class TxReplicator : public QueueReplicator {
broker::MessageStore* store;
std::auto_ptr<broker::TransactionContext> context;
framing::ChannelId channel; // Channel to send prepare-complete.
- bool complete;
+ bool complete, ignore;
// Class to process dequeues and create DeliveryRecords to populate a
// TxAccept.
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 17a60a2c76..5fd0e2fa40 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -1467,13 +1467,10 @@ class TransactionTests(BrokerTest):
tx = cluster[0].connect().session(transactional=True)
s = tx.sender("q;{create:always}")
s.send("foo")
- tx_q = cluster[0].agent().tx_queues()[0]
cluster.restart(1)
- # Verify the new member should not be in the transaction.
- # but should receive the result of the transaction via normal replication.
- cluster[1].wait_no_queue(tx_q)
tx.commit()
- for b in cluster: b.assert_browse_backup("q", ["foo"])
+ # The new member is not in the tx but receives the results normal replication.
+ for b in cluster: b.assert_browse_backup("q", ["foo"], msg=b)
if __name__ == "__main__":
outdir = "ha_tests.tmp"