summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-08-01 20:27:26 +0000
committerAlan Conway <aconway@apache.org>2013-08-01 20:27:26 +0000
commit1ac985d9b3a50d8c7691e534945a4829bd57017a (patch)
tree7179cb6fa40a59d1390f295a613de64cc242814a
parent9bf9a96c336635b415235dcaaf9524d64f1504f0 (diff)
downloadqpid-python-1ac985d9b3a50d8c7691e534945a4829bd57017a.tar.gz
QPID-4327: HA TX transactions: basic replication.
On primary a PrimaryTxObserver observes a transaction's TxBuffer and generates transaction events on a tx-replication-queue. On the backup a TxReplicator receives the events and constructs a TxBuffer equivalent to the one in the primary. Unfinished: - Primary does not wait for backups to prepare() before committing. - All connected backups are assumed to be in the transaction, there are race conditions around brokers joining/leavinv where this assumption is invalid. - Need more tests. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1509423 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/include/qpid/framing/BufferTypes.h106
-rw-r--r--cpp/src/CMakeLists.txt24
-rw-r--r--cpp/src/qpid/broker/Broker.cpp2
-rw-r--r--cpp/src/qpid/broker/Broker.h3
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.cpp2
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.h2
-rw-r--r--cpp/src/qpid/broker/Queue.cpp2
-rw-r--r--cpp/src/qpid/broker/TxAccept.cpp2
-rw-r--r--cpp/src/qpid/ha/Backup.h2
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.cpp16
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.h6
-rw-r--r--cpp/src/qpid/ha/Event.cpp (renamed from cpp/src/qpid/ha/makeMessage.cpp)43
-rw-r--r--cpp/src/qpid/ha/Event.h146
-rw-r--r--cpp/src/qpid/ha/FailoverExchange.cpp4
-rw-r--r--cpp/src/qpid/ha/HaBroker.cpp2
-rw-r--r--cpp/src/qpid/ha/HaBroker.h1
-rw-r--r--cpp/src/qpid/ha/Membership.cpp8
-rw-r--r--cpp/src/qpid/ha/Membership.h3
-rw-r--r--cpp/src/qpid/ha/Primary.cpp41
-rw-r--r--cpp/src/qpid/ha/Primary.h23
-rw-r--r--cpp/src/qpid/ha/PrimaryTxObserver.cpp104
-rw-r--r--cpp/src/qpid/ha/PrimaryTxObserver.h77
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp81
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.h33
-rw-r--r--cpp/src/qpid/ha/RemoteBackup.cpp3
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.cpp42
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.h10
-rw-r--r--cpp/src/qpid/ha/StatusCheck.cpp12
-rw-r--r--cpp/src/qpid/ha/TxReplicator.cpp192
-rw-r--r--cpp/src/qpid/ha/TxReplicator.h123
-rw-r--r--cpp/src/qpid/ha/makeMessage.h66
-rw-r--r--cpp/src/qpid/ha/types.cpp8
-rw-r--r--cpp/src/qpid/ha/types.h9
-rw-r--r--cpp/src/tests/BrokerFixture.h70
-rw-r--r--cpp/src/tests/CMakeLists.txt19
-rw-r--r--cpp/src/tests/MessagingFixture.h15
-rw-r--r--cpp/src/tests/TransactionObserverTest.cpp1
-rw-r--r--cpp/src/tests/brokertest.py15
-rw-r--r--cpp/src/tests/cluster_test.cpp1231
-rwxr-xr-xcpp/src/tests/ha_test.py8
-rwxr-xr-xcpp/src/tests/ha_tests.py95
-rw-r--r--cpp/src/tests/test_env.sh.in4
-rw-r--r--cpp/src/tests/test_store.cpp101
43 files changed, 2483 insertions, 274 deletions
diff --git a/cpp/include/qpid/framing/BufferTypes.h b/cpp/include/qpid/framing/BufferTypes.h
new file mode 100644
index 0000000000..4199755852
--- /dev/null
+++ b/cpp/include/qpid/framing/BufferTypes.h
@@ -0,0 +1,106 @@
+#ifndef QPID_FRAMING_BUFFERTYPES_H
+#define QPID_FRAMING_BUFFERTYPES_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**@file
+ * Using templates with framing::Buffer is difficultg becase of the many
+ * different put/get function names. Here we define a set of types
+ * corresponding the basic types of Buffer but presenting a uniform
+ * encode/decode/encodedSize interface.
+ *
+ * It also provides some convenience templates for the common case of
+ * encoding a single encodable value as a string, e.g.
+ *
+ * LongString ls("hello");
+ * std::string encoded = encodeStr(ls);
+ * LongString ls2 = decodeStr<LongString>(encoded);
+ * LongString ls3;
+ * decodeStr(encoded, ls3);
+ */
+
+namespace qpid {
+namespace framing {
+
+// Templates to help define types
+template <class ValueType> struct BufferTypeTraits {
+ typedef void (Buffer::*Put)(const ValueType&);
+ typedef void (Buffer::*Get)(ValueType&);
+};
+
+template <class ValueType,
+ typename BufferTypeTraits<ValueType>::Put PutFn,
+ typename BufferTypeTraits<ValueType>::Get GetFn>
+struct EncodeDecodeTemplate {
+ EncodeDecodeTemplate(const ValueType& s) : value(s) {}
+ operator ValueType() const { return value; }
+
+ ValueType value;
+ void encode(framing::Buffer& buf) const { (buf.*PutFn)(value); }
+ void decode(framing::Buffer& buf) { (buf.*GetFn)(value); }
+};
+
+template <size_t Size,
+ typename BufferTypeTraits<std::string>::Put PutFn,
+ typename BufferTypeTraits<std::string>::Get GetFn
+ >
+struct StringTypeTemplate : public EncodeDecodeTemplate<std::string, PutFn, GetFn> {
+ typedef EncodeDecodeTemplate<std::string, PutFn, GetFn> Base;
+ StringTypeTemplate(const std::string& s) : Base(s) {}
+ size_t encodedSize() const { return Size + Base::value.size(); }
+};
+
+
+// Convenience tempates for encoding/decoding values to/from a std::string.
+
+/** Encode value as a string. */
+template <class T> std::string encodeStr(const T& value) {
+ std::string encoded(value.encodedSize(), '\0');
+ framing::Buffer buffer(&encoded[0], encoded.size());
+ value.encode(buffer);
+ return encoded;
+}
+
+/** Decode value from a string. */
+template <class T> void decodeStr(const std::string& encoded, T& value) {
+ framing::Buffer b(const_cast<char*>(&encoded[0]), encoded.size());
+ value.decode(b);
+}
+
+/** Decode value from a string. */
+template <class T> T decodeStr(const std::string& encoded) {
+ T value;
+ decodeStr(encoded, value);
+ return value;
+}
+
+// The types
+
+typedef StringTypeTemplate<4, &Buffer::putLongString, &Buffer::getLongString> LongString;
+typedef StringTypeTemplate<2, &Buffer::putMediumString, &Buffer::getMediumString> MediumString;
+typedef StringTypeTemplate<1, &Buffer::putShortString, &Buffer::getShortString> ShortString;
+
+// TODO aconway 2013-07-26: Add integer types.
+
+}} // namespace qpid::framing
+
+#endif /*!QPID_FRAMING_BUFFERTYPES_H*/
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index 3b242d627f..9156d91ff6 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -355,7 +355,7 @@ endif (CMAKE_SYSTEM_NAME STREQUAL Windows)
# where Boost 1.45 is supported, or we can just accept some versions using
# the Additional_versions variable.
if (NOT DEFINED Boost_ADDITIONAL_VERSIONS)
- set (Boost_ADDITIONAL_VERSIONS
+ set (Boost_ADDITIONAL_VERSIONS
"1.45" "1.45.0" "1.46" "1.46.0" "1.47" "1.47.0"
"1.48" "1.48.0" "1.49" "1.49.0" "1.50" "1.50.0"
"1.51" "1.51.0" "1.52" "1.52.0" "1.53" "1.53.0")
@@ -743,6 +743,8 @@ if (BUILD_HA)
qpid/ha/BrokerReplicator.h
qpid/ha/ConnectionObserver.cpp
qpid/ha/ConnectionObserver.h
+ qpid/ha/Event.cpp
+ qpid/ha/Event.h
qpid/ha/FailoverExchange.cpp
qpid/ha/FailoverExchange.h
qpid/ha/HaBroker.cpp
@@ -751,8 +753,6 @@ if (BUILD_HA)
qpid/ha/hash.h
qpid/ha/IdSetter.h
qpid/ha/QueueSnapshot.h
- qpid/ha/makeMessage.cpp
- qpid/ha/makeMessage.h
qpid/ha/Membership.cpp
qpid/ha/Membership.h
qpid/ha/Primary.cpp
@@ -772,7 +772,11 @@ if (BUILD_HA)
qpid/ha/StandAlone.h
qpid/ha/StatusCheck.cpp
qpid/ha/StatusCheck.h
+ qpid/ha/PrimaryTxObserver.cpp
+ qpid/ha/PrimaryTxObserver.h
qpid/ha/types.cpp
+ qpid/ha/TxReplicator.cpp
+ qpid/ha/TxReplicator.h
qpid/ha/types.h
)
@@ -798,7 +802,7 @@ include (amqp.cmake)
# Check for syslog capabilities not present on all systems
check_symbol_exists (LOG_AUTHPRIV "sys/syslog.h" HAVE_LOG_AUTHPRIV)
check_symbol_exists (LOG_FTP "sys/syslog.h" HAVE_LOG_FTP)
-
+
# Set default Memory Status module (Null implementation)
set (qpid_memstat_module
qpid/sys/MemStat.cpp
@@ -884,7 +888,7 @@ else (CMAKE_SYSTEM_NAME STREQUAL Windows)
# On Linux override memory status module
set (qpid_memstat_module
qpid/sys/posix/MemStat.cpp
- )
+ )
endif (CMAKE_SYSTEM_NAME STREQUAL Linux)
if (CMAKE_SYSTEM_NAME STREQUAL SunOS)
@@ -1070,7 +1074,7 @@ target_link_libraries (qpidcommon qpidtypes
${qpidcommon_platform_LIBS}
${qpidcommon_sasl_lib})
set_target_properties (qpidcommon PROPERTIES
- VERSION ${qpidcommon_version}
+ VERSION ${qpidcommon_version}
SOVERSION ${qpidcommon_version_major}
${qpidcommon_LINK_FLAGS})
install (TARGETS qpidcommon
@@ -1087,7 +1091,7 @@ set(qpidtypes_SOURCES
add_msvc_version (qpidtypes library dll)
add_library(qpidtypes SHARED ${qpidtypes_SOURCES})
target_link_libraries(qpidtypes ${qpidtypes_platform_LIBS})
-set_target_properties (qpidtypes PROPERTIES
+set_target_properties (qpidtypes PROPERTIES
VERSION ${qpidtypes_version}
SOVERSION ${qpidtypes_version_major})
install(TARGETS qpidtypes
@@ -1329,9 +1333,9 @@ set (qpidbroker_SOURCES
add_msvc_version (qpidbroker library dll)
add_library (qpidbroker SHARED ${qpidbroker_SOURCES})
target_link_libraries (qpidbroker qpidcommon ${qpidbroker_platform_LIBS})
-set_target_properties (qpidbroker PROPERTIES
+set_target_properties (qpidbroker PROPERTIES
VERSION ${qpidbroker_version}
- SOVERSION ${qpidbroker_version_major}
+ SOVERSION ${qpidbroker_version_major}
COMPILE_DEFINITIONS _IN_QPID_BROKER)
if (MSVC)
set_target_properties (qpidbroker PROPERTIES COMPILE_FLAGS /wd4290)
@@ -1444,7 +1448,7 @@ endif (NOT WIN32)
add_library (qmf2 SHARED ${qmf2_SOURCES})
target_link_libraries (qmf2 qpidmessaging qpidtypes qpidclient qpidcommon)
set_target_properties (qmf2 PROPERTIES
- VERSION ${qmf2_version}
+ VERSION ${qmf2_version}
SOVERSION ${qmf2_version_major})
install (TARGETS qmf2 OPTIONAL
DESTINATION ${QPID_INSTALL_LIBDIR}
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index d420e28d95..7c9e82022f 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -400,7 +400,7 @@ boost::intrusive_ptr<Broker> Broker::create(const Options& opts)
return boost::intrusive_ptr<Broker>(new Broker(opts));
}
-void Broker::setStore (boost::shared_ptr<MessageStore>& _store)
+void Broker::setStore (const boost::shared_ptr<MessageStore>& _store)
{
store.reset(new MessageStoreModule (_store));
setStore();
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index f0b0c83f61..3e22fb491b 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -226,7 +226,8 @@ class Broker : public sys::Runnable, public Plugin::Target,
/** Shut down the broker */
QPID_BROKER_EXTERN virtual void shutdown();
- QPID_BROKER_EXTERN void setStore (boost::shared_ptr<MessageStore>& store);
+ QPID_BROKER_EXTERN void setStore (const boost::shared_ptr<MessageStore>& store);
+ bool hasStore() const { return store.get(); }
MessageStore& getStore() { return *store; }
void setAcl (AclModule* _acl) {acl = _acl;}
AclModule* getAcl() { return acl; }
diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp
index f19b31fa76..edfe6e7819 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.cpp
+++ b/cpp/src/qpid/broker/MessageStoreModule.cpp
@@ -33,7 +33,7 @@ using std::string;
namespace qpid {
namespace broker {
-MessageStoreModule::MessageStoreModule(boost::shared_ptr<MessageStore>& _store)
+MessageStoreModule::MessageStoreModule(const boost::shared_ptr<MessageStore>& _store)
: store(_store) {}
MessageStoreModule::~MessageStoreModule()
diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h
index 82308db84c..a71e8cc9b6 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.h
+++ b/cpp/src/qpid/broker/MessageStoreModule.h
@@ -38,7 +38,7 @@ class MessageStoreModule : public MessageStore
{
boost::shared_ptr<MessageStore> store;
public:
- MessageStoreModule(boost::shared_ptr<MessageStore>& store);
+ MessageStoreModule(const boost::shared_ptr<MessageStore>& store);
bool init(const Options* options);
std::auto_ptr<TransactionContext> begin();
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 49aff3ea97..227acd1b56 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -277,6 +277,7 @@ void Queue::deliver(Message msg, TxBuffer* txn)
void Queue::deliverTo(Message msg, TxBuffer* txn)
{
if (accept(msg)) {
+ interceptors.record(msg);
if (txn) {
TxOp::shared_ptr op(new TxPublish(msg, shared_from_this()));
txn->enlist(op);
@@ -842,7 +843,6 @@ bool Queue::isEmpty(const Mutex::ScopedLock&) const
*/
bool Queue::enqueue(TransactionContext* ctxt, Message& msg)
{
- interceptors.record(msg);
ScopedUse u(barrier);
if (!u.acquired) return false;
diff --git a/cpp/src/qpid/broker/TxAccept.cpp b/cpp/src/qpid/broker/TxAccept.cpp
index 0343cc00ae..ee30dff957 100644
--- a/cpp/src/qpid/broker/TxAccept.cpp
+++ b/cpp/src/qpid/broker/TxAccept.cpp
@@ -20,9 +20,9 @@
*/
#include "qpid/broker/TxAccept.h"
#include "qpid/broker/TransactionObserver.h"
+#include "qpid/broker/Queue.h"
#include "qpid/log/Statement.h"
-
using std::bind1st;
using std::bind2nd;
using std::mem_fun_ref;
diff --git a/cpp/src/qpid/ha/Backup.h b/cpp/src/qpid/ha/Backup.h
index 4943ca5e2e..88194158ce 100644
--- a/cpp/src/qpid/ha/Backup.h
+++ b/cpp/src/qpid/ha/Backup.h
@@ -59,6 +59,8 @@ class Backup : public Role
Role* promote();
+ boost::shared_ptr<BrokerReplicator> getBrokerReplicator() { return replicator; }
+
private:
void stop(sys::Mutex::ScopedLock&);
Role* recover(sys::Mutex::ScopedLock&);
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp
index 17b00185ef..f882cbcbf1 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -21,6 +21,7 @@
#include "BrokerReplicator.h"
#include "HaBroker.h"
#include "QueueReplicator.h"
+#include "TxReplicator.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/broker/ConnectionObserver.h"
@@ -648,11 +649,16 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
if (!replicationTest.getLevel(argsMap)) return;
string name(values[NAME].asString());
+
+ if (TxReplicator::isTxQueue(name)) return; // Can't join a transaction in progress.
+
if (!queueTracker.get())
throw Exception(QPID_MSG("Unexpected queue response: " << values));
if (!queueTracker->response(name)) return; // Response is out-of-date
+
QPID_LOG(debug, logPrefix << "Queue response: " << name);
boost::shared_ptr<Queue> queue = queues.find(name);
+
if (queue) { // Already exists
bool uuidOk = (getHaUuid(queue->getSettings().original) == getHaUuid(argsMap));
if (!uuidOk) QPID_LOG(debug, logPrefix << "UUID mismatch for queue: " << name);
@@ -660,6 +666,7 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
QPID_LOG(debug, logPrefix << "Queue response replacing queue: " << name);
deleteQueue(name);
}
+
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
boost::shared_ptr<QueueReplicator> qr = replicateQueue(
@@ -770,8 +777,13 @@ boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator(
const boost::shared_ptr<Queue>& queue)
{
if (replicationTest.getLevel(*queue) == ALL) {
- boost::shared_ptr<QueueReplicator> qr(
- new QueueReplicator(haBroker, queue, link));
+ boost::shared_ptr<QueueReplicator> qr;
+ if (TxReplicator::isTxQueue(queue->getName())){
+ qr.reset(new TxReplicator(haBroker, queue, link));
+ }
+ else {
+ qr.reset(new QueueReplicator(haBroker, queue, link));
+ }
qr->activate();
return qr;
}
diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h
index f93e25cb81..c36aa352f3 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/cpp/src/qpid/ha/BrokerReplicator.h
@@ -71,6 +71,8 @@ class BrokerReplicator : public broker::Exchange,
public boost::enable_shared_from_this<BrokerReplicator>
{
public:
+ typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr;
+
BrokerReplicator(HaBroker&, const boost::shared_ptr<broker::Link>&);
~BrokerReplicator();
@@ -84,8 +86,9 @@ class BrokerReplicator : public broker::Exchange,
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
void shutdown();
+ QueueReplicatorPtr findQueueReplicator(const std::string& qname);
+
private:
- typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr;
typedef std::pair<boost::shared_ptr<broker::Queue>, bool> CreateQueueResult;
typedef std::pair<boost::shared_ptr<broker::Exchange>, bool> CreateExchangeResult;
@@ -114,7 +117,6 @@ class BrokerReplicator : public broker::Exchange,
void doResponseBind(types::Variant::Map& values);
void doResponseHaBroker(types::Variant::Map& values);
- QueueReplicatorPtr findQueueReplicator(const std::string& qname);
QueueReplicatorPtr startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
QueueReplicatorPtr replicateQueue(
diff --git a/cpp/src/qpid/ha/makeMessage.cpp b/cpp/src/qpid/ha/Event.cpp
index 5b063a23e7..fdd8bc85cc 100644
--- a/cpp/src/qpid/ha/makeMessage.cpp
+++ b/cpp/src/qpid/ha/Event.cpp
@@ -18,25 +18,44 @@
* under the License.
*
*/
-#include "makeMessage.h"
+#include "Event.h"
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/DeliveryProperties.h"
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/log/Statement.h"
namespace qpid {
namespace ha {
-broker::Message makeMessage(const framing::Buffer& buffer,
- const std::string& destination)
-{
- using namespace framing;
- using broker::amqp_0_10::MessageTransfer;
+using namespace std;
+using namespace framing;
+using namespace broker::amqp_0_10;
- boost::intrusive_ptr<MessageTransfer> transfer(
- new qpid::broker::amqp_0_10::MessageTransfer());
- AMQFrame method((MessageTransferBody(ProtocolVersion(), destination, 0, 0)));
+namespace {
+const string QPID_HA(QPID_HA_PREFIX);
+}
+
+bool isEventKey(const std::string& key) {
+ const std::string& prefix = QPID_HA;
+ bool ret = key.size() > prefix.size() && key.compare(0, prefix.size(), prefix) == 0;
+ return ret;
+}
+
+const string DequeueEvent::KEY(QPID_HA+"de");
+const string IdEvent::KEY(QPID_HA+"id");
+const string TxEnqueueEvent::KEY(QPID_HA+"txen");
+const string TxDequeueEvent::KEY(QPID_HA+"txde");
+const string TxPrepareEvent::KEY(QPID_HA+"txpr");
+const string TxCommitEvent::KEY(QPID_HA+"txcm");
+const string TxRollbackEvent::KEY(QPID_HA+"txrb");
+
+broker::Message makeMessage(const string& data, const string& key) {
+ boost::intrusive_ptr<MessageTransfer> transfer(new MessageTransfer());
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), key, 0, 0)));
AMQFrame header((AMQHeaderBody()));
AMQFrame content((AMQContentBody()));
+ Buffer buffer(const_cast<char*>(&data[0]), data.size());
// AMQContentBody::decode is missing a const declaration, so cast it here.
content.castBody<AMQContentBody>()->decode(
const_cast<Buffer&>(buffer), buffer.getSize());
@@ -51,12 +70,8 @@ broker::Message makeMessage(const framing::Buffer& buffer,
transfer->getFrames().append(method);
transfer->getFrames().append(header);
transfer->getFrames().append(content);
+ transfer->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(key);
return broker::Message(transfer, 0);
}
-broker::Message makeMessage(const std::string& content, const std::string& destination) {
- framing::Buffer buffer(const_cast<char*>(&content[0]), content.size());
- return makeMessage(buffer, destination);
-}
-
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/Event.h b/cpp/src/qpid/ha/Event.h
new file mode 100644
index 0000000000..08174bfc9d
--- /dev/null
+++ b/cpp/src/qpid/ha/Event.h
@@ -0,0 +1,146 @@
+#ifndef QPID_HA_EVENT_H
+#define QPID_HA_EVENT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "qpid/broker/Message.h"
+#include "qpid/framing/BufferTypes.h"
+
+/**@file Defines event messages used to pass transaction information from
+ * primary observers to backup replicators.
+ */
+
+namespace qpid {
+namespace ha {
+
+broker::Message makeMessage(const std::string& content, const std::string& destination);
+
+
+/** Test if a string is an event key */
+bool isEventKey(const std::string& key);
+
+/** Base class for encodable events */
+struct Event {
+ virtual ~Event() {}
+ virtual void encode(framing::Buffer& buffer) const = 0;
+ virtual void decode(framing::Buffer& buffer) = 0;
+ virtual size_t encodedSize() const = 0;
+ virtual std::string key() const = 0; // Routing key
+ virtual void print(std::ostream& o) const = 0;
+ broker::Message message() const { return makeMessage(framing::encodeStr(*this), key()); }
+};
+
+
+inline std::ostream& operator<<(std::ostream& o, const Event& e) {
+ o << "<" << e.key() << ":";
+ e.print(o);
+ return o << ">";
+}
+
+/** Event base template */
+template <class Derived> struct EventBase : public Event {
+ std::string key() const { return Derived::KEY; }
+};
+
+//////////////// Specific event type
+
+//// QueueReplicator events
+
+struct DequeueEvent : public EventBase<DequeueEvent> {
+ static const std::string KEY;
+ ReplicationIdSet ids;
+
+ DequeueEvent(ReplicationIdSet ids_=ReplicationIdSet()) : ids(ids_) {}
+ void encode(framing::Buffer& b) const { b.put(ids); }
+ void decode(framing::Buffer& b) { b.get(ids); }
+ virtual size_t encodedSize() const { return ids.encodedSize(); }
+ void print(std::ostream& o) const { o << ids; }
+};
+
+struct IdEvent : public EventBase<IdEvent> {
+ static const std::string KEY;
+ ReplicationId id;
+
+ IdEvent(ReplicationId id_=0) : id(id_) {}
+ void encode(framing::Buffer& b) const { b.put(id); }
+ void decode(framing::Buffer& b) { b.get(id); }
+ virtual size_t encodedSize() const { return id.encodedSize(); }
+ void print(std::ostream& o) const { o << id; }
+};
+
+//// Transaction events
+
+struct TxEnqueueEvent : public EventBase<TxEnqueueEvent> {
+ static const std::string KEY;
+ framing::LongString queue;
+ ReplicationId id;
+
+ TxEnqueueEvent(std::string q=std::string(), ReplicationId i=ReplicationId())
+ : queue(q), id(i) {}
+ void encode(framing::Buffer& b) const { b.put(queue); b.put(id); }
+ void decode(framing::Buffer& b) { b.get(queue); b.get(id); }
+ virtual size_t encodedSize() const { return queue.encodedSize()+id.encodedSize(); }
+ void print(std::ostream& o) const { o << queue.value << " " << id; }
+};
+
+struct TxDequeueEvent : public EventBase<TxDequeueEvent> {
+ static const std::string KEY;
+ framing::LongString queue;
+ ReplicationId id;
+
+ TxDequeueEvent(std::string q=std::string(), ReplicationId r=0) :
+ queue(q), id(r) {}
+ void encode(framing::Buffer& b) const { b.put(queue);b.put(id); }
+ void decode(framing::Buffer& b) { b.get(queue);b.get(id); }
+ virtual size_t encodedSize() const { return queue.encodedSize()+id.encodedSize(); }
+ void print(std::ostream& o) const { o << queue.value << " " << id; }
+};
+
+struct TxPrepareEvent : public EventBase<TxPrepareEvent> {
+ static const std::string KEY;
+ void encode(framing::Buffer&) const {}
+ void decode(framing::Buffer&) {}
+ virtual size_t encodedSize() const { return 0; }
+ void print(std::ostream&) const {}
+};
+
+struct TxCommitEvent : public EventBase<TxCommitEvent> {
+ static const std::string KEY;
+ void encode(framing::Buffer&) const {}
+ void decode(framing::Buffer&) {}
+ virtual size_t encodedSize() const { return 0; }
+ void print(std::ostream&) const {}
+};
+
+struct TxRollbackEvent : public EventBase<TxRollbackEvent> {
+ static const std::string KEY;
+ void encode(framing::Buffer&) const {}
+ void decode(framing::Buffer&) {}
+ virtual size_t encodedSize() const { return 0; }
+ void print(std::ostream&) const {}
+};
+
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_EVENT_H*/
diff --git a/cpp/src/qpid/ha/FailoverExchange.cpp b/cpp/src/qpid/ha/FailoverExchange.cpp
index 556c7458b6..46cc345ab0 100644
--- a/cpp/src/qpid/ha/FailoverExchange.cpp
+++ b/cpp/src/qpid/ha/FailoverExchange.cpp
@@ -19,7 +19,7 @@
*
*/
#include "FailoverExchange.h"
-#include "makeMessage.h"
+#include "Event.h"
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/DeliverableMessage.h"
@@ -117,7 +117,7 @@ void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue, sys::Mutex::Sc
if (urls.empty()) return;
framing::Array array = vectorToUrlArray(urls);
const ProtocolVersion v;
- broker::Message message(makeMessage(Buffer(), typeName));
+ broker::Message message(makeMessage(std::string(), typeName));
MessageTransfer& transfer = MessageTransfer::get(message);
MessageProperties* props =
transfer.getFrames().getHeaders()->get<framing::MessageProperties>(true);
diff --git a/cpp/src/qpid/ha/HaBroker.cpp b/cpp/src/qpid/ha/HaBroker.cpp
index e8c7c5c7d8..5896a5568a 100644
--- a/cpp/src/qpid/ha/HaBroker.cpp
+++ b/cpp/src/qpid/ha/HaBroker.cpp
@@ -75,7 +75,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
// otherwise there's a window for a client to connect before we get to
// initialize()
if (settings.cluster) {
- QPID_LOG(debug, "Broker startup, rejecting client connections.");
+ QPID_LOG(debug, "Backup starting, rejecting client connections.");
shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder);
observer->setObserver(excluder, "Backup: ");
broker.getConnectionObservers().add(observer);
diff --git a/cpp/src/qpid/ha/HaBroker.h b/cpp/src/qpid/ha/HaBroker.h
index 8e5d30acfb..6084db3fc3 100644
--- a/cpp/src/qpid/ha/HaBroker.h
+++ b/cpp/src/qpid/ha/HaBroker.h
@@ -84,6 +84,7 @@ class HaBroker : public management::Manageable
broker::Broker& getBroker() { return broker; }
const Settings& getSettings() const { return settings; }
+ boost::shared_ptr<Role> getRole() const {return role; }
/** Shut down the broker because of a critical error. */
void shutdown(const std::string& message);
diff --git a/cpp/src/qpid/ha/Membership.cpp b/cpp/src/qpid/ha/Membership.cpp
index 411bad3841..b53291ba89 100644
--- a/cpp/src/qpid/ha/Membership.cpp
+++ b/cpp/src/qpid/ha/Membership.cpp
@@ -107,6 +107,14 @@ BrokerInfo::Set Membership::otherBackups() const {
return result;
}
+BrokerInfo::Set Membership::getBrokers() const {
+ Mutex::ScopedLock l(lock);
+ BrokerInfo::Set result;
+ transform(brokers.begin(), brokers.end(), inserter(result, result.begin()),
+ bind(&BrokerInfo::Map::value_type::second, _1));
+ return result;
+}
+
bool Membership::get(const types::Uuid& id, BrokerInfo& result) const {
Mutex::ScopedLock l(lock);
BrokerInfo::Map::const_iterator i = brokers.find(id);
diff --git a/cpp/src/qpid/ha/Membership.h b/cpp/src/qpid/ha/Membership.h
index f442586a71..828f9e8403 100644
--- a/cpp/src/qpid/ha/Membership.h
+++ b/cpp/src/qpid/ha/Membership.h
@@ -69,6 +69,9 @@ class Membership
/** Return IDs of all READY backups other than self */
BrokerInfo::Set otherBackups() const;
+ /** Return IDs of all brokers */
+ BrokerInfo::Set getBrokers() const;
+
void assign(const types::Variant::List&);
types::Variant::List asList() const;
diff --git a/cpp/src/qpid/ha/Primary.cpp b/cpp/src/qpid/ha/Primary.cpp
index bae651a3fc..5fd7814d62 100644
--- a/cpp/src/qpid/ha/Primary.cpp
+++ b/cpp/src/qpid/ha/Primary.cpp
@@ -27,6 +27,7 @@
#include "RemoteBackup.h"
#include "ConnectionObserver.h"
#include "QueueReplicator.h"
+#include "PrimaryTxObserver.h"
#include "qpid/assert.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/BrokerObserver.h"
@@ -34,16 +35,19 @@
#include "qpid/broker/Queue.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
-#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
+#include "qpid/types/Uuid.h"
#include "qpid/sys/Timer.h"
#include <boost/bind.hpp>
+#include <boost/make_shared.hpp>
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace ha {
using sys::Mutex;
using boost::shared_ptr;
+using boost::make_shared;
using namespace std;
using namespace framing;
@@ -67,7 +71,10 @@ class PrimaryBrokerObserver : public broker::BrokerObserver
void queueDestroy(const Primary::QueuePtr& q) { primary.queueDestroy(q); }
void exchangeCreate(const Primary::ExchangePtr& q) { primary.exchangeCreate(q); }
void exchangeDestroy(const Primary::ExchangePtr& q) { primary.exchangeDestroy(q); }
- private:
+ void startTx(const shared_ptr<broker::TxBuffer>& tx) { primary.startTx(tx); }
+ void startDtx(const shared_ptr<broker::DtxBuffer>& dtx) { primary.startDtx(dtx); }
+
+ private:
Primary& primary;
};
@@ -208,6 +215,26 @@ void Primary::readyReplica(const ReplicatingSubscription& rs) {
if (backup) checkReady(backup);
}
+void Primary::addReplica(ReplicatingSubscription& rs) {
+ sys::Mutex::ScopedLock l(lock);
+ replicas[make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())] = &rs;
+}
+
+void Primary::skip(
+ const types::Uuid& backup,
+ const boost::shared_ptr<broker::Queue>& queue,
+ const ReplicationIdSet& ids)
+{
+ sys::Mutex::ScopedLock l(lock);
+ ReplicaMap::const_iterator i = replicas.find(make_pair(backup, queue));
+ if (i != replicas.end()) i->second->addSkip(ids);
+}
+
+void Primary::removeReplica(const ReplicatingSubscription& rs) {
+ sys::Mutex::ScopedLock l(lock);
+ replicas.erase(make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue()));
+}
+
// NOTE: Called with queue registry lock held.
void Primary::queueCreate(const QueuePtr& q) {
// Set replication argument.
@@ -361,4 +388,14 @@ void Primary::setCatchupQueues(const RemoteBackupPtr& backup, bool createGuards)
backup->startCatchup();
}
+void Primary::startTx(const boost::shared_ptr<broker::TxBuffer>& tx) {
+ QPID_LOG(trace, logPrefix << "Started TX transaction");
+ tx->setObserver(make_shared<PrimaryTxObserver>(boost::ref(haBroker)));
+}
+
+void Primary::startDtx(const boost::shared_ptr<broker::DtxBuffer>& dtx) {
+ QPID_LOG(trace, logPrefix << "Started DTX transaction");
+ dtx->setObserver(make_shared<PrimaryTxObserver>(boost::ref(haBroker)));
+}
+
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/Primary.h b/cpp/src/qpid/ha/Primary.h
index 031ad3aab9..a34f8f5a30 100644
--- a/cpp/src/qpid/ha/Primary.h
+++ b/cpp/src/qpid/ha/Primary.h
@@ -31,6 +31,7 @@
#include <boost/shared_ptr.hpp>
#include <boost/intrusive_ptr.hpp>
#include <string>
+#include <boost/functional/hash.hpp>
namespace qpid {
@@ -39,6 +40,8 @@ class Queue;
class Connection;
class ConnectionObserver;
class BrokerObserver;
+class TxBuffer;
+class DtxBuffer;
}
namespace sys {
@@ -75,13 +78,21 @@ class Primary : public Role
void setBrokerUrl(const Url&) {}
void readyReplica(const ReplicatingSubscription&);
- void removeReplica(const std::string& q);
+ void addReplica(ReplicatingSubscription&);
+ void removeReplica(const ReplicatingSubscription&);
+
+ /** Skip replication of ids to queue on backup. */
+ void skip(const types::Uuid& backup,
+ const boost::shared_ptr<broker::Queue>& queue,
+ const ReplicationIdSet& ids);
// Called via BrokerObserver
void queueCreate(const QueuePtr&);
void queueDestroy(const QueuePtr&);
void exchangeCreate(const ExchangePtr&);
void exchangeDestroy(const ExchangePtr&);
+ void startTx(const boost::shared_ptr<broker::TxBuffer>&);
+ void startDtx(const boost::shared_ptr<broker::DtxBuffer>&);
// Called via ConnectionObserver
void opened(broker::Connection& connection);
@@ -93,11 +104,15 @@ class Primary : public Role
void timeoutExpectedBackups();
private:
- typedef qpid::sys::unordered_map<
+ typedef sys::unordered_map<
types::Uuid, RemoteBackupPtr, types::Uuid::Hasher > BackupMap;
typedef std::set<RemoteBackupPtr > BackupSet;
+ typedef std::pair<types::Uuid, boost::shared_ptr<broker::Queue> > UuidQueue;
+ typedef sys::unordered_map<UuidQueue, ReplicatingSubscription*,
+ boost::hash<UuidQueue> > ReplicaMap;
+
RemoteBackupPtr backupConnect(const BrokerInfo&, broker::Connection&, sys::Mutex::ScopedLock&);
void backupDisconnect(RemoteBackupPtr, sys::Mutex::ScopedLock&);
@@ -105,8 +120,9 @@ class Primary : public Role
void checkReady();
void checkReady(RemoteBackupPtr);
void setCatchupQueues(const RemoteBackupPtr&, bool createGuards);
+ void deduplicate();
- sys::Mutex lock;
+ mutable sys::Mutex lock;
HaBroker& haBroker;
Membership& membership;
std::string logPrefix;
@@ -126,6 +142,7 @@ class Primary : public Role
boost::shared_ptr<broker::ConnectionObserver> connectionObserver;
boost::shared_ptr<broker::BrokerObserver> brokerObserver;
boost::intrusive_ptr<sys::TimerTask> timerTask;
+ ReplicaMap replicas;
};
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/cpp/src/qpid/ha/PrimaryTxObserver.cpp
new file mode 100644
index 0000000000..8a8364ac22
--- /dev/null
+++ b/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Event.h"
+#include "HaBroker.h"
+#include "Primary.h"
+#include "PrimaryTxObserver.h"
+#include "QueueGuard.h"
+#include "RemoteBackup.h"
+#include "ReplicatingSubscription.h"
+
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Queue.h"
+#include <boost/lexical_cast.hpp>
+
+namespace qpid {
+namespace ha {
+
+using namespace std;
+using namespace boost;
+using namespace broker;
+
+PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
+ haBroker(hb), broker(hb.getBroker()),
+ id(true) // FIXME aconway 2013-07-11: is UUID an appropriate TX ID?
+{
+ logPrefix = "Primary transaction "+id.str().substr(0,8)+": ";
+ QPID_LOG(trace, logPrefix << "started");
+ pair<shared_ptr<Queue>, bool> result =
+ broker.getQueues().declare(
+ TRANSACTION_REPLICATOR_PREFIX+id.str(),
+ QueueSettings(/*durable*/false, /*autodelete*/true));
+ assert(result.second);
+ txQueue = result.first;
+}
+
+void PrimaryTxObserver::enqueue(const QueuePtr& q, const broker::Message& m)
+{
+ QPID_LOG(trace, logPrefix << "enqueue: " << LogMessageId(*q, m));
+ enqueues[q] += m.getReplicationId();
+ txQueue->deliver(TxEnqueueEvent(q->getName(), m.getReplicationId()).message());
+ txQueue->deliver(m);
+}
+
+void PrimaryTxObserver::dequeue(
+ const QueuePtr& q, QueuePosition pos, ReplicationId id)
+{
+ QPID_LOG(trace, logPrefix << "dequeue: " << LogMessageId(*q, pos, id));
+ txQueue->deliver(TxDequeueEvent(q->getName(), id).message());
+}
+
+void PrimaryTxObserver::deduplicate() {
+ shared_ptr<Primary> primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole()));
+ assert(primary);
+ // FIXME aconway 2013-07-29: need to verify which backups are *in* the transaction.
+ // Use cluster membership for now
+ BrokerInfo::Set brokers = haBroker.getMembership().getBrokers();
+ types::Uuid selfId = haBroker.getMembership().getSelf();
+ // Tell replicating subscriptions to skip IDs in the transaction.
+ for (BrokerInfo::Set::iterator b = brokers.begin(); b != brokers.end(); ++b) {
+ if (b->getSystemId() == selfId) continue;
+ for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q)
+ primary->skip(b->getSystemId(), q->first, q->second);
+ }
+}
+
+bool PrimaryTxObserver::prepare() {
+ // FIXME aconway 2013-07-23: need to delay completion of prepare till all
+ // backups have prepared.
+ QPID_LOG(trace, logPrefix << "prepare");
+ deduplicate();
+ txQueue->deliver(TxPrepareEvent().message());
+ return true;
+}
+
+void PrimaryTxObserver::commit() {
+ QPID_LOG(trace, logPrefix << "commit");
+ txQueue->deliver(TxCommitEvent().message());
+}
+
+void PrimaryTxObserver::rollback() {
+ QPID_LOG(trace, logPrefix << "rollback");
+ txQueue->deliver(TxRollbackEvent().message());
+}
+
+}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/PrimaryTxObserver.h b/cpp/src/qpid/ha/PrimaryTxObserver.h
new file mode 100644
index 0000000000..3681c6b750
--- /dev/null
+++ b/cpp/src/qpid/ha/PrimaryTxObserver.h
@@ -0,0 +1,77 @@
+#ifndef QPID_HA_PRIMARYTXOBSERVER_H
+#define QPID_HA_PRIMARYTXOBSERVER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+
+#include "qpid/broker/TransactionObserver.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/sys/unordered_map.h"
+#include <boost/functional/hash.hpp>
+namespace qpid {
+
+namespace broker {
+class Broker;
+}
+
+namespace ha {
+class HaBroker;
+
+/**
+ * Observe events in the lifecycle of a transaction.
+ *
+ * The observer is called by TxBuffer for each transactional event.
+ * It puts the events on a special tx-queue.
+ * A TxReplicator on the backup replicates the tx-queue and creates
+ * a TxBuffer on the backup equivalent to the one on the primary.
+ *
+ * THREAD UNSAFE: called sequentially in the context of a transaction.
+ */
+class PrimaryTxObserver : public broker::TransactionObserver {
+ public:
+ PrimaryTxObserver(HaBroker&);
+
+ void enqueue(const QueuePtr&, const broker::Message&);
+ void dequeue(const QueuePtr& queue, QueuePosition, ReplicationId);
+ bool prepare();
+ void commit();
+ void rollback();
+
+ private:
+ typedef qpid::sys::unordered_map<
+ QueuePtr, ReplicationIdSet, boost::hash<QueuePtr> > QueueIdsMap;
+
+ void deduplicate();
+
+ std::string logPrefix;
+ HaBroker& haBroker;
+ broker::Broker& broker;
+ framing::Uuid id;
+ QueuePtr txQueue;
+ QueueIdsMap enqueues;
+};
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_PRIMARYTXOBSERVER_H*/
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index d99602fdda..28e9dc4120 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -19,12 +19,13 @@
*
*/
-#include "makeMessage.h"
+#include "Event.h"
#include "HaBroker.h"
#include "QueueReplicator.h"
#include "QueueSnapshots.h"
#include "ReplicatingSubscription.h"
#include "Settings.h"
+#include "types.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Link.h"
@@ -38,36 +39,32 @@
#include "qpid/Msg.h"
#include "qpid/assert.h"
#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
-namespace {
-const std::string QPID_REPLICATOR_("qpid.replicator-");
-const std::string TYPE_NAME("qpid.queue-replicator");
-const std::string QPID_HA("qpid.ha-");
-}
namespace qpid {
namespace ha {
using namespace broker;
using namespace framing;
using namespace std;
+using namespace boost;
+using std::exception;
using sys::Mutex;
-const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA+"dequeue");
-const std::string QueueReplicator::ID_EVENT_KEY(QPID_HA+"id");
const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency");
-std::string QueueReplicator::replicatorName(const std::string& queueName) {
- return QPID_REPLICATOR_ + queueName;
+namespace {
+const string QPID_HA(QPID_HA_PREFIX);
+const std::string TYPE_NAME(QPID_HA+"queue-replicator");
}
-bool QueueReplicator::isReplicatorName(const std::string& name) {
- return name.compare(0, QPID_REPLICATOR_.size(), QPID_REPLICATOR_) == 0;
+
+std::string QueueReplicator::replicatorName(const std::string& queueName) {
+ return QUEUE_REPLICATOR_PREFIX + queueName;
}
-bool QueueReplicator::isEventKey(const std::string key) {
- const std::string& prefix = QPID_HA;
- bool ret = key.size() > prefix.size() && key.compare(0, prefix.size(), prefix) == 0;
- return ret;
+bool QueueReplicator::isReplicatorName(const std::string& name) {
+ return startsWith(name, QUEUE_REPLICATOR_PREFIX);
}
class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
@@ -109,12 +106,12 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
boost::shared_ptr<Link> l)
: Exchange(replicatorName(q->getName()), 0, q->getBroker()),
haBroker(hb),
+ brokerInfo(hb.getBrokerInfo()),
logPrefix("Backup of "+q->getName()+": "),
- queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false),
+ queue(q), link(l), subscribed(false),
settings(hb.getSettings()), destroyed(false),
nextId(0), maxId(0)
{
- QPID_LOG(debug, logPrefix << "Created");
args.setString(QPID_REPLICATE, printable(NONE).str());
Uuid uuid(true);
bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
@@ -122,12 +119,18 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
args.setString(QPID_REPLICATE, printable(NONE).str());
setArgs(args);
if (q->isAutoDelete()) q->markInUse();
+
+ dispatch[DequeueEvent::KEY] =
+ boost::bind(&QueueReplicator::dequeueEvent, this, _1, _2);
+ dispatch[IdEvent::KEY] =
+ boost::bind(&QueueReplicator::idEvent, this, _1, _2);
}
// This must be called immediately after the constructor.
// It has to be separate so we can call shared_from_this().
void QueueReplicator::activate() {
Mutex::ScopedLock l(lock);
+ QPID_LOG(debug, logPrefix << "Created");
if (!queue) return; // Already destroyed
// Enable callback to route()
@@ -224,44 +227,57 @@ template <class T> T decodeContent(Message& m) {
}
}
-void QueueReplicator::dequeue(const ReplicationIdSet& dequeues, Mutex::ScopedLock&) {
- QPID_LOG(trace, logPrefix << "Dequeue " << dequeues);
+void QueueReplicator::dequeueEvent(const string& data, Mutex::ScopedLock&) {
+ DequeueEvent e;
+ decodeStr(data, e);
+ QPID_LOG(trace, logPrefix << "Dequeue " << e.ids);
//TODO: should be able to optimise the following
- for (ReplicationIdSet::iterator i = dequeues.begin(); i != dequeues.end(); ++i) {
+ for (ReplicationIdSet::iterator i = e.ids.begin(); i != e.ids.end(); ++i) {
PositionMap::iterator j = positions.find(*i);
if (j != positions.end()) queue->dequeueMessageAt(j->second);
}
}
// Called in connection thread of the queues bridge to primary.
-void QueueReplicator::route(Deliverable& msg)
+void QueueReplicator::route(Deliverable& deliverable)
{
try {
Mutex::ScopedLock l(lock);
if (destroyed) return;
- const std::string& key = msg.getMessage().getRoutingKey();
- if (!isEventKey(key)) { // Replicated message
+ broker::Message& message(deliverable.getMessage());
+ string key(message.getRoutingKey());
+ if (!isEventKey(message.getRoutingKey())) {
ReplicationId id = nextId++;
maxId = std::max(maxId, id);
- msg.getMessage().setReplicationId(id);
- msg.deliverTo(queue);
+ message.setReplicationId(id);
+ deliver(message);
QueuePosition position = queue->getPosition();
positions[id] = position;
QPID_LOG(trace, logPrefix << "Enqueued " << LogMessageId(*queue,position,id));
}
- else if (key == DEQUEUE_EVENT_KEY) {
- dequeue(decodeContent<ReplicationIdSet>(msg.getMessage()), l);
+ else {
+ DispatchMap::iterator i = dispatch.find(key);
+ if (i == dispatch.end()) {
+ QPID_LOG(info, logPrefix << "Ignoring unknown event: " << key);
+ }
+ else {
+ (i->second)(message.getContent(), l);
+ }
}
- else if (key == ID_EVENT_KEY) {
- nextId = decodeContent<ReplicationId>(msg.getMessage());
- }
- // Ignore unknown event keys, may be introduced in later versions.
}
catch (const std::exception& e) {
haBroker.shutdown(QPID_MSG(logPrefix << "Replication failed: " << e.what()));
}
}
+void QueueReplicator::deliver(const broker::Message& m) {
+ queue->deliver(m);
+}
+
+void QueueReplicator::idEvent(const string& data, Mutex::ScopedLock&) {
+ nextId = decodeStr<IdEvent>(data).id;
+}
+
ReplicationId QueueReplicator::getMaxId() {
Mutex::ScopedLock l(lock);
return maxId;
@@ -273,4 +289,5 @@ bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const
bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; }
std::string QueueReplicator::getType() const { return TYPE_NAME; }
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/ha/QueueReplicator.h b/cpp/src/qpid/ha/QueueReplicator.h
index 811ddba256..90f38ce7e1 100644
--- a/cpp/src/qpid/ha/QueueReplicator.h
+++ b/cpp/src/qpid/ha/QueueReplicator.h
@@ -26,6 +26,7 @@
#include "hash.h"
#include "qpid/broker/Exchange.h"
#include <boost/enable_shared_from_this.hpp>
+#include <boost/function.hpp>
#include <iosfwd>
namespace qpid {
@@ -56,23 +57,19 @@ class QueueReplicator : public broker::Exchange,
public boost::enable_shared_from_this<QueueReplicator>
{
public:
- static const std::string DEQUEUE_EVENT_KEY;
- static const std::string ID_EVENT_KEY;
static const std::string QPID_SYNC_FREQUENCY;
+ static const std::string REPLICATOR_PREFIX;
static std::string replicatorName(const std::string& queueName);
static bool isReplicatorName(const std::string&);
- /** Test if a string is an event key */
- static bool isEventKey(const std::string key);
-
QueueReplicator(HaBroker&,
boost::shared_ptr<broker::Queue> q,
boost::shared_ptr<broker::Link> l);
~QueueReplicator();
- void activate(); // Must be called immediately after constructor.
+ void activate(); // Must be called immediately after constructor.
std::string getType() const;
bool bind(boost::shared_ptr<broker::Queue
@@ -89,24 +86,36 @@ class QueueReplicator : public broker::Exchange,
ReplicationId getMaxId();
- private:
- typedef qpid::sys::unordered_map<ReplicationId, QueuePosition, TrivialHasher<int32_t> > PositionMap;
+ protected:
+ typedef boost::function<void(const std::string&, sys::Mutex::ScopedLock&)> DispatchFn;
+ typedef qpid::sys::unordered_map<std::string, DispatchFn> DispatchMap;
+
+ virtual void deliver(const broker::Message&);
+
+ sys::Mutex lock;
+ HaBroker& haBroker;
+ const BrokerInfo brokerInfo;
+ DispatchMap dispatch;
+ private:
+ typedef qpid::sys::unordered_map<
+ ReplicationId, QueuePosition, TrivialHasher<int32_t> > PositionMap;
class ErrorListener;
class QueueObserver;
void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
void destroy(); // Called when the queue is destroyed.
- void dequeue(const ReplicationIdSet&, sys::Mutex::ScopedLock&);
- HaBroker& haBroker;
+ // Dispatch functions
+ void dequeueEvent(const std::string& data, sys::Mutex::ScopedLock&);
+ void idEvent(const std::string& data, sys::Mutex::ScopedLock&);
+
std::string logPrefix;
std::string bridgeName;
- sys::Mutex lock;
boost::shared_ptr<broker::Queue> queue;
boost::shared_ptr<broker::Link> link;
boost::shared_ptr<broker::Bridge> bridge;
- BrokerInfo brokerInfo;
+
bool subscribed;
const Settings& settings;
bool destroyed;
diff --git a/cpp/src/qpid/ha/RemoteBackup.cpp b/cpp/src/qpid/ha/RemoteBackup.cpp
index e55d415972..776a584bc8 100644
--- a/cpp/src/qpid/ha/RemoteBackup.cpp
+++ b/cpp/src/qpid/ha/RemoteBackup.cpp
@@ -20,6 +20,7 @@
*/
#include "RemoteBackup.h"
#include "QueueGuard.h"
+#include "TxReplicator.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/Queue.h"
@@ -65,6 +66,8 @@ bool RemoteBackup::isReady() {
}
void RemoteBackup::catchupQueue(const QueuePtr& q, bool createGuard) {
+ // Ignore transaction queues for purposes of catch-up calculation
+ if (TxReplicator::isTxQueue(q->getName())) return;
if (replicationTest.getLevel(*q) == ALL) {
QPID_LOG(debug, logPrefix << "Catch-up queue"
<< (createGuard ? " and guard" : "") << ": " << q->getName());
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 2001ec5332..9f464f8066 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -19,7 +19,7 @@
*
*/
-#include "makeMessage.h"
+#include "Event.h"
#include "IdSetter.h"
#include "QueueGuard.h"
#include "QueueReplicator.h"
@@ -36,6 +36,7 @@
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
#include "qpid/types/Uuid.h"
+#include <boost/pointer_cast.hpp>
#include <sstream>
@@ -45,6 +46,7 @@ namespace ha {
using namespace framing;
using namespace broker;
using namespace std;
+using namespace boost;
using sys::Mutex;
using broker::amqp_0_10::MessageTransfer;
@@ -111,7 +113,8 @@ ReplicatingSubscription::ReplicatingSubscription(
) : ConsumerImpl(parent, name, queue, ack, REPLICATOR, exclusive, tag,
resumeId, resumeTtl, arguments),
position(0), ready(false), cancelled(false),
- haBroker(hb)
+ haBroker(hb),
+ primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole()))
{
try {
FieldTable ft;
@@ -137,8 +140,6 @@ ReplicatingSubscription::ReplicatingSubscription(
}
// If there's already a guard (we are in failover) use it, else create one.
- boost::shared_ptr<Primary> primary =
- boost::dynamic_pointer_cast<Primary>(haBroker.getRole());
if (primary) guard = primary->getGuard(queue, info);
if (!guard) guard.reset(new QueueGuard(*queue, info));
@@ -163,7 +164,6 @@ ReplicatingSubscription::ReplicatingSubscription(
sys::Mutex::ScopedLock l(lock); // Concurrent calls to dequeued()
dequeues += initDequeues; // Messages on backup that are not on primary.
skip = backupIds - initDequeues; // Messages already on the backup.
-
// Queue front is moving but we know this subscriptions will start at a
// position >= front so if front is safe then position must be.
position = front;
@@ -191,6 +191,7 @@ ReplicatingSubscription::~ReplicatingSubscription() {}
//
void ReplicatingSubscription::initialize() {
try {
+ if (primary) primary->addReplica(*this);
Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently.
// Send initial dequeues to the backup.
// There must be a shared_ptr(this) when sending.
@@ -218,9 +219,8 @@ bool ReplicatingSubscription::deliver(
try {
bool result = false;
if (skip.contains(id)) {
+ QPID_LOG(trace, logPrefix << "Skip " << LogMessageId(*getQueue(), m));
skip -= id;
- QPID_LOG(trace, logPrefix << "On backup, skip " <<
- LogMessageId(*getQueue(), m));
guard->complete(id); // This will never be acknowledged.
notify();
result = true;
@@ -240,17 +240,12 @@ bool ReplicatingSubscription::deliver(
}
}
-/**
- *@param position: must be <= last position seen by subscription.
- */
void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) {
if (!ready && isGuarded(l) && unready.empty()) {
ready = true;
sys::Mutex::ScopedUnlock u(lock);
// Notify Primary that a subscription is ready.
QPID_LOG(debug, logPrefix << "Caught up");
- boost::shared_ptr<Primary> primary =
- boost::dynamic_pointer_cast<Primary>(haBroker.getRole());
if (primary) primary->readyReplica(*this);
}
}
@@ -264,6 +259,7 @@ void ReplicatingSubscription::cancel()
cancelled = true;
}
QPID_LOG(debug, logPrefix << "Cancelled");
+ if (primary) primary->removeReplica(*this);
getQueue()->removeObserver(observer);
guard->cancel();
ConsumerImpl::cancel();
@@ -289,9 +285,7 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock& l)
{
if (dequeues.empty()) return;
QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
- string buffer = encodeStr(dequeues);
- dequeues.clear();
- sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l);
+ sendEvent(DequeueEvent(dequeues), l);
}
// Called after the message has been removed
@@ -311,23 +305,16 @@ void ReplicatingSubscription::dequeued(ReplicationId id)
// Called with lock held. Called in subscription's connection thread.
void ReplicatingSubscription::sendIdEvent(ReplicationId pos, Mutex::ScopedLock& l)
{
- sendEvent(QueueReplicator::ID_EVENT_KEY, encodeStr(pos), l);
+ sendEvent(IdEvent(pos), l);
}
-void ReplicatingSubscription::sendEvent(const std::string& key,
- const std::string& buffer,
- Mutex::ScopedLock&)
+void ReplicatingSubscription::sendEvent(const Event& event, Mutex::ScopedLock&)
{
Mutex::ScopedUnlock u(lock);
- broker::Message message = makeMessage(buffer);
- MessageTransfer& transfer = MessageTransfer::get(message);
- DeliveryProperties* props =
- transfer.getFrames().getHeaders()->get<DeliveryProperties>(true);
- props->setRoutingKey(key);
// Send the event directly to the base consumer implementation. The dummy
// consumer prevents acknowledgements being handled, which is what we want
// for events
- ConsumerImpl::deliver(QueueCursor(), message, boost::shared_ptr<Consumer>());
+ ConsumerImpl::deliver(QueueCursor(), event.message(), boost::shared_ptr<Consumer>());
}
// Called in subscription's connection thread.
@@ -346,4 +333,9 @@ bool ReplicatingSubscription::doDispatch()
}
}
+void ReplicatingSubscription::addSkip(const ReplicationIdSet& ids) {
+ Mutex::ScopedLock l(lock);
+ skip += ids;
+}
+
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.h b/cpp/src/qpid/ha/ReplicatingSubscription.h
index c202089e91..8a4a48bb4e 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -44,6 +44,8 @@ class Buffer;
namespace ha {
class QueueGuard;
class HaBroker;
+class Event;
+class Primary;
/**
* A susbcription that replicates to a remote backup.
@@ -118,6 +120,9 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
BrokerInfo getBrokerInfo() const { return info; }
+ /** Skip replicating enqueue of of ids. */
+ void addSkip(const ReplicationIdSet& ids);
+
protected:
bool doDispatch();
@@ -128,7 +133,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
std::string logPrefix;
QueuePosition position;
ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event.
- ReplicationIdSet skip; // Messages already on backup will be skipped.
+ ReplicationIdSet skip; // Skip enqueues: messages already on backup and tx enqueues.
ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged.
bool ready;
bool cancelled;
@@ -136,12 +141,13 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
boost::shared_ptr<QueueGuard> guard;
HaBroker& haBroker;
boost::shared_ptr<QueueObserver> observer;
+ boost::shared_ptr<Primary> primary;
bool isGuarded(sys::Mutex::ScopedLock&);
void dequeued(ReplicationId);
void sendDequeueEvent(sys::Mutex::ScopedLock&);
void sendIdEvent(ReplicationId, sys::Mutex::ScopedLock&);
- void sendEvent(const std::string& key, const std::string& data, sys::Mutex::ScopedLock&);
+ void sendEvent(const Event&, sys::Mutex::ScopedLock&);
void checkReady(sys::Mutex::ScopedLock&);
friend class Factory;
};
diff --git a/cpp/src/qpid/ha/StatusCheck.cpp b/cpp/src/qpid/ha/StatusCheck.cpp
index d73f2cf8b5..11f65aa574 100644
--- a/cpp/src/qpid/ha/StatusCheck.cpp
+++ b/cpp/src/qpid/ha/StatusCheck.cpp
@@ -97,16 +97,10 @@ void StatusCheckThread::run() {
QPID_LOG(debug, statusCheck.logPrefix << "Status of " << url << ": " << status);
}
} catch(const exception& error) {
- QPID_LOG(info, statusCheck.logPrefix << "Checking status of " << url << ": " << error.what());
- }
- try { c.close(); }
- catch(const exception&) {
- QPID_LOG(warning, statusCheck.logPrefix << "Error closing status check connection to " << url);
- }
- try { c.close(); }
- catch(const exception&) {
- QPID_LOG(warning, "Error closing status check connection to " << url);
+ QPID_LOG(info, statusCheck.logPrefix << "Error checking status of " << url
+ << ": " << error.what());
}
+ try { c.close(); } catch(...) {}
delete this;
}
diff --git a/cpp/src/qpid/ha/TxReplicator.cpp b/cpp/src/qpid/ha/TxReplicator.cpp
new file mode 100644
index 0000000000..31c68dfe45
--- /dev/null
+++ b/cpp/src/qpid/ha/TxReplicator.cpp
@@ -0,0 +1,192 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include "TxReplicator.h"
+#include "Role.h"
+#include "Backup.h"
+#include "BrokerReplicator.h"
+#include "Event.h"
+#include "HaBroker.h"
+#include "types.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/TxBuffer.h"
+#include "qpid/broker/TxAccept.h"
+#include "qpid/framing/BufferTypes.h"
+#include "qpid/log/Statement.h"
+#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
+#include <boost/make_shared.hpp>
+
+namespace qpid {
+namespace ha {
+
+using namespace std;
+using namespace boost;
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+namespace {
+const string QPID_HA(QPID_HA_PREFIX);
+const string TYPE_NAME(QPID_HA+"tx-queue-replicator");
+const string PREFIX(TRANSACTION_REPLICATOR_PREFIX);
+} // namespace
+
+
+bool TxReplicator::isTxQueue(const string& q) {
+ return startsWith(q, PREFIX);
+}
+
+string TxReplicator::getTxId(const string& q) {
+ assert(isTxQueue(q));
+ return q.substr(PREFIX.size());
+}
+
+string TxReplicator::getType() const { return TYPE_NAME; }
+
+TxReplicator::TxReplicator(
+ HaBroker& hb,
+ const boost::shared_ptr<broker::Queue>& txQueue,
+ const boost::shared_ptr<broker::Link>& link) :
+ QueueReplicator(hb, txQueue, link),
+ id(getTxId(txQueue->getName())),
+ txBuffer(new broker::TxBuffer),
+ broker(hb.getBroker()),
+ store(broker.hasStore() ? &broker.getStore() : 0),
+ dequeueState(hb.getBroker().getQueues())
+{
+ logPrefix = "Backup of transaction "+id+": ";
+
+ if (!store) throw Exception(QPID_MSG(logPrefix << "No message store loaded."));
+ boost::shared_ptr<Backup> backup = dynamic_pointer_cast<Backup>(hb.getRole());
+ if (!backup) throw Exception(QPID_MSG(logPrefix << "Broker is not in backup mode."));
+ brokerReplicator = backup->getBrokerReplicator();
+
+ // Dispatch transaction events.
+ dispatch[TxEnqueueEvent::KEY] =
+ boost::bind(&TxReplicator::enqueue, this, _1, _2);
+ dispatch[TxDequeueEvent::KEY] =
+ boost::bind(&TxReplicator::dequeue, this, _1, _2);
+ dispatch[TxPrepareEvent::KEY] =
+ boost::bind(&TxReplicator::prepare, this, _1, _2);
+ dispatch[TxCommitEvent::KEY] =
+ boost::bind(&TxReplicator::commit, this, _1, _2);
+ dispatch[TxRollbackEvent::KEY] =
+ boost::bind(&TxReplicator::rollback, this, _1, _2);
+}
+
+void TxReplicator::deliver(const broker::Message& m_) {
+ // Deliver message to the target queue, not the tx-queue.
+ broker::Message m(m_);
+ m.setReplicationId(enq.id); // Use replicated id.
+ boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(enq.queue);
+ QPID_LOG(trace, logPrefix << "Deliver " << LogMessageId(*queue, m));
+ DeliverableMessage dm(m, txBuffer.get());
+ dm.deliverTo(queue);
+}
+
+void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) {
+ TxEnqueueEvent e;
+ decodeStr(data, e);
+ QPID_LOG(trace, logPrefix << "Enqueue: " << e);
+ enq = e;
+}
+
+void TxReplicator::dequeue(const string& data, sys::Mutex::ScopedLock&) {
+ TxDequeueEvent e;
+ decodeStr(data, e);
+ QPID_LOG(trace, logPrefix << "Dequeue: " << e);
+ // NOTE: Backup does not see transactional dequeues until the transaction is
+ // prepared, then they are all receieved before the prepare event.
+ // We collect the events here so we can do a single scan of the queue in prepare.
+ dequeueState.add(e);
+}
+
+void TxReplicator::DequeueState::add(const TxDequeueEvent& event) {
+ events[event.queue] += event.id;
+}
+
+// Use this function as a seek() predicate to find the dequeued messages.
+bool TxReplicator::DequeueState::addRecord(
+ const broker::Message& m, const boost::shared_ptr<Queue>& queue,
+ const ReplicationIdSet& rids)
+{
+ if (rids.contains(m.getReplicationId())) {
+ // FIXME aconway 2013-07-24:
+ // - Do we need to acquire before creating a DR?
+ // - Are the parameters to DeliveryRecord ok?
+ DeliveryRecord dr(cursor, m.getSequence(), m.getReplicationId(), queue,
+ string() /*tag*/,
+ boost::shared_ptr<Consumer>(),
+ true /*acquired*/,
+ false /*accepted*/,
+ false /*credit.isWindowMode()*/,
+ 0 /*credit*/);
+ // Fake record ids, unique within this transaction.
+ dr.setId(nextId++);
+ records.push_back(dr);
+ recordIds += dr.getId();
+ }
+ return false;
+}
+
+void TxReplicator::DequeueState::addRecords(const EventMap::value_type& entry) {
+ // Process all the dequeues for a single queue, in one pass of seek()
+ boost::shared_ptr<broker::Queue> q = queues.get(entry.first);
+ q->seek(cursor, boost::bind(&TxReplicator::DequeueState::addRecord,
+ this, _1, q, entry.second));
+}
+
+boost::shared_ptr<TxAccept> TxReplicator::DequeueState::makeAccept() {
+ for_each(events.begin(), events.end(),
+ boost::bind(&TxReplicator::DequeueState::addRecords, this, _1));
+ return make_shared<TxAccept>(cref(recordIds), ref(records));
+}
+
+void TxReplicator::prepare(const string&, sys::Mutex::ScopedLock&) {
+ QPID_LOG(trace, logPrefix << "Prepare");
+ txBuffer->enlist(dequeueState.makeAccept());
+ context = store->begin();
+ txBuffer->prepare(context.get());
+ // FIXME aconway 2013-07-26: notify the primary of prepare outcome.
+}
+
+void TxReplicator::commit(const string&, sys::Mutex::ScopedLock&) {
+ QPID_LOG(trace, logPrefix << "Commit");
+ if (context.get()) store->commit(*context);
+ txBuffer->commit();
+ end();
+}
+
+void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock&) {
+ QPID_LOG(trace, logPrefix << "Rollback");
+ if (context.get()) store->abort(*context);
+ txBuffer->rollback();
+ end();
+}
+
+void TxReplicator::end(){
+ // FIXME aconway 2013-07-26: destroying the txqueue (auto-delete?) will
+ // destroy this via QueueReplicator::destroy
+}
+}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/TxReplicator.h b/cpp/src/qpid/ha/TxReplicator.h
new file mode 100644
index 0000000000..c4df8c13f9
--- /dev/null
+++ b/cpp/src/qpid/ha/TxReplicator.h
@@ -0,0 +1,123 @@
+#ifndef QPID_HA_TRANSACTIONREPLICATOR_H
+#define QPID_HA_TRANSACTIONREPLICATOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "QueueReplicator.h"
+#include "Event.h"
+#include "qpid/broker/DeliveryRecord.h"
+#include "qpid/broker/TransactionalStore.h"
+#include "qpid/sys/Mutex.h"
+
+namespace qpid {
+
+namespace broker {
+class TxBuffer;
+class TxAccept;
+class DtxBuffer;
+class Broker;
+class MessageStore;
+}
+
+namespace ha {
+class BrokerReplicator;
+
+/**
+ * Exchange created on a backup broker to replicate a transaction on the primary.
+ *
+ * Subscribes to a tx-queue like a normal queue but puts replicated messages and
+ * transaction events into a local TxBuffer.
+ *
+ * THREAD SAFE: Called in different connection threads.
+ */
+class TxReplicator : public QueueReplicator {
+ public:
+ typedef boost::shared_ptr<broker::Queue> QueuePtr;
+ typedef boost::shared_ptr<broker::Link> LinkPtr;
+
+ static bool isTxQueue(const std::string& queue);
+ static std::string getTxId(const std::string& queue);
+
+ TxReplicator(HaBroker&, const QueuePtr& txQueue, const LinkPtr& link);
+
+ std::string getType() const;
+
+ protected:
+
+ void deliver(const broker::Message&);
+
+ private:
+
+ typedef void (TxReplicator::*DispatchFunction)(
+ const std::string&, sys::Mutex::ScopedLock&);
+ typedef qpid::sys::unordered_map<std::string, DispatchFunction> DispatchMap;
+ typedef qpid::sys::unordered_map<std::string, ReplicationIdSet> DequeueMap;
+
+ void enqueue(const std::string& data, sys::Mutex::ScopedLock&);
+ void dequeue(const std::string& data, sys::Mutex::ScopedLock&);
+ void prepare(const std::string& data, sys::Mutex::ScopedLock&);
+ void commit(const std::string& data, sys::Mutex::ScopedLock&);
+ void rollback(const std::string& data, sys::Mutex::ScopedLock&);
+ void end();
+
+ std::string logPrefix;
+ const std::string id;
+ TxEnqueueEvent enq; // Enqueue data for next deliver.
+ boost::shared_ptr<broker::TxBuffer> txBuffer;
+ broker::Broker& broker;
+ broker::MessageStore* store;
+ boost::shared_ptr<BrokerReplicator> brokerReplicator;
+ std::auto_ptr<broker::TransactionContext> context;
+
+ // Class to process dequeues and create DeliveryRecords to populate a
+ // TxAccept.
+ class DequeueState {
+ public:
+ DequeueState(broker::QueueRegistry& qr) : queues(qr) {}
+ void add(const TxDequeueEvent&);
+ boost::shared_ptr<broker::TxAccept> makeAccept();
+
+ private:
+ // Delivery record IDs are command IDs from the session.
+ // On a backup we will just fake these Ids.
+ typedef framing::SequenceNumber Id;
+ typedef framing::SequenceSet IdSet;
+ typedef qpid::sys::unordered_map<std::string, ReplicationIdSet> EventMap;
+
+ bool addRecord(const broker::Message& m, const boost::shared_ptr<broker::Queue>&,
+ const ReplicationIdSet& );
+ void addRecords(const DequeueMap::value_type& entry);
+
+ broker::QueueRegistry& queues;
+ EventMap events;
+ broker::DeliveryRecords records;
+ broker::QueueCursor cursor;
+ framing::SequenceNumber nextId;
+ IdSet recordIds;
+ };
+ DequeueState dequeueState;
+};
+
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_TRANSACTIONREPLICATOR_H*/
diff --git a/cpp/src/qpid/ha/makeMessage.h b/cpp/src/qpid/ha/makeMessage.h
deleted file mode 100644
index 4427cdd948..0000000000
--- a/cpp/src/qpid/ha/makeMessage.h
+++ /dev/null
@@ -1,66 +0,0 @@
-#ifndef QPID_HA_MAKEMESSAGE_H
-#define QPID_HA_MAKEMESSAGE_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/broker/Message.h"
-#include "qpid/framing/Buffer.h"
-#include <string>
-
-/** Utilities for creating messages used by HA internally. */
-
-namespace qpid {
-namespace framing {
-class Buffer;
-}
-namespace ha {
-
-/**
- * Create internal messages used by HA components.
- */
-broker::Message makeMessage(
- const framing::Buffer& content,
- const std::string& destination=std::string()
-);
-
-broker::Message makeMessage(const std::string& content,
- const std::string& destination=std::string());
-
-/** Encode value as a string. */
-template <class T> std::string encodeStr(const T& value) {
- std::string encoded(value.encodedSize(), '\0');
- framing::Buffer buffer(&encoded[0], encoded.size());
- value.encode(buffer);
- return encoded;
-}
-
-/** Decode value from a string. */
-template <class T> T decodeStr(const std::string& encoded) {
- framing::Buffer buffer(const_cast<char*>(&encoded[0]), encoded.size());
- T value;
- value.decode(buffer);
- return value;
-}
-
-}} // namespace qpid::ha
-
-#endif /*!QPID_HA_MAKEMESSAGE_H*/
diff --git a/cpp/src/qpid/ha/types.cpp b/cpp/src/qpid/ha/types.cpp
index 2246355339..10d6bd4e3b 100644
--- a/cpp/src/qpid/ha/types.cpp
+++ b/cpp/src/qpid/ha/types.cpp
@@ -37,6 +37,14 @@ using namespace std;
const string QPID_REPLICATE("qpid.replicate");
const string QPID_HA_UUID("qpid.ha-uuid");
+const char* QPID_HA_PREFIX = "qpid.ha-";
+const char* QUEUE_REPLICATOR_PREFIX = "qpid.ha-q:";
+const char* TRANSACTION_REPLICATOR_PREFIX = "qpid.ha-tx:";
+
+bool startsWith(const string& name, const string& prefix) {
+ return name.compare(0, prefix.size(), prefix) == 0;
+}
+
string EnumBase::str() const {
assert(value < count);
return names[value];
diff --git a/cpp/src/qpid/ha/types.h b/cpp/src/qpid/ha/types.h
index 9a7e97c66d..3af095d470 100644
--- a/cpp/src/qpid/ha/types.h
+++ b/cpp/src/qpid/ha/types.h
@@ -105,10 +105,17 @@ inline bool isPrimary(BrokerStatus s) {
inline bool isBackup(BrokerStatus s) { return !isPrimary(s); }
-// String constants.
+// String constants, defined as char* to avoid initialization order problems.
extern const std::string QPID_REPLICATE;
extern const std::string QPID_HA_UUID;
+// Strings used as prefixes, defined as char* to avoid link order problems.
+extern const char* QPID_HA_PREFIX;
+extern const char* QUEUE_REPLICATOR_PREFIX;
+extern const char* TRANSACTION_REPLICATOR_PREFIX;
+
+bool startsWith(const std::string& name, const std::string& prefix);
+
/** Define IdSet type, not a typedef so we can overload operator << */
class IdSet : public std::set<types::Uuid> {};
diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h
index bc23867ee1..b7b0f9d34b 100644
--- a/cpp/src/tests/BrokerFixture.h
+++ b/cpp/src/tests/BrokerFixture.h
@@ -42,11 +42,45 @@ namespace tests {
struct BrokerFixture : private boost::noncopyable {
typedef qpid::broker::Broker Broker;
typedef boost::intrusive_ptr<Broker> BrokerPtr;
+ typedef std::vector<std::string> Args;
BrokerPtr broker;
+ uint16_t port;
qpid::sys::Thread brokerThread;
- BrokerFixture(Broker::Options opts=Broker::Options(), bool enableMgmt=false) {
+ BrokerFixture(const Args& args=Args(), const Broker::Options& opts=Broker::Options(),
+ bool isExternalPort_=false, uint16_t externalPort_=0)
+ {
+ init(args, opts, isExternalPort_, externalPort_);
+ }
+
+ BrokerFixture(const Broker::Options& opts,
+ bool isExternalPort_=false, uint16_t externalPort_=0)
+ {
+ init(Args(), opts, isExternalPort_, externalPort_);
+ }
+
+ void shutdownBroker() {
+ if (broker) {
+ broker->shutdown();
+ brokerThread.join();
+ broker = BrokerPtr();
+ }
+ }
+
+ ~BrokerFixture() { shutdownBroker(); }
+
+ /** Open a connection to the broker. */
+ void open(qpid::client::Connection& c) {
+ c.open("localhost", getPort());
+ }
+
+ uint16_t getPort() { return port; }
+
+ private:
+ void init(const Args& args, Broker::Options opts,
+ bool isExternalPort=false, uint16_t externalPort=0)
+ {
// Keep the tests quiet unless logging env. vars have been set by user.
if (!::getenv("QPID_LOG_ENABLE") && !::getenv("QPID_TRACE")) {
qpid::log::Options logOpts;
@@ -55,38 +89,28 @@ struct BrokerFixture : private boost::noncopyable {
logOpts.selectors.push_back("error+");
qpid::log::Logger::instance().configure(logOpts);
}
+ // Default options, may be over-ridden when we parse args.
opts.port=0;
opts.listenInterfaces.push_back("127.0.0.1");
- // Management doesn't play well with multiple in-process brokers.
- opts.enableMgmt=enableMgmt;
opts.workerThreads=1;
opts.dataDir="";
opts.auth=false;
+
+ // Argument parsing
+ std::vector<const char*> argv(args.size());
+ std::transform(args.begin(), args.end(), argv.begin(),
+ boost::bind(&std::string::c_str, _1));
+ Plugin::addOptions(opts);
+ opts.parse(argv.size(), &argv[0]);
broker = Broker::create(opts);
// TODO aconway 2007-12-05: At one point BrokerFixture
// tests could hang in Connection ctor if the following
// line is removed. This may not be an issue anymore.
broker->accept();
- broker->getPort(qpid::broker::Broker::TCP_TRANSPORT);
+ if (isExternalPort) port = externalPort;
+ else port = broker->getPort(qpid::broker::Broker::TCP_TRANSPORT);
brokerThread = qpid::sys::Thread(*broker);
};
-
- void shutdownBroker() {
- if (broker) {
- broker->shutdown();
- brokerThread.join();
- broker = BrokerPtr();
- }
- }
-
- ~BrokerFixture() { shutdownBroker(); }
-
- /** Open a connection to the broker. */
- void open(qpid::client::Connection& c) {
- c.open("localhost", broker->getPort(qpid::broker::Broker::TCP_TRANSPORT));
- }
-
- uint16_t getPort() { return broker->getPort(qpid::broker::Broker::TCP_TRANSPORT); }
};
/** Connection that opens in its constructor */
@@ -125,8 +149,8 @@ template <class ConnectionType, class SessionType=qpid::client::Session>
struct SessionFixtureT : BrokerFixture, ClientT<ConnectionType,SessionType> {
SessionFixtureT(Broker::Options opts=Broker::Options()) :
- BrokerFixture(opts),
- ClientT<ConnectionType,SessionType>(broker->getPort(qpid::broker::Broker::TCP_TRANSPORT))
+ BrokerFixture(BrokerFixture::Args(), opts),
+ ClientT<ConnectionType,SessionType>(getPort())
{}
};
diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt
index a1d29c23a3..94d965eb7a 100644
--- a/cpp/src/tests/CMakeLists.txt
+++ b/cpp/src/tests/CMakeLists.txt
@@ -379,24 +379,7 @@ add_test (queue_redirect ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_queue_redirect
add_library(test_store MODULE test_store.cpp)
target_link_libraries (test_store qpidbroker qpidcommon)
-set_target_properties (test_store PROPERTIES
- COMPILE_DEFINITIONS _IN_QPID_BROKER
- PREFIX "")
+set_target_properties (test_store PROPERTIES PREFIX "" COMPILE_DEFINITIONS _IN_QPID_BROKER)
add_library (dlclose_noop MODULE dlclose_noop.c)
-#libdlclose_noop_la_LDFLAGS = -module -rpath $(abs_builddir)
-
-#CLEANFILES+=valgrind.out *.log *.vglog* dummy_test $(unit_wrappers)
-#
-## Longer running stability tests, not run by default check: target.
-## Not run under valgrind, too slow
-#LONG_TESTS=fanout_perftest shared_perftest multiq_perftest topic_perftest
-#EXTRA_DIST+=$(LONG_TESTS) run_perftest
-#check-long:
-# $(MAKE) check TESTS="start_broker $(LONG_TESTS) stop_broker" VALGRIND=
-
-#
-# legacystore
-#
-# add_subdirectory(legacystore)
diff --git a/cpp/src/tests/MessagingFixture.h b/cpp/src/tests/MessagingFixture.h
index 2312a87e9d..c18869a7c3 100644
--- a/cpp/src/tests/MessagingFixture.h
+++ b/cpp/src/tests/MessagingFixture.h
@@ -115,6 +115,7 @@ struct MessagingFixture : public BrokerFixture
(boost::format("amqp:tcp:localhost:%1%") % (port)).str());
connection.open();
return connection;
+
}
/** Open a connection to the broker. */
@@ -231,9 +232,10 @@ inline void receive(messaging::Receiver& receiver, uint count = 1, uint start =
class MethodInvoker
{
public:
- MethodInvoker(messaging::Session& session) : replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"),
- sender(session.createSender("qmf.default.direct/broker")),
- receiver(session.createReceiver(replyTo)) {}
+ MethodInvoker(messaging::Session session) :
+ replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"),
+ sender(session.createSender("qmf.default.direct/broker")),
+ receiver(session.createReceiver(replyTo)) {}
void createExchange(const std::string& name, const std::string& type, bool durable=false)
{
@@ -292,11 +294,14 @@ class MethodInvoker
methodRequest("delete", params);
}
- void methodRequest(const std::string& method, const Variant::Map& inParams, Variant::Map* outParams = 0)
+ void methodRequest(
+ const std::string& method,
+ const Variant::Map& inParams, Variant::Map* outParams = 0,
+ const std::string& objectName="org.apache.qpid.broker:broker:amqp-broker")
{
Variant::Map content;
Variant::Map objectId;
- objectId["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
+ objectId["_object_name"] = objectName;;
content["_object_id"] = objectId;
content["_method_name"] = method;
content["_arguments"] = inParams;
diff --git a/cpp/src/tests/TransactionObserverTest.cpp b/cpp/src/tests/TransactionObserverTest.cpp
index c284417e25..f570837ccf 100644
--- a/cpp/src/tests/TransactionObserverTest.cpp
+++ b/cpp/src/tests/TransactionObserverTest.cpp
@@ -89,6 +89,7 @@ Session simpleTxTransaction(MessagingFixture& fix) {
// Transaction with 1 enqueue and 1 dequeue.
Session txSession = fix.connection.createTransactionalSession();
BOOST_CHECK_EQUAL("foo", txSession.createReceiver("q1").fetch().getContent());
+ txSession.acknowledge();
txSession.createSender("q2;{create:always}").send(msg("bar"));
return txSession;
}
diff --git a/cpp/src/tests/brokertest.py b/cpp/src/tests/brokertest.py
index 286beb0258..b07a5b5d11 100644
--- a/cpp/src/tests/brokertest.py
+++ b/cpp/src/tests/brokertest.py
@@ -244,14 +244,10 @@ class Broker(Popen):
def __str__(self): return "Broker<%s %s :%d>"%(self.log, self.pname, self.port())
- def find_log(self):
- self.log = "%03d:%s.log" % (Broker._log_count, self.name)
- Broker._log_count += 1
-
def get_log(self):
return os.path.abspath(self.log)
- def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None, show_cmd=False):
+ def __init__(self, test, args=[], test_store=False, name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None, show_cmd=False):
"""Start a broker daemon. name determines the data-dir and log
file names."""
@@ -273,11 +269,18 @@ class Broker(Popen):
else:
self.name = "broker%d" % Broker._broker_count
Broker._broker_count += 1
- self.find_log()
+
+ self.log = "%03d:%s.log" % (Broker._log_count, self.name)
+ self.store_log = "%03d:%s.store.log" % (Broker._log_count, self.name)
+ Broker._log_count += 1
+
cmd += ["--log-to-file", self.log]
cmd += ["--log-to-stderr=no"]
cmd += ["--log-enable=%s"%(log_level or "info+") ]
+ if test_store: cmd += ["--load-module", BrokerTest.test_store_lib,
+ "--test-store-events", self.store_log]
+
self.datadir = self.name
cmd += ["--data-dir", self.datadir]
if show_cmd: print cmd
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
new file mode 100644
index 0000000000..62bbf513bd
--- /dev/null
+++ b/cpp/src/tests/cluster_test.cpp
@@ -0,0 +1,1231 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "test_tools.h"
+#include "unit_test.h"
+#include "ForkedBroker.h"
+#include "BrokerFixture.h"
+#include "ClusterFixture.h"
+
+#include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionSettings.h"
+#include "qpid/client/ConnectionAccess.h"
+#include "qpid/client/Session.h"
+#include "qpid/client/FailoverListener.h"
+#include "qpid/client/FailoverManager.h"
+#include "qpid/client/QueueOptions.h"
+#include "qpid/cluster/Cluster.h"
+#include "qpid/cluster/Cpg.h"
+#include "qpid/cluster/UpdateClient.h"
+#include "qpid/framing/AMQBody.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/enum.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/log/Logger.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Thread.h"
+
+#include <boost/bind.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/assign.hpp>
+
+#include <string>
+#include <iostream>
+#include <fstream>
+#include <iterator>
+#include <vector>
+#include <set>
+#include <algorithm>
+#include <iterator>
+
+
+using namespace std;
+using namespace qpid;
+using namespace qpid::cluster;
+using namespace qpid::framing;
+using namespace qpid::client;
+using namespace boost::assign;
+using broker::Broker;
+using boost::shared_ptr;
+
+namespace qpid {
+namespace tests {
+
+QPID_AUTO_TEST_SUITE(cluster_test)
+
+bool durableFlag = std::getenv("STORE_LIB") != 0;
+
+void prepareArgs(ClusterFixture::Args& args, const bool durableFlag = false) {
+ ostringstream clusterLib;
+ clusterLib << getLibPath("CLUSTER_LIB");
+ args += "--auth", "no", "--no-module-dir", "--load-module", clusterLib.str();
+ if (durableFlag)
+ args += "--load-module", getLibPath("STORE_LIB"), "TMP_DATA_DIR";
+ else
+ args += "--no-data-dir";
+}
+
+ClusterFixture::Args prepareArgs(const bool durableFlag = false) {
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ return args;
+}
+
+// Timeout for tests that wait for messages
+const sys::Duration TIMEOUT=2*sys::TIME_SEC;
+
+
+ostream& operator<<(ostream& o, const cpg_name* n) {
+ return o << Cpg::str(*n);
+}
+
+ostream& operator<<(ostream& o, const cpg_address& a) {
+ return o << "(" << a.nodeid <<","<<a.pid<<","<<a.reason<<")";
+}
+
+template <class T>
+ostream& operator<<(ostream& o, const pair<T*, int>& array) {
+ o << "{ ";
+ ostream_iterator<cpg_address> i(o, " ");
+ copy(array.first, array.first+array.second, i);
+ o << "}";
+ return o;
+}
+
+template <class C> set<int> makeSet(const C& c) {
+ set<int> s;
+ copy(c.begin(), c.end(), inserter(s, s.begin()));
+ return s;
+}
+
+class Sender {
+ public:
+ Sender(boost::shared_ptr<ConnectionImpl> ci, uint16_t ch) : connection(ci), channel(ch) {}
+ void send(const AMQBody& body, bool firstSeg, bool lastSeg, bool firstFrame, bool lastFrame) {
+ AMQFrame f(body);
+ f.setChannel(channel);
+ f.setFirstSegment(firstSeg);
+ f.setLastSegment(lastSeg);
+ f.setFirstFrame(firstFrame);
+ f.setLastFrame(lastFrame);
+ connection->expand(f.encodedSize(), false);
+ connection->handle(f);
+ }
+
+ private:
+ boost::shared_ptr<ConnectionImpl> connection;
+ uint16_t channel;
+};
+
+int64_t getMsgSequence(const Message& m) {
+ return m.getMessageProperties().getApplicationHeaders().getAsInt64("qpid.msg_sequence");
+}
+
+Message ttlMessage(const string& data, const string& key, uint64_t ttl, bool durable = false) {
+ Message m(data, key);
+ m.getDeliveryProperties().setTtl(ttl);
+ if (durable) m.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
+ return m;
+}
+
+Message makeMessage(const string& data, const string& key, bool durable = false) {
+ Message m(data, key);
+ if (durable) m.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
+ return m;
+}
+
+vector<string> browse(Client& c, const string& q, int n) {
+ SubscriptionSettings browseSettings(
+ FlowControl::messageCredit(n),
+ ACCEPT_MODE_NONE,
+ ACQUIRE_MODE_NOT_ACQUIRED,
+ 0 // No auto-ack.
+ );
+ LocalQueue lq;
+ c.subs.subscribe(lq, q, browseSettings);
+ c.session.messageFlush(q);
+ vector<string> result;
+ for (int i = 0; i < n; ++i) {
+ Message m;
+ if (!lq.get(m, TIMEOUT))
+ break;
+ result.push_back(m.getData());
+ }
+ c.subs.getSubscription(q).cancel();
+ return result;
+}
+
+ConnectionSettings aclSettings(int port, const std::string& id) {
+ ConnectionSettings settings;
+ settings.port = port;
+ settings.mechanism = "PLAIN";
+ settings.username = id;
+ settings.password = id;
+ return settings;
+}
+
+// An illegal frame body
+struct PoisonPill : public AMQBody {
+ virtual uint8_t type() const { return 0xFF; }
+ virtual void encode(Buffer& ) const {}
+ virtual void decode(Buffer& , uint32_t=0) {}
+ virtual uint32_t encodedSize() const { return 0; }
+
+ virtual void print(std::ostream&) const {};
+ virtual void accept(AMQBodyConstVisitor&) const {};
+
+ virtual AMQMethodBody* getMethod() { return 0; }
+ virtual const AMQMethodBody* getMethod() const { return 0; }
+
+ /** Match if same type and same class/method ID for methods */
+ static bool match(const AMQBody& , const AMQBody& ) { return false; }
+ virtual boost::intrusive_ptr<AMQBody> clone() const { return new PoisonPill; }
+};
+
+QPID_AUTO_TEST_CASE(testBadClientData) {
+ // Ensure that bad data on a client connection closes the
+ // connection but does not stop the broker.
+ ClusterFixture::Args args;
+ prepareArgs(args, false);
+ args += "--log-enable=critical"; // Supress expected errors
+ ClusterFixture cluster(2, args, -1);
+ Client c0(cluster[0]);
+ Client c1(cluster[1]);
+ boost::shared_ptr<client::ConnectionImpl> ci =
+ client::ConnectionAccess::getImpl(c0.connection);
+ AMQFrame poison(boost::intrusive_ptr<AMQBody>(new PoisonPill));
+ ci->expand(poison.encodedSize(), false);
+ ci->handle(poison);
+ {
+ ScopedSuppressLogging sl;
+ BOOST_CHECK_THROW(c0.session.queueQuery("q0"), Exception);
+ }
+ Client c00(cluster[0]);
+ BOOST_CHECK_EQUAL(c00.session.queueQuery("q00").getQueue(), "");
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getQueue(), "");
+}
+
+QPID_AUTO_TEST_CASE(testAcl) {
+ ofstream policyFile("cluster_test.acl");
+ policyFile << "acl allow foo@QPID create queue name=foo" << endl
+ << "acl allow foo@QPID create queue name=foo2" << endl
+ << "acl deny foo@QPID create queue name=bar" << endl
+ << "acl allow all all" << endl;
+ policyFile.close();
+ char cwd[1024];
+ BOOST_CHECK(::getcwd(cwd, sizeof(cwd)));
+ ostringstream aclLib;
+ aclLib << getLibPath("ACL_LIB");
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ args += "--log-enable=critical"; // Supress expected errors
+ args += "--acl-file", string(cwd) + "/cluster_test.acl",
+ "--cluster-mechanism", "PLAIN",
+ "--cluster-username", "cluster",
+ "--cluster-password", "cluster",
+ "--load-module", aclLib.str();
+ ClusterFixture cluster(2, args, -1);
+
+ Client c0(aclSettings(cluster[0], "c0"), "c0");
+ Client c1(aclSettings(cluster[1], "c1"), "c1");
+ Client foo(aclSettings(cluster[1], "foo"), "foo");
+
+ foo.session.queueDeclare("foo", arg::durable=durableFlag);
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("foo").getQueue(), "foo");
+
+ {
+ ScopedSuppressLogging sl;
+ BOOST_CHECK_THROW(foo.session.queueDeclare("bar", arg::durable=durableFlag), framing::UnauthorizedAccessException);
+ }
+ BOOST_CHECK(c0.session.queueQuery("bar").getQueue().empty());
+ BOOST_CHECK(c1.session.queueQuery("bar").getQueue().empty());
+
+ cluster.add();
+ Client c2(aclSettings(cluster[2], "c2"), "c2");
+ {
+ ScopedSuppressLogging sl;
+ BOOST_CHECK_THROW(foo.session.queueDeclare("bar", arg::durable=durableFlag), framing::UnauthorizedAccessException);
+ }
+ BOOST_CHECK(c2.session.queueQuery("bar").getQueue().empty());
+}
+
+QPID_AUTO_TEST_CASE(testMessageTimeToLive) {
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(2, args, -1);
+ Client c0(cluster[0], "c0");
+ Client c1(cluster[1], "c1");
+ c0.session.queueDeclare("p", arg::durable=durableFlag);
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200, durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("b", "q", durableFlag));
+ c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 100000, durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("y", "p", durableFlag));
+ cluster.add();
+ Client c2(cluster[1], "c2");
+
+ BOOST_CHECK_EQUAL(browse(c0, "p", 1), list_of<string>("x"));
+ BOOST_CHECK_EQUAL(browse(c1, "p", 1), list_of<string>("x"));
+ BOOST_CHECK_EQUAL(browse(c2, "p", 1), list_of<string>("x"));
+
+ sys::usleep(200*1000);
+ BOOST_CHECK_EQUAL(browse(c0, "q", 1), list_of<string>("b"));
+ BOOST_CHECK_EQUAL(browse(c1, "q", 1), list_of<string>("b"));
+ BOOST_CHECK_EQUAL(browse(c2, "q", 1), list_of<string>("b"));
+}
+
+QPID_AUTO_TEST_CASE(testSequenceOptions) {
+ // Make sure the exchange qpid.msg_sequence property is properly replicated.
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c0(cluster[0], "c0");
+ FieldTable ftargs;
+ ftargs.setInt("qpid.msg_sequence", 1);
+ c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag);
+ c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=ftargs);
+ c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k");
+ c0.session.messageTransfer(arg::content=makeMessage("1", "k", durableFlag), arg::destination="ex");
+ c0.session.messageTransfer(arg::content=makeMessage("2", "k", durableFlag), arg::destination="ex");
+ BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIMEOUT)));
+ BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIMEOUT)));
+
+ cluster.add();
+ Client c1(cluster[1]);
+ c1.session.messageTransfer(arg::content=makeMessage("3", "k", durableFlag), arg::destination="ex");
+ BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIMEOUT)));
+}
+
+QPID_AUTO_TEST_CASE(testTxTransaction) {
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c0(cluster[0], "c0");
+ c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("A", "q", durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("B", "q", durableFlag));
+
+ // Start a transaction that will commit.
+ Session commitSession = c0.connection.newSession("commit");
+ SubscriptionManager commitSubs(commitSession);
+ commitSession.txSelect();
+ commitSession.messageTransfer(arg::content=makeMessage("a", "q", durableFlag));
+ commitSession.messageTransfer(arg::content=makeMessage("b", "q", durableFlag));
+ BOOST_CHECK_EQUAL(commitSubs.get("q", TIMEOUT).getData(), "A");
+
+ // Start a transaction that will roll back.
+ Session rollbackSession = c0.connection.newSession("rollback");
+ SubscriptionManager rollbackSubs(rollbackSession);
+ rollbackSession.txSelect();
+ rollbackSession.messageTransfer(arg::content=makeMessage("1", "q", durableFlag));
+ Message rollbackMessage = rollbackSubs.get("q", TIMEOUT);
+ BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B");
+
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
+ // Add new member mid transaction.
+ cluster.add();
+ Client c1(cluster[1], "c1");
+
+ // More transactional work
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+ rollbackSession.messageTransfer(arg::content=makeMessage("2", "q", durableFlag));
+ commitSession.messageTransfer(arg::content=makeMessage("c", "q", durableFlag));
+ rollbackSession.messageTransfer(arg::content=makeMessage("3", "q", durableFlag));
+
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+
+ // Commit/roll back.
+ commitSession.txCommit();
+ rollbackSession.txRollback();
+ rollbackSession.messageRelease(rollbackMessage.getId());
+
+ // Verify queue status: just the comitted messages and dequeues should remain.
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 4u);
+ BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "B");
+ BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "a");
+ BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "b");
+ BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "c");
+
+ commitSession.close();
+ rollbackSession.close();
+}
+
+QPID_AUTO_TEST_CASE(testUnacked) {
+ // Verify replication of unacknowledged messages.
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c0(cluster[0], "c0");
+
+ Message m;
+
+ // Create unacked message: acquired but not accepted.
+ SubscriptionSettings manualAccept(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 0);
+ c0.session.queueDeclare("q1", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("11","q1", durableFlag));
+ LocalQueue q1;
+ c0.subs.subscribe(q1, "q1", manualAccept);
+ BOOST_CHECK_EQUAL(q1.get(TIMEOUT).getData(), "11"); // Acquired but not accepted
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q1").getMessageCount(), 0u); // Gone from queue
+
+ // Create unacked message: not acquired, accepted or completeed.
+ SubscriptionSettings manualAcquire(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_NOT_ACQUIRED, 0);
+ c0.session.queueDeclare("q2", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("21","q2", durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("22","q2", durableFlag));
+ LocalQueue q2;
+ c0.subs.subscribe(q2, "q2", manualAcquire);
+ m = q2.get(TIMEOUT); // Not acquired or accepted, still on queue
+ BOOST_CHECK_EQUAL(m.getData(), "21");
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 2u); // Not removed
+ c0.subs.getSubscription("q2").acquire(m); // Acquire manually
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // Removed
+ BOOST_CHECK_EQUAL(q2.get(TIMEOUT).getData(), "22"); // Not acquired or accepted, still on queue
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // 1 not acquired.
+
+ // Create empty credit record: acquire and accept but don't complete.
+ SubscriptionSettings manualComplete(FlowControl::messageWindow(1), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 1, MANUAL_COMPLETION);
+ c0.session.queueDeclare("q3", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("31", "q3", durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("32", "q3", durableFlag));
+ LocalQueue q3;
+ c0.subs.subscribe(q3, "q3", manualComplete);
+ Message m31=q3.get(TIMEOUT);
+ BOOST_CHECK_EQUAL(m31.getData(), "31"); // Automatically acquired & accepted but not completed.
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 1u);
+
+ // Add new member while there are unacked messages.
+ cluster.add();
+ Client c1(cluster[1], "c1");
+
+ // Check queue counts
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 0u);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 1u);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 1u);
+
+ // Complete the empty credit message, should unblock the message behind it.
+ BOOST_CHECK_THROW(q3.get(0), Exception);
+ c0.session.markCompleted(SequenceSet(m31.getId()), true);
+ BOOST_CHECK_EQUAL(q3.get(TIMEOUT).getData(), "32");
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 0u);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 0u);
+
+ // Close the original session - unacked messages should be requeued.
+ c0.session.close();
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 1u);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 2u);
+
+ BOOST_CHECK_EQUAL(c1.subs.get("q1", TIMEOUT).getData(), "11");
+ BOOST_CHECK_EQUAL(c1.subs.get("q2", TIMEOUT).getData(), "21");
+ BOOST_CHECK_EQUAL(c1.subs.get("q2", TIMEOUT).getData(), "22");
+}
+
+// FIXME aconway 2009-06-17: test for unimplemented feature, enable when implemented.
+void testUpdateTxState() {
+ // Verify that we update transaction state correctly to new members.
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c0(cluster[0], "c0");
+
+ // Do work in a transaction.
+ c0.session.txSelect();
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("1","q", durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("2","q", durableFlag));
+ Message m;
+ BOOST_CHECK(c0.subs.get(m, "q", TIMEOUT));
+ BOOST_CHECK_EQUAL(m.getData(), "1");
+
+ // New member, TX not comitted, c1 should see nothing.
+ cluster.add();
+ Client c1(cluster[1], "c1");
+ BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u);
+
+ // After commit c1 shoudl see results of tx.
+ c0.session.txCommit();
+ BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u);
+ BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
+ BOOST_CHECK_EQUAL(m.getData(), "2");
+
+ // Another transaction with both members active.
+ c0.session.messageTransfer(arg::content=makeMessage("3","q", durableFlag));
+ BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u);
+ c0.session.txCommit();
+ BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u);
+ BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
+ BOOST_CHECK_EQUAL(m.getData(), "3");
+}
+
+QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) {
+ // Verify that we update a partially recieved message to a new member.
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c0(cluster[0], "c0");
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ Sender sender(ConnectionAccess::getImpl(c0.connection), c0.session.getChannel());
+
+ // Send first 2 frames of message.
+ MessageTransferBody transfer(
+ ProtocolVersion(), string(), // default exchange.
+ framing::message::ACCEPT_MODE_NONE,
+ framing::message::ACQUIRE_MODE_PRE_ACQUIRED);
+ sender.send(transfer, true, false, true, true);
+ AMQHeaderBody header;
+ header.get<DeliveryProperties>(true)->setRoutingKey("q");
+ if (durableFlag)
+ header.get<DeliveryProperties>(true)->setDeliveryMode(DELIVERY_MODE_PERSISTENT);
+ else
+ header.get<DeliveryProperties>(true)->setDeliveryMode(DELIVERY_MODE_NON_PERSISTENT);
+ sender.send(header, false, false, true, true);
+
+ // No reliable way to ensure the partial message has arrived
+ // before we start the new broker, so we sleep.
+ sys::usleep(2500);
+ cluster.add();
+
+ // Send final 2 frames of message.
+ sender.send(AMQContentBody("ab"), false, true, true, false);
+ sender.send(AMQContentBody("cd"), false, true, false, true);
+
+ // Verify message is enqued correctly on second member.
+ Message m;
+ Client c1(cluster[1], "c1");
+ BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
+ BOOST_CHECK_EQUAL(m.getData(), "abcd");
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());
+}
+
+QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c0(cluster[0], "c0");
+ set<int> kb0 = knownBrokerPorts(c0.connection, 1);
+ BOOST_CHECK_EQUAL(kb0.size(), 1u);
+ BOOST_CHECK_EQUAL(kb0, makeSet(cluster));
+
+ cluster.add();
+ Client c1(cluster[1], "c1");
+ set<int> kb1 = knownBrokerPorts(c1.connection, 2);
+ kb0 = knownBrokerPorts(c0.connection, 2);
+ BOOST_CHECK_EQUAL(kb1.size(), 2u);
+ BOOST_CHECK_EQUAL(kb1, makeSet(cluster));
+ BOOST_CHECK_EQUAL(kb1,kb0);
+
+ cluster.add();
+ Client c2(cluster[2], "c2");
+ set<int> kb2 = knownBrokerPorts(c2.connection, 3);
+ kb1 = knownBrokerPorts(c1.connection, 3);
+ kb0 = knownBrokerPorts(c0.connection, 3);
+ BOOST_CHECK_EQUAL(kb2.size(), 3u);
+ BOOST_CHECK_EQUAL(kb2, makeSet(cluster));
+ BOOST_CHECK_EQUAL(kb2,kb0);
+ BOOST_CHECK_EQUAL(kb2,kb1);
+
+ cluster.killWithSilencer(1,c1.connection,9);
+ kb0 = knownBrokerPorts(c0.connection, 2);
+ kb2 = knownBrokerPorts(c2.connection, 2);
+ BOOST_CHECK_EQUAL(kb0.size(), 2u);
+ BOOST_CHECK_EQUAL(kb0, kb2);
+}
+
+QPID_AUTO_TEST_CASE(testUpdateConsumers) {
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+
+ Client c0(cluster[0], "c0");
+ c0.session.queueDeclare("p", arg::durable=durableFlag);
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.subs.subscribe(c0.lq, "q", FlowControl::zero());
+ LocalQueue lp;
+ c0.subs.subscribe(lp, "p", FlowControl::messageCredit(1));
+ c0.session.sync();
+
+ // Start new members
+ cluster.add(); // Local
+ Client c1(cluster[1], "c1");
+ cluster.add();
+ Client c2(cluster[2], "c2");
+
+ // Transfer messages
+ c0.session.messageTransfer(arg::content=makeMessage("aaa", "q", durableFlag));
+
+ c0.session.messageTransfer(arg::content=makeMessage("bbb", "p", durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("ccc", "p", durableFlag));
+
+ // Activate the subscription, ensure message removed on all queues.
+ c0.subs.setFlowControl("q", FlowControl::unlimited());
+ Message m;
+ BOOST_CHECK(c0.lq.get(m, TIMEOUT));
+ BOOST_CHECK_EQUAL(m.getData(), "aaa");
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+ BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u);
+
+ // Check second subscription's flow control: gets first message, not second.
+ BOOST_CHECK(lp.get(m, TIMEOUT));
+ BOOST_CHECK_EQUAL(m.getData(), "bbb");
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("p").getMessageCount(), 1u);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u);
+ BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 1u);
+
+ BOOST_CHECK(c0.subs.get(m, "p", TIMEOUT));
+ BOOST_CHECK_EQUAL(m.getData(), "ccc");
+
+ // Kill the subscribing member, ensure further messages are not removed.
+ cluster.killWithSilencer(0,c0.connection,9);
+ BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2u);
+ for (int i = 0; i < 10; ++i) {
+ c1.session.messageTransfer(arg::content=makeMessage("xxx", "q", durableFlag));
+ BOOST_REQUIRE(c1.subs.get(m, "q", TIMEOUT));
+ BOOST_REQUIRE_EQUAL(m.getData(), "xxx");
+ }
+}
+
+// Test that message data and delivery properties are updated properly.
+QPID_AUTO_TEST_CASE(testUpdateMessages) {
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c0(cluster[0], "c0");
+
+ // Create messages with different delivery properties
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.exchangeBind(arg::exchange="amq.fanout", arg::queue="q");
+ c0.session.messageTransfer(arg::content=makeMessage("foo","q", durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("bar","q", durableFlag),
+ arg::destination="amq.fanout");
+
+ while (c0.session.queueQuery("q").getMessageCount() != 2)
+ sys::usleep(1000); // Wait for message to show up on broker 0.
+
+ // Add a new broker, it will catch up.
+ cluster.add();
+
+ // Do some work post-add
+ c0.session.queueDeclare("p", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("pfoo","p", durableFlag));
+
+ // Do some work post-join
+ BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 2).size(), 2u);
+ c0.session.messageTransfer(arg::content=makeMessage("pbar","p", durableFlag));
+
+ // Verify new brokers have state.
+ Message m;
+
+ Client c1(cluster[1], "c1");
+
+ BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
+ BOOST_CHECK_EQUAL(m.getData(), "foo");
+ BOOST_CHECK(m.getDeliveryProperties().hasExchange());
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "");
+ BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
+ BOOST_CHECK_EQUAL(m.getData(), "bar");
+ BOOST_CHECK(m.getDeliveryProperties().hasExchange());
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "amq.fanout");
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+
+ // Add another broker, don't wait for join - should be stalled till ready.
+ cluster.add();
+ Client c2(cluster[2], "c2");
+ BOOST_CHECK(c2.subs.get(m, "p", TIMEOUT));
+ BOOST_CHECK_EQUAL(m.getData(), "pfoo");
+ BOOST_CHECK(c2.subs.get(m, "p", TIMEOUT));
+ BOOST_CHECK_EQUAL(m.getData(), "pbar");
+ BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 0u);
+}
+
+QPID_AUTO_TEST_CASE(testWiringReplication) {
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(3, args, -1);
+ Client c0(cluster[0]);
+ BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty());
+ BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty());
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.exchangeDeclare("ex", arg::type="direct");
+ c0.session.close();
+ c0.connection.close();
+ // Verify all brokers get wiring update.
+ for (size_t i = 0; i < cluster.size(); ++i) {
+ BOOST_MESSAGE("i == "<< i);
+ Client c(cluster[i]);
+ BOOST_CHECK_EQUAL("q", c.session.queueQuery("q").getQueue());
+ BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("ex").getType());
+ }
+}
+
+QPID_AUTO_TEST_CASE(testMessageEnqueue) {
+ // Enqueue on one broker, dequeue on another.
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(2, args, -1);
+ Client c0(cluster[0]);
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("foo", "q", durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("bar", "q", durableFlag));
+ c0.session.close();
+ Client c1(cluster[1]);
+ Message msg;
+ BOOST_CHECK(c1.subs.get(msg, "q", TIMEOUT));
+ BOOST_CHECK_EQUAL(string("foo"), msg.getData());
+ BOOST_CHECK(c1.subs.get(msg, "q", TIMEOUT));
+ BOOST_CHECK_EQUAL(string("bar"), msg.getData());
+}
+
+QPID_AUTO_TEST_CASE(testMessageDequeue) {
+ // Enqueue on one broker, dequeue on two others.
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(3, args, -1);
+ Client c0(cluster[0], "c0");
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("foo", "q", durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("bar", "q", durableFlag));
+
+ Message msg;
+
+ // Dequeue on 2 others, ensure correct order.
+ Client c1(cluster[1], "c1");
+ BOOST_CHECK(c1.subs.get(msg, "q"));
+ BOOST_CHECK_EQUAL("foo", msg.getData());
+
+ Client c2(cluster[2], "c2");
+ BOOST_CHECK(c1.subs.get(msg, "q"));
+ BOOST_CHECK_EQUAL("bar", msg.getData());
+
+ // Queue should be empty on all cluster members.
+ BOOST_CHECK_EQUAL(0u, c0.session.queueQuery("q").getMessageCount());
+ BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount());
+ BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
+}
+
+QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(3, args, -1);
+ Client c0(cluster[0]);
+ BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 3).size(), 3u); // Wait for brokers.
+
+ // First start a subscription.
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2));
+
+ // Now send messages
+ Client c1(cluster[1]);
+ c1.session.messageTransfer(arg::content=makeMessage("foo", "q", durableFlag));
+ c1.session.messageTransfer(arg::content=makeMessage("bar", "q", durableFlag));
+
+ // Check they arrived
+ Message m;
+ BOOST_CHECK(c0.lq.get(m, TIMEOUT));
+ BOOST_CHECK_EQUAL("foo", m.getData());
+ BOOST_CHECK(c0.lq.get(m, TIMEOUT));
+ BOOST_CHECK_EQUAL("bar", m.getData());
+
+ // Queue should be empty on all cluster members.
+ Client c2(cluster[2]);
+ BOOST_CHECK_EQUAL(0u, c0.session.queueQuery("q").getMessageCount());
+ BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount());
+ BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
+}
+
+QPID_AUTO_TEST_CASE(queueDurabilityPropagationToNewbie)
+{
+ /*
+ Start with a single broker.
+ Set up two queues: one durable, and one not.
+ Add a new broker to the cluster.
+ Make sure it has one durable and one non-durable queue.
+ */
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c0(cluster[0]);
+ c0.session.queueDeclare("durable_queue", arg::durable=true);
+ c0.session.queueDeclare("non_durable_queue", arg::durable=false);
+ cluster.add();
+ Client c1(cluster[1]);
+ QueueQueryResult durable_query = c1.session.queueQuery ( "durable_queue" );
+ QueueQueryResult non_durable_query = c1.session.queueQuery ( "non_durable_queue" );
+ BOOST_CHECK_EQUAL(durable_query.getQueue(), std::string("durable_queue"));
+ BOOST_CHECK_EQUAL(non_durable_query.getQueue(), std::string("non_durable_queue"));
+
+ BOOST_CHECK_EQUAL ( durable_query.getDurable(), true );
+ BOOST_CHECK_EQUAL ( non_durable_query.getDurable(), false );
+}
+
+
+QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover)
+{
+
+ struct Sender : FailoverManager::Command
+ {
+ std::string queue;
+ std::string content;
+
+ Sender(const std::string& q, const std::string& c) : queue(q), content(c) {}
+
+ void execute(AsyncSession& session, bool)
+ {
+ session.messageTransfer(arg::content=makeMessage(content, queue, durableFlag));
+ }
+ };
+
+ struct Receiver : FailoverManager::Command, MessageListener, qpid::sys::Runnable
+ {
+ FailoverManager& mgr;
+ std::string queue;
+ std::string expectedContent;
+ qpid::client::Subscription subscription;
+ qpid::sys::Monitor lock;
+ bool ready, failed;
+
+ Receiver(FailoverManager& m, const std::string& q, const std::string& c) : mgr(m), queue(q), expectedContent(c), ready(false), failed(false) {}
+
+ void received(Message& message)
+ {
+ BOOST_CHECK_EQUAL(expectedContent, message.getData());
+ subscription.cancel();
+ }
+
+ void execute(AsyncSession& session, bool)
+ {
+ session.queueDeclare(arg::queue=queue, arg::durable=durableFlag);
+ SubscriptionManager subs(session);
+ subscription = subs.subscribe(*this, queue);
+ session.sync();
+ setReady();
+ subs.run();
+ //cleanup:
+ session.queueDelete(arg::queue=queue);
+ }
+
+ void run()
+ {
+ try {
+ mgr.execute(*this);
+ }
+ catch (const std::exception& e) {
+ BOOST_MESSAGE("Exception in mgr.execute: " << e.what());
+ failed = true;
+ }
+ }
+
+ void waitForReady()
+ {
+ qpid::sys::Monitor::ScopedLock l(lock);
+ while (!ready) {
+ lock.wait();
+ }
+ }
+
+ void setReady()
+ {
+ qpid::sys::Monitor::ScopedLock l(lock);
+ ready = true;
+ lock.notify();
+ }
+ };
+
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(2, args, -1);
+ ConnectionSettings settings;
+ settings.port = cluster[1];
+ settings.heartbeat = 1;
+ FailoverManager fmgr(settings);
+ Sender sender("my-queue", "my-data");
+ Receiver receiver(fmgr, "my-queue", "my-data");
+ qpid::sys::Thread runner(receiver);
+ receiver.waitForReady();
+ {
+ ScopedSuppressLogging allQuiet; // suppress connection closed messages
+ cluster.kill(1);
+ //sleep for 2 secs to allow the heartbeat task to fire on the now dead connection:
+ ::usleep(2*1000*1000);
+ }
+ fmgr.execute(sender);
+ runner.join();
+ BOOST_CHECK(!receiver.failed);
+ fmgr.close();
+}
+
+QPID_AUTO_TEST_CASE(testPolicyUpdate) {
+ //tests that the policys internal state is accurate on newly
+ //joined nodes
+ ClusterFixture::Args args;
+ args += "--log-enable", "critical";
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c1(cluster[0], "c1");
+ {
+ ScopedSuppressLogging allQuiet;
+ QueueOptions options;
+ options.setSizePolicy(REJECT, 0, 2);
+ c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+ c1.session.messageTransfer(arg::content=makeMessage("one", "q", durableFlag));
+ cluster.add();
+ Client c2(cluster[1], "c2");
+ c2.session.messageTransfer(arg::content=makeMessage("two", "q", durableFlag));
+
+ BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=makeMessage("three", "q", durableFlag)), framing::ResourceLimitExceededException);
+
+ Message received;
+ BOOST_CHECK(c1.subs.get(received, "q"));
+ BOOST_CHECK_EQUAL(received.getData(), std::string("one"));
+ BOOST_CHECK(c1.subs.get(received, "q"));
+ BOOST_CHECK_EQUAL(received.getData(), std::string("two"));
+ BOOST_CHECK(!c1.subs.get(received, "q"));
+ }
+}
+
+QPID_AUTO_TEST_CASE(testExclusiveQueueUpdate) {
+ //tests that exclusive queues are accurately replicated on newly
+ //joined nodes
+ ClusterFixture::Args args;
+ args += "--log-enable", "critical";
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c1(cluster[0], "c1");
+ {
+ ScopedSuppressLogging allQuiet;
+ c1.session.queueDeclare("q", arg::exclusive=true, arg::autoDelete=true, arg::alternateExchange="amq.fanout");
+ cluster.add();
+ Client c2(cluster[1], "c2");
+ QueueQueryResult result = c2.session.queueQuery("q");
+ BOOST_CHECK_EQUAL(result.getQueue(), std::string("q"));
+ BOOST_CHECK(result.getExclusive());
+ BOOST_CHECK(result.getAutoDelete());
+ BOOST_CHECK(!result.getDurable());
+ BOOST_CHECK_EQUAL(result.getAlternateExchange(), std::string("amq.fanout"));
+ BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::exclusive=true, arg::passive=true), framing::ResourceLockedException);
+ c1.session.close();
+ c1.connection.close();
+ c2.session = c2.connection.newSession();
+ BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException);
+ }
+}
+
+/**
+ * Subscribes to specified queue and acquires up to the specified
+ * number of message but does not accept or release them. These
+ * message are therefore 'locked' by the clients session.
+ */
+Subscription lockMessages(Client& client, const std::string& queue, int count)
+{
+ LocalQueue q;
+ SubscriptionSettings settings(FlowControl::messageCredit(count));
+ settings.autoAck = 0;
+ Subscription sub = client.subs.subscribe(q, queue, settings);
+ client.session.messageFlush(sub.getName());
+ return sub;
+}
+
+/**
+ * check that the specified queue contains the expected set of
+ * messages (matched on content) for all nodes in the cluster
+ */
+void checkQueue(ClusterFixture& cluster, const std::string& queue, const std::vector<std::string>& messages)
+{
+ for (size_t i = 0; i < cluster.size(); i++) {
+ Client client(cluster[i], (boost::format("%1%_%2%") % "c" % (i+1)).str());
+ BOOST_CHECK_EQUAL(browse(client, queue, messages.size()), messages);
+ client.close();
+ }
+}
+
+void send(Client& client, const std::string& queue, int count, int start=1, const std::string& base="m",
+ const std::string& lvqKey="")
+{
+ for (int i = 0; i < count; i++) {
+ Message message = makeMessage((boost::format("%1%_%2%") % base % (i+start)).str(), queue, durableFlag);
+ if (!lvqKey.empty()) message.getHeaders().setString(QueueOptions::strLVQMatchProperty, lvqKey);
+ client.session.messageTransfer(arg::content=message);
+ }
+}
+
+QPID_AUTO_TEST_CASE(testRingQueueUpdate) {
+ //tests that ring queues are accurately replicated on newly
+ //joined nodes
+ ClusterFixture::Args args;
+ args += "--log-enable", "critical";
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c1(cluster[0], "c1");
+ {
+ ScopedSuppressLogging allQuiet;
+ QueueOptions options;
+ options.setSizePolicy(RING, 0, 5);
+ c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+ send(c1, "q", 5);
+ lockMessages(c1, "q", 1);
+ //add new node
+ cluster.add();
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
+ //send one more message
+ send(c1, "q", 1, 6);
+ //release locked message
+ c1.close();
+ //check state of queue on both nodes
+ checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6"));
+ }
+}
+
+QPID_AUTO_TEST_CASE(testRingQueueUpdate2) {
+ //tests that ring queues are accurately replicated on newly joined
+ //nodes; just like testRingQueueUpdate, but new node joins after
+ //the sixth message has been sent.
+ ClusterFixture::Args args;
+ args += "--log-enable", "critical";
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c1(cluster[0], "c1");
+ {
+ ScopedSuppressLogging allQuiet;
+ QueueOptions options;
+ options.setSizePolicy(RING, 0, 5);
+ c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+ send(c1, "q", 5);
+ lockMessages(c1, "q", 1);
+ //send sixth message
+ send(c1, "q", 1, 6);
+ //add new node
+ cluster.add();
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
+ //release locked message
+ c1.close();
+ //check state of queue on both nodes
+ checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6"));
+ }
+}
+
+QPID_AUTO_TEST_CASE(testLvqUpdate) {
+ //tests that lvqs are accurately replicated on newly joined nodes
+ ClusterFixture::Args args;
+ args += "--log-enable", "critical";
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c1(cluster[0], "c1");
+ {
+ ScopedSuppressLogging allQuiet;
+ QueueOptions options;
+ options.setOrdering(LVQ);
+ c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+
+ send(c1, "q", 5, 1, "a", "a");
+ send(c1, "q", 2, 1, "b", "b");
+ send(c1, "q", 1, 1, "c", "c");
+ send(c1, "q", 1, 3, "b", "b");
+
+ //add new node
+ cluster.add();
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
+
+ //check state of queue on both nodes
+ checkQueue(cluster, "q", list_of<string>("a_5")("b_3")("c_1"));
+ }
+}
+
+
+QPID_AUTO_TEST_CASE(testBrowsedLvqUpdate) {
+ //tests that lvqs are accurately replicated on newly joined nodes
+ //if the lvq state has been affected by browsers
+ ClusterFixture::Args args;
+ args += "--log-enable", "critical";
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c1(cluster[0], "c1");
+ {
+ ScopedSuppressLogging allQuiet;
+ QueueOptions options;
+ options.setOrdering(LVQ);
+ c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+
+ send(c1, "q", 1, 1, "a", "a");
+ send(c1, "q", 2, 1, "b", "b");
+ send(c1, "q", 1, 1, "c", "c");
+ checkQueue(cluster, "q", list_of<string>("a_1")("b_2")("c_1"));
+ send(c1, "q", 4, 2, "a", "a");
+ send(c1, "q", 1, 3, "b", "b");
+
+ //add new node
+ cluster.add();
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
+
+ //check state of queue on both nodes
+ checkQueue(cluster, "q", list_of<string>("a_1")("b_2")("c_1")("a_5")("b_3"));
+ }
+}
+
+QPID_AUTO_TEST_CASE(testRelease) {
+ //tests that releasing a messages that was unacked when one node
+ //joined works correctly
+ ClusterFixture::Args args;
+ args += "--log-enable", "critical";
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c1(cluster[0], "c1");
+ {
+ ScopedSuppressLogging allQuiet;
+ c1.session.queueDeclare("q", arg::durable=durableFlag);
+ for (int i = 0; i < 5; i++) {
+ c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag));
+ }
+ //receive but don't ack a message
+ LocalQueue lq;
+ SubscriptionSettings lqSettings(FlowControl::messageCredit(1));
+ lqSettings.autoAck = 0;
+ Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings);
+ c1.session.messageFlush("q");
+ Message received;
+ BOOST_CHECK(lq.get(received));
+ BOOST_CHECK_EQUAL(received.getData(), std::string("m_1"));
+
+ //add new node
+ cluster.add();
+
+ lqSub.release(lqSub.getUnaccepted());
+
+ //check state of queue on both nodes
+ vector<string> expected = list_of<string>("m_1")("m_2")("m_3")("m_4")("m_5");
+ Client c3(cluster[0], "c3");
+ BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected);
+ Client c2(cluster[1], "c2");
+ BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected);
+ }
+}
+
+
+// Browse for 1 message with byte credit, return true if a message was
+// received false if not.
+bool browseByteCredit(Client& c, const string& q, int n, Message& m) {
+ SubscriptionSettings browseSettings(
+ FlowControl(1, n, false), // 1 message, n bytes credit, no window
+ ACCEPT_MODE_NONE,
+ ACQUIRE_MODE_NOT_ACQUIRED,
+ 0 // No auto-ack.
+ );
+ LocalQueue lq;
+ Subscription s = c.subs.subscribe(lq, q, browseSettings);
+ c.session.messageFlush(arg::destination=q, arg::sync=true);
+ c.session.sync();
+ c.subs.getSubscription(q).cancel();
+ return lq.get(m, 0); // No timeout, flush should push message thru.
+}
+
+// Ensure cluster update preserves exact message size, use byte credt as test.
+QPID_AUTO_TEST_CASE(testExactByteCredit) {
+ ClusterFixture cluster(1, prepareArgs(), -1);
+ Client c0(cluster[0], "c0");
+ c0.session.queueDeclare("q");
+ c0.session.messageTransfer(arg::content=Message("MyMessage", "q"));
+ cluster.add();
+
+ int size=36; // Size of message on broker: headers+body
+ Client c1(cluster[1], "c1");
+ Message m;
+
+ // Ensure we get the message with exact credit.
+ BOOST_CHECK(browseByteCredit(c0, "q", size, m));
+ BOOST_CHECK(browseByteCredit(c1, "q", size, m));
+ // and not with one byte less.
+ BOOST_CHECK(!browseByteCredit(c0, "q", size-1, m));
+ BOOST_CHECK(!browseByteCredit(c1, "q", size-1, m));
+}
+
+// Test that consumer positions are updated correctly.
+// Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=541927
+//
+QPID_AUTO_TEST_CASE(testUpdateConsumerPosition) {
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c0(cluster[0], "c0");
+
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ SubscriptionSettings settings;
+ settings.autoAck = 0;
+ // Set the acquire mode to 'not-acquired' the consumer moves along the queue
+ // but does not acquire (remove) messages.
+ settings.acquireMode = ACQUIRE_MODE_NOT_ACQUIRED;
+ Subscription s = c0.subs.subscribe(c0.lq, "q", settings);
+ c0.session.messageTransfer(arg::content=makeMessage("1", "q", durableFlag));
+ BOOST_CHECK_EQUAL("1", c0.lq.get(TIMEOUT).getData());
+
+ // Add another member, send/receive another message and acquire
+ // the messages. With the bug, this creates an inconsistency
+ // because the browse position was not updated to the new member.
+ cluster.add();
+ c0.session.messageTransfer(arg::content=makeMessage("2", "q", durableFlag));
+ BOOST_CHECK_EQUAL("2", c0.lq.get(TIMEOUT).getData());
+ s.acquire(s.getUnacquired());
+ s.accept(s.getUnaccepted());
+
+ // In the bug we now have 0 messages on cluster[0] and 1 message on cluster[1]
+ // Subscribing on cluster[1] provokes an error that shuts down cluster[0]
+ Client c1(cluster[1], "c1");
+ Subscription s1 = c1.subs.subscribe(c1.lq, "q"); // Default auto-ack=1
+ Message m;
+ BOOST_CHECK(!c1.lq.get(m, TIMEOUT/10));
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
+}
+
+QPID_AUTO_TEST_CASE(testFairsharePriorityDelivery) {
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c0(cluster[0], "c0");
+
+ FieldTable arguments;
+ arguments.setInt("x-qpid-priorities", 10);
+ arguments.setInt("x-qpid-fairshare", 5);
+ c0.session.queueDeclare("q", arg::durable=durableFlag, arg::arguments=arguments);
+
+ //send messages of different priorities
+ for (int i = 0; i < 20; i++) {
+ Message msg = makeMessage((boost::format("msg-%1%") % i).str(), "q", durableFlag);
+ msg.getDeliveryProperties().setPriority(i % 2 ? 9 : 5);
+ c0.session.messageTransfer(arg::content=msg);
+ }
+
+ //pull off a couple of the messages (first four should be the top priority messages
+ for (int i = 0; i < 4; i++) {
+ BOOST_CHECK_EQUAL((boost::format("msg-%1%") % ((i*2)+1)).str(), c0.subs.get("q", TIMEOUT).getData());
+ }
+
+ // Add another member
+ cluster.add();
+ Client c1(cluster[1], "c1");
+
+ //pull off some more messages
+ BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 9).str(), c0.subs.get("q", TIMEOUT).getData());
+ BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 0).str(), c1.subs.get("q", TIMEOUT).getData());
+ BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 2).str(), c0.subs.get("q", TIMEOUT).getData());
+
+ //check queue has same content on both nodes
+ BOOST_CHECK_EQUAL(browse(c0, "q", 12), browse(c1, "q", 12));
+}
+
+QPID_AUTO_TEST_SUITE_END()
+}} // namespace qpid::tests
diff --git a/cpp/src/tests/ha_test.py b/cpp/src/tests/ha_test.py
index f2fc50054f..602a62ca17 100755
--- a/cpp/src/tests/ha_test.py
+++ b/cpp/src/tests/ha_test.py
@@ -107,7 +107,7 @@ class HaBroker(Broker):
ha_port = ha_port or HaPort(test)
args = copy(args)
args += ["--load-module", BrokerTest.ha_lib,
- "--log-enable=debug+:ha::",
+ "--log-enable=trace+:ha::", # FIXME aconway 2013-07-29: debug
# Non-standard settings for faster tests.
"--link-maintenance-interval=0.1",
# Heartbeat and negotiate time are needed so that a broker wont
@@ -221,6 +221,12 @@ acl allow all all
def wait_backup(self, address): self.wait_address(address)
+ def browse(self, queue, timeout=0, transform=lambda m: m.content):
+ c = self.connect_admin()
+ try:
+ return browse(c.session(), queue, timeout, transform)
+ finally: c.close()
+
def assert_browse(self, queue, expected, **kwargs):
"""Verify queue contents by browsing."""
bs = self.connect().session()
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py
index de5dfb4b10..55715639a4 100755
--- a/cpp/src/tests/ha_tests.py
+++ b/cpp/src/tests/ha_tests.py
@@ -1287,50 +1287,101 @@ class StoreTests(BrokerTest):
cluster[0].assert_browse("q2", ["hello", "end"])
cluster[1].assert_browse_backup("q2", ["hello", "end"])
+def open_read(name):
+ with open(name) as f: return f.read()
+
class TransactionTests(BrokerTest):
+ load_store=["--load-module", BrokerTest.test_store_lib]
+
def tx_simple_setup(self, broker):
- """Start a transaction: receive 'foo' from 'a' and send 'bar' to 'b'"""
+ """Start a transaction, remove messages from queue a, add messages to queue b"""
c = broker.connect()
- c.session().sender("a;{create:always}").send("foo")
+ # Send messages to a, no transaction.
+ sa = c.session().sender("a;{create:always,node:{durable:true}}")
+ tx_msgs = ["x","y","z"]
+ for m in tx_msgs: sa.send(Message(content=m, durable=True))
+
+ # Receive messages from a, in transaction.
tx = c.session(transactional=True)
- self.assertEqual("foo", tx.receiver("a").fetch(1).content)
- tx.acknowledge();
- tx.sender("b;{create:always}").send("bar")
+ txr = tx.receiver("a")
+ tx_msgs2 = [txr.fetch(1).content for i in xrange(3)]
+ self.assertEqual(tx_msgs, tx_msgs2)
+
+ # Send messages to b, transactional, mixed with non-transactional.
+ sb = c.session().sender("b;{create:always,node:{durable:true}}")
+ txs = tx.sender("b")
+ msgs = [str(i) for i in xrange(3)]
+ for tx_m,m in zip(tx_msgs2, msgs):
+ txs.send(tx_m);
+ sb.send(m)
return tx
def test_tx_simple_commit(self):
- cluster = HaCluster(self, 2, args=["--log-enable=trace+:ha::"])
+ cluster = HaCluster(self, 2, test_store=True)
tx = self.tx_simple_setup(cluster[0])
+ tx.sync()
+
+ # NOTE: backup does not process transactional dequeues until prepare
+ cluster[1].assert_browse_backup("a", ["x","y","z"])
+ cluster[1].assert_browse_backup("b", ['0', '1', '2'])
+
+ tx.acknowledge()
tx.commit()
+ tx.sync()
+
for b in cluster:
b.assert_browse_backup("a", [], msg=b)
- b.assert_browse_backup("b", ["bar"], msg=b)
+ b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b)
+
+ # Check for expected actions on the store
+ expect = """<enqueue a x>
+<enqueue a y>
+<enqueue a z>
+<begin tx 1>
+<dequeue a x tx=1>
+<dequeue a y tx=1>
+<dequeue a z tx=1>
+<commit tx=1>
+"""
+ self.assertEqual(expect, open_read(cluster[0].store_log))
+ self.assertEqual(expect, open_read(cluster[1].store_log))
def test_tx_simple_rollback(self):
- cluster = HaCluster(self, 2)
+ cluster = HaCluster(self, 2, test_store=True)
tx = self.tx_simple_setup(cluster[0])
+ tx.acknowledge()
tx.rollback()
for b in cluster:
- b.assert_browse_backup("a", ["foo"], msg=b)
- b.assert_browse_backup("b", [], msg=b)
+ b.assert_browse_backup("a", ["x","y","z"], msg=b)
+ b.assert_browse_backup("b", ['0', '1', '2'], msg=b)
+ # Check for expected actions on the store
+ expect = """<enqueue a x>
+<enqueue a y>
+<enqueue a z>
+"""
+ self.assertEqual(open_read(cluster[0].store_log), expect)
+ self.assertEqual(open_read(cluster[1].store_log), expect)
def test_tx_simple_failover(self):
- cluster = HaCluster(self, 2)
+ cluster = HaCluster(self, 2, test_store=True)
tx = self.tx_simple_setup(cluster[0])
+ tx.acknowledge()
cluster.bounce(0) # Should cause roll-back
+ cluster[0].wait_status("ready")
for b in cluster:
- b.assert_browse_backup("a", ["foo"], msg=b)
- b.assert_browse_backup("b", [], msg=b)
-
- def test_tx_simple_join(self):
- cluster = HaCluster(self, 2)
- tx = self.tx_simple_setup(cluster[0])
- cluster.bounce(1) # Should catch up with tx
- tx.commit()
- for b in cluster:
- b.assert_browse_backup("a", [], msg=b)
- b.assert_browse_backup("b", ["bar"], msg=b)
+ b.assert_browse_backup("a", ["x","y","z"], msg=b)
+ b.assert_browse_backup("b", ['0', '1', '2'], msg=b)
+
+ # Check for expected actions on the store
+ expect = """<enqueue a x>
+<enqueue a y>
+<enqueue a z>
+"""
+ self.assertEqual(open_read(cluster[0].store_log), expect)
+ self.assertEqual(open_read(cluster[1].store_log), expect)
+
+# FIXME aconway 2013-07-23: test with partial acknowledgement.
if __name__ == "__main__":
outdir = "ha_tests.tmp"
diff --git a/cpp/src/tests/test_env.sh.in b/cpp/src/tests/test_env.sh.in
index f20fb317aa..60cc458f75 100644
--- a/cpp/src/tests/test_env.sh.in
+++ b/cpp/src/tests/test_env.sh.in
@@ -81,5 +81,5 @@ if [ ! -e "$HOME" ]; then
fi
# Options for boost test framework
-export BOOST_TEST_SHOW_PROGRESS=yes
-export BOOST_TEST_CATCH_SYSTEM_ERRORS=no
+test -z "$BOOST_TEST_SHOW_PROGRESS" && export BOOST_TEST_SHOW_PROGRESS=yes
+test -z "$BOOST_TEST_CATCH_SYSTEM_ERRORS" && export BOOST_TEST_CATCH_SYSTEM_ERRORS=no
diff --git a/cpp/src/tests/test_store.cpp b/cpp/src/tests/test_store.cpp
index eac4deda2d..fc44889f33 100644
--- a/cpp/src/tests/test_store.cpp
+++ b/cpp/src/tests/test_store.cpp
@@ -40,14 +40,19 @@
#include "qpid/sys/Thread.h"
#include "qpid/Plugin.h"
#include "qpid/Options.h"
+#include "qpid/RefCounted.h"
+#include "qpid/Msg.h"
#include <boost/cast.hpp>
#include <boost/lexical_cast.hpp>
#include <memory>
+#include <ostream>
#include <fstream>
+#include <sstream>
-using namespace qpid;
-using namespace broker;
using namespace std;
+using namespace boost;
+using namespace qpid;
+using namespace qpid::broker;
using namespace qpid::sys;
namespace qpid {
@@ -57,11 +62,13 @@ struct TestStoreOptions : public Options {
string name;
string dump;
+ string events;
TestStoreOptions() : Options("Test Store Options") {
addOptions()
("test-store-name", optValue(name, "NAME"), "Name of test store instance.")
("test-store-dump", optValue(dump, "FILE"), "File to dump enqueued messages.")
+ ("test-store-events", optValue(events, "FILE"), "File to log events, 1 line per event.")
;
}
};
@@ -82,24 +89,74 @@ class TestStore : public NullMessageStore {
TestStore(const TestStoreOptions& opts, Broker& broker_)
: options(opts), name(opts.name), broker(broker_)
{
- QPID_LOG(info, "TestStore name=" << name << " dump=" << options.dump);
- if (!options.dump.empty())
+ QPID_LOG(info, "TestStore name=" << name
+ << " dump=" << options.dump
+ << " events=" << options.events);
+
+ if (!options.dump.empty())
dump.reset(new ofstream(options.dump.c_str()));
+ if (!options.events.empty())
+ events.reset(new ofstream(options.events.c_str()));
}
~TestStore() {
for_each(threads.begin(), threads.end(), boost::bind(&Thread::join, _1));
}
- virtual bool isNull() const { return false; }
-
- void enqueue(TransactionContext* ,
+ // Dummy transaction context.
+ struct TxContext : public TPCTransactionContext {
+ static int nextId;
+ string id;
+ TxContext() : id(lexical_cast<string>(nextId++)) {}
+ TxContext(string xid) : id(xid) {}
+ };
+
+ static string getId(const TransactionContext& tx) {
+ const TxContext* tc = dynamic_cast<const TxContext*>(&tx);
+ assert(tc);
+ return tc->id;
+ }
+
+
+ bool isNull() const { return false; }
+
+ void log(const string& msg) {
+ QPID_LOG(info, "test_store: " << msg);
+ if (events.get()) *events << msg << endl << std::flush;
+ }
+
+ auto_ptr<TransactionContext> begin() {
+ auto_ptr<TxContext> tx(new TxContext());
+ log(Msg() << "<begin tx " << tx->id << ">");
+ return auto_ptr<TransactionContext>(tx);
+ }
+
+ auto_ptr<TPCTransactionContext> begin(const std::string& xid) {
+ auto_ptr<TxContext> tx(new TxContext(xid));
+ log(Msg() << "<begin tx " << tx->id << ">");
+ return auto_ptr<TPCTransactionContext>(tx);
+ }
+
+ string getContent(const intrusive_ptr<PersistableMessage>& msg) {
+ intrusive_ptr<broker::Message::Encoding> enc(
+ dynamic_pointer_cast<broker::Message::Encoding>(msg));
+ return enc->getContent();
+ }
+
+ void enqueue(TransactionContext* tx,
const boost::intrusive_ptr<PersistableMessage>& pmsg,
- const PersistableQueue& )
+ const PersistableQueue& queue)
{
+ QPID_LOG(debug, "TestStore enqueue " << queue.getName());
qpid::broker::amqp_0_10::MessageTransfer* msg = dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get());
assert(msg);
+ ostringstream o;
+ o << "<enqueue " << queue.getName() << " " << getContent(msg);
+ if (tx) o << " tx=" << getId(*tx);
+ o << ">";
+ log(o.str());
+
// Dump the message if there is a dump file.
if (dump.get()) {
msg->getFrames().getMethod()->print(*dump);
@@ -144,6 +201,31 @@ class TestStore : public NullMessageStore {
msg->enqueueComplete();
}
+ void dequeue(TransactionContext* tx,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue)
+ {
+ QPID_LOG(debug, "TestStore dequeue " << queue.getName());
+ ostringstream o;
+ o<< "<dequeue " << queue.getName() << " " << getContent(msg);
+ if (tx) o << " tx=" << getId(*tx);
+ o << ">";
+ log(o.str());
+ }
+
+ void prepare(TPCTransactionContext& txn) {
+ log(Msg() << "<prepare tx=" << getId(txn) << ">");
+ }
+
+ void commit(TransactionContext& txn) {
+ log(Msg() << "<commit tx=" << getId(txn) << ">");
+ }
+
+ void abort(TransactionContext& txn) {
+ log(Msg() << "<abort tx=" << getId(txn) << ">");
+ }
+
+
private:
static const string TEST_STORE_DO, EXCEPTION, EXIT_PROCESS, ASYNC;
TestStoreOptions options;
@@ -151,8 +233,11 @@ class TestStore : public NullMessageStore {
Broker& broker;
vector<Thread> threads;
std::auto_ptr<ofstream> dump;
+ std::auto_ptr<ofstream> events;
};
+int TestStore::TxContext::nextId(1);
+
const string TestStore::TEST_STORE_DO = "TEST_STORE_DO: ";
const string TestStore::EXCEPTION = "exception";
const string TestStore::EXIT_PROCESS = "exit_process";