summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-08-01 20:27:39 +0000
committerAlan Conway <aconway@apache.org>2013-08-01 20:27:39 +0000
commit882f6f224ccca9b674de2fb2dae26d439a713b2b (patch)
treed5bcef3dc971792fd675b447d20eb0ae123dadfc
parent1ac985d9b3a50d8c7691e534945a4829bd57017a (diff)
downloadqpid-python-882f6f224ccca9b674de2fb2dae26d439a713b2b.tar.gz
QPID-4327: HA TX transactions, blocking wait for prepare
Backups send prepare messages to primary, primary delays completion of prepare till all are prepared (or there is a failure). This is NOT the production solution - blocking could cause a deadlock. We need to introduce asynchronous completion of prepare without blocking. This interim solution allows testing on other aspects of TX support. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1509424 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/include/qpid/types/Uuid.h1
-rw-r--r--cpp/src/qpid/framing/FrameSet.h14
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.cpp4
-rw-r--r--cpp/src/qpid/ha/Event.cpp24
-rw-r--r--cpp/src/qpid/ha/Event.h39
-rw-r--r--cpp/src/qpid/ha/FailoverExchange.cpp2
-rw-r--r--cpp/src/qpid/ha/Primary.cpp8
-rw-r--r--cpp/src/qpid/ha/PrimaryTxObserver.cpp117
-rw-r--r--cpp/src/qpid/ha/PrimaryTxObserver.h36
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp15
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.h19
-rw-r--r--cpp/src/qpid/ha/RemoteBackup.cpp2
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.h1
-rw-r--r--cpp/src/qpid/ha/TxReplicator.cpp85
-rw-r--r--cpp/src/qpid/ha/TxReplicator.h11
-rw-r--r--cpp/src/qpid/ha/types.cpp4
-rw-r--r--cpp/src/qpid/ha/types.h7
-rw-r--r--cpp/src/qpid/types/Uuid.cpp5
-rwxr-xr-xcpp/src/tests/ha_test.py16
-rwxr-xr-xcpp/src/tests/ha_tests.py13
-rw-r--r--cpp/src/tests/test_store.cpp24
21 files changed, 343 insertions, 104 deletions
diff --git a/cpp/include/qpid/types/Uuid.h b/cpp/include/qpid/types/Uuid.h
index 43ec160ab3..b931670d97 100644
--- a/cpp/include/qpid/types/Uuid.h
+++ b/cpp/include/qpid/types/Uuid.h
@@ -42,6 +42,7 @@ class QPID_TYPES_CLASS_EXTERN Uuid
QPID_TYPES_EXTERN Uuid& operator=(const Uuid&);
/** Copy the UUID from data16, which must point to a 16-byte UUID */
QPID_TYPES_EXTERN Uuid(const unsigned char* data16);
+ QPID_TYPES_EXTERN Uuid(const char* data16);
/** Set to a new unique identifier. */
QPID_TYPES_EXTERN void generate();
diff --git a/cpp/src/qpid/framing/FrameSet.h b/cpp/src/qpid/framing/FrameSet.h
index 9640abb7ac..4188fd9b8c 100644
--- a/cpp/src/qpid/framing/FrameSet.h
+++ b/cpp/src/qpid/framing/FrameSet.h
@@ -9,9 +9,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -44,6 +44,8 @@ class FrameSet
public:
typedef boost::shared_ptr<FrameSet> shared_ptr;
+ typedef Frames::iterator iterator;
+ typedef Frames::const_iterator const_iterator;
QPID_COMMON_EXTERN FrameSet(const SequenceNumber& id);
QPID_COMMON_EXTERN FrameSet(const FrameSet&);
@@ -62,7 +64,7 @@ public:
QPID_COMMON_EXTERN AMQMethodBody* getMethod();
QPID_COMMON_EXTERN const AMQHeaderBody* getHeaders() const;
QPID_COMMON_EXTERN AMQHeaderBody* getHeaders();
-
+
template <class T> bool isA() const {
const AMQMethodBody* method = getMethod();
return method && method->isA<T>();
@@ -71,12 +73,12 @@ public:
template <class T> const T* as() const {
const AMQMethodBody* method = getMethod();
return (method && method->isA<T>()) ? dynamic_cast<const T*>(method) : 0;
- }
+ }
template <class T> T* as() {
AMQMethodBody* method = getMethod();
return (method && method->isA<T>()) ? dynamic_cast<T*>(method) : 0;
- }
+ }
template <class T> const T* getHeaderProperties() const {
const AMQHeaderBody* header = getHeaders();
@@ -85,7 +87,7 @@ public:
Frames::const_iterator begin() const { return parts.begin(); }
Frames::const_iterator end() const { return parts.end(); }
-
+
const SequenceNumber& getId() const { return id; }
template <class P> void remove(P predicate) {
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp
index f882cbcbf1..47860b433f 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -130,7 +130,6 @@ const string COLON(":");
void sendQuery(const string& packageName, const string& className, const string& queueName,
SessionHandler& sessionHandler)
{
- framing::AMQP_ServerProxy peer(sessionHandler.out);
Variant::Map request;
request[WHAT] = OBJECT;
Variant::Map schema;
@@ -649,9 +648,6 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
if (!replicationTest.getLevel(argsMap)) return;
string name(values[NAME].asString());
-
- if (TxReplicator::isTxQueue(name)) return; // Can't join a transaction in progress.
-
if (!queueTracker.get())
throw Exception(QPID_MSG("Unexpected queue response: " << values));
if (!queueTracker->response(name)) return; // Response is out-of-date
diff --git a/cpp/src/qpid/ha/Event.cpp b/cpp/src/qpid/ha/Event.cpp
index fdd8bc85cc..25e7947267 100644
--- a/cpp/src/qpid/ha/Event.cpp
+++ b/cpp/src/qpid/ha/Event.cpp
@@ -49,28 +49,36 @@ const string TxDequeueEvent::KEY(QPID_HA+"txde");
const string TxPrepareEvent::KEY(QPID_HA+"txpr");
const string TxCommitEvent::KEY(QPID_HA+"txcm");
const string TxRollbackEvent::KEY(QPID_HA+"txrb");
+const string TxPrepareOkEvent::KEY(QPID_HA+"txok");
+const string TxPrepareFailEvent::KEY(QPID_HA+"txno");
-broker::Message makeMessage(const string& data, const string& key) {
+broker::Message makeMessage(
+ const string& data, const string& destination, const string& routingKey)
+{
boost::intrusive_ptr<MessageTransfer> transfer(new MessageTransfer());
- AMQFrame method((MessageTransferBody(ProtocolVersion(), key, 0, 0)));
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), destination, 0, 0)));
+ method.setBof(true);
+ method.setEof(false);
+ method.setBos(true);
+ method.setEos(true);
AMQFrame header((AMQHeaderBody()));
- AMQFrame content((AMQContentBody()));
- Buffer buffer(const_cast<char*>(&data[0]), data.size());
- // AMQContentBody::decode is missing a const declaration, so cast it here.
- content.castBody<AMQContentBody>()->decode(
- const_cast<Buffer&>(buffer), buffer.getSize());
header.setBof(false);
header.setEof(false);
header.setBos(true);
header.setEos(true);
+ AMQFrame content((AMQContentBody()));
content.setBof(false);
content.setEof(true);
content.setBos(true);
content.setEos(true);
+ Buffer buffer(const_cast<char*>(&data[0]), data.size());
+ content.castBody<AMQContentBody>()->decode(
+ const_cast<Buffer&>(buffer), buffer.getSize());
transfer->getFrames().append(method);
transfer->getFrames().append(header);
transfer->getFrames().append(content);
- transfer->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(key);
+ transfer->getFrames().getHeaders()->
+ get<DeliveryProperties>(true)->setRoutingKey(routingKey);
return broker::Message(transfer, 0);
}
diff --git a/cpp/src/qpid/ha/Event.h b/cpp/src/qpid/ha/Event.h
index 08174bfc9d..daaa6eada3 100644
--- a/cpp/src/qpid/ha/Event.h
+++ b/cpp/src/qpid/ha/Event.h
@@ -33,7 +33,10 @@
namespace qpid {
namespace ha {
-broker::Message makeMessage(const std::string& content, const std::string& destination);
+broker::Message makeMessage(
+ const std::string& content,
+ const std::string& destination,
+ const std::string& routingKey);
/** Test if a string is an event key */
@@ -47,7 +50,8 @@ struct Event {
virtual size_t encodedSize() const = 0;
virtual std::string key() const = 0; // Routing key
virtual void print(std::ostream& o) const = 0;
- broker::Message message() const { return makeMessage(framing::encodeStr(*this), key()); }
+ broker::Message message(const std::string& destination=std::string()) const {
+ return makeMessage(framing::encodeStr(*this), destination, key()); }
};
@@ -140,6 +144,37 @@ struct TxRollbackEvent : public EventBase<TxRollbackEvent> {
void print(std::ostream&) const {}
};
+struct TxPrepareOkEvent : public EventBase<TxPrepareOkEvent> {
+ static const std::string KEY;
+ types::Uuid broker;
+ TxPrepareOkEvent(const types::Uuid& b=types::Uuid()) : broker(b) {}
+
+ void encode(framing::Buffer& b) const {
+ b.putRawData(broker.data(), broker.size());
+ }
+
+ void decode(framing::Buffer& b) {
+ std::string s;
+ b.getRawData(s, broker.size());
+ broker = types::Uuid(&s[0]);
+ }
+ virtual size_t encodedSize() const { return broker.size(); }
+ void print(std::ostream& o) const { o << broker; }
+};
+
+struct TxPrepareFailEvent : public EventBase<TxPrepareFailEvent> {
+ static const std::string KEY;
+ types::Uuid broker;
+ TxPrepareFailEvent(const types::Uuid& b=types::Uuid()) : broker(b) {}
+ void encode(framing::Buffer& b) const { b.putRawData(broker.data(), broker.size()); }
+ void decode(framing::Buffer& b) {
+ std::string s;
+ b.getRawData(s, broker.size());
+ broker = types::Uuid(&s[0]);
+ }
+ virtual size_t encodedSize() const { return broker.size(); }
+ void print(std::ostream& o) const { o << broker; }
+};
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/FailoverExchange.cpp b/cpp/src/qpid/ha/FailoverExchange.cpp
index 46cc345ab0..9c7b986bf8 100644
--- a/cpp/src/qpid/ha/FailoverExchange.cpp
+++ b/cpp/src/qpid/ha/FailoverExchange.cpp
@@ -117,7 +117,7 @@ void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue, sys::Mutex::Sc
if (urls.empty()) return;
framing::Array array = vectorToUrlArray(urls);
const ProtocolVersion v;
- broker::Message message(makeMessage(std::string(), typeName));
+ broker::Message message(makeMessage(std::string(), typeName, typeName));
MessageTransfer& transfer = MessageTransfer::get(message);
MessageProperties* props =
transfer.getFrames().getHeaders()->get<framing::MessageProperties>(true);
diff --git a/cpp/src/qpid/ha/Primary.cpp b/cpp/src/qpid/ha/Primary.cpp
index 5fd7814d62..4208147ff4 100644
--- a/cpp/src/qpid/ha/Primary.cpp
+++ b/cpp/src/qpid/ha/Primary.cpp
@@ -390,12 +390,16 @@ void Primary::setCatchupQueues(const RemoteBackupPtr& backup, bool createGuards)
void Primary::startTx(const boost::shared_ptr<broker::TxBuffer>& tx) {
QPID_LOG(trace, logPrefix << "Started TX transaction");
- tx->setObserver(make_shared<PrimaryTxObserver>(boost::ref(haBroker)));
+ shared_ptr<PrimaryTxObserver> observer(new PrimaryTxObserver(haBroker));
+ observer->initialize();
+ tx->setObserver(observer);
}
void Primary::startDtx(const boost::shared_ptr<broker::DtxBuffer>& dtx) {
QPID_LOG(trace, logPrefix << "Started DTX transaction");
- dtx->setObserver(make_shared<PrimaryTxObserver>(boost::ref(haBroker)));
+ shared_ptr<PrimaryTxObserver> observer(new PrimaryTxObserver(haBroker));
+ observer->initialize();
+ dtx->setObserver(observer);
}
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index 8a8364ac22..f400dbf99d 100644
--- a/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -26,24 +26,73 @@
#include "QueueGuard.h"
#include "RemoteBackup.h"
#include "ReplicatingSubscription.h"
+#include "QueueReplicator.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include <boost/lexical_cast.hpp>
+#include <boost/make_shared.hpp>
namespace qpid {
+namespace framing {
+class FieldTable;
+}
namespace ha {
using namespace std;
using namespace boost;
-using namespace broker;
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+// Exchange to receive prepare OK events.
+class PrimaryTxObserver::Exchange : public broker::Exchange {
+ public:
+ Exchange(const boost::shared_ptr<PrimaryTxObserver>& tx_) :
+ broker::Exchange(TRANSACTION_REPLICATOR_PREFIX+tx_->getId().str()),
+ tx(tx_)
+ {
+ dispatch[TxPrepareOkEvent::KEY] =
+ boost::bind(&PrimaryTxObserver::txPrepareOkEvent, tx, _1);
+ dispatch[TxPrepareFailEvent::KEY] =
+ boost::bind(&PrimaryTxObserver::txPrepareFailEvent, tx, _1);
+ }
+
+ void route(Deliverable& deliverable) {
+ const broker::Message& message(deliverable.getMessage());
+ DispatchMap::iterator i = dispatch.find(message.getRoutingKey());
+ if (i != dispatch.end()) i->second(message.getContent());
+ }
+
+ bool bind(boost::shared_ptr<Queue>, const string&, const FieldTable*) { return false; }
+ bool unbind(boost::shared_ptr<Queue>, const string&, const FieldTable*) { return false; }
+ bool isBound(boost::shared_ptr<Queue>, const string* const, const FieldTable* const) { return false; }
+ string getType() const { return TYPE_NAME; }
+
+ private:
+ static const string TYPE_NAME;
+ typedef function<void(const std::string&)> DispatchFn;
+ typedef qpid::sys::unordered_map<std::string, DispatchFn> DispatchMap;
+
+ DispatchMap dispatch;
+ boost::shared_ptr<PrimaryTxObserver> tx;
+};
+const string PrimaryTxObserver::Exchange::TYPE_NAME(string(QPID_HA_PREFIX)+"primary-tx-observer");
PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
- haBroker(hb), broker(hb.getBroker()),
- id(true) // FIXME aconway 2013-07-11: is UUID an appropriate TX ID?
+ haBroker(hb), broker(hb.getBroker()), id(true), failed(false)
{
- logPrefix = "Primary transaction "+id.str().substr(0,8)+": ";
- QPID_LOG(trace, logPrefix << "started");
+ logPrefix = "Primary transaction "+shortStr(id)+": ";
+
+ // The brokers known at this point are the ones that will be included
+ // in the transaction. Brokers that join later are not included
+ // Latecomers that have replicated the transaction will be rolled back
+ // when the tx-queue is deleted.
+ //
+ BrokerInfo::Set infoSet(haBroker.getMembership().otherBackups());
+ transform(infoSet.begin(), infoSet.end(), inserter(backups, backups.begin()),
+ bind(&BrokerInfo::getSystemId, _1));
+ QPID_LOG(trace, logPrefix << "Started on " << backups);
+
pair<shared_ptr<Queue>, bool> result =
broker.getQueues().declare(
TRANSACTION_REPLICATOR_PREFIX+id.str(),
@@ -52,8 +101,13 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
txQueue = result.first;
}
+void PrimaryTxObserver::initialize() {
+ broker.getExchanges().registerExchange(make_shared<Exchange>(shared_from_this()));
+}
+
void PrimaryTxObserver::enqueue(const QueuePtr& q, const broker::Message& m)
{
+ sys::Mutex::ScopedLock l(lock);
QPID_LOG(trace, logPrefix << "enqueue: " << LogMessageId(*q, m));
enqueues[q] += m.getReplicationId();
txQueue->deliver(TxEnqueueEvent(q->getName(), m.getReplicationId()).message());
@@ -63,42 +117,63 @@ void PrimaryTxObserver::enqueue(const QueuePtr& q, const broker::Message& m)
void PrimaryTxObserver::dequeue(
const QueuePtr& q, QueuePosition pos, ReplicationId id)
{
+ sys::Mutex::ScopedLock l(lock);
QPID_LOG(trace, logPrefix << "dequeue: " << LogMessageId(*q, pos, id));
txQueue->deliver(TxDequeueEvent(q->getName(), id).message());
}
-void PrimaryTxObserver::deduplicate() {
+void PrimaryTxObserver::deduplicate(sys::Mutex::ScopedLock&) {
shared_ptr<Primary> primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole()));
assert(primary);
- // FIXME aconway 2013-07-29: need to verify which backups are *in* the transaction.
- // Use cluster membership for now
- BrokerInfo::Set brokers = haBroker.getMembership().getBrokers();
- types::Uuid selfId = haBroker.getMembership().getSelf();
// Tell replicating subscriptions to skip IDs in the transaction.
- for (BrokerInfo::Set::iterator b = brokers.begin(); b != brokers.end(); ++b) {
- if (b->getSystemId() == selfId) continue;
+ for (UuidSet::iterator b = backups.begin(); b != backups.end(); ++b)
for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q)
- primary->skip(b->getSystemId(), q->first, q->second);
- }
+ primary->skip(*b, q->first, q->second);
}
bool PrimaryTxObserver::prepare() {
- // FIXME aconway 2013-07-23: need to delay completion of prepare till all
- // backups have prepared.
- QPID_LOG(trace, logPrefix << "prepare");
- deduplicate();
+ sys::Mutex::ScopedLock l(lock);
+ // FIXME aconway 2013-07-23: WRONG blocking. Need async completion.
+ QPID_LOG(trace, logPrefix << "Prepare");
+ deduplicate(l);
txQueue->deliver(TxPrepareEvent().message());
- return true;
+ while (!isPrepared(l)) lock.wait();
+ return !failed;
}
void PrimaryTxObserver::commit() {
- QPID_LOG(trace, logPrefix << "commit");
+ sys::Mutex::ScopedLock l(lock);
+ QPID_LOG(trace, logPrefix << "Commit");
txQueue->deliver(TxCommitEvent().message());
}
void PrimaryTxObserver::rollback() {
- QPID_LOG(trace, logPrefix << "rollback");
+ sys::Mutex::ScopedLock l(lock);
+ QPID_LOG(trace, logPrefix << "Rollback");
txQueue->deliver(TxRollbackEvent().message());
}
+void PrimaryTxObserver::txPrepareOkEvent(const string& data) {
+ QPID_LOG(critical, logPrefix << "FIXME data: " << data);
+ sys::Mutex::ScopedLock l(lock);
+ types::Uuid backup = decodeStr<TxPrepareOkEvent>(data).broker;
+ QPID_LOG(critical, logPrefix << "FIXME backup: " << backup);
+ QPID_LOG(trace, logPrefix << "Backup prepared ok: " << backup);
+ prepared.insert(backup);
+ lock.notify();
+}
+
+void PrimaryTxObserver::txPrepareFailEvent(const string& data) {
+ sys::Mutex::ScopedLock l(lock);
+ types::Uuid backup = decodeStr<TxPrepareFailEvent>(data).broker;
+ QPID_LOG(error, logPrefix << "Backup prepare failed: " << backup);
+ prepared.insert(backup);
+ failed = true;
+ lock.notify();
+}
+
+bool PrimaryTxObserver::isPrepared(sys::Mutex::ScopedLock&) {
+ return (prepared == backups || failed);
+}
+
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/PrimaryTxObserver.h b/cpp/src/qpid/ha/PrimaryTxObserver.h
index 3681c6b750..413c627d68 100644
--- a/cpp/src/qpid/ha/PrimaryTxObserver.h
+++ b/cpp/src/qpid/ha/PrimaryTxObserver.h
@@ -26,13 +26,18 @@
#include "qpid/broker/TransactionObserver.h"
#include "qpid/log/Statement.h"
-#include "qpid/framing/Uuid.h"
+#include "qpid/types/Uuid.h"
#include "qpid/sys/unordered_map.h"
+#include "qpid/sys/Monitor.h"
#include <boost/functional/hash.hpp>
+#include <boost/enable_shared_from_this.hpp>
+
namespace qpid {
namespace broker {
class Broker;
+class Message;
+class Consumer;
}
namespace ha {
@@ -46,30 +51,51 @@ class HaBroker;
* A TxReplicator on the backup replicates the tx-queue and creates
* a TxBuffer on the backup equivalent to the one on the primary.
*
- * THREAD UNSAFE: called sequentially in the context of a transaction.
+ * Also observes the tx-queue for prepare-complete messages and
+ * subscription cancellations.
+ *
+ * THREAD SAFE: called in user connection thread for TX events,
+ * and in backup connection threads for prepare-completed events
+ * and unsubscriptions.
*/
-class PrimaryTxObserver : public broker::TransactionObserver {
+class PrimaryTxObserver : public broker::TransactionObserver,
+ public boost::enable_shared_from_this<PrimaryTxObserver>
+{
public:
PrimaryTxObserver(HaBroker&);
+ /** Call immediately after constructor, uses shared_from_this. */
+ void initialize();
+
void enqueue(const QueuePtr&, const broker::Message&);
void dequeue(const QueuePtr& queue, QueuePosition, ReplicationId);
bool prepare();
void commit();
void rollback();
+ types::Uuid getId() const { return id; }
+
private:
+ class Exchange;
typedef qpid::sys::unordered_map<
QueuePtr, ReplicationIdSet, boost::hash<QueuePtr> > QueueIdsMap;
- void deduplicate();
+ void deduplicate(sys::Mutex::ScopedLock&);
+ void txPrepareOkEvent(const std::string& data);
+ void txPrepareFailEvent(const std::string& data);
+ void consumerRemoved(const broker::Consumer&);
+ bool isPrepared(sys::Mutex::ScopedLock&);
+ sys::Monitor lock;
std::string logPrefix;
HaBroker& haBroker;
broker::Broker& broker;
- framing::Uuid id;
+ types::Uuid id;
QueuePtr txQueue;
QueueIdsMap enqueues;
+ bool failed;
+ UuidSet backups;
+ UuidSet prepared;
};
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index 28e9dc4120..9149567cf2 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -107,9 +107,13 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
: Exchange(replicatorName(q->getName()), 0, q->getBroker()),
haBroker(hb),
brokerInfo(hb.getBrokerInfo()),
+ link(l),
+ queue(q),
+ sessionHandler(0),
logPrefix("Backup of "+q->getName()+": "),
- queue(q), link(l), subscribed(false),
- settings(hb.getSettings()), destroyed(false),
+ subscribed(false),
+ settings(hb.getSettings()),
+ destroyed(false),
nextId(0), maxId(0)
{
args.setString(QPID_REPLICATE, printable(NONE).str());
@@ -176,22 +180,23 @@ void QueueReplicator::destroy() {
if (destroyed) return;
destroyed = true;
QPID_LOG(debug, logPrefix << "Destroyed");
+ bridge2 = bridge; // call close outside the lock.
// Need to drop shared pointers to avoid pointer cycles keeping this in memory.
queue.reset();
link.reset();
bridge.reset();
getBroker()->getExchanges().destroy(getName());
- bridge2 = bridge;
}
if (bridge2) bridge2->close(); // Outside of lock, avoid deadlock.
}
// Called in a broker connection thread when the bridge is created.
// Note: called with the Link lock held.
-void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
+void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler_) {
Mutex::ScopedLock l(lock);
if (destroyed) return; // Already destroyed
- AMQP_ServerProxy peer(sessionHandler.out);
+ sessionHandler = &sessionHandler_;
+ AMQP_ServerProxy peer(sessionHandler->out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
FieldTable arguments;
arguments.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
diff --git a/cpp/src/qpid/ha/QueueReplicator.h b/cpp/src/qpid/ha/QueueReplicator.h
index 90f38ce7e1..cbc950d4bc 100644
--- a/cpp/src/qpid/ha/QueueReplicator.h
+++ b/cpp/src/qpid/ha/QueueReplicator.h
@@ -72,11 +72,8 @@ class QueueReplicator : public broker::Exchange,
void activate(); // Must be called immediately after constructor.
std::string getType() const;
- bool bind(boost::shared_ptr<broker::Queue
- >, const std::string&, const framing::FieldTable*);
- bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+
void route(broker::Deliverable&);
- bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
// Set if the queue has ever been subscribed to, used for auto-delete cleanup.
void setSubscribed() { subscribed = true; }
@@ -86,16 +83,26 @@ class QueueReplicator : public broker::Exchange,
ReplicationId getMaxId();
+ // No-op unused Exchange virtual functions.
+ bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+ bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+ bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
+
protected:
typedef boost::function<void(const std::string&, sys::Mutex::ScopedLock&)> DispatchFn;
typedef qpid::sys::unordered_map<std::string, DispatchFn> DispatchMap;
virtual void deliver(const broker::Message&);
+ virtual void destroy(); // Called when the queue is destroyed.
sys::Mutex lock;
HaBroker& haBroker;
const BrokerInfo brokerInfo;
DispatchMap dispatch;
+ boost::shared_ptr<broker::Link> link;
+ boost::shared_ptr<broker::Bridge> bridge;
+ boost::shared_ptr<broker::Queue> queue;
+ broker::SessionHandler* sessionHandler;
private:
typedef qpid::sys::unordered_map<
@@ -104,7 +111,6 @@ class QueueReplicator : public broker::Exchange,
class QueueObserver;
void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
- void destroy(); // Called when the queue is destroyed.
// Dispatch functions
void dequeueEvent(const std::string& data, sys::Mutex::ScopedLock&);
@@ -112,9 +118,6 @@ class QueueReplicator : public broker::Exchange,
std::string logPrefix;
std::string bridgeName;
- boost::shared_ptr<broker::Queue> queue;
- boost::shared_ptr<broker::Link> link;
- boost::shared_ptr<broker::Bridge> bridge;
bool subscribed;
const Settings& settings;
diff --git a/cpp/src/qpid/ha/RemoteBackup.cpp b/cpp/src/qpid/ha/RemoteBackup.cpp
index 776a584bc8..0993c6ea39 100644
--- a/cpp/src/qpid/ha/RemoteBackup.cpp
+++ b/cpp/src/qpid/ha/RemoteBackup.cpp
@@ -66,8 +66,6 @@ bool RemoteBackup::isReady() {
}
void RemoteBackup::catchupQueue(const QueuePtr& q, bool createGuard) {
- // Ignore transaction queues for purposes of catch-up calculation
- if (TxReplicator::isTxQueue(q->getName())) return;
if (replicationTest.getLevel(*q) == ALL) {
QPID_LOG(debug, logPrefix << "Catch-up queue"
<< (createGuard ? " and guard" : "") << ": " << q->getName());
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.h b/cpp/src/qpid/ha/ReplicatingSubscription.h
index 8a4a48bb4e..71993bcb12 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -25,7 +25,6 @@
#include "BrokerInfo.h"
#include "qpid/broker/SemanticState.h"
#include "qpid/broker/ConsumerFactory.h"
-#include "qpid/types/Uuid.h"
#include <iosfwd>
namespace qpid {
diff --git a/cpp/src/qpid/ha/TxReplicator.cpp b/cpp/src/qpid/ha/TxReplicator.cpp
index 31c68dfe45..58963149fc 100644
--- a/cpp/src/qpid/ha/TxReplicator.cpp
+++ b/cpp/src/qpid/ha/TxReplicator.cpp
@@ -28,15 +28,20 @@
#include "HaBroker.h"
#include "types.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/Link.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/SessionHandler.h"
#include "qpid/broker/TxBuffer.h"
#include "qpid/broker/TxAccept.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/framing/BufferTypes.h"
#include "qpid/log/Statement.h"
#include <boost/shared_ptr.hpp>
#include <boost/bind.hpp>
#include <boost/make_shared.hpp>
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
+#include "qpid/framing/MessageTransferBody.h"
namespace qpid {
namespace ha {
@@ -45,14 +50,18 @@ using namespace std;
using namespace boost;
using namespace qpid::broker;
using namespace qpid::framing;
+using qpid::broker::amqp_0_10::MessageTransfer;
+using qpid::types::Uuid;
namespace {
const string QPID_HA(QPID_HA_PREFIX);
-const string TYPE_NAME(QPID_HA+"tx-queue-replicator");
+const string TYPE_NAME(QPID_HA+"tx-replicator");
const string PREFIX(TRANSACTION_REPLICATOR_PREFIX);
+
} // namespace
+
bool TxReplicator::isTxQueue(const string& q) {
return startsWith(q, PREFIX);
}
@@ -69,13 +78,13 @@ TxReplicator::TxReplicator(
const boost::shared_ptr<broker::Queue>& txQueue,
const boost::shared_ptr<broker::Link>& link) :
QueueReplicator(hb, txQueue, link),
- id(getTxId(txQueue->getName())),
txBuffer(new broker::TxBuffer),
- broker(hb.getBroker()),
- store(broker.hasStore() ? &broker.getStore() : 0),
+ store(hb.getBroker().hasStore() ? &hb.getBroker().getStore() : 0),
+ channel(link->nextChannel()),
dequeueState(hb.getBroker().getQueues())
{
- logPrefix = "Backup of transaction "+id+": ";
+ string shortId = getTxId(txQueue->getName()).substr(0, 8);
+ logPrefix = "Backup of transaction "+shortId+": ";
if (!store) throw Exception(QPID_MSG(logPrefix << "No message store loaded."));
boost::shared_ptr<Backup> backup = dynamic_pointer_cast<Backup>(hb.getRole());
@@ -95,17 +104,45 @@ TxReplicator::TxReplicator(
boost::bind(&TxReplicator::rollback, this, _1, _2);
}
+TxReplicator::~TxReplicator() {
+ link->returnChannel(channel);
+}
+
+// Send a message to the primary tx.
+void TxReplicator::sendMessage(const broker::Message& msg, sys::Mutex::ScopedLock&) {
+ assert(sessionHandler);
+ const MessageTransfer& transfer(MessageTransfer::get(msg));
+ for (FrameSet::const_iterator i = transfer.getFrames().begin();
+ i != transfer.getFrames().end();
+ ++i)
+ {
+ sessionHandler->out.handle(const_cast<AMQFrame&>(*i));
+ }
+}
+
void TxReplicator::deliver(const broker::Message& m_) {
+ sys::Mutex::ScopedLock l(lock);
// Deliver message to the target queue, not the tx-queue.
broker::Message m(m_);
m.setReplicationId(enq.id); // Use replicated id.
- boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(enq.queue);
+ boost::shared_ptr<broker::Queue> queue =
+ haBroker.getBroker().getQueues().get(enq.queue);
QPID_LOG(trace, logPrefix << "Deliver " << LogMessageId(*queue, m));
DeliverableMessage dm(m, txBuffer.get());
dm.deliverTo(queue);
}
+void TxReplicator::destroy() {
+ {
+ sys::Mutex::ScopedLock l(lock);
+ if (context.get()) store->abort(*context);
+ txBuffer->rollback();
+ }
+ QueueReplicator::destroy();
+}
+
void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) {
+ sys::Mutex::ScopedLock l(lock);
TxEnqueueEvent e;
decodeStr(data, e);
QPID_LOG(trace, logPrefix << "Enqueue: " << e);
@@ -113,6 +150,7 @@ void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) {
}
void TxReplicator::dequeue(const string& data, sys::Mutex::ScopedLock&) {
+ sys::Mutex::ScopedLock l(lock);
TxDequeueEvent e;
decodeStr(data, e);
QPID_LOG(trace, logPrefix << "Dequeue: " << e);
@@ -132,9 +170,6 @@ bool TxReplicator::DequeueState::addRecord(
const ReplicationIdSet& rids)
{
if (rids.contains(m.getReplicationId())) {
- // FIXME aconway 2013-07-24:
- // - Do we need to acquire before creating a DR?
- // - Are the parameters to DeliveryRecord ok?
DeliveryRecord dr(cursor, m.getSequence(), m.getReplicationId(), queue,
string() /*tag*/,
boost::shared_ptr<Consumer>(),
@@ -142,7 +177,7 @@ bool TxReplicator::DequeueState::addRecord(
false /*accepted*/,
false /*credit.isWindowMode()*/,
0 /*credit*/);
- // Fake record ids, unique within this transaction.
+ // Generate record ids, unique within this transaction.
dr.setId(nextId++);
records.push_back(dr);
recordIds += dr.getId();
@@ -163,30 +198,40 @@ boost::shared_ptr<TxAccept> TxReplicator::DequeueState::makeAccept() {
return make_shared<TxAccept>(cref(recordIds), ref(records));
}
-void TxReplicator::prepare(const string&, sys::Mutex::ScopedLock&) {
+void TxReplicator::prepare(const string&, sys::Mutex::ScopedLock& l) {
QPID_LOG(trace, logPrefix << "Prepare");
txBuffer->enlist(dequeueState.makeAccept());
context = store->begin();
- txBuffer->prepare(context.get());
- // FIXME aconway 2013-07-26: notify the primary of prepare outcome.
+ if (txBuffer->prepare(context.get())) {
+ QPID_LOG(trace, logPrefix << "Prepared OK");
+ QPID_LOG(critical, logPrefix << "FIXME Prepared OK " << haBroker.getSystemId());
+ QPID_LOG(critical, logPrefix << "FIXME Prepared ok "<<
+ encodeStr(TxPrepareOkEvent(haBroker.getSystemId())));
+ sendMessage(TxPrepareOkEvent(haBroker.getSystemId()).message(queue->getName()), l);
+ } else {
+ QPID_LOG(trace, logPrefix << "Prepare failed");
+ sendMessage(TxPrepareFailEvent(haBroker.getSystemId()).message(queue->getName()), l);
+ }
}
-void TxReplicator::commit(const string&, sys::Mutex::ScopedLock&) {
+void TxReplicator::commit(const string&, sys::Mutex::ScopedLock& l) {
QPID_LOG(trace, logPrefix << "Commit");
if (context.get()) store->commit(*context);
txBuffer->commit();
- end();
+ end(l);
}
-void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock&) {
+void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock& l) {
QPID_LOG(trace, logPrefix << "Rollback");
if (context.get()) store->abort(*context);
txBuffer->rollback();
- end();
+ end(l);
}
-void TxReplicator::end(){
- // FIXME aconway 2013-07-26: destroying the txqueue (auto-delete?) will
- // destroy this via QueueReplicator::destroy
+void TxReplicator::end(sys::Mutex::ScopedLock&) {
+ // Destroy the tx-queue, which will destroy this via QueueReplicator destroy.
+ // FIXME aconway 2013-08-01: what about connection & user ID for destroy?
+ haBroker.getBroker().getQueues().destroy(queue->getName());
}
+
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/TxReplicator.h b/cpp/src/qpid/ha/TxReplicator.h
index c4df8c13f9..41555ef094 100644
--- a/cpp/src/qpid/ha/TxReplicator.h
+++ b/cpp/src/qpid/ha/TxReplicator.h
@@ -58,12 +58,14 @@ class TxReplicator : public QueueReplicator {
static std::string getTxId(const std::string& queue);
TxReplicator(HaBroker&, const QueuePtr& txQueue, const LinkPtr& link);
+ ~TxReplicator();
std::string getType() const;
protected:
void deliver(const broker::Message&);
+ void destroy();
private:
@@ -72,21 +74,21 @@ class TxReplicator : public QueueReplicator {
typedef qpid::sys::unordered_map<std::string, DispatchFunction> DispatchMap;
typedef qpid::sys::unordered_map<std::string, ReplicationIdSet> DequeueMap;
+ void sendMessage(const broker::Message&, sys::Mutex::ScopedLock&);
void enqueue(const std::string& data, sys::Mutex::ScopedLock&);
void dequeue(const std::string& data, sys::Mutex::ScopedLock&);
void prepare(const std::string& data, sys::Mutex::ScopedLock&);
void commit(const std::string& data, sys::Mutex::ScopedLock&);
void rollback(const std::string& data, sys::Mutex::ScopedLock&);
- void end();
+ void end(sys::Mutex::ScopedLock&);
std::string logPrefix;
- const std::string id;
TxEnqueueEvent enq; // Enqueue data for next deliver.
boost::shared_ptr<broker::TxBuffer> txBuffer;
- broker::Broker& broker;
broker::MessageStore* store;
boost::shared_ptr<BrokerReplicator> brokerReplicator;
std::auto_ptr<broker::TransactionContext> context;
+ framing::ChannelId channel; // Channel to send prepare-complete.
// Class to process dequeues and create DeliveryRecords to populate a
// TxAccept.
@@ -103,7 +105,8 @@ class TxReplicator : public QueueReplicator {
typedef framing::SequenceSet IdSet;
typedef qpid::sys::unordered_map<std::string, ReplicationIdSet> EventMap;
- bool addRecord(const broker::Message& m, const boost::shared_ptr<broker::Queue>&,
+ bool addRecord(const broker::Message& m,
+ const boost::shared_ptr<broker::Queue>&,
const ReplicationIdSet& );
void addRecords(const DequeueMap::value_type& entry);
diff --git a/cpp/src/qpid/ha/types.cpp b/cpp/src/qpid/ha/types.cpp
index 10d6bd4e3b..500a35051a 100644
--- a/cpp/src/qpid/ha/types.cpp
+++ b/cpp/src/qpid/ha/types.cpp
@@ -87,9 +87,11 @@ istream& operator>>(istream& i, EnumBase& e) {
return i;
}
-ostream& operator<<(ostream& o, const IdSet& ids) {
+ostream& operator<<(ostream& o, const UuidSet& ids) {
ostream_iterator<qpid::types::Uuid> out(o, " ");
+ o << "{ ";
copy(ids.begin(), ids.end(), out);
+ o << "}";
return o;
}
diff --git a/cpp/src/qpid/ha/types.h b/cpp/src/qpid/ha/types.h
index 3af095d470..91e43137d2 100644
--- a/cpp/src/qpid/ha/types.h
+++ b/cpp/src/qpid/ha/types.h
@@ -117,9 +117,9 @@ extern const char* TRANSACTION_REPLICATOR_PREFIX;
bool startsWith(const std::string& name, const std::string& prefix);
/** Define IdSet type, not a typedef so we can overload operator << */
-class IdSet : public std::set<types::Uuid> {};
+class UuidSet : public std::set<types::Uuid> {};
-std::ostream& operator<<(std::ostream& o, const IdSet& ids);
+std::ostream& operator<<(std::ostream& o, const UuidSet& ids);
// Use type names to distinguish Positions from Replication Ids
typedef framing::SequenceNumber QueuePosition;
@@ -139,5 +139,8 @@ struct LogMessageId {
};
std::ostream& operator<<(std::ostream&, const LogMessageId&);
+/** Return short version of human-readable UUID. */
+inline std::string shortStr(const types::Uuid& uuid) { return uuid.str().substr(0,8); }
+
}} // qpid::ha
#endif /*!QPID_HA_ENUM_H*/
diff --git a/cpp/src/qpid/types/Uuid.cpp b/cpp/src/qpid/types/Uuid.cpp
index 1d6fbf430a..875e5925a9 100644
--- a/cpp/src/qpid/types/Uuid.cpp
+++ b/cpp/src/qpid/types/Uuid.cpp
@@ -52,6 +52,11 @@ Uuid::Uuid(const unsigned char* uuid)
::memcpy(bytes, uuid, Uuid::SIZE);
}
+Uuid::Uuid(const char* uuid)
+{
+ ::memcpy(bytes, uuid, Uuid::SIZE);
+}
+
Uuid& Uuid::operator=(const Uuid& other)
{
if (this == &other) return *this;
diff --git a/cpp/src/tests/ha_test.py b/cpp/src/tests/ha_test.py
index 602a62ca17..cceb9795eb 100755
--- a/cpp/src/tests/ha_test.py
+++ b/cpp/src/tests/ha_test.py
@@ -265,16 +265,19 @@ acl allow all all
class HaCluster(object):
_cluster_count = 0
- def __init__(self, test, n, promote=True, wait=True, args=[], **kwargs):
+ def __init__(self, test, n, promote=True, wait=True, args=[], s_args=[], **kwargs):
"""Start a cluster of n brokers.
@test: The test being run
@n: start n brokers
@promote: promote self[0] to primary
@wait: wait for primary active and backups ready. Ignored if promote=False
+ @args: args for all brokers in the cluster.
+ @s_args: args for specific brokers: s_args[i] for broker i.
"""
self.test = test
self.args = args
+ self.s_args = s_args
self.kwargs = kwargs
self._ports = [HaPort(test) for i in xrange(n)]
self._set_url()
@@ -294,9 +297,12 @@ class HaCluster(object):
self.broker_id += 1
return name
- def _ha_broker(self, ha_port, name):
+ def _ha_broker(self, i, name):
+ args = self.args
+ if i < len(self.s_args): args += self.s_args[i]
+ ha_port = self._ports[i]
b = HaBroker(ha_port.test, ha_port, brokers_url=self.url, name=name,
- args=self.args, **self.kwargs)
+ args=args, **self.kwargs)
b.ready()
return b
@@ -308,7 +314,7 @@ class HaCluster(object):
self._ports.append(HaPort(self.test))
self._set_url()
self._update_urls()
- b = self._ha_broker(self._ports[i], self.next_name())
+ b = self._ha_broker(i, self.next_name())
self._brokers.append(b)
return b
@@ -334,7 +340,7 @@ class HaCluster(object):
a separate log file: foo.n.log"""
if self._ports[i].stopped: raise Exception("Restart after final kill: %s"%(self))
b = self._brokers[i]
- self._brokers[i] = self._ha_broker(self._ports[i], b.name)
+ self._brokers[i] = self._ha_broker(i, b.name)
self._brokers[i].ready()
def bounce(self, i, promote_next=True):
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py
index 55715639a4..e97614d785 100755
--- a/cpp/src/tests/ha_tests.py
+++ b/cpp/src/tests/ha_tests.py
@@ -1381,7 +1381,18 @@ class TransactionTests(BrokerTest):
self.assertEqual(open_read(cluster[0].store_log), expect)
self.assertEqual(open_read(cluster[1].store_log), expect)
-# FIXME aconway 2013-07-23: test with partial acknowledgement.
+ def test_tx_backup_fail(self):
+ # FIXME aconway 2013-07-31: check exception types, reduce timeout.
+ cluster = HaCluster(
+ self, 2, test_store=True, s_args=[[],["--test-store-throw=bang"]])
+ c = cluster[0].connect()
+ tx = c.session(transactional=True)
+ s = tx.sender("q;{create:always,node:{durable:true}}")
+ for m in ["foo","bang","bar"]: s.send(Message(m, durable=True))
+ self.assertRaises(Exception, tx.commit)
+ for b in cluster: b.assert_browse_backup("q", [])
+ self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<abort tx=1>\n")
+ self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<abort tx=1>\n")
if __name__ == "__main__":
outdir = "ha_tests.tmp"
diff --git a/cpp/src/tests/test_store.cpp b/cpp/src/tests/test_store.cpp
index fc44889f33..e299161c68 100644
--- a/cpp/src/tests/test_store.cpp
+++ b/cpp/src/tests/test_store.cpp
@@ -63,12 +63,18 @@ struct TestStoreOptions : public Options {
string name;
string dump;
string events;
+ vector<string> throwMsg; // Throw exception if message content matches.
TestStoreOptions() : Options("Test Store Options") {
addOptions()
- ("test-store-name", optValue(name, "NAME"), "Name of test store instance.")
- ("test-store-dump", optValue(dump, "FILE"), "File to dump enqueued messages.")
- ("test-store-events", optValue(events, "FILE"), "File to log events, 1 line per event.")
+ ("test-store-name", optValue(name, "NAME"),
+ "Name of test store instance.")
+ ("test-store-dump", optValue(dump, "FILE"),
+ "File to dump enqueued messages.")
+ ("test-store-events", optValue(events, "FILE"),
+ "File to log events, 1 line per event.")
+ ("test-store-throw", optValue(throwMsg, "CONTENT"),
+ "Throw exception if message content matches.")
;
}
};
@@ -91,7 +97,8 @@ class TestStore : public NullMessageStore {
{
QPID_LOG(info, "TestStore name=" << name
<< " dump=" << options.dump
- << " events=" << options.events);
+ << " events=" << options.events
+ << " throw messages =" << options.throwMsg.size());
if (!options.dump.empty())
dump.reset(new ofstream(options.dump.c_str()));
@@ -148,7 +155,8 @@ class TestStore : public NullMessageStore {
const PersistableQueue& queue)
{
QPID_LOG(debug, "TestStore enqueue " << queue.getName());
- qpid::broker::amqp_0_10::MessageTransfer* msg = dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get());
+ qpid::broker::amqp_0_10::MessageTransfer* msg =
+ dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get());
assert(msg);
ostringstream o;
@@ -170,7 +178,11 @@ class TestStore : public NullMessageStore {
string data = msg->getFrames().getContent();
size_t i = string::npos;
size_t j = string::npos;
- if (strncmp(data.c_str(), TEST_STORE_DO.c_str(), strlen(TEST_STORE_DO.c_str())) == 0
+ const vector<string>& throwMsg(options.throwMsg);
+ if (find(throwMsg.begin(), throwMsg.end(), data) != throwMsg.end()) {
+ throw Exception(QPID_MSG("TestStore " << name << " throwing exception for: " << data));
+ }
+ else if (strncmp(data.c_str(), TEST_STORE_DO.c_str(), strlen(TEST_STORE_DO.c_str())) == 0
&& (i = data.find(name+"[")) != string::npos
&& (j = data.find("]", i)) != string::npos)
{