summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-11-12 16:58:52 +0000
committerAlan Conway <aconway@apache.org>2013-11-12 16:58:52 +0000
commit0630ea05003e6c530b9dde889e8296b12e67e41b (patch)
treed3c441f0071f398f9ad31f872a95363109bf9212
parentc9b6567bbd2167284d357f4021954e8e5f976b67 (diff)
downloadqpid-python-0630ea05003e6c530b9dde889e8296b12e67e41b.tar.gz
QPID-5275: HA transactions failing in qpid-cluster-benchmark
The test was failing due to incorrect handling of the transaction lifecycle: - Failing to handle the automatic rollback of the empty TX at session close. - Deleting the tx-q before all backups were finished with it. The fixes include - Make tx-q auto-delete, deleted only when the TxReplicators cancel their subscriptions. - Use markInUse/releaseFromUse on the primary to keep the tx-q until the primary is done. - Count TxReplicators for auto-delete (unlike normal QueueReplicators) - Improved error handling and log messages - Handle *incoming* exceptions on a federation link by passing to ErrorListener - QueueReplicator catches incoming not-found and resource-deleted exceptions - close the backup bridge, handle race between subscribe and delete. - Simplify QueueSnapshots, remove need for snapshot map. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1541146 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/CMakeLists.txt2
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.h1
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp1
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h4
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.cpp9
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.h30
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h2
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp13
-rw-r--r--qpid/cpp/src/qpid/ha/Event.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/Event.h14
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp170
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.h18
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp50
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h5
-rw-r--r--qpid/cpp/src/qpid/ha/QueueSnapshots.h32
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp95
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h146
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicatingSubscription.cpp45
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicatingSubscription.h50
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicator.cpp58
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicator.h4
-rw-r--r--qpid/cpp/src/tests/brokertest.py22
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py63
26 files changed, 552 insertions, 299 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index 1da108b727..248798977f 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -623,6 +623,8 @@ if (BUILD_HA)
qpid/ha/StandAlone.h
qpid/ha/StatusCheck.cpp
qpid/ha/StatusCheck.h
+ qpid/ha/TxReplicatingSubscription.cpp
+ qpid/ha/TxReplicatingSubscription.h
qpid/ha/PrimaryTxObserver.cpp
qpid/ha/PrimaryTxObserver.h
qpid/ha/types.cpp
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index 6b34898158..b7fbb1a9aa 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -442,6 +442,12 @@ void Bridge::executionException(
if (errorListener) errorListener->executionException(code, msg);
}
+void Bridge::incomingExecutionException(
+ framing::execution::ErrorCode code, const std::string& msg)
+{
+ if (errorListener) errorListener->incomingExecutionException(code, msg);
+}
+
void Bridge::detach() {
detached = true;
if (errorListener) errorListener->detach();
diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h
index 604a8473f3..3eac8f2af2 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.h
+++ b/qpid/cpp/src/qpid/broker/Bridge.h
@@ -111,6 +111,7 @@ class Bridge : public PersistableConfig,
void connectionException(framing::connection::CloseCode code, const std::string& msg);
void channelException(framing::session::DetachCode, const std::string& msg);
void executionException(framing::execution::ErrorCode, const std::string& msg);
+ void incomingExecutionException(framing::execution::ErrorCode, const std::string& msg);
void detach();
void setErrorListener(boost::shared_ptr<ErrorListener> e) { errorListener = e; }
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index fe1cac8aab..fc200adbea 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -273,6 +273,7 @@ class DetachedCallback : public SessionHandler::ErrorListener {
void connectionException(framing::connection::CloseCode, const std::string&) {}
void channelException(framing::session::DetachCode, const std::string&) {}
void executionException(framing::execution::ErrorCode, const std::string&) {}
+ void incomingExecutionException(framing::execution::ErrorCode, const std::string& ) {}
void detach() {}
private:
const std::string name;
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 4d5ea3b5b6..8d7acda673 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -206,9 +206,9 @@ void SemanticState::rollback()
{
if (!txBuffer)
throw CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
+ StartTxOnExit e(*this);
session.rollbackTx(); // Just to update statistics
txBuffer->rollback();
- startTx(); // Start a new TX automatically.
}
void SemanticState::selectDtx()
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index 4375a3f0f1..8fb796add7 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -125,6 +125,7 @@ class SemanticState : private boost::noncopyable {
SessionContext& getSession();
const SessionContext& getSession() const;
+ SessionState& getSessionState() { return session; }
const boost::shared_ptr<ConsumerImpl> find(const std::string& destination) const;
bool find(const std::string& destination, boost::shared_ptr<ConsumerImpl>&) const;
@@ -200,8 +201,9 @@ class SemanticStateConsumerImpl : public Consumer, public sys::OutputTask,
protected:
mutable qpid::sys::Mutex lock;
SemanticState* const parent;
- private:
const boost::shared_ptr<Queue> queue;
+
+ private:
const bool ackExpected;
const bool acquire;
bool blocked;
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
index 9a8110d54f..5b1a6aa267 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -536,16 +536,17 @@ void SessionAdapter::ExecutionHandlerImpl::result(const SequenceNumber& /*comman
//TODO: but currently never used client->server
}
-void SessionAdapter::ExecutionHandlerImpl::exception(uint16_t /*errorCode*/,
+void SessionAdapter::ExecutionHandlerImpl::exception(uint16_t errorCode,
const SequenceNumber& /*commandId*/,
uint8_t /*classCode*/,
uint8_t /*commandCode*/,
uint8_t /*fieldIndex*/,
- const std::string& /*description*/,
+ const std::string& description,
const framing::FieldTable& /*errorInfo*/)
{
- //TODO: again, not really used client->server but may be important
- //for inter-broker links
+ broker::SessionHandler* s = state.getSessionState().getHandler();
+ if (s) s->incomingExecutionException(
+ framing::execution::ErrorCode(errorCode), description);
}
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
index ff7693e93a..a8f5734af7 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
@@ -68,6 +68,13 @@ void SessionHandler::executionException(
errorListener->executionException(code, msg);
}
+void SessionHandler::incomingExecutionException(
+ framing::execution::ErrorCode code, const std::string& msg)
+{
+ if (errorListener)
+ errorListener->incomingExecutionException(code, msg);
+}
+
amqp_0_10::Connection& SessionHandler::getConnection() { return connection; }
const amqp_0_10::Connection& SessionHandler::getConnection() const { return connection; }
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h
index 3ee1538ccd..cb81084014 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.h
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.h
@@ -46,12 +46,23 @@ class SessionHandler : public qpid::amqp_0_10::SessionHandler {
class ErrorListener {
public:
virtual ~ErrorListener() {}
+
+ /** Called when there is an outgoing connection-exception */
virtual void connectionException(
framing::connection::CloseCode code, const std::string& msg) = 0;
+ /** Called when there is an outgoing channel-exception */
virtual void channelException(
framing::session::DetachCode, const std::string& msg) = 0;
+ /** Called when there is an outgoing execution-exception */
virtual void executionException(
framing::execution::ErrorCode, const std::string& msg) = 0;
+
+ /** Called when there is an incoming execution-exception.
+ * Useful for inter-broker bridges.
+ */
+ virtual void incomingExecutionException(
+ framing::execution::ErrorCode, const std::string& msg) = 0;
+
/** Called when it is safe to delete the ErrorListener. */
virtual void detach() = 0;
};
@@ -77,15 +88,18 @@ class SessionHandler : public qpid::amqp_0_10::SessionHandler {
void setErrorListener(boost::shared_ptr<ErrorListener> e) { errorListener = e; }
+ // Called by SessionAdapter
+ void incomingExecutionException(framing::execution::ErrorCode, const std::string& msg);
+
protected:
- virtual void setState(const std::string& sessionName, bool force);
- virtual qpid::SessionState* getState();
- virtual framing::FrameHandler* getInHandler();
- virtual void connectionException(framing::connection::CloseCode code, const std::string& msg);
- virtual void channelException(framing::session::DetachCode, const std::string& msg);
- virtual void executionException(framing::execution::ErrorCode, const std::string& msg);
- virtual void detaching();
- virtual void readyToSend();
+ void setState(const std::string& sessionName, bool force);
+ qpid::SessionState* getState();
+ framing::FrameHandler* getInHandler();
+ void connectionException(framing::connection::CloseCode code, const std::string& msg);
+ void channelException(framing::session::DetachCode, const std::string& msg);
+ void executionException(framing::execution::ErrorCode, const std::string& msg);
+ void detaching();
+ void readyToSend();
private:
struct SetChannelProxy : public framing::AMQP_ClientProxy { // Proxy that sets the channel.
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index 23dc3b897d..ca6d6bf530 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -82,6 +82,8 @@ class SessionState : public qpid::SessionState,
void attach(SessionHandler& handler);
void disableOutput();
+ SessionHandler* getHandler() { return handler; }
+
/** @pre isAttached() */
framing::AMQP_ClientProxy& getProxy();
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index a59c874594..d27d5e84b3 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -188,6 +188,12 @@ class BrokerReplicator::ErrorListener : public SessionHandler::ErrorListener {
void executionException(framing::execution::ErrorCode, const std::string& msg) {
QPID_LOG(error, logPrefix << "Execution error: " << msg);
}
+
+ void incomingExecutionException(
+ framing::execution::ErrorCode, const std::string& msg) {
+ QPID_LOG(error, logPrefix << "Incoming execution error: " << msg);
+ }
+
void detach() {
QPID_LOG(debug, logPrefix << "Session detached.");
}
@@ -453,7 +459,7 @@ void BrokerReplicator::route(Deliverable& msg) {
if (msg.getMessage().getPropertyAsString(QMF_CONTENT) == EVENT) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
Variant::Map& map = i->asMap();
- QPID_LOG(trace, "Broker replicator event: " << map);
+ QPID_LOG(debug, "Broker replicator event: " << map);
Variant::Map& schema = map[SCHEMA_ID].asMap();
Variant::Map& values = map[VALUES].asMap();
std::string key = (schema[PACKAGE_NAME].asString() +
@@ -465,7 +471,7 @@ void BrokerReplicator::route(Deliverable& msg) {
} else if (msg.getMessage().getPropertyAsString(QMF_OPCODE) == QUERY_RESPONSE) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
Variant::Map& map = i->asMap();
- QPID_LOG(trace, "Broker replicator response: " << map);
+ QPID_LOG(debug, "Broker replicator response: " << map);
string type = map[SCHEMA_ID].asMap()[CLASS_NAME].asString();
Variant::Map& values = map[VALUES].asMap();
framing::FieldTable args;
@@ -758,7 +764,7 @@ const string REPLICATE_DEFAULT="replicateDefault";
// Received the ha-broker configuration object for the primary broker.
void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
try {
- QPID_LOG(trace, logPrefix << "HA Broker response: " << values);
+ QPID_LOG(debug, logPrefix << "HA Broker response: " << values);
ReplicateLevel mine = haBroker.getSettings().replicateDefault.get();
ReplicateLevel primary = replicationTest.getLevel(values[REPLICATE_DEFAULT].asString());
if (mine != primary)
@@ -882,6 +888,7 @@ string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR;
void BrokerReplicator::disconnectedExchange(boost::shared_ptr<Exchange> ex) {
boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
+ // FIXME aconway 2013-11-01: move logic with releaseFromUse to QueueReplicator
if (qr) {
qr->disconnect();
if (TxReplicator::isTxQueue(qr->getQueue()->getName())) {
diff --git a/qpid/cpp/src/qpid/ha/Event.cpp b/qpid/cpp/src/qpid/ha/Event.cpp
index 8265a6edd3..ff336d0b2b 100644
--- a/qpid/cpp/src/qpid/ha/Event.cpp
+++ b/qpid/cpp/src/qpid/ha/Event.cpp
@@ -51,7 +51,7 @@ const string TxCommitEvent::KEY(QPID_HA+"txcom");
const string TxRollbackEvent::KEY(QPID_HA+"txrb");
const string TxPrepareOkEvent::KEY(QPID_HA+"txok");
const string TxPrepareFailEvent::KEY(QPID_HA+"txno");
-const string TxMembersEvent::KEY(QPID_HA+"txmem");
+const string TxBackupsEvent::KEY(QPID_HA+"txmem");
broker::Message makeMessage(
const string& data, const string& destination, const string& routingKey)
diff --git a/qpid/cpp/src/qpid/ha/Event.h b/qpid/cpp/src/qpid/ha/Event.h
index f292499e7b..7b96e36f64 100644
--- a/qpid/cpp/src/qpid/ha/Event.h
+++ b/qpid/cpp/src/qpid/ha/Event.h
@@ -178,14 +178,14 @@ struct TxPrepareFailEvent : public EventBase<TxPrepareFailEvent> {
void print(std::ostream& o) const { o << broker; }
};
-struct TxMembersEvent : public EventBase<TxMembersEvent> {
+struct TxBackupsEvent : public EventBase<TxBackupsEvent> {
static const std::string KEY;
- UuidSet members;
- TxMembersEvent(const UuidSet& s=UuidSet()) : members(s) {}
- void encode(framing::Buffer& b) const { b.put(members); }
- void decode(framing::Buffer& b) { b.get(members); }
- size_t encodedSize() const { return members.encodedSize(); }
- void print(std::ostream& o) const { o << members; }
+ UuidSet backups;
+ TxBackupsEvent(const UuidSet& s=UuidSet()) : backups(s) {}
+ void encode(framing::Buffer& b) const { b.put(backups); }
+ void decode(framing::Buffer& b) { b.get(backups); }
+ size_t encodedSize() const { return backups.encodedSize(); }
+ void print(std::ostream& o) const { o << backups; }
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index 416bb329a6..a32334bcf9 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -30,6 +30,7 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
+#include "qpid/framing/reply_exceptions.h"
#include <boost/lexical_cast.hpp>
#include <algorithm>
@@ -40,8 +41,9 @@ class FieldTable;
namespace ha {
using namespace std;
-using namespace qpid::broker;
-using namespace qpid::framing;
+using namespace sys;
+using namespace broker;
+using namespace framing;
using types::Uuid;
// Exchange to receive prepare OK events.
@@ -51,6 +53,7 @@ class PrimaryTxObserver::Exchange : public broker::Exchange {
broker::Exchange(tx_->getExchangeName()),
tx(tx_)
{
+ args.setString(QPID_REPLICATE, printable(NONE).str()); // Set replication arg.
dispatch[TxPrepareOkEvent::KEY] =
boost::bind(&PrimaryTxObserver::txPrepareOkEvent, tx, _1);
dispatch[TxPrepareFailEvent::KEY] =
@@ -72,7 +75,7 @@ class PrimaryTxObserver::Exchange : public broker::Exchange {
private:
static const string TYPE_NAME;
typedef boost::function<void(const std::string&)> DispatchFn;
- typedef qpid::sys::unordered_map<std::string, DispatchFn> DispatchMap;
+ typedef unordered_map<std::string, DispatchFn> DispatchMap;
DispatchMap dispatch;
boost::shared_ptr<PrimaryTxObserver> tx;
@@ -83,50 +86,62 @@ const string PrimaryTxObserver::Exchange::TYPE_NAME(string(QPID_HA_PREFIX)+"prim
PrimaryTxObserver::PrimaryTxObserver(
Primary& p, HaBroker& hb, const boost::intrusive_ptr<broker::TxBuffer>& tx
) :
+ state(SENDING),
primary(p), haBroker(hb), broker(hb.getBroker()),
replicationTest(hb.getSettings().replicateDefault.get()),
txBuffer(tx),
id(true),
- exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()),
- complete(false)
+ exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str())
{
logPrefix = "Primary transaction "+shortStr(id)+": ";
// The brokers known at this point are the ones that will be included
// in the transaction. Brokers that join later are not included.
//
- BrokerInfo::Set backups(haBroker.getMembership().otherBackups());
- std::transform(backups.begin(), backups.end(), inserter(members, members.begin()),
+ BrokerInfo::Set backups_(haBroker.getMembership().otherBackups());
+ std::transform(backups_.begin(), backups_.end(), inserter(backups, backups.begin()),
boost::bind(&BrokerInfo::getSystemId, _1));
+ // Delay completion of TX untill all backups have responded to prepare.
+ incomplete = backups;
+ for (size_t i = 0; i < incomplete.size(); ++i)
+ txBuffer->startCompleter();
+
QPID_LOG(debug, logPrefix << "Started TX " << id);
- QPID_LOG(debug, logPrefix << "Members: " << members);
- unprepared = unfinished = members;
+ QPID_LOG(debug, logPrefix << "Backups: " << backups);
+}
+void PrimaryTxObserver::initialize() {
+ boost::shared_ptr<Exchange> ex(new Exchange(shared_from_this()));
+ broker.getExchanges().registerExchange(ex);
pair<QueuePtr, bool> result =
broker.getQueues().declare(
exchangeName, QueueSettings(/*durable*/false, /*autodelete*/true));
- assert(result.second);
+ if (!result.second)
+ throw InvalidArgumentException(
+ QPID_MSG(logPrefix << "TX replication queue already exists."));
txQueue = result.first;
- txQueue->deliver(TxMembersEvent(members).message());
+ txQueue->markInUse(true); // Prevent auto-delete till we are done.
+ txQueue->deliver(TxBackupsEvent(backups).message());
+
}
+
PrimaryTxObserver::~PrimaryTxObserver() {
QPID_LOG(debug, logPrefix << "Ended");
}
-void PrimaryTxObserver::initialize() {
- boost::shared_ptr<Exchange> ex(new Exchange(shared_from_this()));
- FieldTable args = ex->getArgs();
- args.setString(QPID_REPLICATE, printable(NONE).str()); // Set replication arg.
- broker.getExchanges().registerExchange(ex);
+void PrimaryTxObserver::checkState(State expect, const std::string& msg) {
+ if (state != expect)
+ throw IllegalStateException(QPID_MSG(logPrefix << "Illegal state: " << msg));
}
void PrimaryTxObserver::enqueue(const QueuePtr& q, const broker::Message& m)
{
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues.
QPID_LOG(trace, logPrefix << "Enqueue: " << LogMessageId(*q, m));
+ checkState(SENDING, "Too late for enqueue");
enqueues[q] += m.getReplicationId();
txQueue->deliver(TxEnqueueEvent(q->getName(), m.getReplicationId()).message());
txQueue->deliver(m);
@@ -136,7 +151,8 @@ void PrimaryTxObserver::enqueue(const QueuePtr& q, const broker::Message& m)
void PrimaryTxObserver::dequeue(
const QueuePtr& q, QueuePosition pos, ReplicationId id)
{
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
+ checkState(SENDING, "Too late for dequeue");
if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues.
QPID_LOG(trace, logPrefix << "Dequeue: " << LogMessageId(*q, pos, id));
txQueue->deliver(TxDequeueEvent(q->getName(), id).message());
@@ -163,14 +179,14 @@ struct Skip {
} // namespace
bool PrimaryTxObserver::prepare() {
- QPID_LOG(debug, logPrefix << "Prepare " << members);
+ QPID_LOG(debug, logPrefix << "Prepare " << backups);
vector<Skip> skips;
{
- sys::Mutex::ScopedLock l(lock);
- for (size_t i = 0; i < members.size(); ++i) txBuffer->startCompleter();
-
+ Mutex::ScopedLock l(lock);
+ checkState(SENDING, "Too late for prepare");
+ state = PREPARING;
// Tell replicating subscriptions to skip IDs in the transaction.
- for (UuidSet::iterator b = members.begin(); b != members.end(); ++b)
+ for (UuidSet::iterator b = backups.begin(); b != backups.end(); ++b)
for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q)
skips.push_back(Skip(*b, q->first, q->second));
}
@@ -183,69 +199,91 @@ bool PrimaryTxObserver::prepare() {
void PrimaryTxObserver::commit() {
QPID_LOG(debug, logPrefix << "Commit");
- sys::Mutex::ScopedLock l(lock);
- txQueue->deliver(TxCommitEvent().message());
- complete = true;
- end(l);
+ Mutex::ScopedLock l(lock);
+ checkState(PREPARING, "Cannot commit, not preparing");
+ if (incomplete.size() == 0) {
+ txQueue->deliver(TxCommitEvent().message());
+ end(l);
+ } else {
+ txQueue->deliver(TxRollbackEvent().message());
+ end(l);
+ throw PreconditionFailedException(
+ QPID_MSG(logPrefix << "Cannot commit, " << incomplete.size()
+ << " incomplete backups"));
+ }
}
void PrimaryTxObserver::rollback() {
QPID_LOG(debug, logPrefix << "Rollback");
- sys::Mutex::ScopedLock l(lock);
- txQueue->deliver(TxRollbackEvent().message());
- complete = true;
- end(l);
-}
-
-void PrimaryTxObserver::end(sys::Mutex::ScopedLock&) {
- // Don't destroy the tx-queue until the transaction is complete and there
- // are no connected subscriptions.
- if (txBuffer && complete && unfinished.empty()) {
- txBuffer = 0; // Break pointer cycle.
- try {
- haBroker.getBroker().deleteQueue(txQueue->getName(), haBroker.getUserId(), string());
- } catch (const std::exception& e) {
- QPID_LOG(error, logPrefix << "Deleting transaction queue: " << e.what());
- }
- try {
- broker.getExchanges().destroy(getExchangeName());
- } catch (const std::exception& e) {
- QPID_LOG(error, logPrefix << "Deleting transaction exchange: " << e.what());
- }
+ Mutex::ScopedLock l(lock);
+ if (state != ENDED) {
+ txQueue->deliver(TxRollbackEvent().message());
+ end(l);
}
}
+void PrimaryTxObserver::end(Mutex::ScopedLock&) {
+ if (state == ENDED) return;
+ state = ENDED;
+ // If there are no outstanding completions, break pointer cycle here.
+ // Otherwise break it in cancel() when the remaining completions are done.
+ if (incomplete.empty()) txBuffer = 0;
+ txQueue->releaseFromUse(true); // txQueue will auto-delete
+ txQueue.reset();
+ try {
+ broker.getExchanges().destroy(getExchangeName());
+ } catch (const std::exception& e) {
+ QPID_LOG(error, logPrefix << "Deleting transaction exchange: " << e.what());
+ }
+}
+
+bool PrimaryTxObserver::completed(const Uuid& id, Mutex::ScopedLock&) {
+ if (incomplete.erase(id)) {
+ txBuffer->finishCompleter();
+ return true;
+ }
+ return false;
+}
+
+bool PrimaryTxObserver::error(const Uuid& id, const char* msg, Mutex::ScopedLock& l)
+{
+ if (incomplete.find(id) != incomplete.end()) {
+ // Note: setError before completed since completed may trigger completion.
+ txBuffer->setError(QPID_MSG(logPrefix << msg << id));
+ completed(id, l);
+ return true;
+ }
+ return false;
+}
+
void PrimaryTxObserver::txPrepareOkEvent(const string& data) {
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
types::Uuid backup = decodeStr<TxPrepareOkEvent>(data).broker;
- if (unprepared.erase(backup)) {
+ if (completed(backup, l)) {
QPID_LOG(debug, logPrefix << "Backup prepared ok: " << backup);
- txBuffer->finishCompleter();
+ } else {
+ QPID_LOG(error, logPrefix << "Unexpected prepare-ok response from " << backup);
}
}
void PrimaryTxObserver::txPrepareFailEvent(const string& data) {
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
types::Uuid backup = decodeStr<TxPrepareFailEvent>(data).broker;
- if (unprepared.erase(backup)) {
- QPID_LOG(error, logPrefix << "Prepare failed on backup: " << backup);
- txBuffer->setError(
- QPID_MSG(logPrefix << "Prepare failed on backup: " << backup));
- txBuffer->finishCompleter();
+ if (error(backup, "Prepare failed on backup: ", l)) {
+ QPID_LOG(error, logPrefix << "Prepare failed on backup " << backup);
+ } else {
+ QPID_LOG(error, logPrefix << "Unexpected prepare-fail response from " << backup);
}
}
void PrimaryTxObserver::cancel(const ReplicatingSubscription& rs) {
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
types::Uuid backup = rs.getBrokerInfo().getSystemId();
- if (unprepared.erase(backup) ){
- complete = true; // Cancelled before prepared.
- txBuffer->setError(
- QPID_MSG(logPrefix << "Backup disconnected: " << rs.getBrokerInfo()));
- txBuffer->finishCompleter();
- }
- unfinished.erase(backup);
- end(l);
+ QPID_LOG(debug, logPrefix << "Backup disconnected: " << backup);
+ // Normally the backup should be completed before it is cancelled.
+ if (completed(backup, l)) error(backup, "Unexpected disconnect:", l);
+ // Break the pointer cycle if backups have completed and we are done with txBuffer.
+ if (state == ENDED && incomplete.empty()) txBuffer = 0;
}
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
index cd6c88ad41..31b2b84b0a 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
@@ -90,13 +90,21 @@ class PrimaryTxObserver : public broker::TransactionObserver,
typedef qpid::sys::unordered_map<
QueuePtr, ReplicationIdSet, Hasher<QueuePtr> > QueueIdsMap;
- void membership(const BrokerInfo::Map&);
+ enum State {
+ SENDING, ///< Sending TX messages and acks
+ PREPARING, ///< Prepare sent, waiting for response
+ ENDED ///< Commit or rollback sent, local transaction ended.
+ };
+
+ void checkState(State expect, const std::string& msg);
void end(sys::Mutex::ScopedLock&);
void txPrepareOkEvent(const std::string& data);
void txPrepareFailEvent(const std::string& data);
-
+ bool completed(const types::Uuid& id, sys::Mutex::ScopedLock&);
+ bool error(const types::Uuid& id, const char* msg, sys::Mutex::ScopedLock& l);
sys::Monitor lock;
+ State state;
std::string logPrefix;
Primary& primary;
HaBroker& haBroker;
@@ -110,10 +118,8 @@ class PrimaryTxObserver : public broker::TransactionObserver,
std::string exchangeName;
QueuePtr txQueue;
QueueIdsMap enqueues;
- bool complete;
- UuidSet members; // All members of transaction.
- UuidSet unprepared; // Members that have not yet responded to prepare.
- UuidSet unfinished; // Members that have not yet disconnected.
+ UuidSet backups; // All backups of transaction.
+ UuidSet incomplete; // Incomplete backups (not yet responded to prepare)
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 8037559c3d..cc6c8a3f30 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -46,18 +46,13 @@ namespace qpid {
namespace ha {
using namespace broker;
using namespace framing;
+using namespace framing::execution;
using namespace std;
using std::exception;
using sys::Mutex;
const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency");
-namespace {
-const string QPID_HA(QPID_HA_PREFIX);
-const std::string TYPE_NAME(QPID_HA+"queue-replicator");
-}
-
-
std::string QueueReplicator::replicatorName(const std::string& queueName) {
return QUEUE_REPLICATOR_PREFIX + queueName;
}
@@ -68,20 +63,21 @@ bool QueueReplicator::isReplicatorName(const std::string& name) {
class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
public:
- ErrorListener(const std::string& prefix) : logPrefix(prefix) {}
- void connectionException(framing::connection::CloseCode, const std::string& msg) {
- QPID_LOG(error, logPrefix << "Connection error: " << msg);
- }
- void channelException(framing::session::DetachCode, const std::string& msg) {
- QPID_LOG(error, logPrefix << "Channel error: " << msg);
- }
- void executionException(framing::execution::ErrorCode, const std::string& msg) {
- QPID_LOG(error, logPrefix << "Execution error: " << msg);
+ ErrorListener(const boost::shared_ptr<QueueReplicator>& qr)
+ : queueReplicator(qr), logPrefix(qr->logPrefix) {}
+
+ void connectionException(framing::connection::CloseCode, const std::string&) {}
+ void channelException(framing::session::DetachCode, const std::string&) {}
+ void executionException(framing::execution::ErrorCode, const std::string&) {}
+
+ void incomingExecutionException(ErrorCode e, const std::string& msg) {
+ queueReplicator->incomingExecutionException(e, msg);
}
void detach() {
QPID_LOG(debug, logPrefix << "Session detached");
}
private:
+ boost::shared_ptr<QueueReplicator> queueReplicator;
std::string logPrefix;
};
@@ -128,6 +124,8 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
boost::bind(&QueueReplicator::idEvent, this, _1, _2);
}
+QueueReplicator::~QueueReplicator() {}
+
// This must be called immediately after the constructor.
// It has to be separate so we can call shared_from_this().
void QueueReplicator::activate() {
@@ -161,7 +159,7 @@ void QueueReplicator::activate() {
);
bridge = result.first;
bridge->setErrorListener(
- boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix)));
+ boost::shared_ptr<ErrorListener>(new ErrorListener(shared_from_this())));
// Enable callback to destroy()
queue->addObserver(
@@ -173,8 +171,6 @@ void QueueReplicator::disconnect() {
sessionHandler = 0;
}
-QueueReplicator::~QueueReplicator() {}
-
// Called from Queue::destroyed()
void QueueReplicator::destroy() {
boost::shared_ptr<Bridge> bridge2; // To call outside of lock
@@ -200,7 +196,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
AMQP_ServerProxy peer(sessionHandler->out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
FieldTable arguments;
- arguments.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
+ arguments.setString(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, getType());
arguments.setInt(QPID_SYNC_FREQUENCY, 1); // TODO aconway 2012-05-22: optimize?
arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO, brokerInfo.asFieldTable());
arguments.setString(ReplicatingSubscription::QPID_ID_SET,
@@ -289,12 +285,26 @@ ReplicationId QueueReplicator::getMaxId() {
return maxId;
}
+void QueueReplicator::incomingExecutionException(ErrorCode e, const std::string& msg) {
+ if (e == ERROR_CODE_NOT_FOUND || e == ERROR_CODE_RESOURCE_DELETED) {
+ // If the queue is destroyed at the same time we are subscribing, we may
+ // get a not-found or resource-deleted exception before the
+ // BrokerReplicator gets the queue-delete event. Shut down the bridge by
+ // calling destroy(), we can let the BrokerReplicator delete the queue
+ // when the queue-delete arrives.
+ QPID_LOG(debug, logPrefix << "Deleted on primary: " << msg);
+ destroy();
+ }
+ else
+ QPID_LOG(error, logPrefix << "Incoming execution exception: " << msg);
+}
+
// Unused Exchange methods.
bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; }
bool QueueReplicator::hasBindings() { return false; }
-std::string QueueReplicator::getType() const { return TYPE_NAME; }
+std::string QueueReplicator::getType() const { return ReplicatingSubscription::QPID_QUEUE_REPLICATOR; }
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index cbb36757f6..6fd140fde3 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -118,6 +118,9 @@ class QueueReplicator : public broker::Exchange,
void dequeueEvent(const std::string& data, sys::Mutex::ScopedLock&);
void idEvent(const std::string& data, sys::Mutex::ScopedLock&);
+ void incomingExecutionException(framing::execution::ErrorCode e,
+ const std::string& msg);
+
std::string logPrefix;
std::string bridgeName;
@@ -127,6 +130,8 @@ class QueueReplicator : public broker::Exchange,
ReplicationIdSet idSet; // Set of replicationIds on the queue.
ReplicationId nextId; // ID for next message to arrive.
ReplicationId maxId; // Max ID used so far.
+
+ friend class ErrorListener;
};
diff --git a/qpid/cpp/src/qpid/ha/QueueSnapshots.h b/qpid/cpp/src/qpid/ha/QueueSnapshots.h
index d067b983d1..e49fe5c7e7 100644
--- a/qpid/cpp/src/qpid/ha/QueueSnapshots.h
+++ b/qpid/cpp/src/qpid/ha/QueueSnapshots.h
@@ -32,6 +32,7 @@
#include "qpid/sys/Mutex.h"
#include <boost/shared_ptr.hpp>
+#include <boost/make_shared.hpp>
namespace qpid {
namespace ha {
@@ -44,35 +45,28 @@ class QueueSnapshots : public broker::BrokerObserver
{
public:
boost::shared_ptr<QueueSnapshot> get(const boost::shared_ptr<broker::Queue>& q) const {
- sys::Mutex::ScopedLock l(lock);
- SnapshotMap::const_iterator i = snapshots.find(q);
- return i != snapshots.end() ? i->second : boost::shared_ptr<QueueSnapshot>();
+ boost::shared_ptr<QueueSnapshot> qs;
+ q->eachObserver(
+ boost::bind(QueueSnapshots::saveQueueSnapshot, _1, boost::ref(qs)));
+ return qs;
}
// BrokerObserver overrides.
void queueCreate(const boost::shared_ptr<broker::Queue>& q) {
- sys::Mutex::ScopedLock l(lock);
- boost::shared_ptr<QueueSnapshot> observer(new QueueSnapshot);
- snapshots[q] = observer;
- q->addObserver(observer);
+ q->addObserver(boost::make_shared<QueueSnapshot>());
}
void queueDestroy(const boost::shared_ptr<broker::Queue>& q) {
- sys::Mutex::ScopedLock l(lock);
- SnapshotMap::iterator i = snapshots.find(q);
- if (i != snapshots.end()) {
- q->removeObserver(i->second);
- snapshots.erase(i);
- }
+ q->removeObserver(get(q));
}
private:
- typedef qpid::sys::unordered_map<boost::shared_ptr<broker::Queue>,
- boost::shared_ptr<QueueSnapshot>,
- Hasher<boost::shared_ptr<broker::Queue> >
- > SnapshotMap;
- SnapshotMap snapshots;
- mutable sys::Mutex lock;
+ static void saveQueueSnapshot(
+ const boost::shared_ptr<broker::QueueObserver>& observer,
+ boost::shared_ptr<QueueSnapshot>& out)
+ {
+ if (!out) out = boost::dynamic_pointer_cast<QueueSnapshot>(observer);
+ }
};
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index cdfe9dd888..d0b93da85f 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -25,15 +25,16 @@
#include "QueueReplicator.h"
#include "QueueSnapshots.h"
#include "ReplicatingSubscription.h"
+#include "TxReplicatingSubscription.h"
#include "Primary.h"
#include "HaBroker.h"
#include "qpid/assert.h"
#include "qpid/broker/Queue.h"
-#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
#include "qpid/types/Uuid.h"
#include <sstream>
@@ -47,22 +48,12 @@ using namespace broker;
using namespace std;
using sys::Mutex;
using broker::amqp_0_10::MessageTransfer;
-
-const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.ha-replicating-subscription");
-const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.ha-broker-info");
-const string ReplicatingSubscription::QPID_ID_SET("qpid.ha-info");
-
-class ReplicatingSubscription::QueueObserver : public broker::QueueObserver {
- public:
- QueueObserver(ReplicatingSubscription& rs_) : rs(rs_) {}
- void enqueued(const broker::Message&) {}
- void dequeued(const broker::Message& m) { rs.dequeued(m.getReplicationId()); }
- void acquired(const broker::Message&) {}
- void requeued(const broker::Message&) {}
- private:
- ReplicatingSubscription& rs;
-};
-
+namespace { const string QPID_HA(QPID_HA_PREFIX); }
+const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION(QPID_HA+"repsub");
+const string ReplicatingSubscription::QPID_BROKER_INFO(QPID_HA+"info");
+const string ReplicatingSubscription::QPID_ID_SET(QPID_HA+"ids");
+const string ReplicatingSubscription::QPID_QUEUE_REPLICATOR(QPID_HA+"qrep");
+const string ReplicatingSubscription::QPID_TX_REPLICATOR(QPID_HA+"txrep");
/* Called by SemanticState::consume to create a consumer */
boost::shared_ptr<broker::SemanticState::ConsumerImpl>
@@ -79,13 +70,20 @@ ReplicatingSubscription::Factory::create(
const framing::FieldTable& arguments
) {
boost::shared_ptr<ReplicatingSubscription> rs;
- if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) {
+ std::string type = arguments.getAsString(QPID_REPLICATING_SUBSCRIPTION);
+ if (type == QPID_QUEUE_REPLICATOR) {
rs.reset(new ReplicatingSubscription(
haBroker,
parent, name, queue, ack, acquire, exclusive, tag,
resumeId, resumeTtl, arguments));
- rs->initialize();
}
+ else if (type == QPID_TX_REPLICATOR) {
+ rs.reset(new TxReplicatingSubscription(
+ haBroker,
+ parent, name, queue, ack, acquire, exclusive, tag,
+ resumeId, resumeTtl, arguments));
+ }
+ if (rs) rs->initialize();
return rs;
}
@@ -100,7 +98,7 @@ ReplicatingSubscription::ReplicatingSubscription(
HaBroker& hb,
SemanticState* parent,
const string& name,
- Queue::shared_ptr queue,
+ Queue::shared_ptr queue_,
bool ack,
bool /*acquire*/,
bool exclusive,
@@ -108,16 +106,22 @@ ReplicatingSubscription::ReplicatingSubscription(
const string& resumeId,
uint64_t resumeTtl,
const framing::FieldTable& arguments
-) : ConsumerImpl(parent, name, queue, ack, REPLICATOR, exclusive, tag,
+) : ConsumerImpl(parent, name, queue_, ack, REPLICATOR, exclusive, tag,
resumeId, resumeTtl, arguments),
position(0), ready(false), cancelled(false),
haBroker(hb),
primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole()))
-{
+{}
+
+// Called in subscription's connection thread when the subscription is created.
+// Separate from ctor because we need to use shared_from_this
+//
+void ReplicatingSubscription::initialize() {
try {
FieldTable ft;
- if (!arguments.getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft))
- throw Exception("Replicating subscription does not have broker info: " + tag);
+ if (!getArguments().getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft))
+ throw InvalidArgumentException(
+ logPrefix+"Can't subscribe, no broker info: "+getTag());
info.assign(ft);
// Set a log prefix message that identifies the remote broker.
@@ -147,10 +151,17 @@ ReplicatingSubscription::ReplicatingSubscription(
// However we must attach the observer _before_ we snapshot for
// initial dequeues to be sure we don't miss any dequeues
// between the snapshot and attaching the observer.
- observer.reset(new QueueObserver(*this));
- queue->addObserver(observer);
- ReplicationIdSet primaryIds = haBroker.getQueueSnapshots()->get(queue)->snapshot();
- std::string backupStr = arguments.getAsString(ReplicatingSubscription::QPID_ID_SET);
+ queue->addObserver(
+ boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
+ boost::shared_ptr<QueueSnapshot> snapshot = haBroker.getQueueSnapshots()->get(queue);
+ // There may be no snapshot if the queue is being deleted concurrently.
+ if (!snapshot) {
+ queue->removeObserver(
+ boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
+ throw ResourceDeletedException(logPrefix+"Can't subscribe, queue deleted");
+ }
+ ReplicationIdSet primaryIds = snapshot->snapshot();
+ std::string backupStr = getArguments().getAsString(ReplicatingSubscription::QPID_ID_SET);
ReplicationIdSet backupIds;
if (!backupStr.empty()) backupIds = decodeStr<ReplicationIdSet>(backupStr);
@@ -172,23 +183,7 @@ ReplicatingSubscription::ReplicatingSubscription(
<< ", on backup " << skip);
checkReady(l);
}
- }
- catch (const std::exception& e) {
- QPID_LOG(error, logPrefix << "Creation error: " << e.what()
- << ": arguments=" << getArguments());
- throw;
- }
-}
-ReplicatingSubscription::~ReplicatingSubscription() {}
-
-
-// Called in subscription's connection thread when the subscription is created.
-// Called separate from ctor because sending events requires
-// shared_from_this
-//
-void ReplicatingSubscription::initialize() {
- try {
if (primary) primary->addReplica(*this);
Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently.
// Send initial dequeues to the backup.
@@ -196,12 +191,14 @@ void ReplicatingSubscription::initialize() {
sendDequeueEvent(l);
}
catch (const std::exception& e) {
- QPID_LOG(error, logPrefix << "Initialization error: " << e.what()
- << ": arguments=" << getArguments());
+ QPID_LOG(error, logPrefix << "Subscribe failed: " << e.what());
throw;
}
}
+ReplicatingSubscription::~ReplicatingSubscription() {}
+
+
// True if the next position for the ReplicatingSubscription is a guarded position.
bool ReplicatingSubscription::isGuarded(sys::Mutex::ScopedLock&) {
return position+1 >= guard->getFirst();
@@ -258,7 +255,8 @@ void ReplicatingSubscription::cancel()
}
QPID_LOG(debug, logPrefix << "Cancelled");
if (primary) primary->removeReplica(*this);
- getQueue()->removeObserver(observer);
+ getQueue()->removeObserver(
+ boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
guard->cancel();
ConsumerImpl::cancel();
}
@@ -289,8 +287,9 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock& l)
// Called after the message has been removed
// from the deque and under the messageLock in the queue. Called in
// arbitrary connection threads.
-void ReplicatingSubscription::dequeued(ReplicationId id)
+void ReplicatingSubscription::dequeued(const broker::Message& m)
{
+ ReplicationId id = m.getReplicationId();
QPID_LOG(trace, logPrefix << "Dequeued ID " << id);
{
Mutex::ScopedLock l(lock);
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index 71993bcb12..0df6f0b411 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -25,6 +25,8 @@
#include "BrokerInfo.h"
#include "qpid/broker/SemanticState.h"
#include "qpid/broker/ConsumerFactory.h"
+#include "qpid/broker/QueueObserver.h"
+#include <boost/enable_shared_from_this.hpp>
#include <iosfwd>
namespace qpid {
@@ -65,81 +67,91 @@ class Primary;
*
* ReplicatingSubscription makes calls on QueueGuard, but not vice-versa.
*/
-class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
+class ReplicatingSubscription :
+ public broker::SemanticState::ConsumerImpl,
+ public broker::QueueObserver
{
- public:
- typedef broker::SemanticState::ConsumerImpl ConsumerImpl;
-
- class Factory : public broker::ConsumerFactory {
- public:
- Factory(HaBroker& hb) : haBroker(hb) {}
-
- HaBroker& getHaBroker() const { return haBroker; }
-
- boost::shared_ptr<broker::SemanticState::ConsumerImpl> create(
- broker::SemanticState* parent,
- const std::string& name, boost::shared_ptr<broker::Queue> ,
- bool ack, bool acquire, bool exclusive, const std::string& tag,
- const std::string& resumeId, uint64_t resumeTtl,
- const framing::FieldTable& arguments);
- private:
- HaBroker& haBroker;
- };
-
- // Argument names for consume command.
- static const std::string QPID_REPLICATING_SUBSCRIPTION;
- static const std::string QPID_BROKER_INFO;
- static const std::string QPID_ID_SET;
-
- ReplicatingSubscription(HaBroker& haBroker,
+public:
+typedef broker::SemanticState::ConsumerImpl ConsumerImpl;
+
+class Factory : public broker::ConsumerFactory {
+public:
+Factory(HaBroker& hb) : haBroker(hb) {}
+
+HaBroker& getHaBroker() const { return haBroker; }
+
+boost::shared_ptr<broker::SemanticState::ConsumerImpl> create(
+broker::SemanticState* parent,
+ const std::string& name, boost::shared_ptr<broker::Queue> ,
+ bool ack, bool acquire, bool exclusive, const std::string& tag,
+ const std::string& resumeId, uint64_t resumeTtl,
+ const framing::FieldTable& arguments);
+private:
+HaBroker& haBroker;
+};
+
+// Argument names for consume command.
+static const std::string QPID_REPLICATING_SUBSCRIPTION;
+static const std::string QPID_BROKER_INFO;
+static const std::string QPID_ID_SET;
+// Replicator types: argument values for QPID_REPLICATING_SUBSCRIPTION argument.
+static const std::string QPID_QUEUE_REPLICATOR;
+static const std::string QPID_TX_REPLICATOR;
+
+ReplicatingSubscription(HaBroker& haBroker,
broker::SemanticState* parent,
const std::string& name, boost::shared_ptr<broker::Queue> ,
bool ack, bool acquire, bool exclusive, const std::string& tag,
const std::string& resumeId, uint64_t resumeTtl,
const framing::FieldTable& arguments);
- ~ReplicatingSubscription();
-
-
- // Consumer overrides.
- bool deliver(const broker::QueueCursor& cursor, const broker::Message& msg);
- void cancel();
- void acknowledged(const broker::DeliveryRecord&);
- bool browseAcquired() const { return true; }
- // Hide the "queue deleted" error for a ReplicatingSubscription when a
- // queue is deleted, this is normal and not an error.
- bool hideDeletedError() { return true; }
- // Not counted for auto deletion and immediate message purposes.
- bool isCounted() { return false; }
-
- /** Initialization that must be done separately from construction
- * because it requires a shared_ptr to this to exist.
- */
- void initialize();
-
- BrokerInfo getBrokerInfo() const { return info; }
-
- /** Skip replicating enqueue of of ids. */
- void addSkip(const ReplicationIdSet& ids);
-
- protected:
- bool doDispatch();
-
- private:
- class QueueObserver;
- friend class QueueObserver;
-
- std::string logPrefix;
- QueuePosition position;
- ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event.
- ReplicationIdSet skip; // Skip enqueues: messages already on backup and tx enqueues.
- ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged.
- bool ready;
- bool cancelled;
- BrokerInfo info;
- boost::shared_ptr<QueueGuard> guard;
+~ReplicatingSubscription();
+
+
+// Consumer overrides.
+bool deliver(const broker::QueueCursor& cursor, const broker::Message& msg);
+void cancel();
+void acknowledged(const broker::DeliveryRecord&);
+bool browseAcquired() const { return true; }
+// Hide the "queue deleted" error for a ReplicatingSubscription when a
+// queue is deleted, this is normal and not an error.
+bool hideDeletedError() { return true; }
+
+// QueueObserver overrides
+void enqueued(const broker::Message&) {}
+void dequeued(const broker::Message&);
+void acquired(const broker::Message&) {}
+void requeued(const broker::Message&) {}
+
+/** A ReplicatingSubscription is a passive observer, not counted for auto
+ * deletion and immediate message purposes.
+ */
+bool isCounted() { return false; }
+
+/** Initialization that must be done separately from construction
+ * because it requires a shared_ptr to this to exist.
+ */
+void initialize();
+
+BrokerInfo getBrokerInfo() const { return info; }
+
+/** Skip replicating enqueue of of ids. */
+void addSkip(const ReplicationIdSet& ids);
+
+protected:
+bool doDispatch();
+
+private:
+std::string logPrefix;
+QueuePosition position;
+ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event.
+ReplicationIdSet skip; // Skip enqueues: messages already on backup and tx enqueues.
+ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged.
+bool ready;
+bool cancelled;
+BrokerInfo info;
+boost::shared_ptr<QueueGuard> guard;
HaBroker& haBroker;
- boost::shared_ptr<QueueObserver> observer;
boost::shared_ptr<Primary> primary;
bool isGuarded(sys::Mutex::ScopedLock&);
diff --git a/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.cpp
new file mode 100644
index 0000000000..15b33fe89d
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.cpp
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "TxReplicatingSubscription.h"
+
+namespace qpid {
+namespace ha {
+using namespace std;
+using namespace broker;
+
+TxReplicatingSubscription::TxReplicatingSubscription(
+ HaBroker& hb,
+ SemanticState* parent,
+ const string& name,
+ boost::shared_ptr<Queue> queue,
+ bool ack,
+ bool acquire,
+ bool exclusive,
+ const string& tag,
+ const string& resumeId,
+ uint64_t resumeTtl,
+ const framing::FieldTable& arguments
+) : ReplicatingSubscription(hb, parent, name, queue, ack, acquire, exclusive, tag,
+ resumeId, resumeTtl, arguments)
+{}
+
+}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.h
new file mode 100644
index 0000000000..a363d262a0
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.h
@@ -0,0 +1,50 @@
+#ifndef QPID_HA_TXREPLICATINGSUBSCRIPTION_H
+#define QPID_HA_TXREPLICATINGSUBSCRIPTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ReplicatingSubscription.h"
+
+namespace qpid {
+namespace ha {
+
+/**
+ * Replicating subscription for a TX queue.
+ */
+class TxReplicatingSubscription : public ReplicatingSubscription
+{
+ public:
+ TxReplicatingSubscription(HaBroker& haBroker,
+ broker::SemanticState* parent,
+ const std::string& name, boost::shared_ptr<broker::Queue> ,
+ bool ack, bool acquire, bool exclusive, const std::string& tag,
+ const std::string& resumeId, uint64_t resumeTtl,
+ const framing::FieldTable& arguments);
+
+ /** A TxReplicatingSubscription is counted for auto-delete so we can clean
+ * up the TX queue when all backups are done.
+ */
+ bool isCounted() { return true; }
+};
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_TXREPLICATINGSUBSCRIPTION_H*/
diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.cpp b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
index 63301a92f5..95afdb9759 100644
--- a/qpid/cpp/src/qpid/ha/TxReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
@@ -26,6 +26,7 @@
#include "BrokerReplicator.h"
#include "Event.h"
#include "HaBroker.h"
+#include "ReplicatingSubscription.h"
#include "types.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Link.h"
@@ -53,10 +54,7 @@ using qpid::broker::amqp_0_10::MessageTransfer;
using qpid::types::Uuid;
namespace {
-const string QPID_HA(QPID_HA_PREFIX);
-const string TYPE_NAME(QPID_HA+"tx-replicator");
const string PREFIX(TRANSACTION_REPLICATOR_PREFIX);
-
} // namespace
@@ -70,17 +68,16 @@ string TxReplicator::getTxId(const string& q) {
return q.substr(PREFIX.size());
}
-string TxReplicator::getType() const { return TYPE_NAME; }
+string TxReplicator::getType() const { return ReplicatingSubscription::QPID_TX_REPLICATOR; }
TxReplicator::TxReplicator(
HaBroker& hb,
const boost::shared_ptr<broker::Queue>& txQueue,
const boost::shared_ptr<broker::Link>& link) :
QueueReplicator(hb, txQueue, link),
- txBuffer(new broker::TxBuffer),
store(hb.getBroker().hasStore() ? &hb.getBroker().getStore() : 0),
channel(link->nextChannel()),
- complete(false), ignore(false),
+ ended(false),
dequeueState(hb.getBroker().getQueues())
{
string id(getTxId(txQueue->getName()));
@@ -100,8 +97,8 @@ TxReplicator::TxReplicator(
boost::bind(&TxReplicator::commit, this, _1, _2);
dispatch[TxRollbackEvent::KEY] =
boost::bind(&TxReplicator::rollback, this, _1, _2);
- dispatch[TxMembersEvent::KEY] =
- boost::bind(&TxReplicator::members, this, _1, _2);
+ dispatch[TxBackupsEvent::KEY] =
+ boost::bind(&TxReplicator::backups, this, _1, _2);
}
TxReplicator::~TxReplicator() {
@@ -121,11 +118,12 @@ void TxReplicator::sendMessage(const broker::Message& msg, sys::Mutex::ScopedLoc
}
void TxReplicator::route(broker::Deliverable& deliverable) {
- if (!ignore) QueueReplicator::route(deliverable);
+ QueueReplicator::route(deliverable);
}
void TxReplicator::deliver(const broker::Message& m_) {
sys::Mutex::ScopedLock l(lock);
+ if (!txBuffer) return;
// Deliver message to the target queue, not the tx-queue.
broker::Message m(m_);
m.setReplicationId(enq.id); // Use replicated id.
@@ -138,6 +136,7 @@ void TxReplicator::deliver(const broker::Message& m_) {
void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) {
sys::Mutex::ScopedLock l(lock);
+ if (!txBuffer) return;
TxEnqueueEvent e;
decodeStr(data, e);
QPID_LOG(trace, logPrefix << "Enqueue: " << e);
@@ -146,6 +145,7 @@ void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) {
void TxReplicator::dequeue(const string& data, sys::Mutex::ScopedLock&) {
sys::Mutex::ScopedLock l(lock);
+ if (!txBuffer) return;
TxDequeueEvent e;
decodeStr(data, e);
QPID_LOG(trace, logPrefix << "Dequeue: " << e);
@@ -195,18 +195,20 @@ boost::shared_ptr<TxAccept> TxReplicator::DequeueState::makeAccept() {
}
void TxReplicator::prepare(const string&, sys::Mutex::ScopedLock& l) {
+ if (!txBuffer) return;
txBuffer->enlist(dequeueState.makeAccept());
context = store->begin();
if (txBuffer->prepare(context.get())) {
- QPID_LOG(debug, logPrefix << "Prepared OK");
+ QPID_LOG(debug, logPrefix << "Local prepare OK");
sendMessage(TxPrepareOkEvent(haBroker.getSystemId()).message(queue->getName()), l);
} else {
- QPID_LOG(debug, logPrefix << "Prepare failed");
+ QPID_LOG(debug, logPrefix << "Local prepare failed");
sendMessage(TxPrepareFailEvent(haBroker.getSystemId()).message(queue->getName()), l);
}
}
void TxReplicator::commit(const string&, sys::Mutex::ScopedLock& l) {
+ if (!txBuffer) return;
QPID_LOG(debug, logPrefix << "Commit");
if (context.get()) store->commit(*context);
txBuffer->commit();
@@ -214,34 +216,44 @@ void TxReplicator::commit(const string&, sys::Mutex::ScopedLock& l) {
}
void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock& l) {
+ if (!txBuffer) return;
QPID_LOG(debug, logPrefix << "Rollback");
if (context.get()) store->abort(*context);
txBuffer->rollback();
end(l);
}
-void TxReplicator::members(const string& data, sys::Mutex::ScopedLock&) {
- TxMembersEvent e;
+void TxReplicator::backups(const string& data, sys::Mutex::ScopedLock& l) {
+ TxBackupsEvent e;
decodeStr(data, e);
- QPID_LOG(debug, logPrefix << "Members: " << e.members);
- if (!e.members.count(haBroker.getMembership().getSelf().getSystemId())) {
+ if (!e.backups.count(haBroker.getMembership().getSelf().getSystemId())) {
QPID_LOG(info, logPrefix << "Not participating in transaction");
- ignore = true;
+ end(l);
+ } else {
+ QPID_LOG(debug, logPrefix << "Backups: " << e.backups);
+ txBuffer = new broker::TxBuffer;
}
}
void TxReplicator::end(sys::Mutex::ScopedLock&) {
- complete = true;
- if (!getQueue()) return; // Already destroyed
- // Destroy will cancel the subscription to the primary tx-queue which
- // informs the primary that we have completed the transaction.
- destroy();
+ ended = true;
+ txBuffer.reset();
+ // QueueReplicator::destroy cancels subscription to the primary tx-queue
+ // which allows the primary to clean up resources.
+ sys::Mutex::ScopedUnlock u(lock);
+ QueueReplicator::destroy();
}
+// Called when the tx queue is deleted.
void TxReplicator::destroy() {
+ {
+ sys::Mutex::ScopedLock l(lock);
+ if (!ended) {
+ QPID_LOG(error, logPrefix << "Destroyed prematurely, rollback.");
+ rollback(string(), l);
+ }
+ }
QueueReplicator::destroy();
- sys::Mutex::ScopedLock l(lock);
- if (!ignore && !complete) rollback(string(), l);
}
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.h b/qpid/cpp/src/qpid/ha/TxReplicator.h
index bd80443e50..9d80ecb8d3 100644
--- a/qpid/cpp/src/qpid/ha/TxReplicator.h
+++ b/qpid/cpp/src/qpid/ha/TxReplicator.h
@@ -84,7 +84,7 @@ class TxReplicator : public QueueReplicator {
void prepare(const std::string& data, sys::Mutex::ScopedLock&);
void commit(const std::string& data, sys::Mutex::ScopedLock&);
void rollback(const std::string& data, sys::Mutex::ScopedLock&);
- void members(const std::string& data, sys::Mutex::ScopedLock&);
+ void backups(const std::string& data, sys::Mutex::ScopedLock&);
void end(sys::Mutex::ScopedLock&);
std::string logPrefix;
@@ -93,7 +93,7 @@ class TxReplicator : public QueueReplicator {
broker::MessageStore* store;
std::auto_ptr<broker::TransactionContext> context;
framing::ChannelId channel; // Channel to send prepare-complete.
- bool complete, ignore;
+ bool empty, ended;
// Class to process dequeues and create DeliveryRecords to populate a
// TxAccept.
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index d1f020a945..28e7f6b182 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -381,6 +381,16 @@ class Broker(Popen):
"Broker %s not responding: (%s)%s"%(
self.name,e,error_line(self.log, 5)))
+ def assert_log_clean(self, ignore=None):
+ log = open(self.get_log())
+ try:
+ error = re.compile("] error|] critical")
+ if ignore: ignore = re.compile(ignore)
+ else: ignore = re.compile("\000") # Won't match anything
+ for line in log.readlines():
+ assert not error.search(line) or ignore.search(line), "Errors in log file %s: %s"%(log, line)
+ finally: log.close()
+
def browse(session, queue, timeout=0, transform=lambda m: m.content):
"""Return a list with the contents of each message on queue."""
r = session.receiver("%s;{mode:browse}"%(queue))
@@ -549,7 +559,11 @@ class NumberedSender(Thread):
self.condition.release()
self.write_message(self.sent)
self.sent += 1
- except Exception: self.error = RethrownException(self.sender.pname)
+ except Exception, e:
+ self.error = RethrownException(
+ "%s: (%s)%s"%(self.sender.pname,e,
+ error_line(self.sender.outfile("err"))))
+
def notify_received(self, count):
"""Called by receiver to enable flow control. count = messages received so far."""
@@ -612,8 +626,10 @@ class NumberedReceiver(Thread):
if self.sender:
self.sender.notify_received(self.received)
m = self.read_message()
- except Exception:
- self.error = RethrownException(self.receiver.pname)
+ except Exception, e:
+ self.error = RethrownException(
+ "%s: (%s)%s"%(self.receiver.pname,e,
+ error_line(self.receiver.outfile("err"))))
def check(self):
"""Raise an exception if there has been an error"""
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 79024d48e3..138868f64e 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -30,19 +30,10 @@ from qpidtoollibs import BrokerAgent, EventHelper
log = getLogger(__name__)
-def grep(filename, regexp):
- for line in open(filename).readlines():
- if (regexp.search(line)): return True
- return False
class HaBrokerTest(BrokerTest):
"""Base class for HA broker tests"""
- def assert_log_no_errors(self, broker):
- log = broker.get_log()
- if grep(log, re.compile("] error|] critical")):
- self.fail("Errors in log file %s"%(log))
-
class ReplicationTests(HaBrokerTest):
"""Correctness tests for HA replication."""
@@ -838,7 +829,7 @@ acl deny all all
# It is possible for the backup to attempt to subscribe after the queue
# is deleted. This is not an error, but is logged as an error on the primary.
# The backup does not log this as an error so we only check the backup log for errors.
- self.assert_log_no_errors(cluster[1])
+ cluster[1].assert_log_clean()
def test_missed_recreate(self):
"""If a queue or exchange is destroyed and one with the same name re-created
@@ -1003,6 +994,32 @@ class LongTests(HaBrokerTest):
dead = filter(lambda b: not b.is_running(), brokers)
if dead: raise Exception("Brokers not running: %s"%dead)
+ def test_tx_send_receive(self):
+ brokers = HaCluster(self, 3)
+ sender = self.popen(
+ ["qpid-send",
+ "--broker", brokers[0].host_port(),
+ "--address", "q;{create:always}",
+ "--messages=1000",
+ "--tx=10"
+ ])
+ receiver = self.popen(
+ ["qpid-receive",
+ "--broker", brokers[0].host_port(),
+ "--address", "q;{create:always}",
+ "--messages=990",
+ "--timeout=10",
+ "--tx=10"
+ ])
+ self.assertEqual(sender.wait(), 0)
+ self.assertEqual(receiver.wait(), 0)
+ expect = [long(i) for i in range(991, 1001)]
+ sn = lambda m: m.properties["sn"]
+ brokers[0].assert_browse("q", expect, transform=sn)
+ brokers[1].assert_browse_backup("q", expect, transform=sn)
+ brokers[2].assert_browse_backup("q", expect, transform=sn)
+
+
def test_qmf_order(self):
"""QPID 4402: HA QMF events can be out of order.
This test mimics the test described in the JIRA. Two threads repeatedly
@@ -1352,12 +1369,14 @@ class TransactionTests(HaBrokerTest):
def assert_tx_clean(self, b):
"""Verify that there are no transaction artifacts
(exchanges, queues, subscriptions) on b."""
- queues=[]
- def txq(): queues = b.agent().tx_queues(); return not queues
- assert retry(txq), "%s: unexpected %s"%(b,queues)
- subs=[]
- def txs(): subs = self.tx_subscriptions(b); return not subs
- assert retry(txs), "%s: unexpected %s"%(b,subs)
+ class FunctionCache: # Call a function and cache the result.
+ def __init__(self, f): self.f, self.value = f, None
+ def __call__(self): self.value = self.f(); return self.value
+
+ txq= FunctionCache(b.agent().tx_queues)
+ assert retry(lambda: not txq()), "%s: unexpected %s"%(b, txq.value)
+ txsub = FunctionCache(lambda: self.tx_subscriptions(b))
+ assert retry(lambda: not txsub()), "%s: unexpected %s"%(b, txsub.value)
# TODO aconway 2013-10-15: TX exchanges don't show up in management.
def assert_simple_commit_outcome(self, b, tx_queues):
@@ -1462,18 +1481,22 @@ class TransactionTests(HaBrokerTest):
self.assertEqual([1,1,1], [len(b.agent().tx_queues()) for b in cluster])
cluster[1].kill(final=False)
s.send("b")
- self.assert_commit_raises(tx)
- for b in [cluster[0],cluster[2]]: self.assert_tx_clean(b)
+ tx.commit()
+ tx.close()
+ for b in [cluster[0],cluster[2]]:
+ self.assert_tx_clean(b)
+ b.assert_browse_backup("q", ["a","b"], msg=b)
# Joining
tx = cluster[0].connect().session(transactional=True)
s = tx.sender("q;{create:always}")
s.send("foo")
- cluster.restart(1)
+ cluster.restart(1) # Not a part of the current transaction.
tx.commit()
tx.close()
for b in cluster: self.assert_tx_clean(b)
# The new member is not in the tx but receives the results normal replication.
- for b in cluster: b.assert_browse_backup("q", ["foo"], msg=b)
+ for b in cluster: b.assert_browse_backup("q", ["a", "b", "foo"], msg=b)
+ # FIXME aconway 2013-11-07: assert_log_clean
def test_tx_block_threads(self):
"""Verify that TXs blocked in commit don't deadlock."""