diff options
author | Alan Conway <aconway@apache.org> | 2013-08-01 20:27:39 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2013-08-01 20:27:39 +0000 |
commit | 09ba98e9af327f339de442512a288190893f2c92 (patch) | |
tree | 82738a1e51cbf888eab5c497bd2fa1317a32515a | |
parent | 014f0f39d9cfb6242bea173eadbc0f8229ba5f7f (diff) | |
download | qpid-python-09ba98e9af327f339de442512a288190893f2c92.tar.gz |
QPID-4327: HA TX transactions, blocking wait for prepare
Backups send prepare messages to primary, primary delays completion of prepare
till all are prepared (or there is a failure).
This is NOT the production solution - blocking could cause a deadlock. We need
to introduce asynchronous completion of prepare without blocking. This
interim solution allows testing on other aspects of TX support.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1509424 13f79535-47bb-0310-9956-ffa450edef68
21 files changed, 343 insertions, 104 deletions
diff --git a/qpid/cpp/include/qpid/types/Uuid.h b/qpid/cpp/include/qpid/types/Uuid.h index 43ec160ab3..b931670d97 100644 --- a/qpid/cpp/include/qpid/types/Uuid.h +++ b/qpid/cpp/include/qpid/types/Uuid.h @@ -42,6 +42,7 @@ class QPID_TYPES_CLASS_EXTERN Uuid QPID_TYPES_EXTERN Uuid& operator=(const Uuid&); /** Copy the UUID from data16, which must point to a 16-byte UUID */ QPID_TYPES_EXTERN Uuid(const unsigned char* data16); + QPID_TYPES_EXTERN Uuid(const char* data16); /** Set to a new unique identifier. */ QPID_TYPES_EXTERN void generate(); diff --git a/qpid/cpp/src/qpid/framing/FrameSet.h b/qpid/cpp/src/qpid/framing/FrameSet.h index 9640abb7ac..4188fd9b8c 100644 --- a/qpid/cpp/src/qpid/framing/FrameSet.h +++ b/qpid/cpp/src/qpid/framing/FrameSet.h @@ -9,9 +9,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -44,6 +44,8 @@ class FrameSet public: typedef boost::shared_ptr<FrameSet> shared_ptr; + typedef Frames::iterator iterator; + typedef Frames::const_iterator const_iterator; QPID_COMMON_EXTERN FrameSet(const SequenceNumber& id); QPID_COMMON_EXTERN FrameSet(const FrameSet&); @@ -62,7 +64,7 @@ public: QPID_COMMON_EXTERN AMQMethodBody* getMethod(); QPID_COMMON_EXTERN const AMQHeaderBody* getHeaders() const; QPID_COMMON_EXTERN AMQHeaderBody* getHeaders(); - + template <class T> bool isA() const { const AMQMethodBody* method = getMethod(); return method && method->isA<T>(); @@ -71,12 +73,12 @@ public: template <class T> const T* as() const { const AMQMethodBody* method = getMethod(); return (method && method->isA<T>()) ? dynamic_cast<const T*>(method) : 0; - } + } template <class T> T* as() { AMQMethodBody* method = getMethod(); return (method && method->isA<T>()) ? dynamic_cast<T*>(method) : 0; - } + } template <class T> const T* getHeaderProperties() const { const AMQHeaderBody* header = getHeaders(); @@ -85,7 +87,7 @@ public: Frames::const_iterator begin() const { return parts.begin(); } Frames::const_iterator end() const { return parts.end(); } - + const SequenceNumber& getId() const { return id; } template <class P> void remove(P predicate) { diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index f882cbcbf1..47860b433f 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -130,7 +130,6 @@ const string COLON(":"); void sendQuery(const string& packageName, const string& className, const string& queueName, SessionHandler& sessionHandler) { - framing::AMQP_ServerProxy peer(sessionHandler.out); Variant::Map request; request[WHAT] = OBJECT; Variant::Map schema; @@ -649,9 +648,6 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); if (!replicationTest.getLevel(argsMap)) return; string name(values[NAME].asString()); - - if (TxReplicator::isTxQueue(name)) return; // Can't join a transaction in progress. - if (!queueTracker.get()) throw Exception(QPID_MSG("Unexpected queue response: " << values)); if (!queueTracker->response(name)) return; // Response is out-of-date diff --git a/qpid/cpp/src/qpid/ha/Event.cpp b/qpid/cpp/src/qpid/ha/Event.cpp index fdd8bc85cc..25e7947267 100644 --- a/qpid/cpp/src/qpid/ha/Event.cpp +++ b/qpid/cpp/src/qpid/ha/Event.cpp @@ -49,28 +49,36 @@ const string TxDequeueEvent::KEY(QPID_HA+"txde"); const string TxPrepareEvent::KEY(QPID_HA+"txpr"); const string TxCommitEvent::KEY(QPID_HA+"txcm"); const string TxRollbackEvent::KEY(QPID_HA+"txrb"); +const string TxPrepareOkEvent::KEY(QPID_HA+"txok"); +const string TxPrepareFailEvent::KEY(QPID_HA+"txno"); -broker::Message makeMessage(const string& data, const string& key) { +broker::Message makeMessage( + const string& data, const string& destination, const string& routingKey) +{ boost::intrusive_ptr<MessageTransfer> transfer(new MessageTransfer()); - AMQFrame method((MessageTransferBody(ProtocolVersion(), key, 0, 0))); + AMQFrame method((MessageTransferBody(ProtocolVersion(), destination, 0, 0))); + method.setBof(true); + method.setEof(false); + method.setBos(true); + method.setEos(true); AMQFrame header((AMQHeaderBody())); - AMQFrame content((AMQContentBody())); - Buffer buffer(const_cast<char*>(&data[0]), data.size()); - // AMQContentBody::decode is missing a const declaration, so cast it here. - content.castBody<AMQContentBody>()->decode( - const_cast<Buffer&>(buffer), buffer.getSize()); header.setBof(false); header.setEof(false); header.setBos(true); header.setEos(true); + AMQFrame content((AMQContentBody())); content.setBof(false); content.setEof(true); content.setBos(true); content.setEos(true); + Buffer buffer(const_cast<char*>(&data[0]), data.size()); + content.castBody<AMQContentBody>()->decode( + const_cast<Buffer&>(buffer), buffer.getSize()); transfer->getFrames().append(method); transfer->getFrames().append(header); transfer->getFrames().append(content); - transfer->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(key); + transfer->getFrames().getHeaders()-> + get<DeliveryProperties>(true)->setRoutingKey(routingKey); return broker::Message(transfer, 0); } diff --git a/qpid/cpp/src/qpid/ha/Event.h b/qpid/cpp/src/qpid/ha/Event.h index 08174bfc9d..daaa6eada3 100644 --- a/qpid/cpp/src/qpid/ha/Event.h +++ b/qpid/cpp/src/qpid/ha/Event.h @@ -33,7 +33,10 @@ namespace qpid { namespace ha { -broker::Message makeMessage(const std::string& content, const std::string& destination); +broker::Message makeMessage( + const std::string& content, + const std::string& destination, + const std::string& routingKey); /** Test if a string is an event key */ @@ -47,7 +50,8 @@ struct Event { virtual size_t encodedSize() const = 0; virtual std::string key() const = 0; // Routing key virtual void print(std::ostream& o) const = 0; - broker::Message message() const { return makeMessage(framing::encodeStr(*this), key()); } + broker::Message message(const std::string& destination=std::string()) const { + return makeMessage(framing::encodeStr(*this), destination, key()); } }; @@ -140,6 +144,37 @@ struct TxRollbackEvent : public EventBase<TxRollbackEvent> { void print(std::ostream&) const {} }; +struct TxPrepareOkEvent : public EventBase<TxPrepareOkEvent> { + static const std::string KEY; + types::Uuid broker; + TxPrepareOkEvent(const types::Uuid& b=types::Uuid()) : broker(b) {} + + void encode(framing::Buffer& b) const { + b.putRawData(broker.data(), broker.size()); + } + + void decode(framing::Buffer& b) { + std::string s; + b.getRawData(s, broker.size()); + broker = types::Uuid(&s[0]); + } + virtual size_t encodedSize() const { return broker.size(); } + void print(std::ostream& o) const { o << broker; } +}; + +struct TxPrepareFailEvent : public EventBase<TxPrepareFailEvent> { + static const std::string KEY; + types::Uuid broker; + TxPrepareFailEvent(const types::Uuid& b=types::Uuid()) : broker(b) {} + void encode(framing::Buffer& b) const { b.putRawData(broker.data(), broker.size()); } + void decode(framing::Buffer& b) { + std::string s; + b.getRawData(s, broker.size()); + broker = types::Uuid(&s[0]); + } + virtual size_t encodedSize() const { return broker.size(); } + void print(std::ostream& o) const { o << broker; } +}; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/FailoverExchange.cpp b/qpid/cpp/src/qpid/ha/FailoverExchange.cpp index 46cc345ab0..9c7b986bf8 100644 --- a/qpid/cpp/src/qpid/ha/FailoverExchange.cpp +++ b/qpid/cpp/src/qpid/ha/FailoverExchange.cpp @@ -117,7 +117,7 @@ void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue, sys::Mutex::Sc if (urls.empty()) return; framing::Array array = vectorToUrlArray(urls); const ProtocolVersion v; - broker::Message message(makeMessage(std::string(), typeName)); + broker::Message message(makeMessage(std::string(), typeName, typeName)); MessageTransfer& transfer = MessageTransfer::get(message); MessageProperties* props = transfer.getFrames().getHeaders()->get<framing::MessageProperties>(true); diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 5fd7814d62..4208147ff4 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -390,12 +390,16 @@ void Primary::setCatchupQueues(const RemoteBackupPtr& backup, bool createGuards) void Primary::startTx(const boost::shared_ptr<broker::TxBuffer>& tx) { QPID_LOG(trace, logPrefix << "Started TX transaction"); - tx->setObserver(make_shared<PrimaryTxObserver>(boost::ref(haBroker))); + shared_ptr<PrimaryTxObserver> observer(new PrimaryTxObserver(haBroker)); + observer->initialize(); + tx->setObserver(observer); } void Primary::startDtx(const boost::shared_ptr<broker::DtxBuffer>& dtx) { QPID_LOG(trace, logPrefix << "Started DTX transaction"); - dtx->setObserver(make_shared<PrimaryTxObserver>(boost::ref(haBroker))); + shared_ptr<PrimaryTxObserver> observer(new PrimaryTxObserver(haBroker)); + observer->initialize(); + dtx->setObserver(observer); } }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp index 8a8364ac22..f400dbf99d 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp @@ -26,24 +26,73 @@ #include "QueueGuard.h" #include "RemoteBackup.h" #include "ReplicatingSubscription.h" +#include "QueueReplicator.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include <boost/lexical_cast.hpp> +#include <boost/make_shared.hpp> namespace qpid { +namespace framing { +class FieldTable; +} namespace ha { using namespace std; using namespace boost; -using namespace broker; +using namespace qpid::broker; +using namespace qpid::framing; + +// Exchange to receive prepare OK events. +class PrimaryTxObserver::Exchange : public broker::Exchange { + public: + Exchange(const boost::shared_ptr<PrimaryTxObserver>& tx_) : + broker::Exchange(TRANSACTION_REPLICATOR_PREFIX+tx_->getId().str()), + tx(tx_) + { + dispatch[TxPrepareOkEvent::KEY] = + boost::bind(&PrimaryTxObserver::txPrepareOkEvent, tx, _1); + dispatch[TxPrepareFailEvent::KEY] = + boost::bind(&PrimaryTxObserver::txPrepareFailEvent, tx, _1); + } + + void route(Deliverable& deliverable) { + const broker::Message& message(deliverable.getMessage()); + DispatchMap::iterator i = dispatch.find(message.getRoutingKey()); + if (i != dispatch.end()) i->second(message.getContent()); + } + + bool bind(boost::shared_ptr<Queue>, const string&, const FieldTable*) { return false; } + bool unbind(boost::shared_ptr<Queue>, const string&, const FieldTable*) { return false; } + bool isBound(boost::shared_ptr<Queue>, const string* const, const FieldTable* const) { return false; } + string getType() const { return TYPE_NAME; } + + private: + static const string TYPE_NAME; + typedef function<void(const std::string&)> DispatchFn; + typedef qpid::sys::unordered_map<std::string, DispatchFn> DispatchMap; + + DispatchMap dispatch; + boost::shared_ptr<PrimaryTxObserver> tx; +}; +const string PrimaryTxObserver::Exchange::TYPE_NAME(string(QPID_HA_PREFIX)+"primary-tx-observer"); PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) : - haBroker(hb), broker(hb.getBroker()), - id(true) // FIXME aconway 2013-07-11: is UUID an appropriate TX ID? + haBroker(hb), broker(hb.getBroker()), id(true), failed(false) { - logPrefix = "Primary transaction "+id.str().substr(0,8)+": "; - QPID_LOG(trace, logPrefix << "started"); + logPrefix = "Primary transaction "+shortStr(id)+": "; + + // The brokers known at this point are the ones that will be included + // in the transaction. Brokers that join later are not included + // Latecomers that have replicated the transaction will be rolled back + // when the tx-queue is deleted. + // + BrokerInfo::Set infoSet(haBroker.getMembership().otherBackups()); + transform(infoSet.begin(), infoSet.end(), inserter(backups, backups.begin()), + bind(&BrokerInfo::getSystemId, _1)); + QPID_LOG(trace, logPrefix << "Started on " << backups); + pair<shared_ptr<Queue>, bool> result = broker.getQueues().declare( TRANSACTION_REPLICATOR_PREFIX+id.str(), @@ -52,8 +101,13 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) : txQueue = result.first; } +void PrimaryTxObserver::initialize() { + broker.getExchanges().registerExchange(make_shared<Exchange>(shared_from_this())); +} + void PrimaryTxObserver::enqueue(const QueuePtr& q, const broker::Message& m) { + sys::Mutex::ScopedLock l(lock); QPID_LOG(trace, logPrefix << "enqueue: " << LogMessageId(*q, m)); enqueues[q] += m.getReplicationId(); txQueue->deliver(TxEnqueueEvent(q->getName(), m.getReplicationId()).message()); @@ -63,42 +117,63 @@ void PrimaryTxObserver::enqueue(const QueuePtr& q, const broker::Message& m) void PrimaryTxObserver::dequeue( const QueuePtr& q, QueuePosition pos, ReplicationId id) { + sys::Mutex::ScopedLock l(lock); QPID_LOG(trace, logPrefix << "dequeue: " << LogMessageId(*q, pos, id)); txQueue->deliver(TxDequeueEvent(q->getName(), id).message()); } -void PrimaryTxObserver::deduplicate() { +void PrimaryTxObserver::deduplicate(sys::Mutex::ScopedLock&) { shared_ptr<Primary> primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole())); assert(primary); - // FIXME aconway 2013-07-29: need to verify which backups are *in* the transaction. - // Use cluster membership for now - BrokerInfo::Set brokers = haBroker.getMembership().getBrokers(); - types::Uuid selfId = haBroker.getMembership().getSelf(); // Tell replicating subscriptions to skip IDs in the transaction. - for (BrokerInfo::Set::iterator b = brokers.begin(); b != brokers.end(); ++b) { - if (b->getSystemId() == selfId) continue; + for (UuidSet::iterator b = backups.begin(); b != backups.end(); ++b) for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q) - primary->skip(b->getSystemId(), q->first, q->second); - } + primary->skip(*b, q->first, q->second); } bool PrimaryTxObserver::prepare() { - // FIXME aconway 2013-07-23: need to delay completion of prepare till all - // backups have prepared. - QPID_LOG(trace, logPrefix << "prepare"); - deduplicate(); + sys::Mutex::ScopedLock l(lock); + // FIXME aconway 2013-07-23: WRONG blocking. Need async completion. + QPID_LOG(trace, logPrefix << "Prepare"); + deduplicate(l); txQueue->deliver(TxPrepareEvent().message()); - return true; + while (!isPrepared(l)) lock.wait(); + return !failed; } void PrimaryTxObserver::commit() { - QPID_LOG(trace, logPrefix << "commit"); + sys::Mutex::ScopedLock l(lock); + QPID_LOG(trace, logPrefix << "Commit"); txQueue->deliver(TxCommitEvent().message()); } void PrimaryTxObserver::rollback() { - QPID_LOG(trace, logPrefix << "rollback"); + sys::Mutex::ScopedLock l(lock); + QPID_LOG(trace, logPrefix << "Rollback"); txQueue->deliver(TxRollbackEvent().message()); } +void PrimaryTxObserver::txPrepareOkEvent(const string& data) { + QPID_LOG(critical, logPrefix << "FIXME data: " << data); + sys::Mutex::ScopedLock l(lock); + types::Uuid backup = decodeStr<TxPrepareOkEvent>(data).broker; + QPID_LOG(critical, logPrefix << "FIXME backup: " << backup); + QPID_LOG(trace, logPrefix << "Backup prepared ok: " << backup); + prepared.insert(backup); + lock.notify(); +} + +void PrimaryTxObserver::txPrepareFailEvent(const string& data) { + sys::Mutex::ScopedLock l(lock); + types::Uuid backup = decodeStr<TxPrepareFailEvent>(data).broker; + QPID_LOG(error, logPrefix << "Backup prepare failed: " << backup); + prepared.insert(backup); + failed = true; + lock.notify(); +} + +bool PrimaryTxObserver::isPrepared(sys::Mutex::ScopedLock&) { + return (prepared == backups || failed); +} + }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h index 3681c6b750..413c627d68 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h @@ -26,13 +26,18 @@ #include "qpid/broker/TransactionObserver.h" #include "qpid/log/Statement.h" -#include "qpid/framing/Uuid.h" +#include "qpid/types/Uuid.h" #include "qpid/sys/unordered_map.h" +#include "qpid/sys/Monitor.h" #include <boost/functional/hash.hpp> +#include <boost/enable_shared_from_this.hpp> + namespace qpid { namespace broker { class Broker; +class Message; +class Consumer; } namespace ha { @@ -46,30 +51,51 @@ class HaBroker; * A TxReplicator on the backup replicates the tx-queue and creates * a TxBuffer on the backup equivalent to the one on the primary. * - * THREAD UNSAFE: called sequentially in the context of a transaction. + * Also observes the tx-queue for prepare-complete messages and + * subscription cancellations. + * + * THREAD SAFE: called in user connection thread for TX events, + * and in backup connection threads for prepare-completed events + * and unsubscriptions. */ -class PrimaryTxObserver : public broker::TransactionObserver { +class PrimaryTxObserver : public broker::TransactionObserver, + public boost::enable_shared_from_this<PrimaryTxObserver> +{ public: PrimaryTxObserver(HaBroker&); + /** Call immediately after constructor, uses shared_from_this. */ + void initialize(); + void enqueue(const QueuePtr&, const broker::Message&); void dequeue(const QueuePtr& queue, QueuePosition, ReplicationId); bool prepare(); void commit(); void rollback(); + types::Uuid getId() const { return id; } + private: + class Exchange; typedef qpid::sys::unordered_map< QueuePtr, ReplicationIdSet, boost::hash<QueuePtr> > QueueIdsMap; - void deduplicate(); + void deduplicate(sys::Mutex::ScopedLock&); + void txPrepareOkEvent(const std::string& data); + void txPrepareFailEvent(const std::string& data); + void consumerRemoved(const broker::Consumer&); + bool isPrepared(sys::Mutex::ScopedLock&); + sys::Monitor lock; std::string logPrefix; HaBroker& haBroker; broker::Broker& broker; - framing::Uuid id; + types::Uuid id; QueuePtr txQueue; QueueIdsMap enqueues; + bool failed; + UuidSet backups; + UuidSet prepared; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 28e9dc4120..9149567cf2 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -107,9 +107,13 @@ QueueReplicator::QueueReplicator(HaBroker& hb, : Exchange(replicatorName(q->getName()), 0, q->getBroker()), haBroker(hb), brokerInfo(hb.getBrokerInfo()), + link(l), + queue(q), + sessionHandler(0), logPrefix("Backup of "+q->getName()+": "), - queue(q), link(l), subscribed(false), - settings(hb.getSettings()), destroyed(false), + subscribed(false), + settings(hb.getSettings()), + destroyed(false), nextId(0), maxId(0) { args.setString(QPID_REPLICATE, printable(NONE).str()); @@ -176,22 +180,23 @@ void QueueReplicator::destroy() { if (destroyed) return; destroyed = true; QPID_LOG(debug, logPrefix << "Destroyed"); + bridge2 = bridge; // call close outside the lock. // Need to drop shared pointers to avoid pointer cycles keeping this in memory. queue.reset(); link.reset(); bridge.reset(); getBroker()->getExchanges().destroy(getName()); - bridge2 = bridge; } if (bridge2) bridge2->close(); // Outside of lock, avoid deadlock. } // Called in a broker connection thread when the bridge is created. // Note: called with the Link lock held. -void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { +void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler_) { Mutex::ScopedLock l(lock); if (destroyed) return; // Already destroyed - AMQP_ServerProxy peer(sessionHandler.out); + sessionHandler = &sessionHandler_; + AMQP_ServerProxy peer(sessionHandler->out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); FieldTable arguments; arguments.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1); diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index 90f38ce7e1..cbc950d4bc 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -72,11 +72,8 @@ class QueueReplicator : public broker::Exchange, void activate(); // Must be called immediately after constructor. std::string getType() const; - bool bind(boost::shared_ptr<broker::Queue - >, const std::string&, const framing::FieldTable*); - bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); + void route(broker::Deliverable&); - bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); // Set if the queue has ever been subscribed to, used for auto-delete cleanup. void setSubscribed() { subscribed = true; } @@ -86,16 +83,26 @@ class QueueReplicator : public broker::Exchange, ReplicationId getMaxId(); + // No-op unused Exchange virtual functions. + bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); + bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); + bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); + protected: typedef boost::function<void(const std::string&, sys::Mutex::ScopedLock&)> DispatchFn; typedef qpid::sys::unordered_map<std::string, DispatchFn> DispatchMap; virtual void deliver(const broker::Message&); + virtual void destroy(); // Called when the queue is destroyed. sys::Mutex lock; HaBroker& haBroker; const BrokerInfo brokerInfo; DispatchMap dispatch; + boost::shared_ptr<broker::Link> link; + boost::shared_ptr<broker::Bridge> bridge; + boost::shared_ptr<broker::Queue> queue; + broker::SessionHandler* sessionHandler; private: typedef qpid::sys::unordered_map< @@ -104,7 +111,6 @@ class QueueReplicator : public broker::Exchange, class QueueObserver; void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler); - void destroy(); // Called when the queue is destroyed. // Dispatch functions void dequeueEvent(const std::string& data, sys::Mutex::ScopedLock&); @@ -112,9 +118,6 @@ class QueueReplicator : public broker::Exchange, std::string logPrefix; std::string bridgeName; - boost::shared_ptr<broker::Queue> queue; - boost::shared_ptr<broker::Link> link; - boost::shared_ptr<broker::Bridge> bridge; bool subscribed; const Settings& settings; diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp index 776a584bc8..0993c6ea39 100644 --- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp @@ -66,8 +66,6 @@ bool RemoteBackup::isReady() { } void RemoteBackup::catchupQueue(const QueuePtr& q, bool createGuard) { - // Ignore transaction queues for purposes of catch-up calculation - if (TxReplicator::isTxQueue(q->getName())) return; if (replicationTest.getLevel(*q) == ALL) { QPID_LOG(debug, logPrefix << "Catch-up queue" << (createGuard ? " and guard" : "") << ": " << q->getName()); diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index 8a4a48bb4e..71993bcb12 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -25,7 +25,6 @@ #include "BrokerInfo.h" #include "qpid/broker/SemanticState.h" #include "qpid/broker/ConsumerFactory.h" -#include "qpid/types/Uuid.h" #include <iosfwd> namespace qpid { diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.cpp b/qpid/cpp/src/qpid/ha/TxReplicator.cpp index 31c68dfe45..58963149fc 100644 --- a/qpid/cpp/src/qpid/ha/TxReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/TxReplicator.cpp @@ -28,15 +28,20 @@ #include "HaBroker.h" #include "types.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/Link.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/SessionHandler.h" #include "qpid/broker/TxBuffer.h" #include "qpid/broker/TxAccept.h" +#include "qpid/broker/amqp_0_10/Connection.h" #include "qpid/framing/BufferTypes.h" #include "qpid/log/Statement.h" #include <boost/shared_ptr.hpp> #include <boost/bind.hpp> #include <boost/make_shared.hpp> +#include "qpid/broker/amqp_0_10/MessageTransfer.h" +#include "qpid/framing/MessageTransferBody.h" namespace qpid { namespace ha { @@ -45,14 +50,18 @@ using namespace std; using namespace boost; using namespace qpid::broker; using namespace qpid::framing; +using qpid::broker::amqp_0_10::MessageTransfer; +using qpid::types::Uuid; namespace { const string QPID_HA(QPID_HA_PREFIX); -const string TYPE_NAME(QPID_HA+"tx-queue-replicator"); +const string TYPE_NAME(QPID_HA+"tx-replicator"); const string PREFIX(TRANSACTION_REPLICATOR_PREFIX); + } // namespace + bool TxReplicator::isTxQueue(const string& q) { return startsWith(q, PREFIX); } @@ -69,13 +78,13 @@ TxReplicator::TxReplicator( const boost::shared_ptr<broker::Queue>& txQueue, const boost::shared_ptr<broker::Link>& link) : QueueReplicator(hb, txQueue, link), - id(getTxId(txQueue->getName())), txBuffer(new broker::TxBuffer), - broker(hb.getBroker()), - store(broker.hasStore() ? &broker.getStore() : 0), + store(hb.getBroker().hasStore() ? &hb.getBroker().getStore() : 0), + channel(link->nextChannel()), dequeueState(hb.getBroker().getQueues()) { - logPrefix = "Backup of transaction "+id+": "; + string shortId = getTxId(txQueue->getName()).substr(0, 8); + logPrefix = "Backup of transaction "+shortId+": "; if (!store) throw Exception(QPID_MSG(logPrefix << "No message store loaded.")); boost::shared_ptr<Backup> backup = dynamic_pointer_cast<Backup>(hb.getRole()); @@ -95,17 +104,45 @@ TxReplicator::TxReplicator( boost::bind(&TxReplicator::rollback, this, _1, _2); } +TxReplicator::~TxReplicator() { + link->returnChannel(channel); +} + +// Send a message to the primary tx. +void TxReplicator::sendMessage(const broker::Message& msg, sys::Mutex::ScopedLock&) { + assert(sessionHandler); + const MessageTransfer& transfer(MessageTransfer::get(msg)); + for (FrameSet::const_iterator i = transfer.getFrames().begin(); + i != transfer.getFrames().end(); + ++i) + { + sessionHandler->out.handle(const_cast<AMQFrame&>(*i)); + } +} + void TxReplicator::deliver(const broker::Message& m_) { + sys::Mutex::ScopedLock l(lock); // Deliver message to the target queue, not the tx-queue. broker::Message m(m_); m.setReplicationId(enq.id); // Use replicated id. - boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(enq.queue); + boost::shared_ptr<broker::Queue> queue = + haBroker.getBroker().getQueues().get(enq.queue); QPID_LOG(trace, logPrefix << "Deliver " << LogMessageId(*queue, m)); DeliverableMessage dm(m, txBuffer.get()); dm.deliverTo(queue); } +void TxReplicator::destroy() { + { + sys::Mutex::ScopedLock l(lock); + if (context.get()) store->abort(*context); + txBuffer->rollback(); + } + QueueReplicator::destroy(); +} + void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) { + sys::Mutex::ScopedLock l(lock); TxEnqueueEvent e; decodeStr(data, e); QPID_LOG(trace, logPrefix << "Enqueue: " << e); @@ -113,6 +150,7 @@ void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) { } void TxReplicator::dequeue(const string& data, sys::Mutex::ScopedLock&) { + sys::Mutex::ScopedLock l(lock); TxDequeueEvent e; decodeStr(data, e); QPID_LOG(trace, logPrefix << "Dequeue: " << e); @@ -132,9 +170,6 @@ bool TxReplicator::DequeueState::addRecord( const ReplicationIdSet& rids) { if (rids.contains(m.getReplicationId())) { - // FIXME aconway 2013-07-24: - // - Do we need to acquire before creating a DR? - // - Are the parameters to DeliveryRecord ok? DeliveryRecord dr(cursor, m.getSequence(), m.getReplicationId(), queue, string() /*tag*/, boost::shared_ptr<Consumer>(), @@ -142,7 +177,7 @@ bool TxReplicator::DequeueState::addRecord( false /*accepted*/, false /*credit.isWindowMode()*/, 0 /*credit*/); - // Fake record ids, unique within this transaction. + // Generate record ids, unique within this transaction. dr.setId(nextId++); records.push_back(dr); recordIds += dr.getId(); @@ -163,30 +198,40 @@ boost::shared_ptr<TxAccept> TxReplicator::DequeueState::makeAccept() { return make_shared<TxAccept>(cref(recordIds), ref(records)); } -void TxReplicator::prepare(const string&, sys::Mutex::ScopedLock&) { +void TxReplicator::prepare(const string&, sys::Mutex::ScopedLock& l) { QPID_LOG(trace, logPrefix << "Prepare"); txBuffer->enlist(dequeueState.makeAccept()); context = store->begin(); - txBuffer->prepare(context.get()); - // FIXME aconway 2013-07-26: notify the primary of prepare outcome. + if (txBuffer->prepare(context.get())) { + QPID_LOG(trace, logPrefix << "Prepared OK"); + QPID_LOG(critical, logPrefix << "FIXME Prepared OK " << haBroker.getSystemId()); + QPID_LOG(critical, logPrefix << "FIXME Prepared ok "<< + encodeStr(TxPrepareOkEvent(haBroker.getSystemId()))); + sendMessage(TxPrepareOkEvent(haBroker.getSystemId()).message(queue->getName()), l); + } else { + QPID_LOG(trace, logPrefix << "Prepare failed"); + sendMessage(TxPrepareFailEvent(haBroker.getSystemId()).message(queue->getName()), l); + } } -void TxReplicator::commit(const string&, sys::Mutex::ScopedLock&) { +void TxReplicator::commit(const string&, sys::Mutex::ScopedLock& l) { QPID_LOG(trace, logPrefix << "Commit"); if (context.get()) store->commit(*context); txBuffer->commit(); - end(); + end(l); } -void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock&) { +void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock& l) { QPID_LOG(trace, logPrefix << "Rollback"); if (context.get()) store->abort(*context); txBuffer->rollback(); - end(); + end(l); } -void TxReplicator::end(){ - // FIXME aconway 2013-07-26: destroying the txqueue (auto-delete?) will - // destroy this via QueueReplicator::destroy +void TxReplicator::end(sys::Mutex::ScopedLock&) { + // Destroy the tx-queue, which will destroy this via QueueReplicator destroy. + // FIXME aconway 2013-08-01: what about connection & user ID for destroy? + haBroker.getBroker().getQueues().destroy(queue->getName()); } + }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.h b/qpid/cpp/src/qpid/ha/TxReplicator.h index c4df8c13f9..41555ef094 100644 --- a/qpid/cpp/src/qpid/ha/TxReplicator.h +++ b/qpid/cpp/src/qpid/ha/TxReplicator.h @@ -58,12 +58,14 @@ class TxReplicator : public QueueReplicator { static std::string getTxId(const std::string& queue); TxReplicator(HaBroker&, const QueuePtr& txQueue, const LinkPtr& link); + ~TxReplicator(); std::string getType() const; protected: void deliver(const broker::Message&); + void destroy(); private: @@ -72,21 +74,21 @@ class TxReplicator : public QueueReplicator { typedef qpid::sys::unordered_map<std::string, DispatchFunction> DispatchMap; typedef qpid::sys::unordered_map<std::string, ReplicationIdSet> DequeueMap; + void sendMessage(const broker::Message&, sys::Mutex::ScopedLock&); void enqueue(const std::string& data, sys::Mutex::ScopedLock&); void dequeue(const std::string& data, sys::Mutex::ScopedLock&); void prepare(const std::string& data, sys::Mutex::ScopedLock&); void commit(const std::string& data, sys::Mutex::ScopedLock&); void rollback(const std::string& data, sys::Mutex::ScopedLock&); - void end(); + void end(sys::Mutex::ScopedLock&); std::string logPrefix; - const std::string id; TxEnqueueEvent enq; // Enqueue data for next deliver. boost::shared_ptr<broker::TxBuffer> txBuffer; - broker::Broker& broker; broker::MessageStore* store; boost::shared_ptr<BrokerReplicator> brokerReplicator; std::auto_ptr<broker::TransactionContext> context; + framing::ChannelId channel; // Channel to send prepare-complete. // Class to process dequeues and create DeliveryRecords to populate a // TxAccept. @@ -103,7 +105,8 @@ class TxReplicator : public QueueReplicator { typedef framing::SequenceSet IdSet; typedef qpid::sys::unordered_map<std::string, ReplicationIdSet> EventMap; - bool addRecord(const broker::Message& m, const boost::shared_ptr<broker::Queue>&, + bool addRecord(const broker::Message& m, + const boost::shared_ptr<broker::Queue>&, const ReplicationIdSet& ); void addRecords(const DequeueMap::value_type& entry); diff --git a/qpid/cpp/src/qpid/ha/types.cpp b/qpid/cpp/src/qpid/ha/types.cpp index 10d6bd4e3b..500a35051a 100644 --- a/qpid/cpp/src/qpid/ha/types.cpp +++ b/qpid/cpp/src/qpid/ha/types.cpp @@ -87,9 +87,11 @@ istream& operator>>(istream& i, EnumBase& e) { return i; } -ostream& operator<<(ostream& o, const IdSet& ids) { +ostream& operator<<(ostream& o, const UuidSet& ids) { ostream_iterator<qpid::types::Uuid> out(o, " "); + o << "{ "; copy(ids.begin(), ids.end(), out); + o << "}"; return o; } diff --git a/qpid/cpp/src/qpid/ha/types.h b/qpid/cpp/src/qpid/ha/types.h index 3af095d470..91e43137d2 100644 --- a/qpid/cpp/src/qpid/ha/types.h +++ b/qpid/cpp/src/qpid/ha/types.h @@ -117,9 +117,9 @@ extern const char* TRANSACTION_REPLICATOR_PREFIX; bool startsWith(const std::string& name, const std::string& prefix); /** Define IdSet type, not a typedef so we can overload operator << */ -class IdSet : public std::set<types::Uuid> {}; +class UuidSet : public std::set<types::Uuid> {}; -std::ostream& operator<<(std::ostream& o, const IdSet& ids); +std::ostream& operator<<(std::ostream& o, const UuidSet& ids); // Use type names to distinguish Positions from Replication Ids typedef framing::SequenceNumber QueuePosition; @@ -139,5 +139,8 @@ struct LogMessageId { }; std::ostream& operator<<(std::ostream&, const LogMessageId&); +/** Return short version of human-readable UUID. */ +inline std::string shortStr(const types::Uuid& uuid) { return uuid.str().substr(0,8); } + }} // qpid::ha #endif /*!QPID_HA_ENUM_H*/ diff --git a/qpid/cpp/src/qpid/types/Uuid.cpp b/qpid/cpp/src/qpid/types/Uuid.cpp index 1d6fbf430a..875e5925a9 100644 --- a/qpid/cpp/src/qpid/types/Uuid.cpp +++ b/qpid/cpp/src/qpid/types/Uuid.cpp @@ -52,6 +52,11 @@ Uuid::Uuid(const unsigned char* uuid) ::memcpy(bytes, uuid, Uuid::SIZE); } +Uuid::Uuid(const char* uuid) +{ + ::memcpy(bytes, uuid, Uuid::SIZE); +} + Uuid& Uuid::operator=(const Uuid& other) { if (this == &other) return *this; diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index 602a62ca17..cceb9795eb 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -265,16 +265,19 @@ acl allow all all class HaCluster(object): _cluster_count = 0 - def __init__(self, test, n, promote=True, wait=True, args=[], **kwargs): + def __init__(self, test, n, promote=True, wait=True, args=[], s_args=[], **kwargs): """Start a cluster of n brokers. @test: The test being run @n: start n brokers @promote: promote self[0] to primary @wait: wait for primary active and backups ready. Ignored if promote=False + @args: args for all brokers in the cluster. + @s_args: args for specific brokers: s_args[i] for broker i. """ self.test = test self.args = args + self.s_args = s_args self.kwargs = kwargs self._ports = [HaPort(test) for i in xrange(n)] self._set_url() @@ -294,9 +297,12 @@ class HaCluster(object): self.broker_id += 1 return name - def _ha_broker(self, ha_port, name): + def _ha_broker(self, i, name): + args = self.args + if i < len(self.s_args): args += self.s_args[i] + ha_port = self._ports[i] b = HaBroker(ha_port.test, ha_port, brokers_url=self.url, name=name, - args=self.args, **self.kwargs) + args=args, **self.kwargs) b.ready() return b @@ -308,7 +314,7 @@ class HaCluster(object): self._ports.append(HaPort(self.test)) self._set_url() self._update_urls() - b = self._ha_broker(self._ports[i], self.next_name()) + b = self._ha_broker(i, self.next_name()) self._brokers.append(b) return b @@ -334,7 +340,7 @@ class HaCluster(object): a separate log file: foo.n.log""" if self._ports[i].stopped: raise Exception("Restart after final kill: %s"%(self)) b = self._brokers[i] - self._brokers[i] = self._ha_broker(self._ports[i], b.name) + self._brokers[i] = self._ha_broker(i, b.name) self._brokers[i].ready() def bounce(self, i, promote_next=True): diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 55715639a4..e97614d785 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -1381,7 +1381,18 @@ class TransactionTests(BrokerTest): self.assertEqual(open_read(cluster[0].store_log), expect) self.assertEqual(open_read(cluster[1].store_log), expect) -# FIXME aconway 2013-07-23: test with partial acknowledgement. + def test_tx_backup_fail(self): + # FIXME aconway 2013-07-31: check exception types, reduce timeout. + cluster = HaCluster( + self, 2, test_store=True, s_args=[[],["--test-store-throw=bang"]]) + c = cluster[0].connect() + tx = c.session(transactional=True) + s = tx.sender("q;{create:always,node:{durable:true}}") + for m in ["foo","bang","bar"]: s.send(Message(m, durable=True)) + self.assertRaises(Exception, tx.commit) + for b in cluster: b.assert_browse_backup("q", []) + self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<abort tx=1>\n") + self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<abort tx=1>\n") if __name__ == "__main__": outdir = "ha_tests.tmp" diff --git a/qpid/cpp/src/tests/test_store.cpp b/qpid/cpp/src/tests/test_store.cpp index fc44889f33..e299161c68 100644 --- a/qpid/cpp/src/tests/test_store.cpp +++ b/qpid/cpp/src/tests/test_store.cpp @@ -63,12 +63,18 @@ struct TestStoreOptions : public Options { string name; string dump; string events; + vector<string> throwMsg; // Throw exception if message content matches. TestStoreOptions() : Options("Test Store Options") { addOptions() - ("test-store-name", optValue(name, "NAME"), "Name of test store instance.") - ("test-store-dump", optValue(dump, "FILE"), "File to dump enqueued messages.") - ("test-store-events", optValue(events, "FILE"), "File to log events, 1 line per event.") + ("test-store-name", optValue(name, "NAME"), + "Name of test store instance.") + ("test-store-dump", optValue(dump, "FILE"), + "File to dump enqueued messages.") + ("test-store-events", optValue(events, "FILE"), + "File to log events, 1 line per event.") + ("test-store-throw", optValue(throwMsg, "CONTENT"), + "Throw exception if message content matches.") ; } }; @@ -91,7 +97,8 @@ class TestStore : public NullMessageStore { { QPID_LOG(info, "TestStore name=" << name << " dump=" << options.dump - << " events=" << options.events); + << " events=" << options.events + << " throw messages =" << options.throwMsg.size()); if (!options.dump.empty()) dump.reset(new ofstream(options.dump.c_str())); @@ -148,7 +155,8 @@ class TestStore : public NullMessageStore { const PersistableQueue& queue) { QPID_LOG(debug, "TestStore enqueue " << queue.getName()); - qpid::broker::amqp_0_10::MessageTransfer* msg = dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get()); + qpid::broker::amqp_0_10::MessageTransfer* msg = + dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get()); assert(msg); ostringstream o; @@ -170,7 +178,11 @@ class TestStore : public NullMessageStore { string data = msg->getFrames().getContent(); size_t i = string::npos; size_t j = string::npos; - if (strncmp(data.c_str(), TEST_STORE_DO.c_str(), strlen(TEST_STORE_DO.c_str())) == 0 + const vector<string>& throwMsg(options.throwMsg); + if (find(throwMsg.begin(), throwMsg.end(), data) != throwMsg.end()) { + throw Exception(QPID_MSG("TestStore " << name << " throwing exception for: " << data)); + } + else if (strncmp(data.c_str(), TEST_STORE_DO.c_str(), strlen(TEST_STORE_DO.c_str())) == 0 && (i = data.find(name+"[")) != string::npos && (j = data.find("]", i)) != string::npos) { |