From 0630ea05003e6c530b9dde889e8296b12e67e41b Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 12 Nov 2013 16:58:52 +0000 Subject: QPID-5275: HA transactions failing in qpid-cluster-benchmark The test was failing due to incorrect handling of the transaction lifecycle: - Failing to handle the automatic rollback of the empty TX at session close. - Deleting the tx-q before all backups were finished with it. The fixes include - Make tx-q auto-delete, deleted only when the TxReplicators cancel their subscriptions. - Use markInUse/releaseFromUse on the primary to keep the tx-q until the primary is done. - Count TxReplicators for auto-delete (unlike normal QueueReplicators) - Improved error handling and log messages - Handle *incoming* exceptions on a federation link by passing to ErrorListener - QueueReplicator catches incoming not-found and resource-deleted exceptions - close the backup bridge, handle race between subscribe and delete. - Simplify QueueSnapshots, remove need for snapshot map. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1541146 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/CMakeLists.txt | 2 + qpid/cpp/src/qpid/broker/Bridge.cpp | 6 + qpid/cpp/src/qpid/broker/Bridge.h | 1 + qpid/cpp/src/qpid/broker/Link.cpp | 1 + qpid/cpp/src/qpid/broker/SemanticState.cpp | 2 +- qpid/cpp/src/qpid/broker/SemanticState.h | 4 +- qpid/cpp/src/qpid/broker/SessionAdapter.cpp | 9 +- qpid/cpp/src/qpid/broker/SessionHandler.cpp | 7 + qpid/cpp/src/qpid/broker/SessionHandler.h | 30 +++- qpid/cpp/src/qpid/broker/SessionState.h | 2 + qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 13 +- qpid/cpp/src/qpid/ha/Event.cpp | 2 +- qpid/cpp/src/qpid/ha/Event.h | 14 +- qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp | 170 +++++++++++++-------- qpid/cpp/src/qpid/ha/PrimaryTxObserver.h | 18 ++- qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 50 +++--- qpid/cpp/src/qpid/ha/QueueReplicator.h | 5 + qpid/cpp/src/qpid/ha/QueueSnapshots.h | 32 ++-- qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 95 ++++++------ qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 146 ++++++++++-------- qpid/cpp/src/qpid/ha/TxReplicatingSubscription.cpp | 45 ++++++ qpid/cpp/src/qpid/ha/TxReplicatingSubscription.h | 50 ++++++ qpid/cpp/src/qpid/ha/TxReplicator.cpp | 58 ++++--- qpid/cpp/src/qpid/ha/TxReplicator.h | 4 +- qpid/cpp/src/tests/brokertest.py | 22 ++- qpid/cpp/src/tests/ha_tests.py | 63 +++++--- 26 files changed, 552 insertions(+), 299 deletions(-) create mode 100644 qpid/cpp/src/qpid/ha/TxReplicatingSubscription.cpp create mode 100644 qpid/cpp/src/qpid/ha/TxReplicatingSubscription.h diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index 1da108b727..248798977f 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -623,6 +623,8 @@ if (BUILD_HA) qpid/ha/StandAlone.h qpid/ha/StatusCheck.cpp qpid/ha/StatusCheck.h + qpid/ha/TxReplicatingSubscription.cpp + qpid/ha/TxReplicatingSubscription.h qpid/ha/PrimaryTxObserver.cpp qpid/ha/PrimaryTxObserver.h qpid/ha/types.cpp diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 6b34898158..b7fbb1a9aa 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -442,6 +442,12 @@ void Bridge::executionException( if (errorListener) errorListener->executionException(code, msg); } +void Bridge::incomingExecutionException( + framing::execution::ErrorCode code, const std::string& msg) +{ + if (errorListener) errorListener->incomingExecutionException(code, msg); +} + void Bridge::detach() { detached = true; if (errorListener) errorListener->detach(); diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h index 604a8473f3..3eac8f2af2 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.h +++ b/qpid/cpp/src/qpid/broker/Bridge.h @@ -111,6 +111,7 @@ class Bridge : public PersistableConfig, void connectionException(framing::connection::CloseCode code, const std::string& msg); void channelException(framing::session::DetachCode, const std::string& msg); void executionException(framing::execution::ErrorCode, const std::string& msg); + void incomingExecutionException(framing::execution::ErrorCode, const std::string& msg); void detach(); void setErrorListener(boost::shared_ptr e) { errorListener = e; } diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index fe1cac8aab..fc200adbea 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -273,6 +273,7 @@ class DetachedCallback : public SessionHandler::ErrorListener { void connectionException(framing::connection::CloseCode, const std::string&) {} void channelException(framing::session::DetachCode, const std::string&) {} void executionException(framing::execution::ErrorCode, const std::string&) {} + void incomingExecutionException(framing::execution::ErrorCode, const std::string& ) {} void detach() {} private: const std::string name; diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 4d5ea3b5b6..8d7acda673 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -206,9 +206,9 @@ void SemanticState::rollback() { if (!txBuffer) throw CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions")); + StartTxOnExit e(*this); session.rollbackTx(); // Just to update statistics txBuffer->rollback(); - startTx(); // Start a new TX automatically. } void SemanticState::selectDtx() diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index 4375a3f0f1..8fb796add7 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -125,6 +125,7 @@ class SemanticState : private boost::noncopyable { SessionContext& getSession(); const SessionContext& getSession() const; + SessionState& getSessionState() { return session; } const boost::shared_ptr find(const std::string& destination) const; bool find(const std::string& destination, boost::shared_ptr&) const; @@ -200,8 +201,9 @@ class SemanticStateConsumerImpl : public Consumer, public sys::OutputTask, protected: mutable qpid::sys::Mutex lock; SemanticState* const parent; - private: const boost::shared_ptr queue; + + private: const bool ackExpected; const bool acquire; bool blocked; diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index 9a8110d54f..5b1a6aa267 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -536,16 +536,17 @@ void SessionAdapter::ExecutionHandlerImpl::result(const SequenceNumber& /*comman //TODO: but currently never used client->server } -void SessionAdapter::ExecutionHandlerImpl::exception(uint16_t /*errorCode*/, +void SessionAdapter::ExecutionHandlerImpl::exception(uint16_t errorCode, const SequenceNumber& /*commandId*/, uint8_t /*classCode*/, uint8_t /*commandCode*/, uint8_t /*fieldIndex*/, - const std::string& /*description*/, + const std::string& description, const framing::FieldTable& /*errorInfo*/) { - //TODO: again, not really used client->server but may be important - //for inter-broker links + broker::SessionHandler* s = state.getSessionState().getHandler(); + if (s) s->incomingExecutionException( + framing::execution::ErrorCode(errorCode), description); } diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp index ff7693e93a..a8f5734af7 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp @@ -68,6 +68,13 @@ void SessionHandler::executionException( errorListener->executionException(code, msg); } +void SessionHandler::incomingExecutionException( + framing::execution::ErrorCode code, const std::string& msg) +{ + if (errorListener) + errorListener->incomingExecutionException(code, msg); +} + amqp_0_10::Connection& SessionHandler::getConnection() { return connection; } const amqp_0_10::Connection& SessionHandler::getConnection() const { return connection; } diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h index 3ee1538ccd..cb81084014 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.h +++ b/qpid/cpp/src/qpid/broker/SessionHandler.h @@ -46,12 +46,23 @@ class SessionHandler : public qpid::amqp_0_10::SessionHandler { class ErrorListener { public: virtual ~ErrorListener() {} + + /** Called when there is an outgoing connection-exception */ virtual void connectionException( framing::connection::CloseCode code, const std::string& msg) = 0; + /** Called when there is an outgoing channel-exception */ virtual void channelException( framing::session::DetachCode, const std::string& msg) = 0; + /** Called when there is an outgoing execution-exception */ virtual void executionException( framing::execution::ErrorCode, const std::string& msg) = 0; + + /** Called when there is an incoming execution-exception. + * Useful for inter-broker bridges. + */ + virtual void incomingExecutionException( + framing::execution::ErrorCode, const std::string& msg) = 0; + /** Called when it is safe to delete the ErrorListener. */ virtual void detach() = 0; }; @@ -77,15 +88,18 @@ class SessionHandler : public qpid::amqp_0_10::SessionHandler { void setErrorListener(boost::shared_ptr e) { errorListener = e; } + // Called by SessionAdapter + void incomingExecutionException(framing::execution::ErrorCode, const std::string& msg); + protected: - virtual void setState(const std::string& sessionName, bool force); - virtual qpid::SessionState* getState(); - virtual framing::FrameHandler* getInHandler(); - virtual void connectionException(framing::connection::CloseCode code, const std::string& msg); - virtual void channelException(framing::session::DetachCode, const std::string& msg); - virtual void executionException(framing::execution::ErrorCode, const std::string& msg); - virtual void detaching(); - virtual void readyToSend(); + void setState(const std::string& sessionName, bool force); + qpid::SessionState* getState(); + framing::FrameHandler* getInHandler(); + void connectionException(framing::connection::CloseCode code, const std::string& msg); + void channelException(framing::session::DetachCode, const std::string& msg); + void executionException(framing::execution::ErrorCode, const std::string& msg); + void detaching(); + void readyToSend(); private: struct SetChannelProxy : public framing::AMQP_ClientProxy { // Proxy that sets the channel. diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index 23dc3b897d..ca6d6bf530 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -82,6 +82,8 @@ class SessionState : public qpid::SessionState, void attach(SessionHandler& handler); void disableOutput(); + SessionHandler* getHandler() { return handler; } + /** @pre isAttached() */ framing::AMQP_ClientProxy& getProxy(); diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index a59c874594..d27d5e84b3 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -188,6 +188,12 @@ class BrokerReplicator::ErrorListener : public SessionHandler::ErrorListener { void executionException(framing::execution::ErrorCode, const std::string& msg) { QPID_LOG(error, logPrefix << "Execution error: " << msg); } + + void incomingExecutionException( + framing::execution::ErrorCode, const std::string& msg) { + QPID_LOG(error, logPrefix << "Incoming execution error: " << msg); + } + void detach() { QPID_LOG(debug, logPrefix << "Session detached."); } @@ -453,7 +459,7 @@ void BrokerReplicator::route(Deliverable& msg) { if (msg.getMessage().getPropertyAsString(QMF_CONTENT) == EVENT) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { Variant::Map& map = i->asMap(); - QPID_LOG(trace, "Broker replicator event: " << map); + QPID_LOG(debug, "Broker replicator event: " << map); Variant::Map& schema = map[SCHEMA_ID].asMap(); Variant::Map& values = map[VALUES].asMap(); std::string key = (schema[PACKAGE_NAME].asString() + @@ -465,7 +471,7 @@ void BrokerReplicator::route(Deliverable& msg) { } else if (msg.getMessage().getPropertyAsString(QMF_OPCODE) == QUERY_RESPONSE) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { Variant::Map& map = i->asMap(); - QPID_LOG(trace, "Broker replicator response: " << map); + QPID_LOG(debug, "Broker replicator response: " << map); string type = map[SCHEMA_ID].asMap()[CLASS_NAME].asString(); Variant::Map& values = map[VALUES].asMap(); framing::FieldTable args; @@ -758,7 +764,7 @@ const string REPLICATE_DEFAULT="replicateDefault"; // Received the ha-broker configuration object for the primary broker. void BrokerReplicator::doResponseHaBroker(Variant::Map& values) { try { - QPID_LOG(trace, logPrefix << "HA Broker response: " << values); + QPID_LOG(debug, logPrefix << "HA Broker response: " << values); ReplicateLevel mine = haBroker.getSettings().replicateDefault.get(); ReplicateLevel primary = replicationTest.getLevel(values[REPLICATE_DEFAULT].asString()); if (mine != primary) @@ -882,6 +888,7 @@ string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; void BrokerReplicator::disconnectedExchange(boost::shared_ptr ex) { boost::shared_ptr qr(boost::dynamic_pointer_cast(ex)); + // FIXME aconway 2013-11-01: move logic with releaseFromUse to QueueReplicator if (qr) { qr->disconnect(); if (TxReplicator::isTxQueue(qr->getQueue()->getName())) { diff --git a/qpid/cpp/src/qpid/ha/Event.cpp b/qpid/cpp/src/qpid/ha/Event.cpp index 8265a6edd3..ff336d0b2b 100644 --- a/qpid/cpp/src/qpid/ha/Event.cpp +++ b/qpid/cpp/src/qpid/ha/Event.cpp @@ -51,7 +51,7 @@ const string TxCommitEvent::KEY(QPID_HA+"txcom"); const string TxRollbackEvent::KEY(QPID_HA+"txrb"); const string TxPrepareOkEvent::KEY(QPID_HA+"txok"); const string TxPrepareFailEvent::KEY(QPID_HA+"txno"); -const string TxMembersEvent::KEY(QPID_HA+"txmem"); +const string TxBackupsEvent::KEY(QPID_HA+"txmem"); broker::Message makeMessage( const string& data, const string& destination, const string& routingKey) diff --git a/qpid/cpp/src/qpid/ha/Event.h b/qpid/cpp/src/qpid/ha/Event.h index f292499e7b..7b96e36f64 100644 --- a/qpid/cpp/src/qpid/ha/Event.h +++ b/qpid/cpp/src/qpid/ha/Event.h @@ -178,14 +178,14 @@ struct TxPrepareFailEvent : public EventBase { void print(std::ostream& o) const { o << broker; } }; -struct TxMembersEvent : public EventBase { +struct TxBackupsEvent : public EventBase { static const std::string KEY; - UuidSet members; - TxMembersEvent(const UuidSet& s=UuidSet()) : members(s) {} - void encode(framing::Buffer& b) const { b.put(members); } - void decode(framing::Buffer& b) { b.get(members); } - size_t encodedSize() const { return members.encodedSize(); } - void print(std::ostream& o) const { o << members; } + UuidSet backups; + TxBackupsEvent(const UuidSet& s=UuidSet()) : backups(s) {} + void encode(framing::Buffer& b) const { b.put(backups); } + void decode(framing::Buffer& b) { b.get(backups); } + size_t encodedSize() const { return backups.encodedSize(); } + void print(std::ostream& o) const { o << backups; } }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp index 416bb329a6..a32334bcf9 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp @@ -30,6 +30,7 @@ #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" +#include "qpid/framing/reply_exceptions.h" #include #include @@ -40,8 +41,9 @@ class FieldTable; namespace ha { using namespace std; -using namespace qpid::broker; -using namespace qpid::framing; +using namespace sys; +using namespace broker; +using namespace framing; using types::Uuid; // Exchange to receive prepare OK events. @@ -51,6 +53,7 @@ class PrimaryTxObserver::Exchange : public broker::Exchange { broker::Exchange(tx_->getExchangeName()), tx(tx_) { + args.setString(QPID_REPLICATE, printable(NONE).str()); // Set replication arg. dispatch[TxPrepareOkEvent::KEY] = boost::bind(&PrimaryTxObserver::txPrepareOkEvent, tx, _1); dispatch[TxPrepareFailEvent::KEY] = @@ -72,7 +75,7 @@ class PrimaryTxObserver::Exchange : public broker::Exchange { private: static const string TYPE_NAME; typedef boost::function DispatchFn; - typedef qpid::sys::unordered_map DispatchMap; + typedef unordered_map DispatchMap; DispatchMap dispatch; boost::shared_ptr tx; @@ -83,50 +86,62 @@ const string PrimaryTxObserver::Exchange::TYPE_NAME(string(QPID_HA_PREFIX)+"prim PrimaryTxObserver::PrimaryTxObserver( Primary& p, HaBroker& hb, const boost::intrusive_ptr& tx ) : + state(SENDING), primary(p), haBroker(hb), broker(hb.getBroker()), replicationTest(hb.getSettings().replicateDefault.get()), txBuffer(tx), id(true), - exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()), - complete(false) + exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()) { logPrefix = "Primary transaction "+shortStr(id)+": "; // The brokers known at this point are the ones that will be included // in the transaction. Brokers that join later are not included. // - BrokerInfo::Set backups(haBroker.getMembership().otherBackups()); - std::transform(backups.begin(), backups.end(), inserter(members, members.begin()), + BrokerInfo::Set backups_(haBroker.getMembership().otherBackups()); + std::transform(backups_.begin(), backups_.end(), inserter(backups, backups.begin()), boost::bind(&BrokerInfo::getSystemId, _1)); + // Delay completion of TX untill all backups have responded to prepare. + incomplete = backups; + for (size_t i = 0; i < incomplete.size(); ++i) + txBuffer->startCompleter(); + QPID_LOG(debug, logPrefix << "Started TX " << id); - QPID_LOG(debug, logPrefix << "Members: " << members); - unprepared = unfinished = members; + QPID_LOG(debug, logPrefix << "Backups: " << backups); +} +void PrimaryTxObserver::initialize() { + boost::shared_ptr ex(new Exchange(shared_from_this())); + broker.getExchanges().registerExchange(ex); pair result = broker.getQueues().declare( exchangeName, QueueSettings(/*durable*/false, /*autodelete*/true)); - assert(result.second); + if (!result.second) + throw InvalidArgumentException( + QPID_MSG(logPrefix << "TX replication queue already exists.")); txQueue = result.first; - txQueue->deliver(TxMembersEvent(members).message()); + txQueue->markInUse(true); // Prevent auto-delete till we are done. + txQueue->deliver(TxBackupsEvent(backups).message()); + } + PrimaryTxObserver::~PrimaryTxObserver() { QPID_LOG(debug, logPrefix << "Ended"); } -void PrimaryTxObserver::initialize() { - boost::shared_ptr ex(new Exchange(shared_from_this())); - FieldTable args = ex->getArgs(); - args.setString(QPID_REPLICATE, printable(NONE).str()); // Set replication arg. - broker.getExchanges().registerExchange(ex); +void PrimaryTxObserver::checkState(State expect, const std::string& msg) { + if (state != expect) + throw IllegalStateException(QPID_MSG(logPrefix << "Illegal state: " << msg)); } void PrimaryTxObserver::enqueue(const QueuePtr& q, const broker::Message& m) { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues. QPID_LOG(trace, logPrefix << "Enqueue: " << LogMessageId(*q, m)); + checkState(SENDING, "Too late for enqueue"); enqueues[q] += m.getReplicationId(); txQueue->deliver(TxEnqueueEvent(q->getName(), m.getReplicationId()).message()); txQueue->deliver(m); @@ -136,7 +151,8 @@ void PrimaryTxObserver::enqueue(const QueuePtr& q, const broker::Message& m) void PrimaryTxObserver::dequeue( const QueuePtr& q, QueuePosition pos, ReplicationId id) { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); + checkState(SENDING, "Too late for dequeue"); if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues. QPID_LOG(trace, logPrefix << "Dequeue: " << LogMessageId(*q, pos, id)); txQueue->deliver(TxDequeueEvent(q->getName(), id).message()); @@ -163,14 +179,14 @@ struct Skip { } // namespace bool PrimaryTxObserver::prepare() { - QPID_LOG(debug, logPrefix << "Prepare " << members); + QPID_LOG(debug, logPrefix << "Prepare " << backups); vector skips; { - sys::Mutex::ScopedLock l(lock); - for (size_t i = 0; i < members.size(); ++i) txBuffer->startCompleter(); - + Mutex::ScopedLock l(lock); + checkState(SENDING, "Too late for prepare"); + state = PREPARING; // Tell replicating subscriptions to skip IDs in the transaction. - for (UuidSet::iterator b = members.begin(); b != members.end(); ++b) + for (UuidSet::iterator b = backups.begin(); b != backups.end(); ++b) for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q) skips.push_back(Skip(*b, q->first, q->second)); } @@ -183,69 +199,91 @@ bool PrimaryTxObserver::prepare() { void PrimaryTxObserver::commit() { QPID_LOG(debug, logPrefix << "Commit"); - sys::Mutex::ScopedLock l(lock); - txQueue->deliver(TxCommitEvent().message()); - complete = true; - end(l); + Mutex::ScopedLock l(lock); + checkState(PREPARING, "Cannot commit, not preparing"); + if (incomplete.size() == 0) { + txQueue->deliver(TxCommitEvent().message()); + end(l); + } else { + txQueue->deliver(TxRollbackEvent().message()); + end(l); + throw PreconditionFailedException( + QPID_MSG(logPrefix << "Cannot commit, " << incomplete.size() + << " incomplete backups")); + } } void PrimaryTxObserver::rollback() { QPID_LOG(debug, logPrefix << "Rollback"); - sys::Mutex::ScopedLock l(lock); - txQueue->deliver(TxRollbackEvent().message()); - complete = true; - end(l); -} - -void PrimaryTxObserver::end(sys::Mutex::ScopedLock&) { - // Don't destroy the tx-queue until the transaction is complete and there - // are no connected subscriptions. - if (txBuffer && complete && unfinished.empty()) { - txBuffer = 0; // Break pointer cycle. - try { - haBroker.getBroker().deleteQueue(txQueue->getName(), haBroker.getUserId(), string()); - } catch (const std::exception& e) { - QPID_LOG(error, logPrefix << "Deleting transaction queue: " << e.what()); - } - try { - broker.getExchanges().destroy(getExchangeName()); - } catch (const std::exception& e) { - QPID_LOG(error, logPrefix << "Deleting transaction exchange: " << e.what()); - } + Mutex::ScopedLock l(lock); + if (state != ENDED) { + txQueue->deliver(TxRollbackEvent().message()); + end(l); } } +void PrimaryTxObserver::end(Mutex::ScopedLock&) { + if (state == ENDED) return; + state = ENDED; + // If there are no outstanding completions, break pointer cycle here. + // Otherwise break it in cancel() when the remaining completions are done. + if (incomplete.empty()) txBuffer = 0; + txQueue->releaseFromUse(true); // txQueue will auto-delete + txQueue.reset(); + try { + broker.getExchanges().destroy(getExchangeName()); + } catch (const std::exception& e) { + QPID_LOG(error, logPrefix << "Deleting transaction exchange: " << e.what()); + } +} + +bool PrimaryTxObserver::completed(const Uuid& id, Mutex::ScopedLock&) { + if (incomplete.erase(id)) { + txBuffer->finishCompleter(); + return true; + } + return false; +} + +bool PrimaryTxObserver::error(const Uuid& id, const char* msg, Mutex::ScopedLock& l) +{ + if (incomplete.find(id) != incomplete.end()) { + // Note: setError before completed since completed may trigger completion. + txBuffer->setError(QPID_MSG(logPrefix << msg << id)); + completed(id, l); + return true; + } + return false; +} + void PrimaryTxObserver::txPrepareOkEvent(const string& data) { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); types::Uuid backup = decodeStr(data).broker; - if (unprepared.erase(backup)) { + if (completed(backup, l)) { QPID_LOG(debug, logPrefix << "Backup prepared ok: " << backup); - txBuffer->finishCompleter(); + } else { + QPID_LOG(error, logPrefix << "Unexpected prepare-ok response from " << backup); } } void PrimaryTxObserver::txPrepareFailEvent(const string& data) { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); types::Uuid backup = decodeStr(data).broker; - if (unprepared.erase(backup)) { - QPID_LOG(error, logPrefix << "Prepare failed on backup: " << backup); - txBuffer->setError( - QPID_MSG(logPrefix << "Prepare failed on backup: " << backup)); - txBuffer->finishCompleter(); + if (error(backup, "Prepare failed on backup: ", l)) { + QPID_LOG(error, logPrefix << "Prepare failed on backup " << backup); + } else { + QPID_LOG(error, logPrefix << "Unexpected prepare-fail response from " << backup); } } void PrimaryTxObserver::cancel(const ReplicatingSubscription& rs) { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); types::Uuid backup = rs.getBrokerInfo().getSystemId(); - if (unprepared.erase(backup) ){ - complete = true; // Cancelled before prepared. - txBuffer->setError( - QPID_MSG(logPrefix << "Backup disconnected: " << rs.getBrokerInfo())); - txBuffer->finishCompleter(); - } - unfinished.erase(backup); - end(l); + QPID_LOG(debug, logPrefix << "Backup disconnected: " << backup); + // Normally the backup should be completed before it is cancelled. + if (completed(backup, l)) error(backup, "Unexpected disconnect:", l); + // Break the pointer cycle if backups have completed and we are done with txBuffer. + if (state == ENDED && incomplete.empty()) txBuffer = 0; } }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h index cd6c88ad41..31b2b84b0a 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h @@ -90,13 +90,21 @@ class PrimaryTxObserver : public broker::TransactionObserver, typedef qpid::sys::unordered_map< QueuePtr, ReplicationIdSet, Hasher > QueueIdsMap; - void membership(const BrokerInfo::Map&); + enum State { + SENDING, ///< Sending TX messages and acks + PREPARING, ///< Prepare sent, waiting for response + ENDED ///< Commit or rollback sent, local transaction ended. + }; + + void checkState(State expect, const std::string& msg); void end(sys::Mutex::ScopedLock&); void txPrepareOkEvent(const std::string& data); void txPrepareFailEvent(const std::string& data); - + bool completed(const types::Uuid& id, sys::Mutex::ScopedLock&); + bool error(const types::Uuid& id, const char* msg, sys::Mutex::ScopedLock& l); sys::Monitor lock; + State state; std::string logPrefix; Primary& primary; HaBroker& haBroker; @@ -110,10 +118,8 @@ class PrimaryTxObserver : public broker::TransactionObserver, std::string exchangeName; QueuePtr txQueue; QueueIdsMap enqueues; - bool complete; - UuidSet members; // All members of transaction. - UuidSet unprepared; // Members that have not yet responded to prepare. - UuidSet unfinished; // Members that have not yet disconnected. + UuidSet backups; // All backups of transaction. + UuidSet incomplete; // Incomplete backups (not yet responded to prepare) }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 8037559c3d..cc6c8a3f30 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -46,18 +46,13 @@ namespace qpid { namespace ha { using namespace broker; using namespace framing; +using namespace framing::execution; using namespace std; using std::exception; using sys::Mutex; const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency"); -namespace { -const string QPID_HA(QPID_HA_PREFIX); -const std::string TYPE_NAME(QPID_HA+"queue-replicator"); -} - - std::string QueueReplicator::replicatorName(const std::string& queueName) { return QUEUE_REPLICATOR_PREFIX + queueName; } @@ -68,20 +63,21 @@ bool QueueReplicator::isReplicatorName(const std::string& name) { class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener { public: - ErrorListener(const std::string& prefix) : logPrefix(prefix) {} - void connectionException(framing::connection::CloseCode, const std::string& msg) { - QPID_LOG(error, logPrefix << "Connection error: " << msg); - } - void channelException(framing::session::DetachCode, const std::string& msg) { - QPID_LOG(error, logPrefix << "Channel error: " << msg); - } - void executionException(framing::execution::ErrorCode, const std::string& msg) { - QPID_LOG(error, logPrefix << "Execution error: " << msg); + ErrorListener(const boost::shared_ptr& qr) + : queueReplicator(qr), logPrefix(qr->logPrefix) {} + + void connectionException(framing::connection::CloseCode, const std::string&) {} + void channelException(framing::session::DetachCode, const std::string&) {} + void executionException(framing::execution::ErrorCode, const std::string&) {} + + void incomingExecutionException(ErrorCode e, const std::string& msg) { + queueReplicator->incomingExecutionException(e, msg); } void detach() { QPID_LOG(debug, logPrefix << "Session detached"); } private: + boost::shared_ptr queueReplicator; std::string logPrefix; }; @@ -128,6 +124,8 @@ QueueReplicator::QueueReplicator(HaBroker& hb, boost::bind(&QueueReplicator::idEvent, this, _1, _2); } +QueueReplicator::~QueueReplicator() {} + // This must be called immediately after the constructor. // It has to be separate so we can call shared_from_this(). void QueueReplicator::activate() { @@ -161,7 +159,7 @@ void QueueReplicator::activate() { ); bridge = result.first; bridge->setErrorListener( - boost::shared_ptr(new ErrorListener(logPrefix))); + boost::shared_ptr(new ErrorListener(shared_from_this()))); // Enable callback to destroy() queue->addObserver( @@ -173,8 +171,6 @@ void QueueReplicator::disconnect() { sessionHandler = 0; } -QueueReplicator::~QueueReplicator() {} - // Called from Queue::destroyed() void QueueReplicator::destroy() { boost::shared_ptr bridge2; // To call outside of lock @@ -200,7 +196,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa AMQP_ServerProxy peer(sessionHandler->out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); FieldTable arguments; - arguments.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1); + arguments.setString(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, getType()); arguments.setInt(QPID_SYNC_FREQUENCY, 1); // TODO aconway 2012-05-22: optimize? arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO, brokerInfo.asFieldTable()); arguments.setString(ReplicatingSubscription::QPID_ID_SET, @@ -289,12 +285,26 @@ ReplicationId QueueReplicator::getMaxId() { return maxId; } +void QueueReplicator::incomingExecutionException(ErrorCode e, const std::string& msg) { + if (e == ERROR_CODE_NOT_FOUND || e == ERROR_CODE_RESOURCE_DELETED) { + // If the queue is destroyed at the same time we are subscribing, we may + // get a not-found or resource-deleted exception before the + // BrokerReplicator gets the queue-delete event. Shut down the bridge by + // calling destroy(), we can let the BrokerReplicator delete the queue + // when the queue-delete arrives. + QPID_LOG(debug, logPrefix << "Deleted on primary: " << msg); + destroy(); + } + else + QPID_LOG(error, logPrefix << "Incoming execution exception: " << msg); +} + // Unused Exchange methods. bool QueueReplicator::bind(boost::shared_ptr, const std::string&, const FieldTable*) { return false; } bool QueueReplicator::unbind(boost::shared_ptr, const std::string&, const FieldTable*) { return false; } bool QueueReplicator::isBound(boost::shared_ptr, const std::string* const, const FieldTable* const) { return false; } bool QueueReplicator::hasBindings() { return false; } -std::string QueueReplicator::getType() const { return TYPE_NAME; } +std::string QueueReplicator::getType() const { return ReplicatingSubscription::QPID_QUEUE_REPLICATOR; } }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index cbb36757f6..6fd140fde3 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -118,6 +118,9 @@ class QueueReplicator : public broker::Exchange, void dequeueEvent(const std::string& data, sys::Mutex::ScopedLock&); void idEvent(const std::string& data, sys::Mutex::ScopedLock&); + void incomingExecutionException(framing::execution::ErrorCode e, + const std::string& msg); + std::string logPrefix; std::string bridgeName; @@ -127,6 +130,8 @@ class QueueReplicator : public broker::Exchange, ReplicationIdSet idSet; // Set of replicationIds on the queue. ReplicationId nextId; // ID for next message to arrive. ReplicationId maxId; // Max ID used so far. + + friend class ErrorListener; }; diff --git a/qpid/cpp/src/qpid/ha/QueueSnapshots.h b/qpid/cpp/src/qpid/ha/QueueSnapshots.h index d067b983d1..e49fe5c7e7 100644 --- a/qpid/cpp/src/qpid/ha/QueueSnapshots.h +++ b/qpid/cpp/src/qpid/ha/QueueSnapshots.h @@ -32,6 +32,7 @@ #include "qpid/sys/Mutex.h" #include +#include namespace qpid { namespace ha { @@ -44,35 +45,28 @@ class QueueSnapshots : public broker::BrokerObserver { public: boost::shared_ptr get(const boost::shared_ptr& q) const { - sys::Mutex::ScopedLock l(lock); - SnapshotMap::const_iterator i = snapshots.find(q); - return i != snapshots.end() ? i->second : boost::shared_ptr(); + boost::shared_ptr qs; + q->eachObserver( + boost::bind(QueueSnapshots::saveQueueSnapshot, _1, boost::ref(qs))); + return qs; } // BrokerObserver overrides. void queueCreate(const boost::shared_ptr& q) { - sys::Mutex::ScopedLock l(lock); - boost::shared_ptr observer(new QueueSnapshot); - snapshots[q] = observer; - q->addObserver(observer); + q->addObserver(boost::make_shared()); } void queueDestroy(const boost::shared_ptr& q) { - sys::Mutex::ScopedLock l(lock); - SnapshotMap::iterator i = snapshots.find(q); - if (i != snapshots.end()) { - q->removeObserver(i->second); - snapshots.erase(i); - } + q->removeObserver(get(q)); } private: - typedef qpid::sys::unordered_map, - boost::shared_ptr, - Hasher > - > SnapshotMap; - SnapshotMap snapshots; - mutable sys::Mutex lock; + static void saveQueueSnapshot( + const boost::shared_ptr& observer, + boost::shared_ptr& out) + { + if (!out) out = boost::dynamic_pointer_cast(observer); + } }; diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index cdfe9dd888..d0b93da85f 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -25,15 +25,16 @@ #include "QueueReplicator.h" #include "QueueSnapshots.h" #include "ReplicatingSubscription.h" +#include "TxReplicatingSubscription.h" #include "Primary.h" #include "HaBroker.h" #include "qpid/assert.h" #include "qpid/broker/Queue.h" -#include "qpid/broker/QueueObserver.h" #include "qpid/broker/SessionContext.h" #include "qpid/broker/amqp_0_10/MessageTransfer.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" #include "qpid/types/Uuid.h" #include @@ -47,22 +48,12 @@ using namespace broker; using namespace std; using sys::Mutex; using broker::amqp_0_10::MessageTransfer; - -const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.ha-replicating-subscription"); -const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.ha-broker-info"); -const string ReplicatingSubscription::QPID_ID_SET("qpid.ha-info"); - -class ReplicatingSubscription::QueueObserver : public broker::QueueObserver { - public: - QueueObserver(ReplicatingSubscription& rs_) : rs(rs_) {} - void enqueued(const broker::Message&) {} - void dequeued(const broker::Message& m) { rs.dequeued(m.getReplicationId()); } - void acquired(const broker::Message&) {} - void requeued(const broker::Message&) {} - private: - ReplicatingSubscription& rs; -}; - +namespace { const string QPID_HA(QPID_HA_PREFIX); } +const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION(QPID_HA+"repsub"); +const string ReplicatingSubscription::QPID_BROKER_INFO(QPID_HA+"info"); +const string ReplicatingSubscription::QPID_ID_SET(QPID_HA+"ids"); +const string ReplicatingSubscription::QPID_QUEUE_REPLICATOR(QPID_HA+"qrep"); +const string ReplicatingSubscription::QPID_TX_REPLICATOR(QPID_HA+"txrep"); /* Called by SemanticState::consume to create a consumer */ boost::shared_ptr @@ -79,13 +70,20 @@ ReplicatingSubscription::Factory::create( const framing::FieldTable& arguments ) { boost::shared_ptr rs; - if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) { + std::string type = arguments.getAsString(QPID_REPLICATING_SUBSCRIPTION); + if (type == QPID_QUEUE_REPLICATOR) { rs.reset(new ReplicatingSubscription( haBroker, parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); - rs->initialize(); } + else if (type == QPID_TX_REPLICATOR) { + rs.reset(new TxReplicatingSubscription( + haBroker, + parent, name, queue, ack, acquire, exclusive, tag, + resumeId, resumeTtl, arguments)); + } + if (rs) rs->initialize(); return rs; } @@ -100,7 +98,7 @@ ReplicatingSubscription::ReplicatingSubscription( HaBroker& hb, SemanticState* parent, const string& name, - Queue::shared_ptr queue, + Queue::shared_ptr queue_, bool ack, bool /*acquire*/, bool exclusive, @@ -108,16 +106,22 @@ ReplicatingSubscription::ReplicatingSubscription( const string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments -) : ConsumerImpl(parent, name, queue, ack, REPLICATOR, exclusive, tag, +) : ConsumerImpl(parent, name, queue_, ack, REPLICATOR, exclusive, tag, resumeId, resumeTtl, arguments), position(0), ready(false), cancelled(false), haBroker(hb), primary(boost::dynamic_pointer_cast(haBroker.getRole())) -{ +{} + +// Called in subscription's connection thread when the subscription is created. +// Separate from ctor because we need to use shared_from_this +// +void ReplicatingSubscription::initialize() { try { FieldTable ft; - if (!arguments.getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft)) - throw Exception("Replicating subscription does not have broker info: " + tag); + if (!getArguments().getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft)) + throw InvalidArgumentException( + logPrefix+"Can't subscribe, no broker info: "+getTag()); info.assign(ft); // Set a log prefix message that identifies the remote broker. @@ -147,10 +151,17 @@ ReplicatingSubscription::ReplicatingSubscription( // However we must attach the observer _before_ we snapshot for // initial dequeues to be sure we don't miss any dequeues // between the snapshot and attaching the observer. - observer.reset(new QueueObserver(*this)); - queue->addObserver(observer); - ReplicationIdSet primaryIds = haBroker.getQueueSnapshots()->get(queue)->snapshot(); - std::string backupStr = arguments.getAsString(ReplicatingSubscription::QPID_ID_SET); + queue->addObserver( + boost::dynamic_pointer_cast(shared_from_this())); + boost::shared_ptr snapshot = haBroker.getQueueSnapshots()->get(queue); + // There may be no snapshot if the queue is being deleted concurrently. + if (!snapshot) { + queue->removeObserver( + boost::dynamic_pointer_cast(shared_from_this())); + throw ResourceDeletedException(logPrefix+"Can't subscribe, queue deleted"); + } + ReplicationIdSet primaryIds = snapshot->snapshot(); + std::string backupStr = getArguments().getAsString(ReplicatingSubscription::QPID_ID_SET); ReplicationIdSet backupIds; if (!backupStr.empty()) backupIds = decodeStr(backupStr); @@ -172,23 +183,7 @@ ReplicatingSubscription::ReplicatingSubscription( << ", on backup " << skip); checkReady(l); } - } - catch (const std::exception& e) { - QPID_LOG(error, logPrefix << "Creation error: " << e.what() - << ": arguments=" << getArguments()); - throw; - } -} -ReplicatingSubscription::~ReplicatingSubscription() {} - - -// Called in subscription's connection thread when the subscription is created. -// Called separate from ctor because sending events requires -// shared_from_this -// -void ReplicatingSubscription::initialize() { - try { if (primary) primary->addReplica(*this); Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently. // Send initial dequeues to the backup. @@ -196,12 +191,14 @@ void ReplicatingSubscription::initialize() { sendDequeueEvent(l); } catch (const std::exception& e) { - QPID_LOG(error, logPrefix << "Initialization error: " << e.what() - << ": arguments=" << getArguments()); + QPID_LOG(error, logPrefix << "Subscribe failed: " << e.what()); throw; } } +ReplicatingSubscription::~ReplicatingSubscription() {} + + // True if the next position for the ReplicatingSubscription is a guarded position. bool ReplicatingSubscription::isGuarded(sys::Mutex::ScopedLock&) { return position+1 >= guard->getFirst(); @@ -258,7 +255,8 @@ void ReplicatingSubscription::cancel() } QPID_LOG(debug, logPrefix << "Cancelled"); if (primary) primary->removeReplica(*this); - getQueue()->removeObserver(observer); + getQueue()->removeObserver( + boost::dynamic_pointer_cast(shared_from_this())); guard->cancel(); ConsumerImpl::cancel(); } @@ -289,8 +287,9 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock& l) // Called after the message has been removed // from the deque and under the messageLock in the queue. Called in // arbitrary connection threads. -void ReplicatingSubscription::dequeued(ReplicationId id) +void ReplicatingSubscription::dequeued(const broker::Message& m) { + ReplicationId id = m.getReplicationId(); QPID_LOG(trace, logPrefix << "Dequeued ID " << id); { Mutex::ScopedLock l(lock); diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index 71993bcb12..0df6f0b411 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -25,6 +25,8 @@ #include "BrokerInfo.h" #include "qpid/broker/SemanticState.h" #include "qpid/broker/ConsumerFactory.h" +#include "qpid/broker/QueueObserver.h" +#include #include namespace qpid { @@ -65,81 +67,91 @@ class Primary; * * ReplicatingSubscription makes calls on QueueGuard, but not vice-versa. */ -class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl +class ReplicatingSubscription : + public broker::SemanticState::ConsumerImpl, + public broker::QueueObserver { - public: - typedef broker::SemanticState::ConsumerImpl ConsumerImpl; - - class Factory : public broker::ConsumerFactory { - public: - Factory(HaBroker& hb) : haBroker(hb) {} - - HaBroker& getHaBroker() const { return haBroker; } - - boost::shared_ptr create( - broker::SemanticState* parent, - const std::string& name, boost::shared_ptr , - bool ack, bool acquire, bool exclusive, const std::string& tag, - const std::string& resumeId, uint64_t resumeTtl, - const framing::FieldTable& arguments); - private: - HaBroker& haBroker; - }; - - // Argument names for consume command. - static const std::string QPID_REPLICATING_SUBSCRIPTION; - static const std::string QPID_BROKER_INFO; - static const std::string QPID_ID_SET; - - ReplicatingSubscription(HaBroker& haBroker, +public: +typedef broker::SemanticState::ConsumerImpl ConsumerImpl; + +class Factory : public broker::ConsumerFactory { +public: +Factory(HaBroker& hb) : haBroker(hb) {} + +HaBroker& getHaBroker() const { return haBroker; } + +boost::shared_ptr create( +broker::SemanticState* parent, + const std::string& name, boost::shared_ptr , + bool ack, bool acquire, bool exclusive, const std::string& tag, + const std::string& resumeId, uint64_t resumeTtl, + const framing::FieldTable& arguments); +private: +HaBroker& haBroker; +}; + +// Argument names for consume command. +static const std::string QPID_REPLICATING_SUBSCRIPTION; +static const std::string QPID_BROKER_INFO; +static const std::string QPID_ID_SET; +// Replicator types: argument values for QPID_REPLICATING_SUBSCRIPTION argument. +static const std::string QPID_QUEUE_REPLICATOR; +static const std::string QPID_TX_REPLICATOR; + +ReplicatingSubscription(HaBroker& haBroker, broker::SemanticState* parent, const std::string& name, boost::shared_ptr , bool ack, bool acquire, bool exclusive, const std::string& tag, const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments); - ~ReplicatingSubscription(); - - - // Consumer overrides. - bool deliver(const broker::QueueCursor& cursor, const broker::Message& msg); - void cancel(); - void acknowledged(const broker::DeliveryRecord&); - bool browseAcquired() const { return true; } - // Hide the "queue deleted" error for a ReplicatingSubscription when a - // queue is deleted, this is normal and not an error. - bool hideDeletedError() { return true; } - // Not counted for auto deletion and immediate message purposes. - bool isCounted() { return false; } - - /** Initialization that must be done separately from construction - * because it requires a shared_ptr to this to exist. - */ - void initialize(); - - BrokerInfo getBrokerInfo() const { return info; } - - /** Skip replicating enqueue of of ids. */ - void addSkip(const ReplicationIdSet& ids); - - protected: - bool doDispatch(); - - private: - class QueueObserver; - friend class QueueObserver; - - std::string logPrefix; - QueuePosition position; - ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event. - ReplicationIdSet skip; // Skip enqueues: messages already on backup and tx enqueues. - ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged. - bool ready; - bool cancelled; - BrokerInfo info; - boost::shared_ptr guard; +~ReplicatingSubscription(); + + +// Consumer overrides. +bool deliver(const broker::QueueCursor& cursor, const broker::Message& msg); +void cancel(); +void acknowledged(const broker::DeliveryRecord&); +bool browseAcquired() const { return true; } +// Hide the "queue deleted" error for a ReplicatingSubscription when a +// queue is deleted, this is normal and not an error. +bool hideDeletedError() { return true; } + +// QueueObserver overrides +void enqueued(const broker::Message&) {} +void dequeued(const broker::Message&); +void acquired(const broker::Message&) {} +void requeued(const broker::Message&) {} + +/** A ReplicatingSubscription is a passive observer, not counted for auto + * deletion and immediate message purposes. + */ +bool isCounted() { return false; } + +/** Initialization that must be done separately from construction + * because it requires a shared_ptr to this to exist. + */ +void initialize(); + +BrokerInfo getBrokerInfo() const { return info; } + +/** Skip replicating enqueue of of ids. */ +void addSkip(const ReplicationIdSet& ids); + +protected: +bool doDispatch(); + +private: +std::string logPrefix; +QueuePosition position; +ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event. +ReplicationIdSet skip; // Skip enqueues: messages already on backup and tx enqueues. +ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged. +bool ready; +bool cancelled; +BrokerInfo info; +boost::shared_ptr guard; HaBroker& haBroker; - boost::shared_ptr observer; boost::shared_ptr primary; bool isGuarded(sys::Mutex::ScopedLock&); diff --git a/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.cpp new file mode 100644 index 0000000000..15b33fe89d --- /dev/null +++ b/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.cpp @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "TxReplicatingSubscription.h" + +namespace qpid { +namespace ha { +using namespace std; +using namespace broker; + +TxReplicatingSubscription::TxReplicatingSubscription( + HaBroker& hb, + SemanticState* parent, + const string& name, + boost::shared_ptr queue, + bool ack, + bool acquire, + bool exclusive, + const string& tag, + const string& resumeId, + uint64_t resumeTtl, + const framing::FieldTable& arguments +) : ReplicatingSubscription(hb, parent, name, queue, ack, acquire, exclusive, tag, + resumeId, resumeTtl, arguments) +{} + +}} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.h new file mode 100644 index 0000000000..a363d262a0 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.h @@ -0,0 +1,50 @@ +#ifndef QPID_HA_TXREPLICATINGSUBSCRIPTION_H +#define QPID_HA_TXREPLICATINGSUBSCRIPTION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ReplicatingSubscription.h" + +namespace qpid { +namespace ha { + +/** + * Replicating subscription for a TX queue. + */ +class TxReplicatingSubscription : public ReplicatingSubscription +{ + public: + TxReplicatingSubscription(HaBroker& haBroker, + broker::SemanticState* parent, + const std::string& name, boost::shared_ptr , + bool ack, bool acquire, bool exclusive, const std::string& tag, + const std::string& resumeId, uint64_t resumeTtl, + const framing::FieldTable& arguments); + + /** A TxReplicatingSubscription is counted for auto-delete so we can clean + * up the TX queue when all backups are done. + */ + bool isCounted() { return true; } +}; +}} // namespace qpid::ha + +#endif /*!QPID_HA_TXREPLICATINGSUBSCRIPTION_H*/ diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.cpp b/qpid/cpp/src/qpid/ha/TxReplicator.cpp index 63301a92f5..95afdb9759 100644 --- a/qpid/cpp/src/qpid/ha/TxReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/TxReplicator.cpp @@ -26,6 +26,7 @@ #include "BrokerReplicator.h" #include "Event.h" #include "HaBroker.h" +#include "ReplicatingSubscription.h" #include "types.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Link.h" @@ -53,10 +54,7 @@ using qpid::broker::amqp_0_10::MessageTransfer; using qpid::types::Uuid; namespace { -const string QPID_HA(QPID_HA_PREFIX); -const string TYPE_NAME(QPID_HA+"tx-replicator"); const string PREFIX(TRANSACTION_REPLICATOR_PREFIX); - } // namespace @@ -70,17 +68,16 @@ string TxReplicator::getTxId(const string& q) { return q.substr(PREFIX.size()); } -string TxReplicator::getType() const { return TYPE_NAME; } +string TxReplicator::getType() const { return ReplicatingSubscription::QPID_TX_REPLICATOR; } TxReplicator::TxReplicator( HaBroker& hb, const boost::shared_ptr& txQueue, const boost::shared_ptr& link) : QueueReplicator(hb, txQueue, link), - txBuffer(new broker::TxBuffer), store(hb.getBroker().hasStore() ? &hb.getBroker().getStore() : 0), channel(link->nextChannel()), - complete(false), ignore(false), + ended(false), dequeueState(hb.getBroker().getQueues()) { string id(getTxId(txQueue->getName())); @@ -100,8 +97,8 @@ TxReplicator::TxReplicator( boost::bind(&TxReplicator::commit, this, _1, _2); dispatch[TxRollbackEvent::KEY] = boost::bind(&TxReplicator::rollback, this, _1, _2); - dispatch[TxMembersEvent::KEY] = - boost::bind(&TxReplicator::members, this, _1, _2); + dispatch[TxBackupsEvent::KEY] = + boost::bind(&TxReplicator::backups, this, _1, _2); } TxReplicator::~TxReplicator() { @@ -121,11 +118,12 @@ void TxReplicator::sendMessage(const broker::Message& msg, sys::Mutex::ScopedLoc } void TxReplicator::route(broker::Deliverable& deliverable) { - if (!ignore) QueueReplicator::route(deliverable); + QueueReplicator::route(deliverable); } void TxReplicator::deliver(const broker::Message& m_) { sys::Mutex::ScopedLock l(lock); + if (!txBuffer) return; // Deliver message to the target queue, not the tx-queue. broker::Message m(m_); m.setReplicationId(enq.id); // Use replicated id. @@ -138,6 +136,7 @@ void TxReplicator::deliver(const broker::Message& m_) { void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) { sys::Mutex::ScopedLock l(lock); + if (!txBuffer) return; TxEnqueueEvent e; decodeStr(data, e); QPID_LOG(trace, logPrefix << "Enqueue: " << e); @@ -146,6 +145,7 @@ void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) { void TxReplicator::dequeue(const string& data, sys::Mutex::ScopedLock&) { sys::Mutex::ScopedLock l(lock); + if (!txBuffer) return; TxDequeueEvent e; decodeStr(data, e); QPID_LOG(trace, logPrefix << "Dequeue: " << e); @@ -195,18 +195,20 @@ boost::shared_ptr TxReplicator::DequeueState::makeAccept() { } void TxReplicator::prepare(const string&, sys::Mutex::ScopedLock& l) { + if (!txBuffer) return; txBuffer->enlist(dequeueState.makeAccept()); context = store->begin(); if (txBuffer->prepare(context.get())) { - QPID_LOG(debug, logPrefix << "Prepared OK"); + QPID_LOG(debug, logPrefix << "Local prepare OK"); sendMessage(TxPrepareOkEvent(haBroker.getSystemId()).message(queue->getName()), l); } else { - QPID_LOG(debug, logPrefix << "Prepare failed"); + QPID_LOG(debug, logPrefix << "Local prepare failed"); sendMessage(TxPrepareFailEvent(haBroker.getSystemId()).message(queue->getName()), l); } } void TxReplicator::commit(const string&, sys::Mutex::ScopedLock& l) { + if (!txBuffer) return; QPID_LOG(debug, logPrefix << "Commit"); if (context.get()) store->commit(*context); txBuffer->commit(); @@ -214,34 +216,44 @@ void TxReplicator::commit(const string&, sys::Mutex::ScopedLock& l) { } void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock& l) { + if (!txBuffer) return; QPID_LOG(debug, logPrefix << "Rollback"); if (context.get()) store->abort(*context); txBuffer->rollback(); end(l); } -void TxReplicator::members(const string& data, sys::Mutex::ScopedLock&) { - TxMembersEvent e; +void TxReplicator::backups(const string& data, sys::Mutex::ScopedLock& l) { + TxBackupsEvent e; decodeStr(data, e); - QPID_LOG(debug, logPrefix << "Members: " << e.members); - if (!e.members.count(haBroker.getMembership().getSelf().getSystemId())) { + if (!e.backups.count(haBroker.getMembership().getSelf().getSystemId())) { QPID_LOG(info, logPrefix << "Not participating in transaction"); - ignore = true; + end(l); + } else { + QPID_LOG(debug, logPrefix << "Backups: " << e.backups); + txBuffer = new broker::TxBuffer; } } void TxReplicator::end(sys::Mutex::ScopedLock&) { - complete = true; - if (!getQueue()) return; // Already destroyed - // Destroy will cancel the subscription to the primary tx-queue which - // informs the primary that we have completed the transaction. - destroy(); + ended = true; + txBuffer.reset(); + // QueueReplicator::destroy cancels subscription to the primary tx-queue + // which allows the primary to clean up resources. + sys::Mutex::ScopedUnlock u(lock); + QueueReplicator::destroy(); } +// Called when the tx queue is deleted. void TxReplicator::destroy() { + { + sys::Mutex::ScopedLock l(lock); + if (!ended) { + QPID_LOG(error, logPrefix << "Destroyed prematurely, rollback."); + rollback(string(), l); + } + } QueueReplicator::destroy(); - sys::Mutex::ScopedLock l(lock); - if (!ignore && !complete) rollback(string(), l); } }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.h b/qpid/cpp/src/qpid/ha/TxReplicator.h index bd80443e50..9d80ecb8d3 100644 --- a/qpid/cpp/src/qpid/ha/TxReplicator.h +++ b/qpid/cpp/src/qpid/ha/TxReplicator.h @@ -84,7 +84,7 @@ class TxReplicator : public QueueReplicator { void prepare(const std::string& data, sys::Mutex::ScopedLock&); void commit(const std::string& data, sys::Mutex::ScopedLock&); void rollback(const std::string& data, sys::Mutex::ScopedLock&); - void members(const std::string& data, sys::Mutex::ScopedLock&); + void backups(const std::string& data, sys::Mutex::ScopedLock&); void end(sys::Mutex::ScopedLock&); std::string logPrefix; @@ -93,7 +93,7 @@ class TxReplicator : public QueueReplicator { broker::MessageStore* store; std::auto_ptr context; framing::ChannelId channel; // Channel to send prepare-complete. - bool complete, ignore; + bool empty, ended; // Class to process dequeues and create DeliveryRecords to populate a // TxAccept. diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index d1f020a945..28e7f6b182 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -381,6 +381,16 @@ class Broker(Popen): "Broker %s not responding: (%s)%s"%( self.name,e,error_line(self.log, 5))) + def assert_log_clean(self, ignore=None): + log = open(self.get_log()) + try: + error = re.compile("] error|] critical") + if ignore: ignore = re.compile(ignore) + else: ignore = re.compile("\000") # Won't match anything + for line in log.readlines(): + assert not error.search(line) or ignore.search(line), "Errors in log file %s: %s"%(log, line) + finally: log.close() + def browse(session, queue, timeout=0, transform=lambda m: m.content): """Return a list with the contents of each message on queue.""" r = session.receiver("%s;{mode:browse}"%(queue)) @@ -549,7 +559,11 @@ class NumberedSender(Thread): self.condition.release() self.write_message(self.sent) self.sent += 1 - except Exception: self.error = RethrownException(self.sender.pname) + except Exception, e: + self.error = RethrownException( + "%s: (%s)%s"%(self.sender.pname,e, + error_line(self.sender.outfile("err")))) + def notify_received(self, count): """Called by receiver to enable flow control. count = messages received so far.""" @@ -612,8 +626,10 @@ class NumberedReceiver(Thread): if self.sender: self.sender.notify_received(self.received) m = self.read_message() - except Exception: - self.error = RethrownException(self.receiver.pname) + except Exception, e: + self.error = RethrownException( + "%s: (%s)%s"%(self.receiver.pname,e, + error_line(self.receiver.outfile("err")))) def check(self): """Raise an exception if there has been an error""" diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 79024d48e3..138868f64e 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -30,19 +30,10 @@ from qpidtoollibs import BrokerAgent, EventHelper log = getLogger(__name__) -def grep(filename, regexp): - for line in open(filename).readlines(): - if (regexp.search(line)): return True - return False class HaBrokerTest(BrokerTest): """Base class for HA broker tests""" - def assert_log_no_errors(self, broker): - log = broker.get_log() - if grep(log, re.compile("] error|] critical")): - self.fail("Errors in log file %s"%(log)) - class ReplicationTests(HaBrokerTest): """Correctness tests for HA replication.""" @@ -838,7 +829,7 @@ acl deny all all # It is possible for the backup to attempt to subscribe after the queue # is deleted. This is not an error, but is logged as an error on the primary. # The backup does not log this as an error so we only check the backup log for errors. - self.assert_log_no_errors(cluster[1]) + cluster[1].assert_log_clean() def test_missed_recreate(self): """If a queue or exchange is destroyed and one with the same name re-created @@ -1003,6 +994,32 @@ class LongTests(HaBrokerTest): dead = filter(lambda b: not b.is_running(), brokers) if dead: raise Exception("Brokers not running: %s"%dead) + def test_tx_send_receive(self): + brokers = HaCluster(self, 3) + sender = self.popen( + ["qpid-send", + "--broker", brokers[0].host_port(), + "--address", "q;{create:always}", + "--messages=1000", + "--tx=10" + ]) + receiver = self.popen( + ["qpid-receive", + "--broker", brokers[0].host_port(), + "--address", "q;{create:always}", + "--messages=990", + "--timeout=10", + "--tx=10" + ]) + self.assertEqual(sender.wait(), 0) + self.assertEqual(receiver.wait(), 0) + expect = [long(i) for i in range(991, 1001)] + sn = lambda m: m.properties["sn"] + brokers[0].assert_browse("q", expect, transform=sn) + brokers[1].assert_browse_backup("q", expect, transform=sn) + brokers[2].assert_browse_backup("q", expect, transform=sn) + + def test_qmf_order(self): """QPID 4402: HA QMF events can be out of order. This test mimics the test described in the JIRA. Two threads repeatedly @@ -1352,12 +1369,14 @@ class TransactionTests(HaBrokerTest): def assert_tx_clean(self, b): """Verify that there are no transaction artifacts (exchanges, queues, subscriptions) on b.""" - queues=[] - def txq(): queues = b.agent().tx_queues(); return not queues - assert retry(txq), "%s: unexpected %s"%(b,queues) - subs=[] - def txs(): subs = self.tx_subscriptions(b); return not subs - assert retry(txs), "%s: unexpected %s"%(b,subs) + class FunctionCache: # Call a function and cache the result. + def __init__(self, f): self.f, self.value = f, None + def __call__(self): self.value = self.f(); return self.value + + txq= FunctionCache(b.agent().tx_queues) + assert retry(lambda: not txq()), "%s: unexpected %s"%(b, txq.value) + txsub = FunctionCache(lambda: self.tx_subscriptions(b)) + assert retry(lambda: not txsub()), "%s: unexpected %s"%(b, txsub.value) # TODO aconway 2013-10-15: TX exchanges don't show up in management. def assert_simple_commit_outcome(self, b, tx_queues): @@ -1462,18 +1481,22 @@ class TransactionTests(HaBrokerTest): self.assertEqual([1,1,1], [len(b.agent().tx_queues()) for b in cluster]) cluster[1].kill(final=False) s.send("b") - self.assert_commit_raises(tx) - for b in [cluster[0],cluster[2]]: self.assert_tx_clean(b) + tx.commit() + tx.close() + for b in [cluster[0],cluster[2]]: + self.assert_tx_clean(b) + b.assert_browse_backup("q", ["a","b"], msg=b) # Joining tx = cluster[0].connect().session(transactional=True) s = tx.sender("q;{create:always}") s.send("foo") - cluster.restart(1) + cluster.restart(1) # Not a part of the current transaction. tx.commit() tx.close() for b in cluster: self.assert_tx_clean(b) # The new member is not in the tx but receives the results normal replication. - for b in cluster: b.assert_browse_backup("q", ["foo"], msg=b) + for b in cluster: b.assert_browse_backup("q", ["a", "b", "foo"], msg=b) + # FIXME aconway 2013-11-07: assert_log_clean def test_tx_block_threads(self): """Verify that TXs blocked in commit don't deadlock.""" -- cgit v1.2.1