diff options
Diffstat (limited to 'qpid/cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueRegistry.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp | 27 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/PrimaryTxObserver.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/TxReplicator.cpp | 23 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/TxReplicator.h | 4 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 7 |
7 files changed, 42 insertions, 28 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp index 631718e7ae..606a8cceae 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp @@ -121,7 +121,9 @@ Queue::shared_ptr QueueRegistry::find(const string& name){ Queue::shared_ptr QueueRegistry::get(const string& name) { Queue::shared_ptr q = find(name); - if (!q) throw framing::NotFoundException(QPID_MSG("Queue not found: "<<name)); + if (!q) { + throw framing::NotFoundException(QPID_MSG("Queue not found: "<<name)); + } return q; } diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 1e09caedb6..eb1206437a 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -269,7 +269,8 @@ class BrokerReplicator::UpdateTracker { void clean(const std::string& name) { QPID_LOG(info, "Backup: Deleted " << type << " " << name << ": no longer exists on primary"); - cleanFn(name); + try { cleanFn(name); } + catch (const framing::NotFoundException&) {} } std::string type; diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp index 04eede6fe0..41494694de 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp @@ -83,7 +83,7 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) : replicationTest(hb.getSettings().replicateDefault.get()), id(true), exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()), - failed(false), ended(false) + failed(false), ended(false), complete(false) { logPrefix = "Primary transaction "+shortStr(id)+": "; @@ -165,7 +165,7 @@ void PrimaryTxObserver::commit() { sys::Mutex::ScopedLock l(lock); QPID_LOG(debug, logPrefix << "Commit"); txQueue->deliver(TxCommitEvent().message()); - ended = true; + complete = true; end(l); } @@ -173,16 +173,25 @@ void PrimaryTxObserver::rollback() { sys::Mutex::ScopedLock l(lock); QPID_LOG(debug, logPrefix << "Rollback"); txQueue->deliver(TxRollbackEvent().message()); - ended = true; + complete = true; end(l); } void PrimaryTxObserver::end(sys::Mutex::ScopedLock&) { - // Don't destroy the tx-queue if there are connected subscriptions. - if (ended && unfinished.empty()) { - haBroker.getBroker().deleteQueue( - txQueue->getName(), haBroker.getUserId(), string()); - broker.getExchanges().destroy(getExchangeName()); + // Don't destroy the tx-queue until the transaction is complete and there + // are no connected subscriptions. + if (!ended && complete && unfinished.empty()) { + ended = true; + try { + haBroker.getBroker().deleteQueue(txQueue->getName(), haBroker.getUserId(), string()); + } catch (const std::exception& e) { + QPID_LOG(error, logPrefix << "Deleting transaction queue: " << e.what()); + } + try { + broker.getExchanges().destroy(getExchangeName()); + } catch (const std::exception& e) { + QPID_LOG(error, logPrefix << "Deleting transaction exchange: " << e.what()); + } } } @@ -207,7 +216,7 @@ void PrimaryTxObserver::cancel(const ReplicatingSubscription& rs) { sys::Mutex::ScopedLock l(lock); types::Uuid backup = rs.getBrokerInfo().getSystemId(); if (unprepared.find(backup) != unprepared.end()) { - ended = failed = true; // Canceled before prepared. + complete = failed = true; // Canceled before prepared. unprepared.erase(backup); // Consider it prepared-fail } unfinished.erase(backup); diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h index 2a378e1413..fb9db25e85 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h @@ -103,7 +103,7 @@ class PrimaryTxObserver : public broker::TransactionObserver, std::string exchangeName; QueuePtr txQueue; QueueIdsMap enqueues; - bool failed, ended; + bool failed, ended, complete; UuidSet members; // All members of transaction. UuidSet unprepared; // Members that have not yet responded to prepare. diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.cpp b/qpid/cpp/src/qpid/ha/TxReplicator.cpp index 0ec0643491..63301a92f5 100644 --- a/qpid/cpp/src/qpid/ha/TxReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/TxReplicator.cpp @@ -35,6 +35,7 @@ #include "qpid/broker/TxBuffer.h" #include "qpid/broker/TxAccept.h" #include "qpid/broker/amqp_0_10/Connection.h" +#include "qpid/broker/DeliverableMessage.h" #include "qpid/framing/BufferTypes.h" #include "qpid/log/Statement.h" #include <boost/shared_ptr.hpp> @@ -79,7 +80,7 @@ TxReplicator::TxReplicator( txBuffer(new broker::TxBuffer), store(hb.getBroker().hasStore() ? &hb.getBroker().getStore() : 0), channel(link->nextChannel()), - complete(false), + complete(false), ignore(false), dequeueState(hb.getBroker().getQueues()) { string id(getTxId(txQueue->getName())); @@ -119,6 +120,10 @@ void TxReplicator::sendMessage(const broker::Message& msg, sys::Mutex::ScopedLoc } } +void TxReplicator::route(broker::Deliverable& deliverable) { + if (!ignore) QueueReplicator::route(deliverable); +} + void TxReplicator::deliver(const broker::Message& m_) { sys::Mutex::ScopedLock l(lock); // Deliver message to the target queue, not the tx-queue. @@ -215,30 +220,28 @@ void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock& l) { end(l); } -void TxReplicator::members(const string& data, sys::Mutex::ScopedLock& l) { +void TxReplicator::members(const string& data, sys::Mutex::ScopedLock&) { TxMembersEvent e; decodeStr(data, e); QPID_LOG(debug, logPrefix << "Members: " << e.members); if (!e.members.count(haBroker.getMembership().getSelf().getSystemId())) { - QPID_LOG(debug, logPrefix << "Not a member of transaction, terminating"); - end(l); + QPID_LOG(info, logPrefix << "Not participating in transaction"); + ignore = true; } } void TxReplicator::end(sys::Mutex::ScopedLock&) { complete = true; if (!getQueue()) return; // Already destroyed - // Destroy the tx-queue, which will destroy this via QueueReplicator destroy. - // Need to do this now to cancel the subscription to the primary tx-queue - // which informs the primary that we have completed the transaction. - haBroker.getBroker().deleteQueue( - getQueue()->getName(), haBroker.getUserId(), string()); + // Destroy will cancel the subscription to the primary tx-queue which + // informs the primary that we have completed the transaction. + destroy(); } void TxReplicator::destroy() { QueueReplicator::destroy(); sys::Mutex::ScopedLock l(lock); - if (!complete) rollback(string(), l); + if (!ignore && !complete) rollback(string(), l); } }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.h b/qpid/cpp/src/qpid/ha/TxReplicator.h index 4d2eb2f242..214b2a8a5f 100644 --- a/qpid/cpp/src/qpid/ha/TxReplicator.h +++ b/qpid/cpp/src/qpid/ha/TxReplicator.h @@ -36,6 +36,7 @@ class TxAccept; class DtxBuffer; class Broker; class MessageStore; +class Deliverable; } namespace ha { @@ -63,6 +64,7 @@ class TxReplicator : public QueueReplicator { std::string getType() const; // QueueReplicator overrides + void route(broker::Deliverable& deliverable); void destroy(); protected: @@ -91,7 +93,7 @@ class TxReplicator : public QueueReplicator { broker::MessageStore* store; std::auto_ptr<broker::TransactionContext> context; framing::ChannelId channel; // Channel to send prepare-complete. - bool complete; + bool complete, ignore; // Class to process dequeues and create DeliveryRecords to populate a // TxAccept. diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 17a60a2c76..5fd0e2fa40 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -1467,13 +1467,10 @@ class TransactionTests(BrokerTest): tx = cluster[0].connect().session(transactional=True) s = tx.sender("q;{create:always}") s.send("foo") - tx_q = cluster[0].agent().tx_queues()[0] cluster.restart(1) - # Verify the new member should not be in the transaction. - # but should receive the result of the transaction via normal replication. - cluster[1].wait_no_queue(tx_q) tx.commit() - for b in cluster: b.assert_browse_backup("q", ["foo"]) + # The new member is not in the tx but receives the results normal replication. + for b in cluster: b.assert_browse_backup("q", ["foo"], msg=b) if __name__ == "__main__": outdir = "ha_tests.tmp" |