From 1ac985d9b3a50d8c7691e534945a4829bd57017a Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 1 Aug 2013 20:27:26 +0000 Subject: QPID-4327: HA TX transactions: basic replication. On primary a PrimaryTxObserver observes a transaction's TxBuffer and generates transaction events on a tx-replication-queue. On the backup a TxReplicator receives the events and constructs a TxBuffer equivalent to the one in the primary. Unfinished: - Primary does not wait for backups to prepare() before committing. - All connected backups are assumed to be in the transaction, there are race conditions around brokers joining/leavinv where this assumption is invalid. - Need more tests. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1509423 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/include/qpid/framing/BufferTypes.h | 106 +++ cpp/src/CMakeLists.txt | 24 +- cpp/src/qpid/broker/Broker.cpp | 2 +- cpp/src/qpid/broker/Broker.h | 3 +- cpp/src/qpid/broker/MessageStoreModule.cpp | 2 +- cpp/src/qpid/broker/MessageStoreModule.h | 2 +- cpp/src/qpid/broker/Queue.cpp | 2 +- cpp/src/qpid/broker/TxAccept.cpp | 2 +- cpp/src/qpid/ha/Backup.h | 2 + cpp/src/qpid/ha/BrokerReplicator.cpp | 16 +- cpp/src/qpid/ha/BrokerReplicator.h | 6 +- cpp/src/qpid/ha/Event.cpp | 77 ++ cpp/src/qpid/ha/Event.h | 146 ++++ cpp/src/qpid/ha/FailoverExchange.cpp | 4 +- cpp/src/qpid/ha/HaBroker.cpp | 2 +- cpp/src/qpid/ha/HaBroker.h | 1 + cpp/src/qpid/ha/Membership.cpp | 8 + cpp/src/qpid/ha/Membership.h | 3 + cpp/src/qpid/ha/Primary.cpp | 41 +- cpp/src/qpid/ha/Primary.h | 23 +- cpp/src/qpid/ha/PrimaryTxObserver.cpp | 104 +++ cpp/src/qpid/ha/PrimaryTxObserver.h | 77 ++ cpp/src/qpid/ha/QueueReplicator.cpp | 81 +- cpp/src/qpid/ha/QueueReplicator.h | 33 +- cpp/src/qpid/ha/RemoteBackup.cpp | 3 + cpp/src/qpid/ha/ReplicatingSubscription.cpp | 42 +- cpp/src/qpid/ha/ReplicatingSubscription.h | 10 +- cpp/src/qpid/ha/StatusCheck.cpp | 12 +- cpp/src/qpid/ha/TxReplicator.cpp | 192 +++++ cpp/src/qpid/ha/TxReplicator.h | 123 +++ cpp/src/qpid/ha/makeMessage.cpp | 62 -- cpp/src/qpid/ha/makeMessage.h | 66 -- cpp/src/qpid/ha/types.cpp | 8 + cpp/src/qpid/ha/types.h | 9 +- cpp/src/tests/BrokerFixture.h | 70 +- cpp/src/tests/CMakeLists.txt | 19 +- cpp/src/tests/MessagingFixture.h | 15 +- cpp/src/tests/TransactionObserverTest.cpp | 1 + cpp/src/tests/brokertest.py | 15 +- cpp/src/tests/cluster_test.cpp | 1231 +++++++++++++++++++++++++++ cpp/src/tests/ha_test.py | 8 +- cpp/src/tests/ha_tests.py | 95 ++- cpp/src/tests/test_env.sh.in | 4 +- cpp/src/tests/test_store.cpp | 101 ++- 44 files changed, 2531 insertions(+), 322 deletions(-) create mode 100644 cpp/include/qpid/framing/BufferTypes.h create mode 100644 cpp/src/qpid/ha/Event.cpp create mode 100644 cpp/src/qpid/ha/Event.h create mode 100644 cpp/src/qpid/ha/PrimaryTxObserver.cpp create mode 100644 cpp/src/qpid/ha/PrimaryTxObserver.h create mode 100644 cpp/src/qpid/ha/TxReplicator.cpp create mode 100644 cpp/src/qpid/ha/TxReplicator.h delete mode 100644 cpp/src/qpid/ha/makeMessage.cpp delete mode 100644 cpp/src/qpid/ha/makeMessage.h create mode 100644 cpp/src/tests/cluster_test.cpp diff --git a/cpp/include/qpid/framing/BufferTypes.h b/cpp/include/qpid/framing/BufferTypes.h new file mode 100644 index 0000000000..4199755852 --- /dev/null +++ b/cpp/include/qpid/framing/BufferTypes.h @@ -0,0 +1,106 @@ +#ifndef QPID_FRAMING_BUFFERTYPES_H +#define QPID_FRAMING_BUFFERTYPES_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/**@file + * Using templates with framing::Buffer is difficultg becase of the many + * different put/get function names. Here we define a set of types + * corresponding the basic types of Buffer but presenting a uniform + * encode/decode/encodedSize interface. + * + * It also provides some convenience templates for the common case of + * encoding a single encodable value as a string, e.g. + * + * LongString ls("hello"); + * std::string encoded = encodeStr(ls); + * LongString ls2 = decodeStr(encoded); + * LongString ls3; + * decodeStr(encoded, ls3); + */ + +namespace qpid { +namespace framing { + +// Templates to help define types +template struct BufferTypeTraits { + typedef void (Buffer::*Put)(const ValueType&); + typedef void (Buffer::*Get)(ValueType&); +}; + +template ::Put PutFn, + typename BufferTypeTraits::Get GetFn> +struct EncodeDecodeTemplate { + EncodeDecodeTemplate(const ValueType& s) : value(s) {} + operator ValueType() const { return value; } + + ValueType value; + void encode(framing::Buffer& buf) const { (buf.*PutFn)(value); } + void decode(framing::Buffer& buf) { (buf.*GetFn)(value); } +}; + +template ::Put PutFn, + typename BufferTypeTraits::Get GetFn + > +struct StringTypeTemplate : public EncodeDecodeTemplate { + typedef EncodeDecodeTemplate Base; + StringTypeTemplate(const std::string& s) : Base(s) {} + size_t encodedSize() const { return Size + Base::value.size(); } +}; + + +// Convenience tempates for encoding/decoding values to/from a std::string. + +/** Encode value as a string. */ +template std::string encodeStr(const T& value) { + std::string encoded(value.encodedSize(), '\0'); + framing::Buffer buffer(&encoded[0], encoded.size()); + value.encode(buffer); + return encoded; +} + +/** Decode value from a string. */ +template void decodeStr(const std::string& encoded, T& value) { + framing::Buffer b(const_cast(&encoded[0]), encoded.size()); + value.decode(b); +} + +/** Decode value from a string. */ +template T decodeStr(const std::string& encoded) { + T value; + decodeStr(encoded, value); + return value; +} + +// The types + +typedef StringTypeTemplate<4, &Buffer::putLongString, &Buffer::getLongString> LongString; +typedef StringTypeTemplate<2, &Buffer::putMediumString, &Buffer::getMediumString> MediumString; +typedef StringTypeTemplate<1, &Buffer::putShortString, &Buffer::getShortString> ShortString; + +// TODO aconway 2013-07-26: Add integer types. + +}} // namespace qpid::framing + +#endif /*!QPID_FRAMING_BUFFERTYPES_H*/ diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 3b242d627f..9156d91ff6 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -355,7 +355,7 @@ endif (CMAKE_SYSTEM_NAME STREQUAL Windows) # where Boost 1.45 is supported, or we can just accept some versions using # the Additional_versions variable. if (NOT DEFINED Boost_ADDITIONAL_VERSIONS) - set (Boost_ADDITIONAL_VERSIONS + set (Boost_ADDITIONAL_VERSIONS "1.45" "1.45.0" "1.46" "1.46.0" "1.47" "1.47.0" "1.48" "1.48.0" "1.49" "1.49.0" "1.50" "1.50.0" "1.51" "1.51.0" "1.52" "1.52.0" "1.53" "1.53.0") @@ -743,6 +743,8 @@ if (BUILD_HA) qpid/ha/BrokerReplicator.h qpid/ha/ConnectionObserver.cpp qpid/ha/ConnectionObserver.h + qpid/ha/Event.cpp + qpid/ha/Event.h qpid/ha/FailoverExchange.cpp qpid/ha/FailoverExchange.h qpid/ha/HaBroker.cpp @@ -751,8 +753,6 @@ if (BUILD_HA) qpid/ha/hash.h qpid/ha/IdSetter.h qpid/ha/QueueSnapshot.h - qpid/ha/makeMessage.cpp - qpid/ha/makeMessage.h qpid/ha/Membership.cpp qpid/ha/Membership.h qpid/ha/Primary.cpp @@ -772,7 +772,11 @@ if (BUILD_HA) qpid/ha/StandAlone.h qpid/ha/StatusCheck.cpp qpid/ha/StatusCheck.h + qpid/ha/PrimaryTxObserver.cpp + qpid/ha/PrimaryTxObserver.h qpid/ha/types.cpp + qpid/ha/TxReplicator.cpp + qpid/ha/TxReplicator.h qpid/ha/types.h ) @@ -798,7 +802,7 @@ include (amqp.cmake) # Check for syslog capabilities not present on all systems check_symbol_exists (LOG_AUTHPRIV "sys/syslog.h" HAVE_LOG_AUTHPRIV) check_symbol_exists (LOG_FTP "sys/syslog.h" HAVE_LOG_FTP) - + # Set default Memory Status module (Null implementation) set (qpid_memstat_module qpid/sys/MemStat.cpp @@ -884,7 +888,7 @@ else (CMAKE_SYSTEM_NAME STREQUAL Windows) # On Linux override memory status module set (qpid_memstat_module qpid/sys/posix/MemStat.cpp - ) + ) endif (CMAKE_SYSTEM_NAME STREQUAL Linux) if (CMAKE_SYSTEM_NAME STREQUAL SunOS) @@ -1070,7 +1074,7 @@ target_link_libraries (qpidcommon qpidtypes ${qpidcommon_platform_LIBS} ${qpidcommon_sasl_lib}) set_target_properties (qpidcommon PROPERTIES - VERSION ${qpidcommon_version} + VERSION ${qpidcommon_version} SOVERSION ${qpidcommon_version_major} ${qpidcommon_LINK_FLAGS}) install (TARGETS qpidcommon @@ -1087,7 +1091,7 @@ set(qpidtypes_SOURCES add_msvc_version (qpidtypes library dll) add_library(qpidtypes SHARED ${qpidtypes_SOURCES}) target_link_libraries(qpidtypes ${qpidtypes_platform_LIBS}) -set_target_properties (qpidtypes PROPERTIES +set_target_properties (qpidtypes PROPERTIES VERSION ${qpidtypes_version} SOVERSION ${qpidtypes_version_major}) install(TARGETS qpidtypes @@ -1329,9 +1333,9 @@ set (qpidbroker_SOURCES add_msvc_version (qpidbroker library dll) add_library (qpidbroker SHARED ${qpidbroker_SOURCES}) target_link_libraries (qpidbroker qpidcommon ${qpidbroker_platform_LIBS}) -set_target_properties (qpidbroker PROPERTIES +set_target_properties (qpidbroker PROPERTIES VERSION ${qpidbroker_version} - SOVERSION ${qpidbroker_version_major} + SOVERSION ${qpidbroker_version_major} COMPILE_DEFINITIONS _IN_QPID_BROKER) if (MSVC) set_target_properties (qpidbroker PROPERTIES COMPILE_FLAGS /wd4290) @@ -1444,7 +1448,7 @@ endif (NOT WIN32) add_library (qmf2 SHARED ${qmf2_SOURCES}) target_link_libraries (qmf2 qpidmessaging qpidtypes qpidclient qpidcommon) set_target_properties (qmf2 PROPERTIES - VERSION ${qmf2_version} + VERSION ${qmf2_version} SOVERSION ${qmf2_version_major}) install (TARGETS qmf2 OPTIONAL DESTINATION ${QPID_INSTALL_LIBDIR} diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index d420e28d95..7c9e82022f 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -400,7 +400,7 @@ boost::intrusive_ptr Broker::create(const Options& opts) return boost::intrusive_ptr(new Broker(opts)); } -void Broker::setStore (boost::shared_ptr& _store) +void Broker::setStore (const boost::shared_ptr& _store) { store.reset(new MessageStoreModule (_store)); setStore(); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index f0b0c83f61..3e22fb491b 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -226,7 +226,8 @@ class Broker : public sys::Runnable, public Plugin::Target, /** Shut down the broker */ QPID_BROKER_EXTERN virtual void shutdown(); - QPID_BROKER_EXTERN void setStore (boost::shared_ptr& store); + QPID_BROKER_EXTERN void setStore (const boost::shared_ptr& store); + bool hasStore() const { return store.get(); } MessageStore& getStore() { return *store; } void setAcl (AclModule* _acl) {acl = _acl;} AclModule* getAcl() { return acl; } diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp index f19b31fa76..edfe6e7819 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.cpp +++ b/cpp/src/qpid/broker/MessageStoreModule.cpp @@ -33,7 +33,7 @@ using std::string; namespace qpid { namespace broker { -MessageStoreModule::MessageStoreModule(boost::shared_ptr& _store) +MessageStoreModule::MessageStoreModule(const boost::shared_ptr& _store) : store(_store) {} MessageStoreModule::~MessageStoreModule() diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h index 82308db84c..a71e8cc9b6 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.h +++ b/cpp/src/qpid/broker/MessageStoreModule.h @@ -38,7 +38,7 @@ class MessageStoreModule : public MessageStore { boost::shared_ptr store; public: - MessageStoreModule(boost::shared_ptr& store); + MessageStoreModule(const boost::shared_ptr& store); bool init(const Options* options); std::auto_ptr begin(); diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 49aff3ea97..227acd1b56 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -277,6 +277,7 @@ void Queue::deliver(Message msg, TxBuffer* txn) void Queue::deliverTo(Message msg, TxBuffer* txn) { if (accept(msg)) { + interceptors.record(msg); if (txn) { TxOp::shared_ptr op(new TxPublish(msg, shared_from_this())); txn->enlist(op); @@ -842,7 +843,6 @@ bool Queue::isEmpty(const Mutex::ScopedLock&) const */ bool Queue::enqueue(TransactionContext* ctxt, Message& msg) { - interceptors.record(msg); ScopedUse u(barrier); if (!u.acquired) return false; diff --git a/cpp/src/qpid/broker/TxAccept.cpp b/cpp/src/qpid/broker/TxAccept.cpp index 0343cc00ae..ee30dff957 100644 --- a/cpp/src/qpid/broker/TxAccept.cpp +++ b/cpp/src/qpid/broker/TxAccept.cpp @@ -20,9 +20,9 @@ */ #include "qpid/broker/TxAccept.h" #include "qpid/broker/TransactionObserver.h" +#include "qpid/broker/Queue.h" #include "qpid/log/Statement.h" - using std::bind1st; using std::bind2nd; using std::mem_fun_ref; diff --git a/cpp/src/qpid/ha/Backup.h b/cpp/src/qpid/ha/Backup.h index 4943ca5e2e..88194158ce 100644 --- a/cpp/src/qpid/ha/Backup.h +++ b/cpp/src/qpid/ha/Backup.h @@ -59,6 +59,8 @@ class Backup : public Role Role* promote(); + boost::shared_ptr getBrokerReplicator() { return replicator; } + private: void stop(sys::Mutex::ScopedLock&); Role* recover(sys::Mutex::ScopedLock&); diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp index 17b00185ef..f882cbcbf1 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -21,6 +21,7 @@ #include "BrokerReplicator.h" #include "HaBroker.h" #include "QueueReplicator.h" +#include "TxReplicator.h" #include "qpid/broker/Broker.h" #include "qpid/broker/amqp_0_10/Connection.h" #include "qpid/broker/ConnectionObserver.h" @@ -648,11 +649,16 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); if (!replicationTest.getLevel(argsMap)) return; string name(values[NAME].asString()); + + if (TxReplicator::isTxQueue(name)) return; // Can't join a transaction in progress. + if (!queueTracker.get()) throw Exception(QPID_MSG("Unexpected queue response: " << values)); if (!queueTracker->response(name)) return; // Response is out-of-date + QPID_LOG(debug, logPrefix << "Queue response: " << name); boost::shared_ptr queue = queues.find(name); + if (queue) { // Already exists bool uuidOk = (getHaUuid(queue->getSettings().original) == getHaUuid(argsMap)); if (!uuidOk) QPID_LOG(debug, logPrefix << "UUID mismatch for queue: " << name); @@ -660,6 +666,7 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { QPID_LOG(debug, logPrefix << "Queue response replacing queue: " << name); deleteQueue(name); } + framing::FieldTable args; qpid::amqp_0_10::translate(argsMap, args); boost::shared_ptr qr = replicateQueue( @@ -770,8 +777,13 @@ boost::shared_ptr BrokerReplicator::startQueueReplicator( const boost::shared_ptr& queue) { if (replicationTest.getLevel(*queue) == ALL) { - boost::shared_ptr qr( - new QueueReplicator(haBroker, queue, link)); + boost::shared_ptr qr; + if (TxReplicator::isTxQueue(queue->getName())){ + qr.reset(new TxReplicator(haBroker, queue, link)); + } + else { + qr.reset(new QueueReplicator(haBroker, queue, link)); + } qr->activate(); return qr; } diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h index f93e25cb81..c36aa352f3 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.h +++ b/cpp/src/qpid/ha/BrokerReplicator.h @@ -71,6 +71,8 @@ class BrokerReplicator : public broker::Exchange, public boost::enable_shared_from_this { public: + typedef boost::shared_ptr QueueReplicatorPtr; + BrokerReplicator(HaBroker&, const boost::shared_ptr&); ~BrokerReplicator(); @@ -84,8 +86,9 @@ class BrokerReplicator : public broker::Exchange, bool isBound(boost::shared_ptr, const std::string* const, const framing::FieldTable* const); void shutdown(); + QueueReplicatorPtr findQueueReplicator(const std::string& qname); + private: - typedef boost::shared_ptr QueueReplicatorPtr; typedef std::pair, bool> CreateQueueResult; typedef std::pair, bool> CreateExchangeResult; @@ -114,7 +117,6 @@ class BrokerReplicator : public broker::Exchange, void doResponseBind(types::Variant::Map& values); void doResponseHaBroker(types::Variant::Map& values); - QueueReplicatorPtr findQueueReplicator(const std::string& qname); QueueReplicatorPtr startQueueReplicator(const boost::shared_ptr&); QueueReplicatorPtr replicateQueue( diff --git a/cpp/src/qpid/ha/Event.cpp b/cpp/src/qpid/ha/Event.cpp new file mode 100644 index 0000000000..fdd8bc85cc --- /dev/null +++ b/cpp/src/qpid/ha/Event.cpp @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Event.h" +#include "qpid/broker/amqp_0_10/MessageTransfer.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/DeliveryProperties.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace ha { + +using namespace std; +using namespace framing; +using namespace broker::amqp_0_10; + +namespace { +const string QPID_HA(QPID_HA_PREFIX); +} + +bool isEventKey(const std::string& key) { + const std::string& prefix = QPID_HA; + bool ret = key.size() > prefix.size() && key.compare(0, prefix.size(), prefix) == 0; + return ret; +} + +const string DequeueEvent::KEY(QPID_HA+"de"); +const string IdEvent::KEY(QPID_HA+"id"); +const string TxEnqueueEvent::KEY(QPID_HA+"txen"); +const string TxDequeueEvent::KEY(QPID_HA+"txde"); +const string TxPrepareEvent::KEY(QPID_HA+"txpr"); +const string TxCommitEvent::KEY(QPID_HA+"txcm"); +const string TxRollbackEvent::KEY(QPID_HA+"txrb"); + +broker::Message makeMessage(const string& data, const string& key) { + boost::intrusive_ptr transfer(new MessageTransfer()); + AMQFrame method((MessageTransferBody(ProtocolVersion(), key, 0, 0))); + AMQFrame header((AMQHeaderBody())); + AMQFrame content((AMQContentBody())); + Buffer buffer(const_cast(&data[0]), data.size()); + // AMQContentBody::decode is missing a const declaration, so cast it here. + content.castBody()->decode( + const_cast(buffer), buffer.getSize()); + header.setBof(false); + header.setEof(false); + header.setBos(true); + header.setEos(true); + content.setBof(false); + content.setEof(true); + content.setBos(true); + content.setEos(true); + transfer->getFrames().append(method); + transfer->getFrames().append(header); + transfer->getFrames().append(content); + transfer->getFrames().getHeaders()->get(true)->setRoutingKey(key); + return broker::Message(transfer, 0); +} + +}} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/Event.h b/cpp/src/qpid/ha/Event.h new file mode 100644 index 0000000000..08174bfc9d --- /dev/null +++ b/cpp/src/qpid/ha/Event.h @@ -0,0 +1,146 @@ +#ifndef QPID_HA_EVENT_H +#define QPID_HA_EVENT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types.h" +#include "qpid/broker/Message.h" +#include "qpid/framing/BufferTypes.h" + +/**@file Defines event messages used to pass transaction information from + * primary observers to backup replicators. + */ + +namespace qpid { +namespace ha { + +broker::Message makeMessage(const std::string& content, const std::string& destination); + + +/** Test if a string is an event key */ +bool isEventKey(const std::string& key); + +/** Base class for encodable events */ +struct Event { + virtual ~Event() {} + virtual void encode(framing::Buffer& buffer) const = 0; + virtual void decode(framing::Buffer& buffer) = 0; + virtual size_t encodedSize() const = 0; + virtual std::string key() const = 0; // Routing key + virtual void print(std::ostream& o) const = 0; + broker::Message message() const { return makeMessage(framing::encodeStr(*this), key()); } +}; + + +inline std::ostream& operator<<(std::ostream& o, const Event& e) { + o << "<" << e.key() << ":"; + e.print(o); + return o << ">"; +} + +/** Event base template */ +template struct EventBase : public Event { + std::string key() const { return Derived::KEY; } +}; + +//////////////// Specific event type + +//// QueueReplicator events + +struct DequeueEvent : public EventBase { + static const std::string KEY; + ReplicationIdSet ids; + + DequeueEvent(ReplicationIdSet ids_=ReplicationIdSet()) : ids(ids_) {} + void encode(framing::Buffer& b) const { b.put(ids); } + void decode(framing::Buffer& b) { b.get(ids); } + virtual size_t encodedSize() const { return ids.encodedSize(); } + void print(std::ostream& o) const { o << ids; } +}; + +struct IdEvent : public EventBase { + static const std::string KEY; + ReplicationId id; + + IdEvent(ReplicationId id_=0) : id(id_) {} + void encode(framing::Buffer& b) const { b.put(id); } + void decode(framing::Buffer& b) { b.get(id); } + virtual size_t encodedSize() const { return id.encodedSize(); } + void print(std::ostream& o) const { o << id; } +}; + +//// Transaction events + +struct TxEnqueueEvent : public EventBase { + static const std::string KEY; + framing::LongString queue; + ReplicationId id; + + TxEnqueueEvent(std::string q=std::string(), ReplicationId i=ReplicationId()) + : queue(q), id(i) {} + void encode(framing::Buffer& b) const { b.put(queue); b.put(id); } + void decode(framing::Buffer& b) { b.get(queue); b.get(id); } + virtual size_t encodedSize() const { return queue.encodedSize()+id.encodedSize(); } + void print(std::ostream& o) const { o << queue.value << " " << id; } +}; + +struct TxDequeueEvent : public EventBase { + static const std::string KEY; + framing::LongString queue; + ReplicationId id; + + TxDequeueEvent(std::string q=std::string(), ReplicationId r=0) : + queue(q), id(r) {} + void encode(framing::Buffer& b) const { b.put(queue);b.put(id); } + void decode(framing::Buffer& b) { b.get(queue);b.get(id); } + virtual size_t encodedSize() const { return queue.encodedSize()+id.encodedSize(); } + void print(std::ostream& o) const { o << queue.value << " " << id; } +}; + +struct TxPrepareEvent : public EventBase { + static const std::string KEY; + void encode(framing::Buffer&) const {} + void decode(framing::Buffer&) {} + virtual size_t encodedSize() const { return 0; } + void print(std::ostream&) const {} +}; + +struct TxCommitEvent : public EventBase { + static const std::string KEY; + void encode(framing::Buffer&) const {} + void decode(framing::Buffer&) {} + virtual size_t encodedSize() const { return 0; } + void print(std::ostream&) const {} +}; + +struct TxRollbackEvent : public EventBase { + static const std::string KEY; + void encode(framing::Buffer&) const {} + void decode(framing::Buffer&) {} + virtual size_t encodedSize() const { return 0; } + void print(std::ostream&) const {} +}; + + +}} // namespace qpid::ha + +#endif /*!QPID_HA_EVENT_H*/ diff --git a/cpp/src/qpid/ha/FailoverExchange.cpp b/cpp/src/qpid/ha/FailoverExchange.cpp index 556c7458b6..46cc345ab0 100644 --- a/cpp/src/qpid/ha/FailoverExchange.cpp +++ b/cpp/src/qpid/ha/FailoverExchange.cpp @@ -19,7 +19,7 @@ * */ #include "FailoverExchange.h" -#include "makeMessage.h" +#include "Event.h" #include "qpid/broker/amqp_0_10/MessageTransfer.h" #include "qpid/broker/Message.h" #include "qpid/broker/DeliverableMessage.h" @@ -117,7 +117,7 @@ void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue, sys::Mutex::Sc if (urls.empty()) return; framing::Array array = vectorToUrlArray(urls); const ProtocolVersion v; - broker::Message message(makeMessage(Buffer(), typeName)); + broker::Message message(makeMessage(std::string(), typeName)); MessageTransfer& transfer = MessageTransfer::get(message); MessageProperties* props = transfer.getFrames().getHeaders()->get(true); diff --git a/cpp/src/qpid/ha/HaBroker.cpp b/cpp/src/qpid/ha/HaBroker.cpp index e8c7c5c7d8..5896a5568a 100644 --- a/cpp/src/qpid/ha/HaBroker.cpp +++ b/cpp/src/qpid/ha/HaBroker.cpp @@ -75,7 +75,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) // otherwise there's a window for a client to connect before we get to // initialize() if (settings.cluster) { - QPID_LOG(debug, "Broker startup, rejecting client connections."); + QPID_LOG(debug, "Backup starting, rejecting client connections."); shared_ptr excluder(new BackupConnectionExcluder); observer->setObserver(excluder, "Backup: "); broker.getConnectionObservers().add(observer); diff --git a/cpp/src/qpid/ha/HaBroker.h b/cpp/src/qpid/ha/HaBroker.h index 8e5d30acfb..6084db3fc3 100644 --- a/cpp/src/qpid/ha/HaBroker.h +++ b/cpp/src/qpid/ha/HaBroker.h @@ -84,6 +84,7 @@ class HaBroker : public management::Manageable broker::Broker& getBroker() { return broker; } const Settings& getSettings() const { return settings; } + boost::shared_ptr getRole() const {return role; } /** Shut down the broker because of a critical error. */ void shutdown(const std::string& message); diff --git a/cpp/src/qpid/ha/Membership.cpp b/cpp/src/qpid/ha/Membership.cpp index 411bad3841..b53291ba89 100644 --- a/cpp/src/qpid/ha/Membership.cpp +++ b/cpp/src/qpid/ha/Membership.cpp @@ -107,6 +107,14 @@ BrokerInfo::Set Membership::otherBackups() const { return result; } +BrokerInfo::Set Membership::getBrokers() const { + Mutex::ScopedLock l(lock); + BrokerInfo::Set result; + transform(brokers.begin(), brokers.end(), inserter(result, result.begin()), + bind(&BrokerInfo::Map::value_type::second, _1)); + return result; +} + bool Membership::get(const types::Uuid& id, BrokerInfo& result) const { Mutex::ScopedLock l(lock); BrokerInfo::Map::const_iterator i = brokers.find(id); diff --git a/cpp/src/qpid/ha/Membership.h b/cpp/src/qpid/ha/Membership.h index f442586a71..828f9e8403 100644 --- a/cpp/src/qpid/ha/Membership.h +++ b/cpp/src/qpid/ha/Membership.h @@ -69,6 +69,9 @@ class Membership /** Return IDs of all READY backups other than self */ BrokerInfo::Set otherBackups() const; + /** Return IDs of all brokers */ + BrokerInfo::Set getBrokers() const; + void assign(const types::Variant::List&); types::Variant::List asList() const; diff --git a/cpp/src/qpid/ha/Primary.cpp b/cpp/src/qpid/ha/Primary.cpp index bae651a3fc..5fd7814d62 100644 --- a/cpp/src/qpid/ha/Primary.cpp +++ b/cpp/src/qpid/ha/Primary.cpp @@ -27,6 +27,7 @@ #include "RemoteBackup.h" #include "ConnectionObserver.h" #include "QueueReplicator.h" +#include "PrimaryTxObserver.h" #include "qpid/assert.h" #include "qpid/broker/Broker.h" #include "qpid/broker/BrokerObserver.h" @@ -34,16 +35,19 @@ #include "qpid/broker/Queue.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/FieldValue.h" -#include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" +#include "qpid/types/Uuid.h" #include "qpid/sys/Timer.h" #include +#include +#include namespace qpid { namespace ha { using sys::Mutex; using boost::shared_ptr; +using boost::make_shared; using namespace std; using namespace framing; @@ -67,7 +71,10 @@ class PrimaryBrokerObserver : public broker::BrokerObserver void queueDestroy(const Primary::QueuePtr& q) { primary.queueDestroy(q); } void exchangeCreate(const Primary::ExchangePtr& q) { primary.exchangeCreate(q); } void exchangeDestroy(const Primary::ExchangePtr& q) { primary.exchangeDestroy(q); } - private: + void startTx(const shared_ptr& tx) { primary.startTx(tx); } + void startDtx(const shared_ptr& dtx) { primary.startDtx(dtx); } + + private: Primary& primary; }; @@ -208,6 +215,26 @@ void Primary::readyReplica(const ReplicatingSubscription& rs) { if (backup) checkReady(backup); } +void Primary::addReplica(ReplicatingSubscription& rs) { + sys::Mutex::ScopedLock l(lock); + replicas[make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())] = &rs; +} + +void Primary::skip( + const types::Uuid& backup, + const boost::shared_ptr& queue, + const ReplicationIdSet& ids) +{ + sys::Mutex::ScopedLock l(lock); + ReplicaMap::const_iterator i = replicas.find(make_pair(backup, queue)); + if (i != replicas.end()) i->second->addSkip(ids); +} + +void Primary::removeReplica(const ReplicatingSubscription& rs) { + sys::Mutex::ScopedLock l(lock); + replicas.erase(make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())); +} + // NOTE: Called with queue registry lock held. void Primary::queueCreate(const QueuePtr& q) { // Set replication argument. @@ -361,4 +388,14 @@ void Primary::setCatchupQueues(const RemoteBackupPtr& backup, bool createGuards) backup->startCatchup(); } +void Primary::startTx(const boost::shared_ptr& tx) { + QPID_LOG(trace, logPrefix << "Started TX transaction"); + tx->setObserver(make_shared(boost::ref(haBroker))); +} + +void Primary::startDtx(const boost::shared_ptr& dtx) { + QPID_LOG(trace, logPrefix << "Started DTX transaction"); + dtx->setObserver(make_shared(boost::ref(haBroker))); +} + }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/Primary.h b/cpp/src/qpid/ha/Primary.h index 031ad3aab9..a34f8f5a30 100644 --- a/cpp/src/qpid/ha/Primary.h +++ b/cpp/src/qpid/ha/Primary.h @@ -31,6 +31,7 @@ #include #include #include +#include namespace qpid { @@ -39,6 +40,8 @@ class Queue; class Connection; class ConnectionObserver; class BrokerObserver; +class TxBuffer; +class DtxBuffer; } namespace sys { @@ -75,13 +78,21 @@ class Primary : public Role void setBrokerUrl(const Url&) {} void readyReplica(const ReplicatingSubscription&); - void removeReplica(const std::string& q); + void addReplica(ReplicatingSubscription&); + void removeReplica(const ReplicatingSubscription&); + + /** Skip replication of ids to queue on backup. */ + void skip(const types::Uuid& backup, + const boost::shared_ptr& queue, + const ReplicationIdSet& ids); // Called via BrokerObserver void queueCreate(const QueuePtr&); void queueDestroy(const QueuePtr&); void exchangeCreate(const ExchangePtr&); void exchangeDestroy(const ExchangePtr&); + void startTx(const boost::shared_ptr&); + void startDtx(const boost::shared_ptr&); // Called via ConnectionObserver void opened(broker::Connection& connection); @@ -93,11 +104,15 @@ class Primary : public Role void timeoutExpectedBackups(); private: - typedef qpid::sys::unordered_map< + typedef sys::unordered_map< types::Uuid, RemoteBackupPtr, types::Uuid::Hasher > BackupMap; typedef std::set BackupSet; + typedef std::pair > UuidQueue; + typedef sys::unordered_map > ReplicaMap; + RemoteBackupPtr backupConnect(const BrokerInfo&, broker::Connection&, sys::Mutex::ScopedLock&); void backupDisconnect(RemoteBackupPtr, sys::Mutex::ScopedLock&); @@ -105,8 +120,9 @@ class Primary : public Role void checkReady(); void checkReady(RemoteBackupPtr); void setCatchupQueues(const RemoteBackupPtr&, bool createGuards); + void deduplicate(); - sys::Mutex lock; + mutable sys::Mutex lock; HaBroker& haBroker; Membership& membership; std::string logPrefix; @@ -126,6 +142,7 @@ class Primary : public Role boost::shared_ptr connectionObserver; boost::shared_ptr brokerObserver; boost::intrusive_ptr timerTask; + ReplicaMap replicas; }; }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/cpp/src/qpid/ha/PrimaryTxObserver.cpp new file mode 100644 index 0000000000..8a8364ac22 --- /dev/null +++ b/cpp/src/qpid/ha/PrimaryTxObserver.cpp @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Event.h" +#include "HaBroker.h" +#include "Primary.h" +#include "PrimaryTxObserver.h" +#include "QueueGuard.h" +#include "RemoteBackup.h" +#include "ReplicatingSubscription.h" + +#include "qpid/broker/Broker.h" +#include "qpid/broker/Queue.h" +#include + +namespace qpid { +namespace ha { + +using namespace std; +using namespace boost; +using namespace broker; + +PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) : + haBroker(hb), broker(hb.getBroker()), + id(true) // FIXME aconway 2013-07-11: is UUID an appropriate TX ID? +{ + logPrefix = "Primary transaction "+id.str().substr(0,8)+": "; + QPID_LOG(trace, logPrefix << "started"); + pair, bool> result = + broker.getQueues().declare( + TRANSACTION_REPLICATOR_PREFIX+id.str(), + QueueSettings(/*durable*/false, /*autodelete*/true)); + assert(result.second); + txQueue = result.first; +} + +void PrimaryTxObserver::enqueue(const QueuePtr& q, const broker::Message& m) +{ + QPID_LOG(trace, logPrefix << "enqueue: " << LogMessageId(*q, m)); + enqueues[q] += m.getReplicationId(); + txQueue->deliver(TxEnqueueEvent(q->getName(), m.getReplicationId()).message()); + txQueue->deliver(m); +} + +void PrimaryTxObserver::dequeue( + const QueuePtr& q, QueuePosition pos, ReplicationId id) +{ + QPID_LOG(trace, logPrefix << "dequeue: " << LogMessageId(*q, pos, id)); + txQueue->deliver(TxDequeueEvent(q->getName(), id).message()); +} + +void PrimaryTxObserver::deduplicate() { + shared_ptr primary(boost::dynamic_pointer_cast(haBroker.getRole())); + assert(primary); + // FIXME aconway 2013-07-29: need to verify which backups are *in* the transaction. + // Use cluster membership for now + BrokerInfo::Set brokers = haBroker.getMembership().getBrokers(); + types::Uuid selfId = haBroker.getMembership().getSelf(); + // Tell replicating subscriptions to skip IDs in the transaction. + for (BrokerInfo::Set::iterator b = brokers.begin(); b != brokers.end(); ++b) { + if (b->getSystemId() == selfId) continue; + for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q) + primary->skip(b->getSystemId(), q->first, q->second); + } +} + +bool PrimaryTxObserver::prepare() { + // FIXME aconway 2013-07-23: need to delay completion of prepare till all + // backups have prepared. + QPID_LOG(trace, logPrefix << "prepare"); + deduplicate(); + txQueue->deliver(TxPrepareEvent().message()); + return true; +} + +void PrimaryTxObserver::commit() { + QPID_LOG(trace, logPrefix << "commit"); + txQueue->deliver(TxCommitEvent().message()); +} + +void PrimaryTxObserver::rollback() { + QPID_LOG(trace, logPrefix << "rollback"); + txQueue->deliver(TxRollbackEvent().message()); +} + +}} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/PrimaryTxObserver.h b/cpp/src/qpid/ha/PrimaryTxObserver.h new file mode 100644 index 0000000000..3681c6b750 --- /dev/null +++ b/cpp/src/qpid/ha/PrimaryTxObserver.h @@ -0,0 +1,77 @@ +#ifndef QPID_HA_PRIMARYTXOBSERVER_H +#define QPID_HA_PRIMARYTXOBSERVER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types.h" + +#include "qpid/broker/TransactionObserver.h" +#include "qpid/log/Statement.h" +#include "qpid/framing/Uuid.h" +#include "qpid/sys/unordered_map.h" +#include +namespace qpid { + +namespace broker { +class Broker; +} + +namespace ha { +class HaBroker; + +/** + * Observe events in the lifecycle of a transaction. + * + * The observer is called by TxBuffer for each transactional event. + * It puts the events on a special tx-queue. + * A TxReplicator on the backup replicates the tx-queue and creates + * a TxBuffer on the backup equivalent to the one on the primary. + * + * THREAD UNSAFE: called sequentially in the context of a transaction. + */ +class PrimaryTxObserver : public broker::TransactionObserver { + public: + PrimaryTxObserver(HaBroker&); + + void enqueue(const QueuePtr&, const broker::Message&); + void dequeue(const QueuePtr& queue, QueuePosition, ReplicationId); + bool prepare(); + void commit(); + void rollback(); + + private: + typedef qpid::sys::unordered_map< + QueuePtr, ReplicationIdSet, boost::hash > QueueIdsMap; + + void deduplicate(); + + std::string logPrefix; + HaBroker& haBroker; + broker::Broker& broker; + framing::Uuid id; + QueuePtr txQueue; + QueueIdsMap enqueues; +}; + +}} // namespace qpid::ha + +#endif /*!QPID_HA_PRIMARYTXOBSERVER_H*/ diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp index d99602fdda..28e9dc4120 100644 --- a/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -19,12 +19,13 @@ * */ -#include "makeMessage.h" +#include "Event.h" #include "HaBroker.h" #include "QueueReplicator.h" #include "QueueSnapshots.h" #include "ReplicatingSubscription.h" #include "Settings.h" +#include "types.h" #include "qpid/broker/Bridge.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Link.h" @@ -38,36 +39,32 @@ #include "qpid/Msg.h" #include "qpid/assert.h" #include +#include -namespace { -const std::string QPID_REPLICATOR_("qpid.replicator-"); -const std::string TYPE_NAME("qpid.queue-replicator"); -const std::string QPID_HA("qpid.ha-"); -} namespace qpid { namespace ha { using namespace broker; using namespace framing; using namespace std; +using namespace boost; +using std::exception; using sys::Mutex; -const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA+"dequeue"); -const std::string QueueReplicator::ID_EVENT_KEY(QPID_HA+"id"); const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency"); -std::string QueueReplicator::replicatorName(const std::string& queueName) { - return QPID_REPLICATOR_ + queueName; +namespace { +const string QPID_HA(QPID_HA_PREFIX); +const std::string TYPE_NAME(QPID_HA+"queue-replicator"); } -bool QueueReplicator::isReplicatorName(const std::string& name) { - return name.compare(0, QPID_REPLICATOR_.size(), QPID_REPLICATOR_) == 0; + +std::string QueueReplicator::replicatorName(const std::string& queueName) { + return QUEUE_REPLICATOR_PREFIX + queueName; } -bool QueueReplicator::isEventKey(const std::string key) { - const std::string& prefix = QPID_HA; - bool ret = key.size() > prefix.size() && key.compare(0, prefix.size(), prefix) == 0; - return ret; +bool QueueReplicator::isReplicatorName(const std::string& name) { + return startsWith(name, QUEUE_REPLICATOR_PREFIX); } class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener { @@ -109,12 +106,12 @@ QueueReplicator::QueueReplicator(HaBroker& hb, boost::shared_ptr l) : Exchange(replicatorName(q->getName()), 0, q->getBroker()), haBroker(hb), + brokerInfo(hb.getBrokerInfo()), logPrefix("Backup of "+q->getName()+": "), - queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false), + queue(q), link(l), subscribed(false), settings(hb.getSettings()), destroyed(false), nextId(0), maxId(0) { - QPID_LOG(debug, logPrefix << "Created"); args.setString(QPID_REPLICATE, printable(NONE).str()); Uuid uuid(true); bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str(); @@ -122,12 +119,18 @@ QueueReplicator::QueueReplicator(HaBroker& hb, args.setString(QPID_REPLICATE, printable(NONE).str()); setArgs(args); if (q->isAutoDelete()) q->markInUse(); + + dispatch[DequeueEvent::KEY] = + boost::bind(&QueueReplicator::dequeueEvent, this, _1, _2); + dispatch[IdEvent::KEY] = + boost::bind(&QueueReplicator::idEvent, this, _1, _2); } // This must be called immediately after the constructor. // It has to be separate so we can call shared_from_this(). void QueueReplicator::activate() { Mutex::ScopedLock l(lock); + QPID_LOG(debug, logPrefix << "Created"); if (!queue) return; // Already destroyed // Enable callback to route() @@ -224,44 +227,57 @@ template T decodeContent(Message& m) { } } -void QueueReplicator::dequeue(const ReplicationIdSet& dequeues, Mutex::ScopedLock&) { - QPID_LOG(trace, logPrefix << "Dequeue " << dequeues); +void QueueReplicator::dequeueEvent(const string& data, Mutex::ScopedLock&) { + DequeueEvent e; + decodeStr(data, e); + QPID_LOG(trace, logPrefix << "Dequeue " << e.ids); //TODO: should be able to optimise the following - for (ReplicationIdSet::iterator i = dequeues.begin(); i != dequeues.end(); ++i) { + for (ReplicationIdSet::iterator i = e.ids.begin(); i != e.ids.end(); ++i) { PositionMap::iterator j = positions.find(*i); if (j != positions.end()) queue->dequeueMessageAt(j->second); } } // Called in connection thread of the queues bridge to primary. -void QueueReplicator::route(Deliverable& msg) +void QueueReplicator::route(Deliverable& deliverable) { try { Mutex::ScopedLock l(lock); if (destroyed) return; - const std::string& key = msg.getMessage().getRoutingKey(); - if (!isEventKey(key)) { // Replicated message + broker::Message& message(deliverable.getMessage()); + string key(message.getRoutingKey()); + if (!isEventKey(message.getRoutingKey())) { ReplicationId id = nextId++; maxId = std::max(maxId, id); - msg.getMessage().setReplicationId(id); - msg.deliverTo(queue); + message.setReplicationId(id); + deliver(message); QueuePosition position = queue->getPosition(); positions[id] = position; QPID_LOG(trace, logPrefix << "Enqueued " << LogMessageId(*queue,position,id)); } - else if (key == DEQUEUE_EVENT_KEY) { - dequeue(decodeContent(msg.getMessage()), l); + else { + DispatchMap::iterator i = dispatch.find(key); + if (i == dispatch.end()) { + QPID_LOG(info, logPrefix << "Ignoring unknown event: " << key); + } + else { + (i->second)(message.getContent(), l); + } } - else if (key == ID_EVENT_KEY) { - nextId = decodeContent(msg.getMessage()); - } - // Ignore unknown event keys, may be introduced in later versions. } catch (const std::exception& e) { haBroker.shutdown(QPID_MSG(logPrefix << "Replication failed: " << e.what())); } } +void QueueReplicator::deliver(const broker::Message& m) { + queue->deliver(m); +} + +void QueueReplicator::idEvent(const string& data, Mutex::ScopedLock&) { + nextId = decodeStr(data).id; +} + ReplicationId QueueReplicator::getMaxId() { Mutex::ScopedLock l(lock); return maxId; @@ -273,4 +289,5 @@ bool QueueReplicator::unbind(boost::shared_ptr, const std::string&, const bool QueueReplicator::isBound(boost::shared_ptr, const std::string* const, const FieldTable* const) { return false; } std::string QueueReplicator::getType() const { return TYPE_NAME; } + }} // namespace qpid::broker diff --git a/cpp/src/qpid/ha/QueueReplicator.h b/cpp/src/qpid/ha/QueueReplicator.h index 811ddba256..90f38ce7e1 100644 --- a/cpp/src/qpid/ha/QueueReplicator.h +++ b/cpp/src/qpid/ha/QueueReplicator.h @@ -26,6 +26,7 @@ #include "hash.h" #include "qpid/broker/Exchange.h" #include +#include #include namespace qpid { @@ -56,23 +57,19 @@ class QueueReplicator : public broker::Exchange, public boost::enable_shared_from_this { public: - static const std::string DEQUEUE_EVENT_KEY; - static const std::string ID_EVENT_KEY; static const std::string QPID_SYNC_FREQUENCY; + static const std::string REPLICATOR_PREFIX; static std::string replicatorName(const std::string& queueName); static bool isReplicatorName(const std::string&); - /** Test if a string is an event key */ - static bool isEventKey(const std::string key); - QueueReplicator(HaBroker&, boost::shared_ptr q, boost::shared_ptr l); ~QueueReplicator(); - void activate(); // Must be called immediately after constructor. + void activate(); // Must be called immediately after constructor. std::string getType() const; bool bind(boost::shared_ptr > PositionMap; + protected: + typedef boost::function DispatchFn; + typedef qpid::sys::unordered_map DispatchMap; + + virtual void deliver(const broker::Message&); + + sys::Mutex lock; + HaBroker& haBroker; + const BrokerInfo brokerInfo; + DispatchMap dispatch; + private: + typedef qpid::sys::unordered_map< + ReplicationId, QueuePosition, TrivialHasher > PositionMap; class ErrorListener; class QueueObserver; void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler); void destroy(); // Called when the queue is destroyed. - void dequeue(const ReplicationIdSet&, sys::Mutex::ScopedLock&); - HaBroker& haBroker; + // Dispatch functions + void dequeueEvent(const std::string& data, sys::Mutex::ScopedLock&); + void idEvent(const std::string& data, sys::Mutex::ScopedLock&); + std::string logPrefix; std::string bridgeName; - sys::Mutex lock; boost::shared_ptr queue; boost::shared_ptr link; boost::shared_ptr bridge; - BrokerInfo brokerInfo; + bool subscribed; const Settings& settings; bool destroyed; diff --git a/cpp/src/qpid/ha/RemoteBackup.cpp b/cpp/src/qpid/ha/RemoteBackup.cpp index e55d415972..776a584bc8 100644 --- a/cpp/src/qpid/ha/RemoteBackup.cpp +++ b/cpp/src/qpid/ha/RemoteBackup.cpp @@ -20,6 +20,7 @@ */ #include "RemoteBackup.h" #include "QueueGuard.h" +#include "TxReplicator.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Connection.h" #include "qpid/broker/Queue.h" @@ -65,6 +66,8 @@ bool RemoteBackup::isReady() { } void RemoteBackup::catchupQueue(const QueuePtr& q, bool createGuard) { + // Ignore transaction queues for purposes of catch-up calculation + if (TxReplicator::isTxQueue(q->getName())) return; if (replicationTest.getLevel(*q) == ALL) { QPID_LOG(debug, logPrefix << "Catch-up queue" << (createGuard ? " and guard" : "") << ": " << q->getName()); diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 2001ec5332..9f464f8066 100644 --- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -19,7 +19,7 @@ * */ -#include "makeMessage.h" +#include "Event.h" #include "IdSetter.h" #include "QueueGuard.h" #include "QueueReplicator.h" @@ -36,6 +36,7 @@ #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" #include "qpid/types/Uuid.h" +#include #include @@ -45,6 +46,7 @@ namespace ha { using namespace framing; using namespace broker; using namespace std; +using namespace boost; using sys::Mutex; using broker::amqp_0_10::MessageTransfer; @@ -111,7 +113,8 @@ ReplicatingSubscription::ReplicatingSubscription( ) : ConsumerImpl(parent, name, queue, ack, REPLICATOR, exclusive, tag, resumeId, resumeTtl, arguments), position(0), ready(false), cancelled(false), - haBroker(hb) + haBroker(hb), + primary(boost::dynamic_pointer_cast(haBroker.getRole())) { try { FieldTable ft; @@ -137,8 +140,6 @@ ReplicatingSubscription::ReplicatingSubscription( } // If there's already a guard (we are in failover) use it, else create one. - boost::shared_ptr primary = - boost::dynamic_pointer_cast(haBroker.getRole()); if (primary) guard = primary->getGuard(queue, info); if (!guard) guard.reset(new QueueGuard(*queue, info)); @@ -163,7 +164,6 @@ ReplicatingSubscription::ReplicatingSubscription( sys::Mutex::ScopedLock l(lock); // Concurrent calls to dequeued() dequeues += initDequeues; // Messages on backup that are not on primary. skip = backupIds - initDequeues; // Messages already on the backup. - // Queue front is moving but we know this subscriptions will start at a // position >= front so if front is safe then position must be. position = front; @@ -191,6 +191,7 @@ ReplicatingSubscription::~ReplicatingSubscription() {} // void ReplicatingSubscription::initialize() { try { + if (primary) primary->addReplica(*this); Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently. // Send initial dequeues to the backup. // There must be a shared_ptr(this) when sending. @@ -218,9 +219,8 @@ bool ReplicatingSubscription::deliver( try { bool result = false; if (skip.contains(id)) { + QPID_LOG(trace, logPrefix << "Skip " << LogMessageId(*getQueue(), m)); skip -= id; - QPID_LOG(trace, logPrefix << "On backup, skip " << - LogMessageId(*getQueue(), m)); guard->complete(id); // This will never be acknowledged. notify(); result = true; @@ -240,17 +240,12 @@ bool ReplicatingSubscription::deliver( } } -/** - *@param position: must be <= last position seen by subscription. - */ void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) { if (!ready && isGuarded(l) && unready.empty()) { ready = true; sys::Mutex::ScopedUnlock u(lock); // Notify Primary that a subscription is ready. QPID_LOG(debug, logPrefix << "Caught up"); - boost::shared_ptr primary = - boost::dynamic_pointer_cast(haBroker.getRole()); if (primary) primary->readyReplica(*this); } } @@ -264,6 +259,7 @@ void ReplicatingSubscription::cancel() cancelled = true; } QPID_LOG(debug, logPrefix << "Cancelled"); + if (primary) primary->removeReplica(*this); getQueue()->removeObserver(observer); guard->cancel(); ConsumerImpl::cancel(); @@ -289,9 +285,7 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock& l) { if (dequeues.empty()) return; QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues); - string buffer = encodeStr(dequeues); - dequeues.clear(); - sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l); + sendEvent(DequeueEvent(dequeues), l); } // Called after the message has been removed @@ -311,23 +305,16 @@ void ReplicatingSubscription::dequeued(ReplicationId id) // Called with lock held. Called in subscription's connection thread. void ReplicatingSubscription::sendIdEvent(ReplicationId pos, Mutex::ScopedLock& l) { - sendEvent(QueueReplicator::ID_EVENT_KEY, encodeStr(pos), l); + sendEvent(IdEvent(pos), l); } -void ReplicatingSubscription::sendEvent(const std::string& key, - const std::string& buffer, - Mutex::ScopedLock&) +void ReplicatingSubscription::sendEvent(const Event& event, Mutex::ScopedLock&) { Mutex::ScopedUnlock u(lock); - broker::Message message = makeMessage(buffer); - MessageTransfer& transfer = MessageTransfer::get(message); - DeliveryProperties* props = - transfer.getFrames().getHeaders()->get(true); - props->setRoutingKey(key); // Send the event directly to the base consumer implementation. The dummy // consumer prevents acknowledgements being handled, which is what we want // for events - ConsumerImpl::deliver(QueueCursor(), message, boost::shared_ptr()); + ConsumerImpl::deliver(QueueCursor(), event.message(), boost::shared_ptr()); } // Called in subscription's connection thread. @@ -346,4 +333,9 @@ bool ReplicatingSubscription::doDispatch() } } +void ReplicatingSubscription::addSkip(const ReplicationIdSet& ids) { + Mutex::ScopedLock l(lock); + skip += ids; +} + }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.h b/cpp/src/qpid/ha/ReplicatingSubscription.h index c202089e91..8a4a48bb4e 100644 --- a/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -44,6 +44,8 @@ class Buffer; namespace ha { class QueueGuard; class HaBroker; +class Event; +class Primary; /** * A susbcription that replicates to a remote backup. @@ -118,6 +120,9 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl BrokerInfo getBrokerInfo() const { return info; } + /** Skip replicating enqueue of of ids. */ + void addSkip(const ReplicationIdSet& ids); + protected: bool doDispatch(); @@ -128,7 +133,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl std::string logPrefix; QueuePosition position; ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event. - ReplicationIdSet skip; // Messages already on backup will be skipped. + ReplicationIdSet skip; // Skip enqueues: messages already on backup and tx enqueues. ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged. bool ready; bool cancelled; @@ -136,12 +141,13 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl boost::shared_ptr guard; HaBroker& haBroker; boost::shared_ptr observer; + boost::shared_ptr primary; bool isGuarded(sys::Mutex::ScopedLock&); void dequeued(ReplicationId); void sendDequeueEvent(sys::Mutex::ScopedLock&); void sendIdEvent(ReplicationId, sys::Mutex::ScopedLock&); - void sendEvent(const std::string& key, const std::string& data, sys::Mutex::ScopedLock&); + void sendEvent(const Event&, sys::Mutex::ScopedLock&); void checkReady(sys::Mutex::ScopedLock&); friend class Factory; }; diff --git a/cpp/src/qpid/ha/StatusCheck.cpp b/cpp/src/qpid/ha/StatusCheck.cpp index d73f2cf8b5..11f65aa574 100644 --- a/cpp/src/qpid/ha/StatusCheck.cpp +++ b/cpp/src/qpid/ha/StatusCheck.cpp @@ -97,16 +97,10 @@ void StatusCheckThread::run() { QPID_LOG(debug, statusCheck.logPrefix << "Status of " << url << ": " << status); } } catch(const exception& error) { - QPID_LOG(info, statusCheck.logPrefix << "Checking status of " << url << ": " << error.what()); - } - try { c.close(); } - catch(const exception&) { - QPID_LOG(warning, statusCheck.logPrefix << "Error closing status check connection to " << url); - } - try { c.close(); } - catch(const exception&) { - QPID_LOG(warning, "Error closing status check connection to " << url); + QPID_LOG(info, statusCheck.logPrefix << "Error checking status of " << url + << ": " << error.what()); } + try { c.close(); } catch(...) {} delete this; } diff --git a/cpp/src/qpid/ha/TxReplicator.cpp b/cpp/src/qpid/ha/TxReplicator.cpp new file mode 100644 index 0000000000..31c68dfe45 --- /dev/null +++ b/cpp/src/qpid/ha/TxReplicator.cpp @@ -0,0 +1,192 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include "TxReplicator.h" +#include "Role.h" +#include "Backup.h" +#include "BrokerReplicator.h" +#include "Event.h" +#include "HaBroker.h" +#include "types.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/TxBuffer.h" +#include "qpid/broker/TxAccept.h" +#include "qpid/framing/BufferTypes.h" +#include "qpid/log/Statement.h" +#include +#include +#include + +namespace qpid { +namespace ha { + +using namespace std; +using namespace boost; +using namespace qpid::broker; +using namespace qpid::framing; + +namespace { +const string QPID_HA(QPID_HA_PREFIX); +const string TYPE_NAME(QPID_HA+"tx-queue-replicator"); +const string PREFIX(TRANSACTION_REPLICATOR_PREFIX); +} // namespace + + +bool TxReplicator::isTxQueue(const string& q) { + return startsWith(q, PREFIX); +} + +string TxReplicator::getTxId(const string& q) { + assert(isTxQueue(q)); + return q.substr(PREFIX.size()); +} + +string TxReplicator::getType() const { return TYPE_NAME; } + +TxReplicator::TxReplicator( + HaBroker& hb, + const boost::shared_ptr& txQueue, + const boost::shared_ptr& link) : + QueueReplicator(hb, txQueue, link), + id(getTxId(txQueue->getName())), + txBuffer(new broker::TxBuffer), + broker(hb.getBroker()), + store(broker.hasStore() ? &broker.getStore() : 0), + dequeueState(hb.getBroker().getQueues()) +{ + logPrefix = "Backup of transaction "+id+": "; + + if (!store) throw Exception(QPID_MSG(logPrefix << "No message store loaded.")); + boost::shared_ptr backup = dynamic_pointer_cast(hb.getRole()); + if (!backup) throw Exception(QPID_MSG(logPrefix << "Broker is not in backup mode.")); + brokerReplicator = backup->getBrokerReplicator(); + + // Dispatch transaction events. + dispatch[TxEnqueueEvent::KEY] = + boost::bind(&TxReplicator::enqueue, this, _1, _2); + dispatch[TxDequeueEvent::KEY] = + boost::bind(&TxReplicator::dequeue, this, _1, _2); + dispatch[TxPrepareEvent::KEY] = + boost::bind(&TxReplicator::prepare, this, _1, _2); + dispatch[TxCommitEvent::KEY] = + boost::bind(&TxReplicator::commit, this, _1, _2); + dispatch[TxRollbackEvent::KEY] = + boost::bind(&TxReplicator::rollback, this, _1, _2); +} + +void TxReplicator::deliver(const broker::Message& m_) { + // Deliver message to the target queue, not the tx-queue. + broker::Message m(m_); + m.setReplicationId(enq.id); // Use replicated id. + boost::shared_ptr queue = broker.getQueues().get(enq.queue); + QPID_LOG(trace, logPrefix << "Deliver " << LogMessageId(*queue, m)); + DeliverableMessage dm(m, txBuffer.get()); + dm.deliverTo(queue); +} + +void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) { + TxEnqueueEvent e; + decodeStr(data, e); + QPID_LOG(trace, logPrefix << "Enqueue: " << e); + enq = e; +} + +void TxReplicator::dequeue(const string& data, sys::Mutex::ScopedLock&) { + TxDequeueEvent e; + decodeStr(data, e); + QPID_LOG(trace, logPrefix << "Dequeue: " << e); + // NOTE: Backup does not see transactional dequeues until the transaction is + // prepared, then they are all receieved before the prepare event. + // We collect the events here so we can do a single scan of the queue in prepare. + dequeueState.add(e); +} + +void TxReplicator::DequeueState::add(const TxDequeueEvent& event) { + events[event.queue] += event.id; +} + +// Use this function as a seek() predicate to find the dequeued messages. +bool TxReplicator::DequeueState::addRecord( + const broker::Message& m, const boost::shared_ptr& queue, + const ReplicationIdSet& rids) +{ + if (rids.contains(m.getReplicationId())) { + // FIXME aconway 2013-07-24: + // - Do we need to acquire before creating a DR? + // - Are the parameters to DeliveryRecord ok? + DeliveryRecord dr(cursor, m.getSequence(), m.getReplicationId(), queue, + string() /*tag*/, + boost::shared_ptr(), + true /*acquired*/, + false /*accepted*/, + false /*credit.isWindowMode()*/, + 0 /*credit*/); + // Fake record ids, unique within this transaction. + dr.setId(nextId++); + records.push_back(dr); + recordIds += dr.getId(); + } + return false; +} + +void TxReplicator::DequeueState::addRecords(const EventMap::value_type& entry) { + // Process all the dequeues for a single queue, in one pass of seek() + boost::shared_ptr q = queues.get(entry.first); + q->seek(cursor, boost::bind(&TxReplicator::DequeueState::addRecord, + this, _1, q, entry.second)); +} + +boost::shared_ptr TxReplicator::DequeueState::makeAccept() { + for_each(events.begin(), events.end(), + boost::bind(&TxReplicator::DequeueState::addRecords, this, _1)); + return make_shared(cref(recordIds), ref(records)); +} + +void TxReplicator::prepare(const string&, sys::Mutex::ScopedLock&) { + QPID_LOG(trace, logPrefix << "Prepare"); + txBuffer->enlist(dequeueState.makeAccept()); + context = store->begin(); + txBuffer->prepare(context.get()); + // FIXME aconway 2013-07-26: notify the primary of prepare outcome. +} + +void TxReplicator::commit(const string&, sys::Mutex::ScopedLock&) { + QPID_LOG(trace, logPrefix << "Commit"); + if (context.get()) store->commit(*context); + txBuffer->commit(); + end(); +} + +void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock&) { + QPID_LOG(trace, logPrefix << "Rollback"); + if (context.get()) store->abort(*context); + txBuffer->rollback(); + end(); +} + +void TxReplicator::end(){ + // FIXME aconway 2013-07-26: destroying the txqueue (auto-delete?) will + // destroy this via QueueReplicator::destroy +} +}} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/TxReplicator.h b/cpp/src/qpid/ha/TxReplicator.h new file mode 100644 index 0000000000..c4df8c13f9 --- /dev/null +++ b/cpp/src/qpid/ha/TxReplicator.h @@ -0,0 +1,123 @@ +#ifndef QPID_HA_TRANSACTIONREPLICATOR_H +#define QPID_HA_TRANSACTIONREPLICATOR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "QueueReplicator.h" +#include "Event.h" +#include "qpid/broker/DeliveryRecord.h" +#include "qpid/broker/TransactionalStore.h" +#include "qpid/sys/Mutex.h" + +namespace qpid { + +namespace broker { +class TxBuffer; +class TxAccept; +class DtxBuffer; +class Broker; +class MessageStore; +} + +namespace ha { +class BrokerReplicator; + +/** + * Exchange created on a backup broker to replicate a transaction on the primary. + * + * Subscribes to a tx-queue like a normal queue but puts replicated messages and + * transaction events into a local TxBuffer. + * + * THREAD SAFE: Called in different connection threads. + */ +class TxReplicator : public QueueReplicator { + public: + typedef boost::shared_ptr QueuePtr; + typedef boost::shared_ptr LinkPtr; + + static bool isTxQueue(const std::string& queue); + static std::string getTxId(const std::string& queue); + + TxReplicator(HaBroker&, const QueuePtr& txQueue, const LinkPtr& link); + + std::string getType() const; + + protected: + + void deliver(const broker::Message&); + + private: + + typedef void (TxReplicator::*DispatchFunction)( + const std::string&, sys::Mutex::ScopedLock&); + typedef qpid::sys::unordered_map DispatchMap; + typedef qpid::sys::unordered_map DequeueMap; + + void enqueue(const std::string& data, sys::Mutex::ScopedLock&); + void dequeue(const std::string& data, sys::Mutex::ScopedLock&); + void prepare(const std::string& data, sys::Mutex::ScopedLock&); + void commit(const std::string& data, sys::Mutex::ScopedLock&); + void rollback(const std::string& data, sys::Mutex::ScopedLock&); + void end(); + + std::string logPrefix; + const std::string id; + TxEnqueueEvent enq; // Enqueue data for next deliver. + boost::shared_ptr txBuffer; + broker::Broker& broker; + broker::MessageStore* store; + boost::shared_ptr brokerReplicator; + std::auto_ptr context; + + // Class to process dequeues and create DeliveryRecords to populate a + // TxAccept. + class DequeueState { + public: + DequeueState(broker::QueueRegistry& qr) : queues(qr) {} + void add(const TxDequeueEvent&); + boost::shared_ptr makeAccept(); + + private: + // Delivery record IDs are command IDs from the session. + // On a backup we will just fake these Ids. + typedef framing::SequenceNumber Id; + typedef framing::SequenceSet IdSet; + typedef qpid::sys::unordered_map EventMap; + + bool addRecord(const broker::Message& m, const boost::shared_ptr&, + const ReplicationIdSet& ); + void addRecords(const DequeueMap::value_type& entry); + + broker::QueueRegistry& queues; + EventMap events; + broker::DeliveryRecords records; + broker::QueueCursor cursor; + framing::SequenceNumber nextId; + IdSet recordIds; + }; + DequeueState dequeueState; +}; + + +}} // namespace qpid::ha + +#endif /*!QPID_HA_TRANSACTIONREPLICATOR_H*/ diff --git a/cpp/src/qpid/ha/makeMessage.cpp b/cpp/src/qpid/ha/makeMessage.cpp deleted file mode 100644 index 5b063a23e7..0000000000 --- a/cpp/src/qpid/ha/makeMessage.cpp +++ /dev/null @@ -1,62 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "makeMessage.h" -#include "qpid/broker/amqp_0_10/MessageTransfer.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/MessageTransferBody.h" - -namespace qpid { -namespace ha { - -broker::Message makeMessage(const framing::Buffer& buffer, - const std::string& destination) -{ - using namespace framing; - using broker::amqp_0_10::MessageTransfer; - - boost::intrusive_ptr transfer( - new qpid::broker::amqp_0_10::MessageTransfer()); - AMQFrame method((MessageTransferBody(ProtocolVersion(), destination, 0, 0))); - AMQFrame header((AMQHeaderBody())); - AMQFrame content((AMQContentBody())); - // AMQContentBody::decode is missing a const declaration, so cast it here. - content.castBody()->decode( - const_cast(buffer), buffer.getSize()); - header.setBof(false); - header.setEof(false); - header.setBos(true); - header.setEos(true); - content.setBof(false); - content.setEof(true); - content.setBos(true); - content.setEos(true); - transfer->getFrames().append(method); - transfer->getFrames().append(header); - transfer->getFrames().append(content); - return broker::Message(transfer, 0); -} - -broker::Message makeMessage(const std::string& content, const std::string& destination) { - framing::Buffer buffer(const_cast(&content[0]), content.size()); - return makeMessage(buffer, destination); -} - -}} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/makeMessage.h b/cpp/src/qpid/ha/makeMessage.h deleted file mode 100644 index 4427cdd948..0000000000 --- a/cpp/src/qpid/ha/makeMessage.h +++ /dev/null @@ -1,66 +0,0 @@ -#ifndef QPID_HA_MAKEMESSAGE_H -#define QPID_HA_MAKEMESSAGE_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/broker/Message.h" -#include "qpid/framing/Buffer.h" -#include - -/** Utilities for creating messages used by HA internally. */ - -namespace qpid { -namespace framing { -class Buffer; -} -namespace ha { - -/** - * Create internal messages used by HA components. - */ -broker::Message makeMessage( - const framing::Buffer& content, - const std::string& destination=std::string() -); - -broker::Message makeMessage(const std::string& content, - const std::string& destination=std::string()); - -/** Encode value as a string. */ -template std::string encodeStr(const T& value) { - std::string encoded(value.encodedSize(), '\0'); - framing::Buffer buffer(&encoded[0], encoded.size()); - value.encode(buffer); - return encoded; -} - -/** Decode value from a string. */ -template T decodeStr(const std::string& encoded) { - framing::Buffer buffer(const_cast(&encoded[0]), encoded.size()); - T value; - value.decode(buffer); - return value; -} - -}} // namespace qpid::ha - -#endif /*!QPID_HA_MAKEMESSAGE_H*/ diff --git a/cpp/src/qpid/ha/types.cpp b/cpp/src/qpid/ha/types.cpp index 2246355339..10d6bd4e3b 100644 --- a/cpp/src/qpid/ha/types.cpp +++ b/cpp/src/qpid/ha/types.cpp @@ -37,6 +37,14 @@ using namespace std; const string QPID_REPLICATE("qpid.replicate"); const string QPID_HA_UUID("qpid.ha-uuid"); +const char* QPID_HA_PREFIX = "qpid.ha-"; +const char* QUEUE_REPLICATOR_PREFIX = "qpid.ha-q:"; +const char* TRANSACTION_REPLICATOR_PREFIX = "qpid.ha-tx:"; + +bool startsWith(const string& name, const string& prefix) { + return name.compare(0, prefix.size(), prefix) == 0; +} + string EnumBase::str() const { assert(value < count); return names[value]; diff --git a/cpp/src/qpid/ha/types.h b/cpp/src/qpid/ha/types.h index 9a7e97c66d..3af095d470 100644 --- a/cpp/src/qpid/ha/types.h +++ b/cpp/src/qpid/ha/types.h @@ -105,10 +105,17 @@ inline bool isPrimary(BrokerStatus s) { inline bool isBackup(BrokerStatus s) { return !isPrimary(s); } -// String constants. +// String constants, defined as char* to avoid initialization order problems. extern const std::string QPID_REPLICATE; extern const std::string QPID_HA_UUID; +// Strings used as prefixes, defined as char* to avoid link order problems. +extern const char* QPID_HA_PREFIX; +extern const char* QUEUE_REPLICATOR_PREFIX; +extern const char* TRANSACTION_REPLICATOR_PREFIX; + +bool startsWith(const std::string& name, const std::string& prefix); + /** Define IdSet type, not a typedef so we can overload operator << */ class IdSet : public std::set {}; diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h index bc23867ee1..b7b0f9d34b 100644 --- a/cpp/src/tests/BrokerFixture.h +++ b/cpp/src/tests/BrokerFixture.h @@ -42,11 +42,45 @@ namespace tests { struct BrokerFixture : private boost::noncopyable { typedef qpid::broker::Broker Broker; typedef boost::intrusive_ptr BrokerPtr; + typedef std::vector Args; BrokerPtr broker; + uint16_t port; qpid::sys::Thread brokerThread; - BrokerFixture(Broker::Options opts=Broker::Options(), bool enableMgmt=false) { + BrokerFixture(const Args& args=Args(), const Broker::Options& opts=Broker::Options(), + bool isExternalPort_=false, uint16_t externalPort_=0) + { + init(args, opts, isExternalPort_, externalPort_); + } + + BrokerFixture(const Broker::Options& opts, + bool isExternalPort_=false, uint16_t externalPort_=0) + { + init(Args(), opts, isExternalPort_, externalPort_); + } + + void shutdownBroker() { + if (broker) { + broker->shutdown(); + brokerThread.join(); + broker = BrokerPtr(); + } + } + + ~BrokerFixture() { shutdownBroker(); } + + /** Open a connection to the broker. */ + void open(qpid::client::Connection& c) { + c.open("localhost", getPort()); + } + + uint16_t getPort() { return port; } + + private: + void init(const Args& args, Broker::Options opts, + bool isExternalPort=false, uint16_t externalPort=0) + { // Keep the tests quiet unless logging env. vars have been set by user. if (!::getenv("QPID_LOG_ENABLE") && !::getenv("QPID_TRACE")) { qpid::log::Options logOpts; @@ -55,38 +89,28 @@ struct BrokerFixture : private boost::noncopyable { logOpts.selectors.push_back("error+"); qpid::log::Logger::instance().configure(logOpts); } + // Default options, may be over-ridden when we parse args. opts.port=0; opts.listenInterfaces.push_back("127.0.0.1"); - // Management doesn't play well with multiple in-process brokers. - opts.enableMgmt=enableMgmt; opts.workerThreads=1; opts.dataDir=""; opts.auth=false; + + // Argument parsing + std::vector argv(args.size()); + std::transform(args.begin(), args.end(), argv.begin(), + boost::bind(&std::string::c_str, _1)); + Plugin::addOptions(opts); + opts.parse(argv.size(), &argv[0]); broker = Broker::create(opts); // TODO aconway 2007-12-05: At one point BrokerFixture // tests could hang in Connection ctor if the following // line is removed. This may not be an issue anymore. broker->accept(); - broker->getPort(qpid::broker::Broker::TCP_TRANSPORT); + if (isExternalPort) port = externalPort; + else port = broker->getPort(qpid::broker::Broker::TCP_TRANSPORT); brokerThread = qpid::sys::Thread(*broker); }; - - void shutdownBroker() { - if (broker) { - broker->shutdown(); - brokerThread.join(); - broker = BrokerPtr(); - } - } - - ~BrokerFixture() { shutdownBroker(); } - - /** Open a connection to the broker. */ - void open(qpid::client::Connection& c) { - c.open("localhost", broker->getPort(qpid::broker::Broker::TCP_TRANSPORT)); - } - - uint16_t getPort() { return broker->getPort(qpid::broker::Broker::TCP_TRANSPORT); } }; /** Connection that opens in its constructor */ @@ -125,8 +149,8 @@ template struct SessionFixtureT : BrokerFixture, ClientT { SessionFixtureT(Broker::Options opts=Broker::Options()) : - BrokerFixture(opts), - ClientT(broker->getPort(qpid::broker::Broker::TCP_TRANSPORT)) + BrokerFixture(BrokerFixture::Args(), opts), + ClientT(getPort()) {} }; diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt index a1d29c23a3..94d965eb7a 100644 --- a/cpp/src/tests/CMakeLists.txt +++ b/cpp/src/tests/CMakeLists.txt @@ -379,24 +379,7 @@ add_test (queue_redirect ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_queue_redirect add_library(test_store MODULE test_store.cpp) target_link_libraries (test_store qpidbroker qpidcommon) -set_target_properties (test_store PROPERTIES - COMPILE_DEFINITIONS _IN_QPID_BROKER - PREFIX "") +set_target_properties (test_store PROPERTIES PREFIX "" COMPILE_DEFINITIONS _IN_QPID_BROKER) add_library (dlclose_noop MODULE dlclose_noop.c) -#libdlclose_noop_la_LDFLAGS = -module -rpath $(abs_builddir) - -#CLEANFILES+=valgrind.out *.log *.vglog* dummy_test $(unit_wrappers) -# -## Longer running stability tests, not run by default check: target. -## Not run under valgrind, too slow -#LONG_TESTS=fanout_perftest shared_perftest multiq_perftest topic_perftest -#EXTRA_DIST+=$(LONG_TESTS) run_perftest -#check-long: -# $(MAKE) check TESTS="start_broker $(LONG_TESTS) stop_broker" VALGRIND= - -# -# legacystore -# -# add_subdirectory(legacystore) diff --git a/cpp/src/tests/MessagingFixture.h b/cpp/src/tests/MessagingFixture.h index 2312a87e9d..c18869a7c3 100644 --- a/cpp/src/tests/MessagingFixture.h +++ b/cpp/src/tests/MessagingFixture.h @@ -115,6 +115,7 @@ struct MessagingFixture : public BrokerFixture (boost::format("amqp:tcp:localhost:%1%") % (port)).str()); connection.open(); return connection; + } /** Open a connection to the broker. */ @@ -231,9 +232,10 @@ inline void receive(messaging::Receiver& receiver, uint count = 1, uint start = class MethodInvoker { public: - MethodInvoker(messaging::Session& session) : replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"), - sender(session.createSender("qmf.default.direct/broker")), - receiver(session.createReceiver(replyTo)) {} + MethodInvoker(messaging::Session session) : + replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"), + sender(session.createSender("qmf.default.direct/broker")), + receiver(session.createReceiver(replyTo)) {} void createExchange(const std::string& name, const std::string& type, bool durable=false) { @@ -292,11 +294,14 @@ class MethodInvoker methodRequest("delete", params); } - void methodRequest(const std::string& method, const Variant::Map& inParams, Variant::Map* outParams = 0) + void methodRequest( + const std::string& method, + const Variant::Map& inParams, Variant::Map* outParams = 0, + const std::string& objectName="org.apache.qpid.broker:broker:amqp-broker") { Variant::Map content; Variant::Map objectId; - objectId["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker"; + objectId["_object_name"] = objectName;; content["_object_id"] = objectId; content["_method_name"] = method; content["_arguments"] = inParams; diff --git a/cpp/src/tests/TransactionObserverTest.cpp b/cpp/src/tests/TransactionObserverTest.cpp index c284417e25..f570837ccf 100644 --- a/cpp/src/tests/TransactionObserverTest.cpp +++ b/cpp/src/tests/TransactionObserverTest.cpp @@ -89,6 +89,7 @@ Session simpleTxTransaction(MessagingFixture& fix) { // Transaction with 1 enqueue and 1 dequeue. Session txSession = fix.connection.createTransactionalSession(); BOOST_CHECK_EQUAL("foo", txSession.createReceiver("q1").fetch().getContent()); + txSession.acknowledge(); txSession.createSender("q2;{create:always}").send(msg("bar")); return txSession; } diff --git a/cpp/src/tests/brokertest.py b/cpp/src/tests/brokertest.py index 286beb0258..b07a5b5d11 100644 --- a/cpp/src/tests/brokertest.py +++ b/cpp/src/tests/brokertest.py @@ -244,14 +244,10 @@ class Broker(Popen): def __str__(self): return "Broker<%s %s :%d>"%(self.log, self.pname, self.port()) - def find_log(self): - self.log = "%03d:%s.log" % (Broker._log_count, self.name) - Broker._log_count += 1 - def get_log(self): return os.path.abspath(self.log) - def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None, show_cmd=False): + def __init__(self, test, args=[], test_store=False, name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None, show_cmd=False): """Start a broker daemon. name determines the data-dir and log file names.""" @@ -273,11 +269,18 @@ class Broker(Popen): else: self.name = "broker%d" % Broker._broker_count Broker._broker_count += 1 - self.find_log() + + self.log = "%03d:%s.log" % (Broker._log_count, self.name) + self.store_log = "%03d:%s.store.log" % (Broker._log_count, self.name) + Broker._log_count += 1 + cmd += ["--log-to-file", self.log] cmd += ["--log-to-stderr=no"] cmd += ["--log-enable=%s"%(log_level or "info+") ] + if test_store: cmd += ["--load-module", BrokerTest.test_store_lib, + "--test-store-events", self.store_log] + self.datadir = self.name cmd += ["--data-dir", self.datadir] if show_cmd: print cmd diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp new file mode 100644 index 0000000000..62bbf513bd --- /dev/null +++ b/cpp/src/tests/cluster_test.cpp @@ -0,0 +1,1231 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "test_tools.h" +#include "unit_test.h" +#include "ForkedBroker.h" +#include "BrokerFixture.h" +#include "ClusterFixture.h" + +#include "qpid/client/Connection.h" +#include "qpid/client/ConnectionSettings.h" +#include "qpid/client/ConnectionAccess.h" +#include "qpid/client/Session.h" +#include "qpid/client/FailoverListener.h" +#include "qpid/client/FailoverManager.h" +#include "qpid/client/QueueOptions.h" +#include "qpid/cluster/Cluster.h" +#include "qpid/cluster/Cpg.h" +#include "qpid/cluster/UpdateClient.h" +#include "qpid/framing/AMQBody.h" +#include "qpid/framing/Uuid.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/enum.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/log/Logger.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/Thread.h" + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + + +using namespace std; +using namespace qpid; +using namespace qpid::cluster; +using namespace qpid::framing; +using namespace qpid::client; +using namespace boost::assign; +using broker::Broker; +using boost::shared_ptr; + +namespace qpid { +namespace tests { + +QPID_AUTO_TEST_SUITE(cluster_test) + +bool durableFlag = std::getenv("STORE_LIB") != 0; + +void prepareArgs(ClusterFixture::Args& args, const bool durableFlag = false) { + ostringstream clusterLib; + clusterLib << getLibPath("CLUSTER_LIB"); + args += "--auth", "no", "--no-module-dir", "--load-module", clusterLib.str(); + if (durableFlag) + args += "--load-module", getLibPath("STORE_LIB"), "TMP_DATA_DIR"; + else + args += "--no-data-dir"; +} + +ClusterFixture::Args prepareArgs(const bool durableFlag = false) { + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + return args; +} + +// Timeout for tests that wait for messages +const sys::Duration TIMEOUT=2*sys::TIME_SEC; + + +ostream& operator<<(ostream& o, const cpg_name* n) { + return o << Cpg::str(*n); +} + +ostream& operator<<(ostream& o, const cpg_address& a) { + return o << "(" << a.nodeid <<","< +ostream& operator<<(ostream& o, const pair& array) { + o << "{ "; + ostream_iterator i(o, " "); + copy(array.first, array.first+array.second, i); + o << "}"; + return o; +} + +template set makeSet(const C& c) { + set s; + copy(c.begin(), c.end(), inserter(s, s.begin())); + return s; +} + +class Sender { + public: + Sender(boost::shared_ptr ci, uint16_t ch) : connection(ci), channel(ch) {} + void send(const AMQBody& body, bool firstSeg, bool lastSeg, bool firstFrame, bool lastFrame) { + AMQFrame f(body); + f.setChannel(channel); + f.setFirstSegment(firstSeg); + f.setLastSegment(lastSeg); + f.setFirstFrame(firstFrame); + f.setLastFrame(lastFrame); + connection->expand(f.encodedSize(), false); + connection->handle(f); + } + + private: + boost::shared_ptr connection; + uint16_t channel; +}; + +int64_t getMsgSequence(const Message& m) { + return m.getMessageProperties().getApplicationHeaders().getAsInt64("qpid.msg_sequence"); +} + +Message ttlMessage(const string& data, const string& key, uint64_t ttl, bool durable = false) { + Message m(data, key); + m.getDeliveryProperties().setTtl(ttl); + if (durable) m.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); + return m; +} + +Message makeMessage(const string& data, const string& key, bool durable = false) { + Message m(data, key); + if (durable) m.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); + return m; +} + +vector browse(Client& c, const string& q, int n) { + SubscriptionSettings browseSettings( + FlowControl::messageCredit(n), + ACCEPT_MODE_NONE, + ACQUIRE_MODE_NOT_ACQUIRED, + 0 // No auto-ack. + ); + LocalQueue lq; + c.subs.subscribe(lq, q, browseSettings); + c.session.messageFlush(q); + vector result; + for (int i = 0; i < n; ++i) { + Message m; + if (!lq.get(m, TIMEOUT)) + break; + result.push_back(m.getData()); + } + c.subs.getSubscription(q).cancel(); + return result; +} + +ConnectionSettings aclSettings(int port, const std::string& id) { + ConnectionSettings settings; + settings.port = port; + settings.mechanism = "PLAIN"; + settings.username = id; + settings.password = id; + return settings; +} + +// An illegal frame body +struct PoisonPill : public AMQBody { + virtual uint8_t type() const { return 0xFF; } + virtual void encode(Buffer& ) const {} + virtual void decode(Buffer& , uint32_t=0) {} + virtual uint32_t encodedSize() const { return 0; } + + virtual void print(std::ostream&) const {}; + virtual void accept(AMQBodyConstVisitor&) const {}; + + virtual AMQMethodBody* getMethod() { return 0; } + virtual const AMQMethodBody* getMethod() const { return 0; } + + /** Match if same type and same class/method ID for methods */ + static bool match(const AMQBody& , const AMQBody& ) { return false; } + virtual boost::intrusive_ptr clone() const { return new PoisonPill; } +}; + +QPID_AUTO_TEST_CASE(testBadClientData) { + // Ensure that bad data on a client connection closes the + // connection but does not stop the broker. + ClusterFixture::Args args; + prepareArgs(args, false); + args += "--log-enable=critical"; // Supress expected errors + ClusterFixture cluster(2, args, -1); + Client c0(cluster[0]); + Client c1(cluster[1]); + boost::shared_ptr ci = + client::ConnectionAccess::getImpl(c0.connection); + AMQFrame poison(boost::intrusive_ptr(new PoisonPill)); + ci->expand(poison.encodedSize(), false); + ci->handle(poison); + { + ScopedSuppressLogging sl; + BOOST_CHECK_THROW(c0.session.queueQuery("q0"), Exception); + } + Client c00(cluster[0]); + BOOST_CHECK_EQUAL(c00.session.queueQuery("q00").getQueue(), ""); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getQueue(), ""); +} + +QPID_AUTO_TEST_CASE(testAcl) { + ofstream policyFile("cluster_test.acl"); + policyFile << "acl allow foo@QPID create queue name=foo" << endl + << "acl allow foo@QPID create queue name=foo2" << endl + << "acl deny foo@QPID create queue name=bar" << endl + << "acl allow all all" << endl; + policyFile.close(); + char cwd[1024]; + BOOST_CHECK(::getcwd(cwd, sizeof(cwd))); + ostringstream aclLib; + aclLib << getLibPath("ACL_LIB"); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + args += "--log-enable=critical"; // Supress expected errors + args += "--acl-file", string(cwd) + "/cluster_test.acl", + "--cluster-mechanism", "PLAIN", + "--cluster-username", "cluster", + "--cluster-password", "cluster", + "--load-module", aclLib.str(); + ClusterFixture cluster(2, args, -1); + + Client c0(aclSettings(cluster[0], "c0"), "c0"); + Client c1(aclSettings(cluster[1], "c1"), "c1"); + Client foo(aclSettings(cluster[1], "foo"), "foo"); + + foo.session.queueDeclare("foo", arg::durable=durableFlag); + BOOST_CHECK_EQUAL(c0.session.queueQuery("foo").getQueue(), "foo"); + + { + ScopedSuppressLogging sl; + BOOST_CHECK_THROW(foo.session.queueDeclare("bar", arg::durable=durableFlag), framing::UnauthorizedAccessException); + } + BOOST_CHECK(c0.session.queueQuery("bar").getQueue().empty()); + BOOST_CHECK(c1.session.queueQuery("bar").getQueue().empty()); + + cluster.add(); + Client c2(aclSettings(cluster[2], "c2"), "c2"); + { + ScopedSuppressLogging sl; + BOOST_CHECK_THROW(foo.session.queueDeclare("bar", arg::durable=durableFlag), framing::UnauthorizedAccessException); + } + BOOST_CHECK(c2.session.queueQuery("bar").getQueue().empty()); +} + +QPID_AUTO_TEST_CASE(testMessageTimeToLive) { + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(2, args, -1); + Client c0(cluster[0], "c0"); + Client c1(cluster[1], "c1"); + c0.session.queueDeclare("p", arg::durable=durableFlag); + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200, durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("b", "q", durableFlag)); + c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 100000, durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("y", "p", durableFlag)); + cluster.add(); + Client c2(cluster[1], "c2"); + + BOOST_CHECK_EQUAL(browse(c0, "p", 1), list_of("x")); + BOOST_CHECK_EQUAL(browse(c1, "p", 1), list_of("x")); + BOOST_CHECK_EQUAL(browse(c2, "p", 1), list_of("x")); + + sys::usleep(200*1000); + BOOST_CHECK_EQUAL(browse(c0, "q", 1), list_of("b")); + BOOST_CHECK_EQUAL(browse(c1, "q", 1), list_of("b")); + BOOST_CHECK_EQUAL(browse(c2, "q", 1), list_of("b")); +} + +QPID_AUTO_TEST_CASE(testSequenceOptions) { + // Make sure the exchange qpid.msg_sequence property is properly replicated. + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c0(cluster[0], "c0"); + FieldTable ftargs; + ftargs.setInt("qpid.msg_sequence", 1); + c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag); + c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=ftargs); + c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k"); + c0.session.messageTransfer(arg::content=makeMessage("1", "k", durableFlag), arg::destination="ex"); + c0.session.messageTransfer(arg::content=makeMessage("2", "k", durableFlag), arg::destination="ex"); + BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIMEOUT))); + BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIMEOUT))); + + cluster.add(); + Client c1(cluster[1]); + c1.session.messageTransfer(arg::content=makeMessage("3", "k", durableFlag), arg::destination="ex"); + BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIMEOUT))); +} + +QPID_AUTO_TEST_CASE(testTxTransaction) { + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c0(cluster[0], "c0"); + c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("A", "q", durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("B", "q", durableFlag)); + + // Start a transaction that will commit. + Session commitSession = c0.connection.newSession("commit"); + SubscriptionManager commitSubs(commitSession); + commitSession.txSelect(); + commitSession.messageTransfer(arg::content=makeMessage("a", "q", durableFlag)); + commitSession.messageTransfer(arg::content=makeMessage("b", "q", durableFlag)); + BOOST_CHECK_EQUAL(commitSubs.get("q", TIMEOUT).getData(), "A"); + + // Start a transaction that will roll back. + Session rollbackSession = c0.connection.newSession("rollback"); + SubscriptionManager rollbackSubs(rollbackSession); + rollbackSession.txSelect(); + rollbackSession.messageTransfer(arg::content=makeMessage("1", "q", durableFlag)); + Message rollbackMessage = rollbackSubs.get("q", TIMEOUT); + BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B"); + + BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); + // Add new member mid transaction. + cluster.add(); + Client c1(cluster[1], "c1"); + + // More transactional work + BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); + rollbackSession.messageTransfer(arg::content=makeMessage("2", "q", durableFlag)); + commitSession.messageTransfer(arg::content=makeMessage("c", "q", durableFlag)); + rollbackSession.messageTransfer(arg::content=makeMessage("3", "q", durableFlag)); + + BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); + + // Commit/roll back. + commitSession.txCommit(); + rollbackSession.txRollback(); + rollbackSession.messageRelease(rollbackMessage.getId()); + + // Verify queue status: just the comitted messages and dequeues should remain. + BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 4u); + BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "B"); + BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "a"); + BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "b"); + BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "c"); + + commitSession.close(); + rollbackSession.close(); +} + +QPID_AUTO_TEST_CASE(testUnacked) { + // Verify replication of unacknowledged messages. + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c0(cluster[0], "c0"); + + Message m; + + // Create unacked message: acquired but not accepted. + SubscriptionSettings manualAccept(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 0); + c0.session.queueDeclare("q1", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("11","q1", durableFlag)); + LocalQueue q1; + c0.subs.subscribe(q1, "q1", manualAccept); + BOOST_CHECK_EQUAL(q1.get(TIMEOUT).getData(), "11"); // Acquired but not accepted + BOOST_CHECK_EQUAL(c0.session.queueQuery("q1").getMessageCount(), 0u); // Gone from queue + + // Create unacked message: not acquired, accepted or completeed. + SubscriptionSettings manualAcquire(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_NOT_ACQUIRED, 0); + c0.session.queueDeclare("q2", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("21","q2", durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("22","q2", durableFlag)); + LocalQueue q2; + c0.subs.subscribe(q2, "q2", manualAcquire); + m = q2.get(TIMEOUT); // Not acquired or accepted, still on queue + BOOST_CHECK_EQUAL(m.getData(), "21"); + BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 2u); // Not removed + c0.subs.getSubscription("q2").acquire(m); // Acquire manually + BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // Removed + BOOST_CHECK_EQUAL(q2.get(TIMEOUT).getData(), "22"); // Not acquired or accepted, still on queue + BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // 1 not acquired. + + // Create empty credit record: acquire and accept but don't complete. + SubscriptionSettings manualComplete(FlowControl::messageWindow(1), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 1, MANUAL_COMPLETION); + c0.session.queueDeclare("q3", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("31", "q3", durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("32", "q3", durableFlag)); + LocalQueue q3; + c0.subs.subscribe(q3, "q3", manualComplete); + Message m31=q3.get(TIMEOUT); + BOOST_CHECK_EQUAL(m31.getData(), "31"); // Automatically acquired & accepted but not completed. + BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 1u); + + // Add new member while there are unacked messages. + cluster.add(); + Client c1(cluster[1], "c1"); + + // Check queue counts + BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 0u); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 1u); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 1u); + + // Complete the empty credit message, should unblock the message behind it. + BOOST_CHECK_THROW(q3.get(0), Exception); + c0.session.markCompleted(SequenceSet(m31.getId()), true); + BOOST_CHECK_EQUAL(q3.get(TIMEOUT).getData(), "32"); + BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 0u); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 0u); + + // Close the original session - unacked messages should be requeued. + c0.session.close(); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 1u); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 2u); + + BOOST_CHECK_EQUAL(c1.subs.get("q1", TIMEOUT).getData(), "11"); + BOOST_CHECK_EQUAL(c1.subs.get("q2", TIMEOUT).getData(), "21"); + BOOST_CHECK_EQUAL(c1.subs.get("q2", TIMEOUT).getData(), "22"); +} + +// FIXME aconway 2009-06-17: test for unimplemented feature, enable when implemented. +void testUpdateTxState() { + // Verify that we update transaction state correctly to new members. + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c0(cluster[0], "c0"); + + // Do work in a transaction. + c0.session.txSelect(); + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("1","q", durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("2","q", durableFlag)); + Message m; + BOOST_CHECK(c0.subs.get(m, "q", TIMEOUT)); + BOOST_CHECK_EQUAL(m.getData(), "1"); + + // New member, TX not comitted, c1 should see nothing. + cluster.add(); + Client c1(cluster[1], "c1"); + BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u); + + // After commit c1 shoudl see results of tx. + c0.session.txCommit(); + BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u); + BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); + BOOST_CHECK_EQUAL(m.getData(), "2"); + + // Another transaction with both members active. + c0.session.messageTransfer(arg::content=makeMessage("3","q", durableFlag)); + BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u); + c0.session.txCommit(); + BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u); + BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); + BOOST_CHECK_EQUAL(m.getData(), "3"); +} + +QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) { + // Verify that we update a partially recieved message to a new member. + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c0(cluster[0], "c0"); + c0.session.queueDeclare("q", arg::durable=durableFlag); + Sender sender(ConnectionAccess::getImpl(c0.connection), c0.session.getChannel()); + + // Send first 2 frames of message. + MessageTransferBody transfer( + ProtocolVersion(), string(), // default exchange. + framing::message::ACCEPT_MODE_NONE, + framing::message::ACQUIRE_MODE_PRE_ACQUIRED); + sender.send(transfer, true, false, true, true); + AMQHeaderBody header; + header.get(true)->setRoutingKey("q"); + if (durableFlag) + header.get(true)->setDeliveryMode(DELIVERY_MODE_PERSISTENT); + else + header.get(true)->setDeliveryMode(DELIVERY_MODE_NON_PERSISTENT); + sender.send(header, false, false, true, true); + + // No reliable way to ensure the partial message has arrived + // before we start the new broker, so we sleep. + sys::usleep(2500); + cluster.add(); + + // Send final 2 frames of message. + sender.send(AMQContentBody("ab"), false, true, true, false); + sender.send(AMQContentBody("cd"), false, true, false, true); + + // Verify message is enqued correctly on second member. + Message m; + Client c1(cluster[1], "c1"); + BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); + BOOST_CHECK_EQUAL(m.getData(), "abcd"); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size()); +} + +QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c0(cluster[0], "c0"); + set kb0 = knownBrokerPorts(c0.connection, 1); + BOOST_CHECK_EQUAL(kb0.size(), 1u); + BOOST_CHECK_EQUAL(kb0, makeSet(cluster)); + + cluster.add(); + Client c1(cluster[1], "c1"); + set kb1 = knownBrokerPorts(c1.connection, 2); + kb0 = knownBrokerPorts(c0.connection, 2); + BOOST_CHECK_EQUAL(kb1.size(), 2u); + BOOST_CHECK_EQUAL(kb1, makeSet(cluster)); + BOOST_CHECK_EQUAL(kb1,kb0); + + cluster.add(); + Client c2(cluster[2], "c2"); + set kb2 = knownBrokerPorts(c2.connection, 3); + kb1 = knownBrokerPorts(c1.connection, 3); + kb0 = knownBrokerPorts(c0.connection, 3); + BOOST_CHECK_EQUAL(kb2.size(), 3u); + BOOST_CHECK_EQUAL(kb2, makeSet(cluster)); + BOOST_CHECK_EQUAL(kb2,kb0); + BOOST_CHECK_EQUAL(kb2,kb1); + + cluster.killWithSilencer(1,c1.connection,9); + kb0 = knownBrokerPorts(c0.connection, 2); + kb2 = knownBrokerPorts(c2.connection, 2); + BOOST_CHECK_EQUAL(kb0.size(), 2u); + BOOST_CHECK_EQUAL(kb0, kb2); +} + +QPID_AUTO_TEST_CASE(testUpdateConsumers) { + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + + Client c0(cluster[0], "c0"); + c0.session.queueDeclare("p", arg::durable=durableFlag); + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.subs.subscribe(c0.lq, "q", FlowControl::zero()); + LocalQueue lp; + c0.subs.subscribe(lp, "p", FlowControl::messageCredit(1)); + c0.session.sync(); + + // Start new members + cluster.add(); // Local + Client c1(cluster[1], "c1"); + cluster.add(); + Client c2(cluster[2], "c2"); + + // Transfer messages + c0.session.messageTransfer(arg::content=makeMessage("aaa", "q", durableFlag)); + + c0.session.messageTransfer(arg::content=makeMessage("bbb", "p", durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("ccc", "p", durableFlag)); + + // Activate the subscription, ensure message removed on all queues. + c0.subs.setFlowControl("q", FlowControl::unlimited()); + Message m; + BOOST_CHECK(c0.lq.get(m, TIMEOUT)); + BOOST_CHECK_EQUAL(m.getData(), "aaa"); + BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); + BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u); + + // Check second subscription's flow control: gets first message, not second. + BOOST_CHECK(lp.get(m, TIMEOUT)); + BOOST_CHECK_EQUAL(m.getData(), "bbb"); + BOOST_CHECK_EQUAL(c0.session.queueQuery("p").getMessageCount(), 1u); + BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u); + BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 1u); + + BOOST_CHECK(c0.subs.get(m, "p", TIMEOUT)); + BOOST_CHECK_EQUAL(m.getData(), "ccc"); + + // Kill the subscribing member, ensure further messages are not removed. + cluster.killWithSilencer(0,c0.connection,9); + BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2u); + for (int i = 0; i < 10; ++i) { + c1.session.messageTransfer(arg::content=makeMessage("xxx", "q", durableFlag)); + BOOST_REQUIRE(c1.subs.get(m, "q", TIMEOUT)); + BOOST_REQUIRE_EQUAL(m.getData(), "xxx"); + } +} + +// Test that message data and delivery properties are updated properly. +QPID_AUTO_TEST_CASE(testUpdateMessages) { + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c0(cluster[0], "c0"); + + // Create messages with different delivery properties + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.exchangeBind(arg::exchange="amq.fanout", arg::queue="q"); + c0.session.messageTransfer(arg::content=makeMessage("foo","q", durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("bar","q", durableFlag), + arg::destination="amq.fanout"); + + while (c0.session.queueQuery("q").getMessageCount() != 2) + sys::usleep(1000); // Wait for message to show up on broker 0. + + // Add a new broker, it will catch up. + cluster.add(); + + // Do some work post-add + c0.session.queueDeclare("p", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("pfoo","p", durableFlag)); + + // Do some work post-join + BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 2).size(), 2u); + c0.session.messageTransfer(arg::content=makeMessage("pbar","p", durableFlag)); + + // Verify new brokers have state. + Message m; + + Client c1(cluster[1], "c1"); + + BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); + BOOST_CHECK_EQUAL(m.getData(), "foo"); + BOOST_CHECK(m.getDeliveryProperties().hasExchange()); + BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), ""); + BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); + BOOST_CHECK_EQUAL(m.getData(), "bar"); + BOOST_CHECK(m.getDeliveryProperties().hasExchange()); + BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "amq.fanout"); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); + + // Add another broker, don't wait for join - should be stalled till ready. + cluster.add(); + Client c2(cluster[2], "c2"); + BOOST_CHECK(c2.subs.get(m, "p", TIMEOUT)); + BOOST_CHECK_EQUAL(m.getData(), "pfoo"); + BOOST_CHECK(c2.subs.get(m, "p", TIMEOUT)); + BOOST_CHECK_EQUAL(m.getData(), "pbar"); + BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 0u); +} + +QPID_AUTO_TEST_CASE(testWiringReplication) { + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(3, args, -1); + Client c0(cluster[0]); + BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty()); + BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty()); + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.exchangeDeclare("ex", arg::type="direct"); + c0.session.close(); + c0.connection.close(); + // Verify all brokers get wiring update. + for (size_t i = 0; i < cluster.size(); ++i) { + BOOST_MESSAGE("i == "<< i); + Client c(cluster[i]); + BOOST_CHECK_EQUAL("q", c.session.queueQuery("q").getQueue()); + BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("ex").getType()); + } +} + +QPID_AUTO_TEST_CASE(testMessageEnqueue) { + // Enqueue on one broker, dequeue on another. + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(2, args, -1); + Client c0(cluster[0]); + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("foo", "q", durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("bar", "q", durableFlag)); + c0.session.close(); + Client c1(cluster[1]); + Message msg; + BOOST_CHECK(c1.subs.get(msg, "q", TIMEOUT)); + BOOST_CHECK_EQUAL(string("foo"), msg.getData()); + BOOST_CHECK(c1.subs.get(msg, "q", TIMEOUT)); + BOOST_CHECK_EQUAL(string("bar"), msg.getData()); +} + +QPID_AUTO_TEST_CASE(testMessageDequeue) { + // Enqueue on one broker, dequeue on two others. + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(3, args, -1); + Client c0(cluster[0], "c0"); + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("foo", "q", durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("bar", "q", durableFlag)); + + Message msg; + + // Dequeue on 2 others, ensure correct order. + Client c1(cluster[1], "c1"); + BOOST_CHECK(c1.subs.get(msg, "q")); + BOOST_CHECK_EQUAL("foo", msg.getData()); + + Client c2(cluster[2], "c2"); + BOOST_CHECK(c1.subs.get(msg, "q")); + BOOST_CHECK_EQUAL("bar", msg.getData()); + + // Queue should be empty on all cluster members. + BOOST_CHECK_EQUAL(0u, c0.session.queueQuery("q").getMessageCount()); + BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount()); + BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount()); +} + +QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) { + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(3, args, -1); + Client c0(cluster[0]); + BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 3).size(), 3u); // Wait for brokers. + + // First start a subscription. + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2)); + + // Now send messages + Client c1(cluster[1]); + c1.session.messageTransfer(arg::content=makeMessage("foo", "q", durableFlag)); + c1.session.messageTransfer(arg::content=makeMessage("bar", "q", durableFlag)); + + // Check they arrived + Message m; + BOOST_CHECK(c0.lq.get(m, TIMEOUT)); + BOOST_CHECK_EQUAL("foo", m.getData()); + BOOST_CHECK(c0.lq.get(m, TIMEOUT)); + BOOST_CHECK_EQUAL("bar", m.getData()); + + // Queue should be empty on all cluster members. + Client c2(cluster[2]); + BOOST_CHECK_EQUAL(0u, c0.session.queueQuery("q").getMessageCount()); + BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount()); + BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount()); +} + +QPID_AUTO_TEST_CASE(queueDurabilityPropagationToNewbie) +{ + /* + Start with a single broker. + Set up two queues: one durable, and one not. + Add a new broker to the cluster. + Make sure it has one durable and one non-durable queue. + */ + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c0(cluster[0]); + c0.session.queueDeclare("durable_queue", arg::durable=true); + c0.session.queueDeclare("non_durable_queue", arg::durable=false); + cluster.add(); + Client c1(cluster[1]); + QueueQueryResult durable_query = c1.session.queueQuery ( "durable_queue" ); + QueueQueryResult non_durable_query = c1.session.queueQuery ( "non_durable_queue" ); + BOOST_CHECK_EQUAL(durable_query.getQueue(), std::string("durable_queue")); + BOOST_CHECK_EQUAL(non_durable_query.getQueue(), std::string("non_durable_queue")); + + BOOST_CHECK_EQUAL ( durable_query.getDurable(), true ); + BOOST_CHECK_EQUAL ( non_durable_query.getDurable(), false ); +} + + +QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover) +{ + + struct Sender : FailoverManager::Command + { + std::string queue; + std::string content; + + Sender(const std::string& q, const std::string& c) : queue(q), content(c) {} + + void execute(AsyncSession& session, bool) + { + session.messageTransfer(arg::content=makeMessage(content, queue, durableFlag)); + } + }; + + struct Receiver : FailoverManager::Command, MessageListener, qpid::sys::Runnable + { + FailoverManager& mgr; + std::string queue; + std::string expectedContent; + qpid::client::Subscription subscription; + qpid::sys::Monitor lock; + bool ready, failed; + + Receiver(FailoverManager& m, const std::string& q, const std::string& c) : mgr(m), queue(q), expectedContent(c), ready(false), failed(false) {} + + void received(Message& message) + { + BOOST_CHECK_EQUAL(expectedContent, message.getData()); + subscription.cancel(); + } + + void execute(AsyncSession& session, bool) + { + session.queueDeclare(arg::queue=queue, arg::durable=durableFlag); + SubscriptionManager subs(session); + subscription = subs.subscribe(*this, queue); + session.sync(); + setReady(); + subs.run(); + //cleanup: + session.queueDelete(arg::queue=queue); + } + + void run() + { + try { + mgr.execute(*this); + } + catch (const std::exception& e) { + BOOST_MESSAGE("Exception in mgr.execute: " << e.what()); + failed = true; + } + } + + void waitForReady() + { + qpid::sys::Monitor::ScopedLock l(lock); + while (!ready) { + lock.wait(); + } + } + + void setReady() + { + qpid::sys::Monitor::ScopedLock l(lock); + ready = true; + lock.notify(); + } + }; + + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(2, args, -1); + ConnectionSettings settings; + settings.port = cluster[1]; + settings.heartbeat = 1; + FailoverManager fmgr(settings); + Sender sender("my-queue", "my-data"); + Receiver receiver(fmgr, "my-queue", "my-data"); + qpid::sys::Thread runner(receiver); + receiver.waitForReady(); + { + ScopedSuppressLogging allQuiet; // suppress connection closed messages + cluster.kill(1); + //sleep for 2 secs to allow the heartbeat task to fire on the now dead connection: + ::usleep(2*1000*1000); + } + fmgr.execute(sender); + runner.join(); + BOOST_CHECK(!receiver.failed); + fmgr.close(); +} + +QPID_AUTO_TEST_CASE(testPolicyUpdate) { + //tests that the policys internal state is accurate on newly + //joined nodes + ClusterFixture::Args args; + args += "--log-enable", "critical"; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c1(cluster[0], "c1"); + { + ScopedSuppressLogging allQuiet; + QueueOptions options; + options.setSizePolicy(REJECT, 0, 2); + c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); + c1.session.messageTransfer(arg::content=makeMessage("one", "q", durableFlag)); + cluster.add(); + Client c2(cluster[1], "c2"); + c2.session.messageTransfer(arg::content=makeMessage("two", "q", durableFlag)); + + BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=makeMessage("three", "q", durableFlag)), framing::ResourceLimitExceededException); + + Message received; + BOOST_CHECK(c1.subs.get(received, "q")); + BOOST_CHECK_EQUAL(received.getData(), std::string("one")); + BOOST_CHECK(c1.subs.get(received, "q")); + BOOST_CHECK_EQUAL(received.getData(), std::string("two")); + BOOST_CHECK(!c1.subs.get(received, "q")); + } +} + +QPID_AUTO_TEST_CASE(testExclusiveQueueUpdate) { + //tests that exclusive queues are accurately replicated on newly + //joined nodes + ClusterFixture::Args args; + args += "--log-enable", "critical"; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c1(cluster[0], "c1"); + { + ScopedSuppressLogging allQuiet; + c1.session.queueDeclare("q", arg::exclusive=true, arg::autoDelete=true, arg::alternateExchange="amq.fanout"); + cluster.add(); + Client c2(cluster[1], "c2"); + QueueQueryResult result = c2.session.queueQuery("q"); + BOOST_CHECK_EQUAL(result.getQueue(), std::string("q")); + BOOST_CHECK(result.getExclusive()); + BOOST_CHECK(result.getAutoDelete()); + BOOST_CHECK(!result.getDurable()); + BOOST_CHECK_EQUAL(result.getAlternateExchange(), std::string("amq.fanout")); + BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::exclusive=true, arg::passive=true), framing::ResourceLockedException); + c1.session.close(); + c1.connection.close(); + c2.session = c2.connection.newSession(); + BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException); + } +} + +/** + * Subscribes to specified queue and acquires up to the specified + * number of message but does not accept or release them. These + * message are therefore 'locked' by the clients session. + */ +Subscription lockMessages(Client& client, const std::string& queue, int count) +{ + LocalQueue q; + SubscriptionSettings settings(FlowControl::messageCredit(count)); + settings.autoAck = 0; + Subscription sub = client.subs.subscribe(q, queue, settings); + client.session.messageFlush(sub.getName()); + return sub; +} + +/** + * check that the specified queue contains the expected set of + * messages (matched on content) for all nodes in the cluster + */ +void checkQueue(ClusterFixture& cluster, const std::string& queue, const std::vector& messages) +{ + for (size_t i = 0; i < cluster.size(); i++) { + Client client(cluster[i], (boost::format("%1%_%2%") % "c" % (i+1)).str()); + BOOST_CHECK_EQUAL(browse(client, queue, messages.size()), messages); + client.close(); + } +} + +void send(Client& client, const std::string& queue, int count, int start=1, const std::string& base="m", + const std::string& lvqKey="") +{ + for (int i = 0; i < count; i++) { + Message message = makeMessage((boost::format("%1%_%2%") % base % (i+start)).str(), queue, durableFlag); + if (!lvqKey.empty()) message.getHeaders().setString(QueueOptions::strLVQMatchProperty, lvqKey); + client.session.messageTransfer(arg::content=message); + } +} + +QPID_AUTO_TEST_CASE(testRingQueueUpdate) { + //tests that ring queues are accurately replicated on newly + //joined nodes + ClusterFixture::Args args; + args += "--log-enable", "critical"; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c1(cluster[0], "c1"); + { + ScopedSuppressLogging allQuiet; + QueueOptions options; + options.setSizePolicy(RING, 0, 5); + c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); + send(c1, "q", 5); + lockMessages(c1, "q", 1); + //add new node + cluster.add(); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined + //send one more message + send(c1, "q", 1, 6); + //release locked message + c1.close(); + //check state of queue on both nodes + checkQueue(cluster, "q", list_of("m_2")("m_3")("m_4")("m_5")("m_6")); + } +} + +QPID_AUTO_TEST_CASE(testRingQueueUpdate2) { + //tests that ring queues are accurately replicated on newly joined + //nodes; just like testRingQueueUpdate, but new node joins after + //the sixth message has been sent. + ClusterFixture::Args args; + args += "--log-enable", "critical"; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c1(cluster[0], "c1"); + { + ScopedSuppressLogging allQuiet; + QueueOptions options; + options.setSizePolicy(RING, 0, 5); + c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); + send(c1, "q", 5); + lockMessages(c1, "q", 1); + //send sixth message + send(c1, "q", 1, 6); + //add new node + cluster.add(); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined + //release locked message + c1.close(); + //check state of queue on both nodes + checkQueue(cluster, "q", list_of("m_2")("m_3")("m_4")("m_5")("m_6")); + } +} + +QPID_AUTO_TEST_CASE(testLvqUpdate) { + //tests that lvqs are accurately replicated on newly joined nodes + ClusterFixture::Args args; + args += "--log-enable", "critical"; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c1(cluster[0], "c1"); + { + ScopedSuppressLogging allQuiet; + QueueOptions options; + options.setOrdering(LVQ); + c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); + + send(c1, "q", 5, 1, "a", "a"); + send(c1, "q", 2, 1, "b", "b"); + send(c1, "q", 1, 1, "c", "c"); + send(c1, "q", 1, 3, "b", "b"); + + //add new node + cluster.add(); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined + + //check state of queue on both nodes + checkQueue(cluster, "q", list_of("a_5")("b_3")("c_1")); + } +} + + +QPID_AUTO_TEST_CASE(testBrowsedLvqUpdate) { + //tests that lvqs are accurately replicated on newly joined nodes + //if the lvq state has been affected by browsers + ClusterFixture::Args args; + args += "--log-enable", "critical"; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c1(cluster[0], "c1"); + { + ScopedSuppressLogging allQuiet; + QueueOptions options; + options.setOrdering(LVQ); + c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); + + send(c1, "q", 1, 1, "a", "a"); + send(c1, "q", 2, 1, "b", "b"); + send(c1, "q", 1, 1, "c", "c"); + checkQueue(cluster, "q", list_of("a_1")("b_2")("c_1")); + send(c1, "q", 4, 2, "a", "a"); + send(c1, "q", 1, 3, "b", "b"); + + //add new node + cluster.add(); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined + + //check state of queue on both nodes + checkQueue(cluster, "q", list_of("a_1")("b_2")("c_1")("a_5")("b_3")); + } +} + +QPID_AUTO_TEST_CASE(testRelease) { + //tests that releasing a messages that was unacked when one node + //joined works correctly + ClusterFixture::Args args; + args += "--log-enable", "critical"; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c1(cluster[0], "c1"); + { + ScopedSuppressLogging allQuiet; + c1.session.queueDeclare("q", arg::durable=durableFlag); + for (int i = 0; i < 5; i++) { + c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag)); + } + //receive but don't ack a message + LocalQueue lq; + SubscriptionSettings lqSettings(FlowControl::messageCredit(1)); + lqSettings.autoAck = 0; + Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings); + c1.session.messageFlush("q"); + Message received; + BOOST_CHECK(lq.get(received)); + BOOST_CHECK_EQUAL(received.getData(), std::string("m_1")); + + //add new node + cluster.add(); + + lqSub.release(lqSub.getUnaccepted()); + + //check state of queue on both nodes + vector expected = list_of("m_1")("m_2")("m_3")("m_4")("m_5"); + Client c3(cluster[0], "c3"); + BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected); + Client c2(cluster[1], "c2"); + BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected); + } +} + + +// Browse for 1 message with byte credit, return true if a message was +// received false if not. +bool browseByteCredit(Client& c, const string& q, int n, Message& m) { + SubscriptionSettings browseSettings( + FlowControl(1, n, false), // 1 message, n bytes credit, no window + ACCEPT_MODE_NONE, + ACQUIRE_MODE_NOT_ACQUIRED, + 0 // No auto-ack. + ); + LocalQueue lq; + Subscription s = c.subs.subscribe(lq, q, browseSettings); + c.session.messageFlush(arg::destination=q, arg::sync=true); + c.session.sync(); + c.subs.getSubscription(q).cancel(); + return lq.get(m, 0); // No timeout, flush should push message thru. +} + +// Ensure cluster update preserves exact message size, use byte credt as test. +QPID_AUTO_TEST_CASE(testExactByteCredit) { + ClusterFixture cluster(1, prepareArgs(), -1); + Client c0(cluster[0], "c0"); + c0.session.queueDeclare("q"); + c0.session.messageTransfer(arg::content=Message("MyMessage", "q")); + cluster.add(); + + int size=36; // Size of message on broker: headers+body + Client c1(cluster[1], "c1"); + Message m; + + // Ensure we get the message with exact credit. + BOOST_CHECK(browseByteCredit(c0, "q", size, m)); + BOOST_CHECK(browseByteCredit(c1, "q", size, m)); + // and not with one byte less. + BOOST_CHECK(!browseByteCredit(c0, "q", size-1, m)); + BOOST_CHECK(!browseByteCredit(c1, "q", size-1, m)); +} + +// Test that consumer positions are updated correctly. +// Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=541927 +// +QPID_AUTO_TEST_CASE(testUpdateConsumerPosition) { + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c0(cluster[0], "c0"); + + c0.session.queueDeclare("q", arg::durable=durableFlag); + SubscriptionSettings settings; + settings.autoAck = 0; + // Set the acquire mode to 'not-acquired' the consumer moves along the queue + // but does not acquire (remove) messages. + settings.acquireMode = ACQUIRE_MODE_NOT_ACQUIRED; + Subscription s = c0.subs.subscribe(c0.lq, "q", settings); + c0.session.messageTransfer(arg::content=makeMessage("1", "q", durableFlag)); + BOOST_CHECK_EQUAL("1", c0.lq.get(TIMEOUT).getData()); + + // Add another member, send/receive another message and acquire + // the messages. With the bug, this creates an inconsistency + // because the browse position was not updated to the new member. + cluster.add(); + c0.session.messageTransfer(arg::content=makeMessage("2", "q", durableFlag)); + BOOST_CHECK_EQUAL("2", c0.lq.get(TIMEOUT).getData()); + s.acquire(s.getUnacquired()); + s.accept(s.getUnaccepted()); + + // In the bug we now have 0 messages on cluster[0] and 1 message on cluster[1] + // Subscribing on cluster[1] provokes an error that shuts down cluster[0] + Client c1(cluster[1], "c1"); + Subscription s1 = c1.subs.subscribe(c1.lq, "q"); // Default auto-ack=1 + Message m; + BOOST_CHECK(!c1.lq.get(m, TIMEOUT/10)); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); + BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); +} + +QPID_AUTO_TEST_CASE(testFairsharePriorityDelivery) { + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c0(cluster[0], "c0"); + + FieldTable arguments; + arguments.setInt("x-qpid-priorities", 10); + arguments.setInt("x-qpid-fairshare", 5); + c0.session.queueDeclare("q", arg::durable=durableFlag, arg::arguments=arguments); + + //send messages of different priorities + for (int i = 0; i < 20; i++) { + Message msg = makeMessage((boost::format("msg-%1%") % i).str(), "q", durableFlag); + msg.getDeliveryProperties().setPriority(i % 2 ? 9 : 5); + c0.session.messageTransfer(arg::content=msg); + } + + //pull off a couple of the messages (first four should be the top priority messages + for (int i = 0; i < 4; i++) { + BOOST_CHECK_EQUAL((boost::format("msg-%1%") % ((i*2)+1)).str(), c0.subs.get("q", TIMEOUT).getData()); + } + + // Add another member + cluster.add(); + Client c1(cluster[1], "c1"); + + //pull off some more messages + BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 9).str(), c0.subs.get("q", TIMEOUT).getData()); + BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 0).str(), c1.subs.get("q", TIMEOUT).getData()); + BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 2).str(), c0.subs.get("q", TIMEOUT).getData()); + + //check queue has same content on both nodes + BOOST_CHECK_EQUAL(browse(c0, "q", 12), browse(c1, "q", 12)); +} + +QPID_AUTO_TEST_SUITE_END() +}} // namespace qpid::tests diff --git a/cpp/src/tests/ha_test.py b/cpp/src/tests/ha_test.py index f2fc50054f..602a62ca17 100755 --- a/cpp/src/tests/ha_test.py +++ b/cpp/src/tests/ha_test.py @@ -107,7 +107,7 @@ class HaBroker(Broker): ha_port = ha_port or HaPort(test) args = copy(args) args += ["--load-module", BrokerTest.ha_lib, - "--log-enable=debug+:ha::", + "--log-enable=trace+:ha::", # FIXME aconway 2013-07-29: debug # Non-standard settings for faster tests. "--link-maintenance-interval=0.1", # Heartbeat and negotiate time are needed so that a broker wont @@ -221,6 +221,12 @@ acl allow all all def wait_backup(self, address): self.wait_address(address) + def browse(self, queue, timeout=0, transform=lambda m: m.content): + c = self.connect_admin() + try: + return browse(c.session(), queue, timeout, transform) + finally: c.close() + def assert_browse(self, queue, expected, **kwargs): """Verify queue contents by browsing.""" bs = self.connect().session() diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py index de5dfb4b10..55715639a4 100755 --- a/cpp/src/tests/ha_tests.py +++ b/cpp/src/tests/ha_tests.py @@ -1287,50 +1287,101 @@ class StoreTests(BrokerTest): cluster[0].assert_browse("q2", ["hello", "end"]) cluster[1].assert_browse_backup("q2", ["hello", "end"]) +def open_read(name): + with open(name) as f: return f.read() + class TransactionTests(BrokerTest): + load_store=["--load-module", BrokerTest.test_store_lib] + def tx_simple_setup(self, broker): - """Start a transaction: receive 'foo' from 'a' and send 'bar' to 'b'""" + """Start a transaction, remove messages from queue a, add messages to queue b""" c = broker.connect() - c.session().sender("a;{create:always}").send("foo") + # Send messages to a, no transaction. + sa = c.session().sender("a;{create:always,node:{durable:true}}") + tx_msgs = ["x","y","z"] + for m in tx_msgs: sa.send(Message(content=m, durable=True)) + + # Receive messages from a, in transaction. tx = c.session(transactional=True) - self.assertEqual("foo", tx.receiver("a").fetch(1).content) - tx.acknowledge(); - tx.sender("b;{create:always}").send("bar") + txr = tx.receiver("a") + tx_msgs2 = [txr.fetch(1).content for i in xrange(3)] + self.assertEqual(tx_msgs, tx_msgs2) + + # Send messages to b, transactional, mixed with non-transactional. + sb = c.session().sender("b;{create:always,node:{durable:true}}") + txs = tx.sender("b") + msgs = [str(i) for i in xrange(3)] + for tx_m,m in zip(tx_msgs2, msgs): + txs.send(tx_m); + sb.send(m) return tx def test_tx_simple_commit(self): - cluster = HaCluster(self, 2, args=["--log-enable=trace+:ha::"]) + cluster = HaCluster(self, 2, test_store=True) tx = self.tx_simple_setup(cluster[0]) + tx.sync() + + # NOTE: backup does not process transactional dequeues until prepare + cluster[1].assert_browse_backup("a", ["x","y","z"]) + cluster[1].assert_browse_backup("b", ['0', '1', '2']) + + tx.acknowledge() tx.commit() + tx.sync() + for b in cluster: b.assert_browse_backup("a", [], msg=b) - b.assert_browse_backup("b", ["bar"], msg=b) + b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b) + + # Check for expected actions on the store + expect = """ + + + + + + + +""" + self.assertEqual(expect, open_read(cluster[0].store_log)) + self.assertEqual(expect, open_read(cluster[1].store_log)) def test_tx_simple_rollback(self): - cluster = HaCluster(self, 2) + cluster = HaCluster(self, 2, test_store=True) tx = self.tx_simple_setup(cluster[0]) + tx.acknowledge() tx.rollback() for b in cluster: - b.assert_browse_backup("a", ["foo"], msg=b) - b.assert_browse_backup("b", [], msg=b) + b.assert_browse_backup("a", ["x","y","z"], msg=b) + b.assert_browse_backup("b", ['0', '1', '2'], msg=b) + # Check for expected actions on the store + expect = """ + + +""" + self.assertEqual(open_read(cluster[0].store_log), expect) + self.assertEqual(open_read(cluster[1].store_log), expect) def test_tx_simple_failover(self): - cluster = HaCluster(self, 2) + cluster = HaCluster(self, 2, test_store=True) tx = self.tx_simple_setup(cluster[0]) + tx.acknowledge() cluster.bounce(0) # Should cause roll-back + cluster[0].wait_status("ready") for b in cluster: - b.assert_browse_backup("a", ["foo"], msg=b) - b.assert_browse_backup("b", [], msg=b) - - def test_tx_simple_join(self): - cluster = HaCluster(self, 2) - tx = self.tx_simple_setup(cluster[0]) - cluster.bounce(1) # Should catch up with tx - tx.commit() - for b in cluster: - b.assert_browse_backup("a", [], msg=b) - b.assert_browse_backup("b", ["bar"], msg=b) + b.assert_browse_backup("a", ["x","y","z"], msg=b) + b.assert_browse_backup("b", ['0', '1', '2'], msg=b) + + # Check for expected actions on the store + expect = """ + + +""" + self.assertEqual(open_read(cluster[0].store_log), expect) + self.assertEqual(open_read(cluster[1].store_log), expect) + +# FIXME aconway 2013-07-23: test with partial acknowledgement. if __name__ == "__main__": outdir = "ha_tests.tmp" diff --git a/cpp/src/tests/test_env.sh.in b/cpp/src/tests/test_env.sh.in index f20fb317aa..60cc458f75 100644 --- a/cpp/src/tests/test_env.sh.in +++ b/cpp/src/tests/test_env.sh.in @@ -81,5 +81,5 @@ if [ ! -e "$HOME" ]; then fi # Options for boost test framework -export BOOST_TEST_SHOW_PROGRESS=yes -export BOOST_TEST_CATCH_SYSTEM_ERRORS=no +test -z "$BOOST_TEST_SHOW_PROGRESS" && export BOOST_TEST_SHOW_PROGRESS=yes +test -z "$BOOST_TEST_CATCH_SYSTEM_ERRORS" && export BOOST_TEST_CATCH_SYSTEM_ERRORS=no diff --git a/cpp/src/tests/test_store.cpp b/cpp/src/tests/test_store.cpp index eac4deda2d..fc44889f33 100644 --- a/cpp/src/tests/test_store.cpp +++ b/cpp/src/tests/test_store.cpp @@ -40,14 +40,19 @@ #include "qpid/sys/Thread.h" #include "qpid/Plugin.h" #include "qpid/Options.h" +#include "qpid/RefCounted.h" +#include "qpid/Msg.h" #include #include #include +#include #include +#include -using namespace qpid; -using namespace broker; using namespace std; +using namespace boost; +using namespace qpid; +using namespace qpid::broker; using namespace qpid::sys; namespace qpid { @@ -57,11 +62,13 @@ struct TestStoreOptions : public Options { string name; string dump; + string events; TestStoreOptions() : Options("Test Store Options") { addOptions() ("test-store-name", optValue(name, "NAME"), "Name of test store instance.") ("test-store-dump", optValue(dump, "FILE"), "File to dump enqueued messages.") + ("test-store-events", optValue(events, "FILE"), "File to log events, 1 line per event.") ; } }; @@ -82,24 +89,74 @@ class TestStore : public NullMessageStore { TestStore(const TestStoreOptions& opts, Broker& broker_) : options(opts), name(opts.name), broker(broker_) { - QPID_LOG(info, "TestStore name=" << name << " dump=" << options.dump); - if (!options.dump.empty()) + QPID_LOG(info, "TestStore name=" << name + << " dump=" << options.dump + << " events=" << options.events); + + if (!options.dump.empty()) dump.reset(new ofstream(options.dump.c_str())); + if (!options.events.empty()) + events.reset(new ofstream(options.events.c_str())); } ~TestStore() { for_each(threads.begin(), threads.end(), boost::bind(&Thread::join, _1)); } - virtual bool isNull() const { return false; } - - void enqueue(TransactionContext* , + // Dummy transaction context. + struct TxContext : public TPCTransactionContext { + static int nextId; + string id; + TxContext() : id(lexical_cast(nextId++)) {} + TxContext(string xid) : id(xid) {} + }; + + static string getId(const TransactionContext& tx) { + const TxContext* tc = dynamic_cast(&tx); + assert(tc); + return tc->id; + } + + + bool isNull() const { return false; } + + void log(const string& msg) { + QPID_LOG(info, "test_store: " << msg); + if (events.get()) *events << msg << endl << std::flush; + } + + auto_ptr begin() { + auto_ptr tx(new TxContext()); + log(Msg() << "id << ">"); + return auto_ptr(tx); + } + + auto_ptr begin(const std::string& xid) { + auto_ptr tx(new TxContext(xid)); + log(Msg() << "id << ">"); + return auto_ptr(tx); + } + + string getContent(const intrusive_ptr& msg) { + intrusive_ptr enc( + dynamic_pointer_cast(msg)); + return enc->getContent(); + } + + void enqueue(TransactionContext* tx, const boost::intrusive_ptr& pmsg, - const PersistableQueue& ) + const PersistableQueue& queue) { + QPID_LOG(debug, "TestStore enqueue " << queue.getName()); qpid::broker::amqp_0_10::MessageTransfer* msg = dynamic_cast(pmsg.get()); assert(msg); + ostringstream o; + o << ""; + log(o.str()); + // Dump the message if there is a dump file. if (dump.get()) { msg->getFrames().getMethod()->print(*dump); @@ -144,6 +201,31 @@ class TestStore : public NullMessageStore { msg->enqueueComplete(); } + void dequeue(TransactionContext* tx, + const boost::intrusive_ptr& msg, + const PersistableQueue& queue) + { + QPID_LOG(debug, "TestStore dequeue " << queue.getName()); + ostringstream o; + o<< ""; + log(o.str()); + } + + void prepare(TPCTransactionContext& txn) { + log(Msg() << ""); + } + + void commit(TransactionContext& txn) { + log(Msg() << ""); + } + + void abort(TransactionContext& txn) { + log(Msg() << ""); + } + + private: static const string TEST_STORE_DO, EXCEPTION, EXIT_PROCESS, ASYNC; TestStoreOptions options; @@ -151,8 +233,11 @@ class TestStore : public NullMessageStore { Broker& broker; vector threads; std::auto_ptr dump; + std::auto_ptr events; }; +int TestStore::TxContext::nextId(1); + const string TestStore::TEST_STORE_DO = "TEST_STORE_DO: "; const string TestStore::EXCEPTION = "exception"; const string TestStore::EXIT_PROCESS = "exit_process"; -- cgit v1.2.1