summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-08-01 20:27:26 +0000
committerAlan Conway <aconway@apache.org>2013-08-01 20:27:26 +0000
commite6598e9f95d55b80f96dbcb1e12bc1fc38c66af1 (patch)
tree7179cb6fa40a59d1390f295a613de64cc242814a /cpp/src/qpid
parent0ffcd71ac9c9f3742aae6e251eafe031068bda31 (diff)
downloadqpid-python-e6598e9f95d55b80f96dbcb1e12bc1fc38c66af1.tar.gz
QPID-4327: HA TX transactions: basic replication.
On primary a PrimaryTxObserver observes a transaction's TxBuffer and generates transaction events on a tx-replication-queue. On the backup a TxReplicator receives the events and constructs a TxBuffer equivalent to the one in the primary. Unfinished: - Primary does not wait for backups to prepare() before committing. - All connected backups are assumed to be in the transaction, there are race conditions around brokers joining/leavinv where this assumption is invalid. - Need more tests. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1509423 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp2
-rw-r--r--cpp/src/qpid/broker/Broker.h3
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.cpp2
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.h2
-rw-r--r--cpp/src/qpid/broker/Queue.cpp2
-rw-r--r--cpp/src/qpid/broker/TxAccept.cpp2
-rw-r--r--cpp/src/qpid/ha/Backup.h2
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.cpp16
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.h6
-rw-r--r--cpp/src/qpid/ha/Event.cpp (renamed from cpp/src/qpid/ha/makeMessage.cpp)43
-rw-r--r--cpp/src/qpid/ha/Event.h146
-rw-r--r--cpp/src/qpid/ha/FailoverExchange.cpp4
-rw-r--r--cpp/src/qpid/ha/HaBroker.cpp2
-rw-r--r--cpp/src/qpid/ha/HaBroker.h1
-rw-r--r--cpp/src/qpid/ha/Membership.cpp8
-rw-r--r--cpp/src/qpid/ha/Membership.h3
-rw-r--r--cpp/src/qpid/ha/Primary.cpp41
-rw-r--r--cpp/src/qpid/ha/Primary.h23
-rw-r--r--cpp/src/qpid/ha/PrimaryTxObserver.cpp104
-rw-r--r--cpp/src/qpid/ha/PrimaryTxObserver.h77
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp81
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.h33
-rw-r--r--cpp/src/qpid/ha/RemoteBackup.cpp3
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.cpp42
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.h10
-rw-r--r--cpp/src/qpid/ha/StatusCheck.cpp12
-rw-r--r--cpp/src/qpid/ha/TxReplicator.cpp192
-rw-r--r--cpp/src/qpid/ha/TxReplicator.h123
-rw-r--r--cpp/src/qpid/ha/makeMessage.h66
-rw-r--r--cpp/src/qpid/ha/types.cpp8
-rw-r--r--cpp/src/qpid/ha/types.h9
31 files changed, 889 insertions, 179 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index d420e28d95..7c9e82022f 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -400,7 +400,7 @@ boost::intrusive_ptr<Broker> Broker::create(const Options& opts)
return boost::intrusive_ptr<Broker>(new Broker(opts));
}
-void Broker::setStore (boost::shared_ptr<MessageStore>& _store)
+void Broker::setStore (const boost::shared_ptr<MessageStore>& _store)
{
store.reset(new MessageStoreModule (_store));
setStore();
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index f0b0c83f61..3e22fb491b 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -226,7 +226,8 @@ class Broker : public sys::Runnable, public Plugin::Target,
/** Shut down the broker */
QPID_BROKER_EXTERN virtual void shutdown();
- QPID_BROKER_EXTERN void setStore (boost::shared_ptr<MessageStore>& store);
+ QPID_BROKER_EXTERN void setStore (const boost::shared_ptr<MessageStore>& store);
+ bool hasStore() const { return store.get(); }
MessageStore& getStore() { return *store; }
void setAcl (AclModule* _acl) {acl = _acl;}
AclModule* getAcl() { return acl; }
diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp
index f19b31fa76..edfe6e7819 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.cpp
+++ b/cpp/src/qpid/broker/MessageStoreModule.cpp
@@ -33,7 +33,7 @@ using std::string;
namespace qpid {
namespace broker {
-MessageStoreModule::MessageStoreModule(boost::shared_ptr<MessageStore>& _store)
+MessageStoreModule::MessageStoreModule(const boost::shared_ptr<MessageStore>& _store)
: store(_store) {}
MessageStoreModule::~MessageStoreModule()
diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h
index 82308db84c..a71e8cc9b6 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.h
+++ b/cpp/src/qpid/broker/MessageStoreModule.h
@@ -38,7 +38,7 @@ class MessageStoreModule : public MessageStore
{
boost::shared_ptr<MessageStore> store;
public:
- MessageStoreModule(boost::shared_ptr<MessageStore>& store);
+ MessageStoreModule(const boost::shared_ptr<MessageStore>& store);
bool init(const Options* options);
std::auto_ptr<TransactionContext> begin();
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 49aff3ea97..227acd1b56 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -277,6 +277,7 @@ void Queue::deliver(Message msg, TxBuffer* txn)
void Queue::deliverTo(Message msg, TxBuffer* txn)
{
if (accept(msg)) {
+ interceptors.record(msg);
if (txn) {
TxOp::shared_ptr op(new TxPublish(msg, shared_from_this()));
txn->enlist(op);
@@ -842,7 +843,6 @@ bool Queue::isEmpty(const Mutex::ScopedLock&) const
*/
bool Queue::enqueue(TransactionContext* ctxt, Message& msg)
{
- interceptors.record(msg);
ScopedUse u(barrier);
if (!u.acquired) return false;
diff --git a/cpp/src/qpid/broker/TxAccept.cpp b/cpp/src/qpid/broker/TxAccept.cpp
index 0343cc00ae..ee30dff957 100644
--- a/cpp/src/qpid/broker/TxAccept.cpp
+++ b/cpp/src/qpid/broker/TxAccept.cpp
@@ -20,9 +20,9 @@
*/
#include "qpid/broker/TxAccept.h"
#include "qpid/broker/TransactionObserver.h"
+#include "qpid/broker/Queue.h"
#include "qpid/log/Statement.h"
-
using std::bind1st;
using std::bind2nd;
using std::mem_fun_ref;
diff --git a/cpp/src/qpid/ha/Backup.h b/cpp/src/qpid/ha/Backup.h
index 4943ca5e2e..88194158ce 100644
--- a/cpp/src/qpid/ha/Backup.h
+++ b/cpp/src/qpid/ha/Backup.h
@@ -59,6 +59,8 @@ class Backup : public Role
Role* promote();
+ boost::shared_ptr<BrokerReplicator> getBrokerReplicator() { return replicator; }
+
private:
void stop(sys::Mutex::ScopedLock&);
Role* recover(sys::Mutex::ScopedLock&);
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp
index 17b00185ef..f882cbcbf1 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -21,6 +21,7 @@
#include "BrokerReplicator.h"
#include "HaBroker.h"
#include "QueueReplicator.h"
+#include "TxReplicator.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/broker/ConnectionObserver.h"
@@ -648,11 +649,16 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
if (!replicationTest.getLevel(argsMap)) return;
string name(values[NAME].asString());
+
+ if (TxReplicator::isTxQueue(name)) return; // Can't join a transaction in progress.
+
if (!queueTracker.get())
throw Exception(QPID_MSG("Unexpected queue response: " << values));
if (!queueTracker->response(name)) return; // Response is out-of-date
+
QPID_LOG(debug, logPrefix << "Queue response: " << name);
boost::shared_ptr<Queue> queue = queues.find(name);
+
if (queue) { // Already exists
bool uuidOk = (getHaUuid(queue->getSettings().original) == getHaUuid(argsMap));
if (!uuidOk) QPID_LOG(debug, logPrefix << "UUID mismatch for queue: " << name);
@@ -660,6 +666,7 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
QPID_LOG(debug, logPrefix << "Queue response replacing queue: " << name);
deleteQueue(name);
}
+
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
boost::shared_ptr<QueueReplicator> qr = replicateQueue(
@@ -770,8 +777,13 @@ boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator(
const boost::shared_ptr<Queue>& queue)
{
if (replicationTest.getLevel(*queue) == ALL) {
- boost::shared_ptr<QueueReplicator> qr(
- new QueueReplicator(haBroker, queue, link));
+ boost::shared_ptr<QueueReplicator> qr;
+ if (TxReplicator::isTxQueue(queue->getName())){
+ qr.reset(new TxReplicator(haBroker, queue, link));
+ }
+ else {
+ qr.reset(new QueueReplicator(haBroker, queue, link));
+ }
qr->activate();
return qr;
}
diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h
index f93e25cb81..c36aa352f3 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/cpp/src/qpid/ha/BrokerReplicator.h
@@ -71,6 +71,8 @@ class BrokerReplicator : public broker::Exchange,
public boost::enable_shared_from_this<BrokerReplicator>
{
public:
+ typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr;
+
BrokerReplicator(HaBroker&, const boost::shared_ptr<broker::Link>&);
~BrokerReplicator();
@@ -84,8 +86,9 @@ class BrokerReplicator : public broker::Exchange,
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
void shutdown();
+ QueueReplicatorPtr findQueueReplicator(const std::string& qname);
+
private:
- typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr;
typedef std::pair<boost::shared_ptr<broker::Queue>, bool> CreateQueueResult;
typedef std::pair<boost::shared_ptr<broker::Exchange>, bool> CreateExchangeResult;
@@ -114,7 +117,6 @@ class BrokerReplicator : public broker::Exchange,
void doResponseBind(types::Variant::Map& values);
void doResponseHaBroker(types::Variant::Map& values);
- QueueReplicatorPtr findQueueReplicator(const std::string& qname);
QueueReplicatorPtr startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
QueueReplicatorPtr replicateQueue(
diff --git a/cpp/src/qpid/ha/makeMessage.cpp b/cpp/src/qpid/ha/Event.cpp
index 5b063a23e7..fdd8bc85cc 100644
--- a/cpp/src/qpid/ha/makeMessage.cpp
+++ b/cpp/src/qpid/ha/Event.cpp
@@ -18,25 +18,44 @@
* under the License.
*
*/
-#include "makeMessage.h"
+#include "Event.h"
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/DeliveryProperties.h"
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/log/Statement.h"
namespace qpid {
namespace ha {
-broker::Message makeMessage(const framing::Buffer& buffer,
- const std::string& destination)
-{
- using namespace framing;
- using broker::amqp_0_10::MessageTransfer;
+using namespace std;
+using namespace framing;
+using namespace broker::amqp_0_10;
- boost::intrusive_ptr<MessageTransfer> transfer(
- new qpid::broker::amqp_0_10::MessageTransfer());
- AMQFrame method((MessageTransferBody(ProtocolVersion(), destination, 0, 0)));
+namespace {
+const string QPID_HA(QPID_HA_PREFIX);
+}
+
+bool isEventKey(const std::string& key) {
+ const std::string& prefix = QPID_HA;
+ bool ret = key.size() > prefix.size() && key.compare(0, prefix.size(), prefix) == 0;
+ return ret;
+}
+
+const string DequeueEvent::KEY(QPID_HA+"de");
+const string IdEvent::KEY(QPID_HA+"id");
+const string TxEnqueueEvent::KEY(QPID_HA+"txen");
+const string TxDequeueEvent::KEY(QPID_HA+"txde");
+const string TxPrepareEvent::KEY(QPID_HA+"txpr");
+const string TxCommitEvent::KEY(QPID_HA+"txcm");
+const string TxRollbackEvent::KEY(QPID_HA+"txrb");
+
+broker::Message makeMessage(const string& data, const string& key) {
+ boost::intrusive_ptr<MessageTransfer> transfer(new MessageTransfer());
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), key, 0, 0)));
AMQFrame header((AMQHeaderBody()));
AMQFrame content((AMQContentBody()));
+ Buffer buffer(const_cast<char*>(&data[0]), data.size());
// AMQContentBody::decode is missing a const declaration, so cast it here.
content.castBody<AMQContentBody>()->decode(
const_cast<Buffer&>(buffer), buffer.getSize());
@@ -51,12 +70,8 @@ broker::Message makeMessage(const framing::Buffer& buffer,
transfer->getFrames().append(method);
transfer->getFrames().append(header);
transfer->getFrames().append(content);
+ transfer->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(key);
return broker::Message(transfer, 0);
}
-broker::Message makeMessage(const std::string& content, const std::string& destination) {
- framing::Buffer buffer(const_cast<char*>(&content[0]), content.size());
- return makeMessage(buffer, destination);
-}
-
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/Event.h b/cpp/src/qpid/ha/Event.h
new file mode 100644
index 0000000000..08174bfc9d
--- /dev/null
+++ b/cpp/src/qpid/ha/Event.h
@@ -0,0 +1,146 @@
+#ifndef QPID_HA_EVENT_H
+#define QPID_HA_EVENT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "qpid/broker/Message.h"
+#include "qpid/framing/BufferTypes.h"
+
+/**@file Defines event messages used to pass transaction information from
+ * primary observers to backup replicators.
+ */
+
+namespace qpid {
+namespace ha {
+
+broker::Message makeMessage(const std::string& content, const std::string& destination);
+
+
+/** Test if a string is an event key */
+bool isEventKey(const std::string& key);
+
+/** Base class for encodable events */
+struct Event {
+ virtual ~Event() {}
+ virtual void encode(framing::Buffer& buffer) const = 0;
+ virtual void decode(framing::Buffer& buffer) = 0;
+ virtual size_t encodedSize() const = 0;
+ virtual std::string key() const = 0; // Routing key
+ virtual void print(std::ostream& o) const = 0;
+ broker::Message message() const { return makeMessage(framing::encodeStr(*this), key()); }
+};
+
+
+inline std::ostream& operator<<(std::ostream& o, const Event& e) {
+ o << "<" << e.key() << ":";
+ e.print(o);
+ return o << ">";
+}
+
+/** Event base template */
+template <class Derived> struct EventBase : public Event {
+ std::string key() const { return Derived::KEY; }
+};
+
+//////////////// Specific event type
+
+//// QueueReplicator events
+
+struct DequeueEvent : public EventBase<DequeueEvent> {
+ static const std::string KEY;
+ ReplicationIdSet ids;
+
+ DequeueEvent(ReplicationIdSet ids_=ReplicationIdSet()) : ids(ids_) {}
+ void encode(framing::Buffer& b) const { b.put(ids); }
+ void decode(framing::Buffer& b) { b.get(ids); }
+ virtual size_t encodedSize() const { return ids.encodedSize(); }
+ void print(std::ostream& o) const { o << ids; }
+};
+
+struct IdEvent : public EventBase<IdEvent> {
+ static const std::string KEY;
+ ReplicationId id;
+
+ IdEvent(ReplicationId id_=0) : id(id_) {}
+ void encode(framing::Buffer& b) const { b.put(id); }
+ void decode(framing::Buffer& b) { b.get(id); }
+ virtual size_t encodedSize() const { return id.encodedSize(); }
+ void print(std::ostream& o) const { o << id; }
+};
+
+//// Transaction events
+
+struct TxEnqueueEvent : public EventBase<TxEnqueueEvent> {
+ static const std::string KEY;
+ framing::LongString queue;
+ ReplicationId id;
+
+ TxEnqueueEvent(std::string q=std::string(), ReplicationId i=ReplicationId())
+ : queue(q), id(i) {}
+ void encode(framing::Buffer& b) const { b.put(queue); b.put(id); }
+ void decode(framing::Buffer& b) { b.get(queue); b.get(id); }
+ virtual size_t encodedSize() const { return queue.encodedSize()+id.encodedSize(); }
+ void print(std::ostream& o) const { o << queue.value << " " << id; }
+};
+
+struct TxDequeueEvent : public EventBase<TxDequeueEvent> {
+ static const std::string KEY;
+ framing::LongString queue;
+ ReplicationId id;
+
+ TxDequeueEvent(std::string q=std::string(), ReplicationId r=0) :
+ queue(q), id(r) {}
+ void encode(framing::Buffer& b) const { b.put(queue);b.put(id); }
+ void decode(framing::Buffer& b) { b.get(queue);b.get(id); }
+ virtual size_t encodedSize() const { return queue.encodedSize()+id.encodedSize(); }
+ void print(std::ostream& o) const { o << queue.value << " " << id; }
+};
+
+struct TxPrepareEvent : public EventBase<TxPrepareEvent> {
+ static const std::string KEY;
+ void encode(framing::Buffer&) const {}
+ void decode(framing::Buffer&) {}
+ virtual size_t encodedSize() const { return 0; }
+ void print(std::ostream&) const {}
+};
+
+struct TxCommitEvent : public EventBase<TxCommitEvent> {
+ static const std::string KEY;
+ void encode(framing::Buffer&) const {}
+ void decode(framing::Buffer&) {}
+ virtual size_t encodedSize() const { return 0; }
+ void print(std::ostream&) const {}
+};
+
+struct TxRollbackEvent : public EventBase<TxRollbackEvent> {
+ static const std::string KEY;
+ void encode(framing::Buffer&) const {}
+ void decode(framing::Buffer&) {}
+ virtual size_t encodedSize() const { return 0; }
+ void print(std::ostream&) const {}
+};
+
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_EVENT_H*/
diff --git a/cpp/src/qpid/ha/FailoverExchange.cpp b/cpp/src/qpid/ha/FailoverExchange.cpp
index 556c7458b6..46cc345ab0 100644
--- a/cpp/src/qpid/ha/FailoverExchange.cpp
+++ b/cpp/src/qpid/ha/FailoverExchange.cpp
@@ -19,7 +19,7 @@
*
*/
#include "FailoverExchange.h"
-#include "makeMessage.h"
+#include "Event.h"
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/DeliverableMessage.h"
@@ -117,7 +117,7 @@ void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue, sys::Mutex::Sc
if (urls.empty()) return;
framing::Array array = vectorToUrlArray(urls);
const ProtocolVersion v;
- broker::Message message(makeMessage(Buffer(), typeName));
+ broker::Message message(makeMessage(std::string(), typeName));
MessageTransfer& transfer = MessageTransfer::get(message);
MessageProperties* props =
transfer.getFrames().getHeaders()->get<framing::MessageProperties>(true);
diff --git a/cpp/src/qpid/ha/HaBroker.cpp b/cpp/src/qpid/ha/HaBroker.cpp
index e8c7c5c7d8..5896a5568a 100644
--- a/cpp/src/qpid/ha/HaBroker.cpp
+++ b/cpp/src/qpid/ha/HaBroker.cpp
@@ -75,7 +75,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
// otherwise there's a window for a client to connect before we get to
// initialize()
if (settings.cluster) {
- QPID_LOG(debug, "Broker startup, rejecting client connections.");
+ QPID_LOG(debug, "Backup starting, rejecting client connections.");
shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder);
observer->setObserver(excluder, "Backup: ");
broker.getConnectionObservers().add(observer);
diff --git a/cpp/src/qpid/ha/HaBroker.h b/cpp/src/qpid/ha/HaBroker.h
index 8e5d30acfb..6084db3fc3 100644
--- a/cpp/src/qpid/ha/HaBroker.h
+++ b/cpp/src/qpid/ha/HaBroker.h
@@ -84,6 +84,7 @@ class HaBroker : public management::Manageable
broker::Broker& getBroker() { return broker; }
const Settings& getSettings() const { return settings; }
+ boost::shared_ptr<Role> getRole() const {return role; }
/** Shut down the broker because of a critical error. */
void shutdown(const std::string& message);
diff --git a/cpp/src/qpid/ha/Membership.cpp b/cpp/src/qpid/ha/Membership.cpp
index 411bad3841..b53291ba89 100644
--- a/cpp/src/qpid/ha/Membership.cpp
+++ b/cpp/src/qpid/ha/Membership.cpp
@@ -107,6 +107,14 @@ BrokerInfo::Set Membership::otherBackups() const {
return result;
}
+BrokerInfo::Set Membership::getBrokers() const {
+ Mutex::ScopedLock l(lock);
+ BrokerInfo::Set result;
+ transform(brokers.begin(), brokers.end(), inserter(result, result.begin()),
+ bind(&BrokerInfo::Map::value_type::second, _1));
+ return result;
+}
+
bool Membership::get(const types::Uuid& id, BrokerInfo& result) const {
Mutex::ScopedLock l(lock);
BrokerInfo::Map::const_iterator i = brokers.find(id);
diff --git a/cpp/src/qpid/ha/Membership.h b/cpp/src/qpid/ha/Membership.h
index f442586a71..828f9e8403 100644
--- a/cpp/src/qpid/ha/Membership.h
+++ b/cpp/src/qpid/ha/Membership.h
@@ -69,6 +69,9 @@ class Membership
/** Return IDs of all READY backups other than self */
BrokerInfo::Set otherBackups() const;
+ /** Return IDs of all brokers */
+ BrokerInfo::Set getBrokers() const;
+
void assign(const types::Variant::List&);
types::Variant::List asList() const;
diff --git a/cpp/src/qpid/ha/Primary.cpp b/cpp/src/qpid/ha/Primary.cpp
index bae651a3fc..5fd7814d62 100644
--- a/cpp/src/qpid/ha/Primary.cpp
+++ b/cpp/src/qpid/ha/Primary.cpp
@@ -27,6 +27,7 @@
#include "RemoteBackup.h"
#include "ConnectionObserver.h"
#include "QueueReplicator.h"
+#include "PrimaryTxObserver.h"
#include "qpid/assert.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/BrokerObserver.h"
@@ -34,16 +35,19 @@
#include "qpid/broker/Queue.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
-#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
+#include "qpid/types/Uuid.h"
#include "qpid/sys/Timer.h"
#include <boost/bind.hpp>
+#include <boost/make_shared.hpp>
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace ha {
using sys::Mutex;
using boost::shared_ptr;
+using boost::make_shared;
using namespace std;
using namespace framing;
@@ -67,7 +71,10 @@ class PrimaryBrokerObserver : public broker::BrokerObserver
void queueDestroy(const Primary::QueuePtr& q) { primary.queueDestroy(q); }
void exchangeCreate(const Primary::ExchangePtr& q) { primary.exchangeCreate(q); }
void exchangeDestroy(const Primary::ExchangePtr& q) { primary.exchangeDestroy(q); }
- private:
+ void startTx(const shared_ptr<broker::TxBuffer>& tx) { primary.startTx(tx); }
+ void startDtx(const shared_ptr<broker::DtxBuffer>& dtx) { primary.startDtx(dtx); }
+
+ private:
Primary& primary;
};
@@ -208,6 +215,26 @@ void Primary::readyReplica(const ReplicatingSubscription& rs) {
if (backup) checkReady(backup);
}
+void Primary::addReplica(ReplicatingSubscription& rs) {
+ sys::Mutex::ScopedLock l(lock);
+ replicas[make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())] = &rs;
+}
+
+void Primary::skip(
+ const types::Uuid& backup,
+ const boost::shared_ptr<broker::Queue>& queue,
+ const ReplicationIdSet& ids)
+{
+ sys::Mutex::ScopedLock l(lock);
+ ReplicaMap::const_iterator i = replicas.find(make_pair(backup, queue));
+ if (i != replicas.end()) i->second->addSkip(ids);
+}
+
+void Primary::removeReplica(const ReplicatingSubscription& rs) {
+ sys::Mutex::ScopedLock l(lock);
+ replicas.erase(make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue()));
+}
+
// NOTE: Called with queue registry lock held.
void Primary::queueCreate(const QueuePtr& q) {
// Set replication argument.
@@ -361,4 +388,14 @@ void Primary::setCatchupQueues(const RemoteBackupPtr& backup, bool createGuards)
backup->startCatchup();
}
+void Primary::startTx(const boost::shared_ptr<broker::TxBuffer>& tx) {
+ QPID_LOG(trace, logPrefix << "Started TX transaction");
+ tx->setObserver(make_shared<PrimaryTxObserver>(boost::ref(haBroker)));
+}
+
+void Primary::startDtx(const boost::shared_ptr<broker::DtxBuffer>& dtx) {
+ QPID_LOG(trace, logPrefix << "Started DTX transaction");
+ dtx->setObserver(make_shared<PrimaryTxObserver>(boost::ref(haBroker)));
+}
+
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/Primary.h b/cpp/src/qpid/ha/Primary.h
index 031ad3aab9..a34f8f5a30 100644
--- a/cpp/src/qpid/ha/Primary.h
+++ b/cpp/src/qpid/ha/Primary.h
@@ -31,6 +31,7 @@
#include <boost/shared_ptr.hpp>
#include <boost/intrusive_ptr.hpp>
#include <string>
+#include <boost/functional/hash.hpp>
namespace qpid {
@@ -39,6 +40,8 @@ class Queue;
class Connection;
class ConnectionObserver;
class BrokerObserver;
+class TxBuffer;
+class DtxBuffer;
}
namespace sys {
@@ -75,13 +78,21 @@ class Primary : public Role
void setBrokerUrl(const Url&) {}
void readyReplica(const ReplicatingSubscription&);
- void removeReplica(const std::string& q);
+ void addReplica(ReplicatingSubscription&);
+ void removeReplica(const ReplicatingSubscription&);
+
+ /** Skip replication of ids to queue on backup. */
+ void skip(const types::Uuid& backup,
+ const boost::shared_ptr<broker::Queue>& queue,
+ const ReplicationIdSet& ids);
// Called via BrokerObserver
void queueCreate(const QueuePtr&);
void queueDestroy(const QueuePtr&);
void exchangeCreate(const ExchangePtr&);
void exchangeDestroy(const ExchangePtr&);
+ void startTx(const boost::shared_ptr<broker::TxBuffer>&);
+ void startDtx(const boost::shared_ptr<broker::DtxBuffer>&);
// Called via ConnectionObserver
void opened(broker::Connection& connection);
@@ -93,11 +104,15 @@ class Primary : public Role
void timeoutExpectedBackups();
private:
- typedef qpid::sys::unordered_map<
+ typedef sys::unordered_map<
types::Uuid, RemoteBackupPtr, types::Uuid::Hasher > BackupMap;
typedef std::set<RemoteBackupPtr > BackupSet;
+ typedef std::pair<types::Uuid, boost::shared_ptr<broker::Queue> > UuidQueue;
+ typedef sys::unordered_map<UuidQueue, ReplicatingSubscription*,
+ boost::hash<UuidQueue> > ReplicaMap;
+
RemoteBackupPtr backupConnect(const BrokerInfo&, broker::Connection&, sys::Mutex::ScopedLock&);
void backupDisconnect(RemoteBackupPtr, sys::Mutex::ScopedLock&);
@@ -105,8 +120,9 @@ class Primary : public Role
void checkReady();
void checkReady(RemoteBackupPtr);
void setCatchupQueues(const RemoteBackupPtr&, bool createGuards);
+ void deduplicate();
- sys::Mutex lock;
+ mutable sys::Mutex lock;
HaBroker& haBroker;
Membership& membership;
std::string logPrefix;
@@ -126,6 +142,7 @@ class Primary : public Role
boost::shared_ptr<broker::ConnectionObserver> connectionObserver;
boost::shared_ptr<broker::BrokerObserver> brokerObserver;
boost::intrusive_ptr<sys::TimerTask> timerTask;
+ ReplicaMap replicas;
};
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/cpp/src/qpid/ha/PrimaryTxObserver.cpp
new file mode 100644
index 0000000000..8a8364ac22
--- /dev/null
+++ b/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Event.h"
+#include "HaBroker.h"
+#include "Primary.h"
+#include "PrimaryTxObserver.h"
+#include "QueueGuard.h"
+#include "RemoteBackup.h"
+#include "ReplicatingSubscription.h"
+
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Queue.h"
+#include <boost/lexical_cast.hpp>
+
+namespace qpid {
+namespace ha {
+
+using namespace std;
+using namespace boost;
+using namespace broker;
+
+PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
+ haBroker(hb), broker(hb.getBroker()),
+ id(true) // FIXME aconway 2013-07-11: is UUID an appropriate TX ID?
+{
+ logPrefix = "Primary transaction "+id.str().substr(0,8)+": ";
+ QPID_LOG(trace, logPrefix << "started");
+ pair<shared_ptr<Queue>, bool> result =
+ broker.getQueues().declare(
+ TRANSACTION_REPLICATOR_PREFIX+id.str(),
+ QueueSettings(/*durable*/false, /*autodelete*/true));
+ assert(result.second);
+ txQueue = result.first;
+}
+
+void PrimaryTxObserver::enqueue(const QueuePtr& q, const broker::Message& m)
+{
+ QPID_LOG(trace, logPrefix << "enqueue: " << LogMessageId(*q, m));
+ enqueues[q] += m.getReplicationId();
+ txQueue->deliver(TxEnqueueEvent(q->getName(), m.getReplicationId()).message());
+ txQueue->deliver(m);
+}
+
+void PrimaryTxObserver::dequeue(
+ const QueuePtr& q, QueuePosition pos, ReplicationId id)
+{
+ QPID_LOG(trace, logPrefix << "dequeue: " << LogMessageId(*q, pos, id));
+ txQueue->deliver(TxDequeueEvent(q->getName(), id).message());
+}
+
+void PrimaryTxObserver::deduplicate() {
+ shared_ptr<Primary> primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole()));
+ assert(primary);
+ // FIXME aconway 2013-07-29: need to verify which backups are *in* the transaction.
+ // Use cluster membership for now
+ BrokerInfo::Set brokers = haBroker.getMembership().getBrokers();
+ types::Uuid selfId = haBroker.getMembership().getSelf();
+ // Tell replicating subscriptions to skip IDs in the transaction.
+ for (BrokerInfo::Set::iterator b = brokers.begin(); b != brokers.end(); ++b) {
+ if (b->getSystemId() == selfId) continue;
+ for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q)
+ primary->skip(b->getSystemId(), q->first, q->second);
+ }
+}
+
+bool PrimaryTxObserver::prepare() {
+ // FIXME aconway 2013-07-23: need to delay completion of prepare till all
+ // backups have prepared.
+ QPID_LOG(trace, logPrefix << "prepare");
+ deduplicate();
+ txQueue->deliver(TxPrepareEvent().message());
+ return true;
+}
+
+void PrimaryTxObserver::commit() {
+ QPID_LOG(trace, logPrefix << "commit");
+ txQueue->deliver(TxCommitEvent().message());
+}
+
+void PrimaryTxObserver::rollback() {
+ QPID_LOG(trace, logPrefix << "rollback");
+ txQueue->deliver(TxRollbackEvent().message());
+}
+
+}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/PrimaryTxObserver.h b/cpp/src/qpid/ha/PrimaryTxObserver.h
new file mode 100644
index 0000000000..3681c6b750
--- /dev/null
+++ b/cpp/src/qpid/ha/PrimaryTxObserver.h
@@ -0,0 +1,77 @@
+#ifndef QPID_HA_PRIMARYTXOBSERVER_H
+#define QPID_HA_PRIMARYTXOBSERVER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+
+#include "qpid/broker/TransactionObserver.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/sys/unordered_map.h"
+#include <boost/functional/hash.hpp>
+namespace qpid {
+
+namespace broker {
+class Broker;
+}
+
+namespace ha {
+class HaBroker;
+
+/**
+ * Observe events in the lifecycle of a transaction.
+ *
+ * The observer is called by TxBuffer for each transactional event.
+ * It puts the events on a special tx-queue.
+ * A TxReplicator on the backup replicates the tx-queue and creates
+ * a TxBuffer on the backup equivalent to the one on the primary.
+ *
+ * THREAD UNSAFE: called sequentially in the context of a transaction.
+ */
+class PrimaryTxObserver : public broker::TransactionObserver {
+ public:
+ PrimaryTxObserver(HaBroker&);
+
+ void enqueue(const QueuePtr&, const broker::Message&);
+ void dequeue(const QueuePtr& queue, QueuePosition, ReplicationId);
+ bool prepare();
+ void commit();
+ void rollback();
+
+ private:
+ typedef qpid::sys::unordered_map<
+ QueuePtr, ReplicationIdSet, boost::hash<QueuePtr> > QueueIdsMap;
+
+ void deduplicate();
+
+ std::string logPrefix;
+ HaBroker& haBroker;
+ broker::Broker& broker;
+ framing::Uuid id;
+ QueuePtr txQueue;
+ QueueIdsMap enqueues;
+};
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_PRIMARYTXOBSERVER_H*/
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index d99602fdda..28e9dc4120 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -19,12 +19,13 @@
*
*/
-#include "makeMessage.h"
+#include "Event.h"
#include "HaBroker.h"
#include "QueueReplicator.h"
#include "QueueSnapshots.h"
#include "ReplicatingSubscription.h"
#include "Settings.h"
+#include "types.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Link.h"
@@ -38,36 +39,32 @@
#include "qpid/Msg.h"
#include "qpid/assert.h"
#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
-namespace {
-const std::string QPID_REPLICATOR_("qpid.replicator-");
-const std::string TYPE_NAME("qpid.queue-replicator");
-const std::string QPID_HA("qpid.ha-");
-}
namespace qpid {
namespace ha {
using namespace broker;
using namespace framing;
using namespace std;
+using namespace boost;
+using std::exception;
using sys::Mutex;
-const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA+"dequeue");
-const std::string QueueReplicator::ID_EVENT_KEY(QPID_HA+"id");
const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency");
-std::string QueueReplicator::replicatorName(const std::string& queueName) {
- return QPID_REPLICATOR_ + queueName;
+namespace {
+const string QPID_HA(QPID_HA_PREFIX);
+const std::string TYPE_NAME(QPID_HA+"queue-replicator");
}
-bool QueueReplicator::isReplicatorName(const std::string& name) {
- return name.compare(0, QPID_REPLICATOR_.size(), QPID_REPLICATOR_) == 0;
+
+std::string QueueReplicator::replicatorName(const std::string& queueName) {
+ return QUEUE_REPLICATOR_PREFIX + queueName;
}
-bool QueueReplicator::isEventKey(const std::string key) {
- const std::string& prefix = QPID_HA;
- bool ret = key.size() > prefix.size() && key.compare(0, prefix.size(), prefix) == 0;
- return ret;
+bool QueueReplicator::isReplicatorName(const std::string& name) {
+ return startsWith(name, QUEUE_REPLICATOR_PREFIX);
}
class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
@@ -109,12 +106,12 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
boost::shared_ptr<Link> l)
: Exchange(replicatorName(q->getName()), 0, q->getBroker()),
haBroker(hb),
+ brokerInfo(hb.getBrokerInfo()),
logPrefix("Backup of "+q->getName()+": "),
- queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false),
+ queue(q), link(l), subscribed(false),
settings(hb.getSettings()), destroyed(false),
nextId(0), maxId(0)
{
- QPID_LOG(debug, logPrefix << "Created");
args.setString(QPID_REPLICATE, printable(NONE).str());
Uuid uuid(true);
bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
@@ -122,12 +119,18 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
args.setString(QPID_REPLICATE, printable(NONE).str());
setArgs(args);
if (q->isAutoDelete()) q->markInUse();
+
+ dispatch[DequeueEvent::KEY] =
+ boost::bind(&QueueReplicator::dequeueEvent, this, _1, _2);
+ dispatch[IdEvent::KEY] =
+ boost::bind(&QueueReplicator::idEvent, this, _1, _2);
}
// This must be called immediately after the constructor.
// It has to be separate so we can call shared_from_this().
void QueueReplicator::activate() {
Mutex::ScopedLock l(lock);
+ QPID_LOG(debug, logPrefix << "Created");
if (!queue) return; // Already destroyed
// Enable callback to route()
@@ -224,44 +227,57 @@ template <class T> T decodeContent(Message& m) {
}
}
-void QueueReplicator::dequeue(const ReplicationIdSet& dequeues, Mutex::ScopedLock&) {
- QPID_LOG(trace, logPrefix << "Dequeue " << dequeues);
+void QueueReplicator::dequeueEvent(const string& data, Mutex::ScopedLock&) {
+ DequeueEvent e;
+ decodeStr(data, e);
+ QPID_LOG(trace, logPrefix << "Dequeue " << e.ids);
//TODO: should be able to optimise the following
- for (ReplicationIdSet::iterator i = dequeues.begin(); i != dequeues.end(); ++i) {
+ for (ReplicationIdSet::iterator i = e.ids.begin(); i != e.ids.end(); ++i) {
PositionMap::iterator j = positions.find(*i);
if (j != positions.end()) queue->dequeueMessageAt(j->second);
}
}
// Called in connection thread of the queues bridge to primary.
-void QueueReplicator::route(Deliverable& msg)
+void QueueReplicator::route(Deliverable& deliverable)
{
try {
Mutex::ScopedLock l(lock);
if (destroyed) return;
- const std::string& key = msg.getMessage().getRoutingKey();
- if (!isEventKey(key)) { // Replicated message
+ broker::Message& message(deliverable.getMessage());
+ string key(message.getRoutingKey());
+ if (!isEventKey(message.getRoutingKey())) {
ReplicationId id = nextId++;
maxId = std::max(maxId, id);
- msg.getMessage().setReplicationId(id);
- msg.deliverTo(queue);
+ message.setReplicationId(id);
+ deliver(message);
QueuePosition position = queue->getPosition();
positions[id] = position;
QPID_LOG(trace, logPrefix << "Enqueued " << LogMessageId(*queue,position,id));
}
- else if (key == DEQUEUE_EVENT_KEY) {
- dequeue(decodeContent<ReplicationIdSet>(msg.getMessage()), l);
+ else {
+ DispatchMap::iterator i = dispatch.find(key);
+ if (i == dispatch.end()) {
+ QPID_LOG(info, logPrefix << "Ignoring unknown event: " << key);
+ }
+ else {
+ (i->second)(message.getContent(), l);
+ }
}
- else if (key == ID_EVENT_KEY) {
- nextId = decodeContent<ReplicationId>(msg.getMessage());
- }
- // Ignore unknown event keys, may be introduced in later versions.
}
catch (const std::exception& e) {
haBroker.shutdown(QPID_MSG(logPrefix << "Replication failed: " << e.what()));
}
}
+void QueueReplicator::deliver(const broker::Message& m) {
+ queue->deliver(m);
+}
+
+void QueueReplicator::idEvent(const string& data, Mutex::ScopedLock&) {
+ nextId = decodeStr<IdEvent>(data).id;
+}
+
ReplicationId QueueReplicator::getMaxId() {
Mutex::ScopedLock l(lock);
return maxId;
@@ -273,4 +289,5 @@ bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const
bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; }
std::string QueueReplicator::getType() const { return TYPE_NAME; }
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/ha/QueueReplicator.h b/cpp/src/qpid/ha/QueueReplicator.h
index 811ddba256..90f38ce7e1 100644
--- a/cpp/src/qpid/ha/QueueReplicator.h
+++ b/cpp/src/qpid/ha/QueueReplicator.h
@@ -26,6 +26,7 @@
#include "hash.h"
#include "qpid/broker/Exchange.h"
#include <boost/enable_shared_from_this.hpp>
+#include <boost/function.hpp>
#include <iosfwd>
namespace qpid {
@@ -56,23 +57,19 @@ class QueueReplicator : public broker::Exchange,
public boost::enable_shared_from_this<QueueReplicator>
{
public:
- static const std::string DEQUEUE_EVENT_KEY;
- static const std::string ID_EVENT_KEY;
static const std::string QPID_SYNC_FREQUENCY;
+ static const std::string REPLICATOR_PREFIX;
static std::string replicatorName(const std::string& queueName);
static bool isReplicatorName(const std::string&);
- /** Test if a string is an event key */
- static bool isEventKey(const std::string key);
-
QueueReplicator(HaBroker&,
boost::shared_ptr<broker::Queue> q,
boost::shared_ptr<broker::Link> l);
~QueueReplicator();
- void activate(); // Must be called immediately after constructor.
+ void activate(); // Must be called immediately after constructor.
std::string getType() const;
bool bind(boost::shared_ptr<broker::Queue
@@ -89,24 +86,36 @@ class QueueReplicator : public broker::Exchange,
ReplicationId getMaxId();
- private:
- typedef qpid::sys::unordered_map<ReplicationId, QueuePosition, TrivialHasher<int32_t> > PositionMap;
+ protected:
+ typedef boost::function<void(const std::string&, sys::Mutex::ScopedLock&)> DispatchFn;
+ typedef qpid::sys::unordered_map<std::string, DispatchFn> DispatchMap;
+
+ virtual void deliver(const broker::Message&);
+
+ sys::Mutex lock;
+ HaBroker& haBroker;
+ const BrokerInfo brokerInfo;
+ DispatchMap dispatch;
+ private:
+ typedef qpid::sys::unordered_map<
+ ReplicationId, QueuePosition, TrivialHasher<int32_t> > PositionMap;
class ErrorListener;
class QueueObserver;
void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
void destroy(); // Called when the queue is destroyed.
- void dequeue(const ReplicationIdSet&, sys::Mutex::ScopedLock&);
- HaBroker& haBroker;
+ // Dispatch functions
+ void dequeueEvent(const std::string& data, sys::Mutex::ScopedLock&);
+ void idEvent(const std::string& data, sys::Mutex::ScopedLock&);
+
std::string logPrefix;
std::string bridgeName;
- sys::Mutex lock;
boost::shared_ptr<broker::Queue> queue;
boost::shared_ptr<broker::Link> link;
boost::shared_ptr<broker::Bridge> bridge;
- BrokerInfo brokerInfo;
+
bool subscribed;
const Settings& settings;
bool destroyed;
diff --git a/cpp/src/qpid/ha/RemoteBackup.cpp b/cpp/src/qpid/ha/RemoteBackup.cpp
index e55d415972..776a584bc8 100644
--- a/cpp/src/qpid/ha/RemoteBackup.cpp
+++ b/cpp/src/qpid/ha/RemoteBackup.cpp
@@ -20,6 +20,7 @@
*/
#include "RemoteBackup.h"
#include "QueueGuard.h"
+#include "TxReplicator.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/Queue.h"
@@ -65,6 +66,8 @@ bool RemoteBackup::isReady() {
}
void RemoteBackup::catchupQueue(const QueuePtr& q, bool createGuard) {
+ // Ignore transaction queues for purposes of catch-up calculation
+ if (TxReplicator::isTxQueue(q->getName())) return;
if (replicationTest.getLevel(*q) == ALL) {
QPID_LOG(debug, logPrefix << "Catch-up queue"
<< (createGuard ? " and guard" : "") << ": " << q->getName());
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 2001ec5332..9f464f8066 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -19,7 +19,7 @@
*
*/
-#include "makeMessage.h"
+#include "Event.h"
#include "IdSetter.h"
#include "QueueGuard.h"
#include "QueueReplicator.h"
@@ -36,6 +36,7 @@
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
#include "qpid/types/Uuid.h"
+#include <boost/pointer_cast.hpp>
#include <sstream>
@@ -45,6 +46,7 @@ namespace ha {
using namespace framing;
using namespace broker;
using namespace std;
+using namespace boost;
using sys::Mutex;
using broker::amqp_0_10::MessageTransfer;
@@ -111,7 +113,8 @@ ReplicatingSubscription::ReplicatingSubscription(
) : ConsumerImpl(parent, name, queue, ack, REPLICATOR, exclusive, tag,
resumeId, resumeTtl, arguments),
position(0), ready(false), cancelled(false),
- haBroker(hb)
+ haBroker(hb),
+ primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole()))
{
try {
FieldTable ft;
@@ -137,8 +140,6 @@ ReplicatingSubscription::ReplicatingSubscription(
}
// If there's already a guard (we are in failover) use it, else create one.
- boost::shared_ptr<Primary> primary =
- boost::dynamic_pointer_cast<Primary>(haBroker.getRole());
if (primary) guard = primary->getGuard(queue, info);
if (!guard) guard.reset(new QueueGuard(*queue, info));
@@ -163,7 +164,6 @@ ReplicatingSubscription::ReplicatingSubscription(
sys::Mutex::ScopedLock l(lock); // Concurrent calls to dequeued()
dequeues += initDequeues; // Messages on backup that are not on primary.
skip = backupIds - initDequeues; // Messages already on the backup.
-
// Queue front is moving but we know this subscriptions will start at a
// position >= front so if front is safe then position must be.
position = front;
@@ -191,6 +191,7 @@ ReplicatingSubscription::~ReplicatingSubscription() {}
//
void ReplicatingSubscription::initialize() {
try {
+ if (primary) primary->addReplica(*this);
Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently.
// Send initial dequeues to the backup.
// There must be a shared_ptr(this) when sending.
@@ -218,9 +219,8 @@ bool ReplicatingSubscription::deliver(
try {
bool result = false;
if (skip.contains(id)) {
+ QPID_LOG(trace, logPrefix << "Skip " << LogMessageId(*getQueue(), m));
skip -= id;
- QPID_LOG(trace, logPrefix << "On backup, skip " <<
- LogMessageId(*getQueue(), m));
guard->complete(id); // This will never be acknowledged.
notify();
result = true;
@@ -240,17 +240,12 @@ bool ReplicatingSubscription::deliver(
}
}
-/**
- *@param position: must be <= last position seen by subscription.
- */
void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) {
if (!ready && isGuarded(l) && unready.empty()) {
ready = true;
sys::Mutex::ScopedUnlock u(lock);
// Notify Primary that a subscription is ready.
QPID_LOG(debug, logPrefix << "Caught up");
- boost::shared_ptr<Primary> primary =
- boost::dynamic_pointer_cast<Primary>(haBroker.getRole());
if (primary) primary->readyReplica(*this);
}
}
@@ -264,6 +259,7 @@ void ReplicatingSubscription::cancel()
cancelled = true;
}
QPID_LOG(debug, logPrefix << "Cancelled");
+ if (primary) primary->removeReplica(*this);
getQueue()->removeObserver(observer);
guard->cancel();
ConsumerImpl::cancel();
@@ -289,9 +285,7 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock& l)
{
if (dequeues.empty()) return;
QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
- string buffer = encodeStr(dequeues);
- dequeues.clear();
- sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l);
+ sendEvent(DequeueEvent(dequeues), l);
}
// Called after the message has been removed
@@ -311,23 +305,16 @@ void ReplicatingSubscription::dequeued(ReplicationId id)
// Called with lock held. Called in subscription's connection thread.
void ReplicatingSubscription::sendIdEvent(ReplicationId pos, Mutex::ScopedLock& l)
{
- sendEvent(QueueReplicator::ID_EVENT_KEY, encodeStr(pos), l);
+ sendEvent(IdEvent(pos), l);
}
-void ReplicatingSubscription::sendEvent(const std::string& key,
- const std::string& buffer,
- Mutex::ScopedLock&)
+void ReplicatingSubscription::sendEvent(const Event& event, Mutex::ScopedLock&)
{
Mutex::ScopedUnlock u(lock);
- broker::Message message = makeMessage(buffer);
- MessageTransfer& transfer = MessageTransfer::get(message);
- DeliveryProperties* props =
- transfer.getFrames().getHeaders()->get<DeliveryProperties>(true);
- props->setRoutingKey(key);
// Send the event directly to the base consumer implementation. The dummy
// consumer prevents acknowledgements being handled, which is what we want
// for events
- ConsumerImpl::deliver(QueueCursor(), message, boost::shared_ptr<Consumer>());
+ ConsumerImpl::deliver(QueueCursor(), event.message(), boost::shared_ptr<Consumer>());
}
// Called in subscription's connection thread.
@@ -346,4 +333,9 @@ bool ReplicatingSubscription::doDispatch()
}
}
+void ReplicatingSubscription::addSkip(const ReplicationIdSet& ids) {
+ Mutex::ScopedLock l(lock);
+ skip += ids;
+}
+
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.h b/cpp/src/qpid/ha/ReplicatingSubscription.h
index c202089e91..8a4a48bb4e 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -44,6 +44,8 @@ class Buffer;
namespace ha {
class QueueGuard;
class HaBroker;
+class Event;
+class Primary;
/**
* A susbcription that replicates to a remote backup.
@@ -118,6 +120,9 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
BrokerInfo getBrokerInfo() const { return info; }
+ /** Skip replicating enqueue of of ids. */
+ void addSkip(const ReplicationIdSet& ids);
+
protected:
bool doDispatch();
@@ -128,7 +133,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
std::string logPrefix;
QueuePosition position;
ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event.
- ReplicationIdSet skip; // Messages already on backup will be skipped.
+ ReplicationIdSet skip; // Skip enqueues: messages already on backup and tx enqueues.
ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged.
bool ready;
bool cancelled;
@@ -136,12 +141,13 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
boost::shared_ptr<QueueGuard> guard;
HaBroker& haBroker;
boost::shared_ptr<QueueObserver> observer;
+ boost::shared_ptr<Primary> primary;
bool isGuarded(sys::Mutex::ScopedLock&);
void dequeued(ReplicationId);
void sendDequeueEvent(sys::Mutex::ScopedLock&);
void sendIdEvent(ReplicationId, sys::Mutex::ScopedLock&);
- void sendEvent(const std::string& key, const std::string& data, sys::Mutex::ScopedLock&);
+ void sendEvent(const Event&, sys::Mutex::ScopedLock&);
void checkReady(sys::Mutex::ScopedLock&);
friend class Factory;
};
diff --git a/cpp/src/qpid/ha/StatusCheck.cpp b/cpp/src/qpid/ha/StatusCheck.cpp
index d73f2cf8b5..11f65aa574 100644
--- a/cpp/src/qpid/ha/StatusCheck.cpp
+++ b/cpp/src/qpid/ha/StatusCheck.cpp
@@ -97,16 +97,10 @@ void StatusCheckThread::run() {
QPID_LOG(debug, statusCheck.logPrefix << "Status of " << url << ": " << status);
}
} catch(const exception& error) {
- QPID_LOG(info, statusCheck.logPrefix << "Checking status of " << url << ": " << error.what());
- }
- try { c.close(); }
- catch(const exception&) {
- QPID_LOG(warning, statusCheck.logPrefix << "Error closing status check connection to " << url);
- }
- try { c.close(); }
- catch(const exception&) {
- QPID_LOG(warning, "Error closing status check connection to " << url);
+ QPID_LOG(info, statusCheck.logPrefix << "Error checking status of " << url
+ << ": " << error.what());
}
+ try { c.close(); } catch(...) {}
delete this;
}
diff --git a/cpp/src/qpid/ha/TxReplicator.cpp b/cpp/src/qpid/ha/TxReplicator.cpp
new file mode 100644
index 0000000000..31c68dfe45
--- /dev/null
+++ b/cpp/src/qpid/ha/TxReplicator.cpp
@@ -0,0 +1,192 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include "TxReplicator.h"
+#include "Role.h"
+#include "Backup.h"
+#include "BrokerReplicator.h"
+#include "Event.h"
+#include "HaBroker.h"
+#include "types.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/TxBuffer.h"
+#include "qpid/broker/TxAccept.h"
+#include "qpid/framing/BufferTypes.h"
+#include "qpid/log/Statement.h"
+#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
+#include <boost/make_shared.hpp>
+
+namespace qpid {
+namespace ha {
+
+using namespace std;
+using namespace boost;
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+namespace {
+const string QPID_HA(QPID_HA_PREFIX);
+const string TYPE_NAME(QPID_HA+"tx-queue-replicator");
+const string PREFIX(TRANSACTION_REPLICATOR_PREFIX);
+} // namespace
+
+
+bool TxReplicator::isTxQueue(const string& q) {
+ return startsWith(q, PREFIX);
+}
+
+string TxReplicator::getTxId(const string& q) {
+ assert(isTxQueue(q));
+ return q.substr(PREFIX.size());
+}
+
+string TxReplicator::getType() const { return TYPE_NAME; }
+
+TxReplicator::TxReplicator(
+ HaBroker& hb,
+ const boost::shared_ptr<broker::Queue>& txQueue,
+ const boost::shared_ptr<broker::Link>& link) :
+ QueueReplicator(hb, txQueue, link),
+ id(getTxId(txQueue->getName())),
+ txBuffer(new broker::TxBuffer),
+ broker(hb.getBroker()),
+ store(broker.hasStore() ? &broker.getStore() : 0),
+ dequeueState(hb.getBroker().getQueues())
+{
+ logPrefix = "Backup of transaction "+id+": ";
+
+ if (!store) throw Exception(QPID_MSG(logPrefix << "No message store loaded."));
+ boost::shared_ptr<Backup> backup = dynamic_pointer_cast<Backup>(hb.getRole());
+ if (!backup) throw Exception(QPID_MSG(logPrefix << "Broker is not in backup mode."));
+ brokerReplicator = backup->getBrokerReplicator();
+
+ // Dispatch transaction events.
+ dispatch[TxEnqueueEvent::KEY] =
+ boost::bind(&TxReplicator::enqueue, this, _1, _2);
+ dispatch[TxDequeueEvent::KEY] =
+ boost::bind(&TxReplicator::dequeue, this, _1, _2);
+ dispatch[TxPrepareEvent::KEY] =
+ boost::bind(&TxReplicator::prepare, this, _1, _2);
+ dispatch[TxCommitEvent::KEY] =
+ boost::bind(&TxReplicator::commit, this, _1, _2);
+ dispatch[TxRollbackEvent::KEY] =
+ boost::bind(&TxReplicator::rollback, this, _1, _2);
+}
+
+void TxReplicator::deliver(const broker::Message& m_) {
+ // Deliver message to the target queue, not the tx-queue.
+ broker::Message m(m_);
+ m.setReplicationId(enq.id); // Use replicated id.
+ boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(enq.queue);
+ QPID_LOG(trace, logPrefix << "Deliver " << LogMessageId(*queue, m));
+ DeliverableMessage dm(m, txBuffer.get());
+ dm.deliverTo(queue);
+}
+
+void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) {
+ TxEnqueueEvent e;
+ decodeStr(data, e);
+ QPID_LOG(trace, logPrefix << "Enqueue: " << e);
+ enq = e;
+}
+
+void TxReplicator::dequeue(const string& data, sys::Mutex::ScopedLock&) {
+ TxDequeueEvent e;
+ decodeStr(data, e);
+ QPID_LOG(trace, logPrefix << "Dequeue: " << e);
+ // NOTE: Backup does not see transactional dequeues until the transaction is
+ // prepared, then they are all receieved before the prepare event.
+ // We collect the events here so we can do a single scan of the queue in prepare.
+ dequeueState.add(e);
+}
+
+void TxReplicator::DequeueState::add(const TxDequeueEvent& event) {
+ events[event.queue] += event.id;
+}
+
+// Use this function as a seek() predicate to find the dequeued messages.
+bool TxReplicator::DequeueState::addRecord(
+ const broker::Message& m, const boost::shared_ptr<Queue>& queue,
+ const ReplicationIdSet& rids)
+{
+ if (rids.contains(m.getReplicationId())) {
+ // FIXME aconway 2013-07-24:
+ // - Do we need to acquire before creating a DR?
+ // - Are the parameters to DeliveryRecord ok?
+ DeliveryRecord dr(cursor, m.getSequence(), m.getReplicationId(), queue,
+ string() /*tag*/,
+ boost::shared_ptr<Consumer>(),
+ true /*acquired*/,
+ false /*accepted*/,
+ false /*credit.isWindowMode()*/,
+ 0 /*credit*/);
+ // Fake record ids, unique within this transaction.
+ dr.setId(nextId++);
+ records.push_back(dr);
+ recordIds += dr.getId();
+ }
+ return false;
+}
+
+void TxReplicator::DequeueState::addRecords(const EventMap::value_type& entry) {
+ // Process all the dequeues for a single queue, in one pass of seek()
+ boost::shared_ptr<broker::Queue> q = queues.get(entry.first);
+ q->seek(cursor, boost::bind(&TxReplicator::DequeueState::addRecord,
+ this, _1, q, entry.second));
+}
+
+boost::shared_ptr<TxAccept> TxReplicator::DequeueState::makeAccept() {
+ for_each(events.begin(), events.end(),
+ boost::bind(&TxReplicator::DequeueState::addRecords, this, _1));
+ return make_shared<TxAccept>(cref(recordIds), ref(records));
+}
+
+void TxReplicator::prepare(const string&, sys::Mutex::ScopedLock&) {
+ QPID_LOG(trace, logPrefix << "Prepare");
+ txBuffer->enlist(dequeueState.makeAccept());
+ context = store->begin();
+ txBuffer->prepare(context.get());
+ // FIXME aconway 2013-07-26: notify the primary of prepare outcome.
+}
+
+void TxReplicator::commit(const string&, sys::Mutex::ScopedLock&) {
+ QPID_LOG(trace, logPrefix << "Commit");
+ if (context.get()) store->commit(*context);
+ txBuffer->commit();
+ end();
+}
+
+void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock&) {
+ QPID_LOG(trace, logPrefix << "Rollback");
+ if (context.get()) store->abort(*context);
+ txBuffer->rollback();
+ end();
+}
+
+void TxReplicator::end(){
+ // FIXME aconway 2013-07-26: destroying the txqueue (auto-delete?) will
+ // destroy this via QueueReplicator::destroy
+}
+}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/TxReplicator.h b/cpp/src/qpid/ha/TxReplicator.h
new file mode 100644
index 0000000000..c4df8c13f9
--- /dev/null
+++ b/cpp/src/qpid/ha/TxReplicator.h
@@ -0,0 +1,123 @@
+#ifndef QPID_HA_TRANSACTIONREPLICATOR_H
+#define QPID_HA_TRANSACTIONREPLICATOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "QueueReplicator.h"
+#include "Event.h"
+#include "qpid/broker/DeliveryRecord.h"
+#include "qpid/broker/TransactionalStore.h"
+#include "qpid/sys/Mutex.h"
+
+namespace qpid {
+
+namespace broker {
+class TxBuffer;
+class TxAccept;
+class DtxBuffer;
+class Broker;
+class MessageStore;
+}
+
+namespace ha {
+class BrokerReplicator;
+
+/**
+ * Exchange created on a backup broker to replicate a transaction on the primary.
+ *
+ * Subscribes to a tx-queue like a normal queue but puts replicated messages and
+ * transaction events into a local TxBuffer.
+ *
+ * THREAD SAFE: Called in different connection threads.
+ */
+class TxReplicator : public QueueReplicator {
+ public:
+ typedef boost::shared_ptr<broker::Queue> QueuePtr;
+ typedef boost::shared_ptr<broker::Link> LinkPtr;
+
+ static bool isTxQueue(const std::string& queue);
+ static std::string getTxId(const std::string& queue);
+
+ TxReplicator(HaBroker&, const QueuePtr& txQueue, const LinkPtr& link);
+
+ std::string getType() const;
+
+ protected:
+
+ void deliver(const broker::Message&);
+
+ private:
+
+ typedef void (TxReplicator::*DispatchFunction)(
+ const std::string&, sys::Mutex::ScopedLock&);
+ typedef qpid::sys::unordered_map<std::string, DispatchFunction> DispatchMap;
+ typedef qpid::sys::unordered_map<std::string, ReplicationIdSet> DequeueMap;
+
+ void enqueue(const std::string& data, sys::Mutex::ScopedLock&);
+ void dequeue(const std::string& data, sys::Mutex::ScopedLock&);
+ void prepare(const std::string& data, sys::Mutex::ScopedLock&);
+ void commit(const std::string& data, sys::Mutex::ScopedLock&);
+ void rollback(const std::string& data, sys::Mutex::ScopedLock&);
+ void end();
+
+ std::string logPrefix;
+ const std::string id;
+ TxEnqueueEvent enq; // Enqueue data for next deliver.
+ boost::shared_ptr<broker::TxBuffer> txBuffer;
+ broker::Broker& broker;
+ broker::MessageStore* store;
+ boost::shared_ptr<BrokerReplicator> brokerReplicator;
+ std::auto_ptr<broker::TransactionContext> context;
+
+ // Class to process dequeues and create DeliveryRecords to populate a
+ // TxAccept.
+ class DequeueState {
+ public:
+ DequeueState(broker::QueueRegistry& qr) : queues(qr) {}
+ void add(const TxDequeueEvent&);
+ boost::shared_ptr<broker::TxAccept> makeAccept();
+
+ private:
+ // Delivery record IDs are command IDs from the session.
+ // On a backup we will just fake these Ids.
+ typedef framing::SequenceNumber Id;
+ typedef framing::SequenceSet IdSet;
+ typedef qpid::sys::unordered_map<std::string, ReplicationIdSet> EventMap;
+
+ bool addRecord(const broker::Message& m, const boost::shared_ptr<broker::Queue>&,
+ const ReplicationIdSet& );
+ void addRecords(const DequeueMap::value_type& entry);
+
+ broker::QueueRegistry& queues;
+ EventMap events;
+ broker::DeliveryRecords records;
+ broker::QueueCursor cursor;
+ framing::SequenceNumber nextId;
+ IdSet recordIds;
+ };
+ DequeueState dequeueState;
+};
+
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_TRANSACTIONREPLICATOR_H*/
diff --git a/cpp/src/qpid/ha/makeMessage.h b/cpp/src/qpid/ha/makeMessage.h
deleted file mode 100644
index 4427cdd948..0000000000
--- a/cpp/src/qpid/ha/makeMessage.h
+++ /dev/null
@@ -1,66 +0,0 @@
-#ifndef QPID_HA_MAKEMESSAGE_H
-#define QPID_HA_MAKEMESSAGE_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/broker/Message.h"
-#include "qpid/framing/Buffer.h"
-#include <string>
-
-/** Utilities for creating messages used by HA internally. */
-
-namespace qpid {
-namespace framing {
-class Buffer;
-}
-namespace ha {
-
-/**
- * Create internal messages used by HA components.
- */
-broker::Message makeMessage(
- const framing::Buffer& content,
- const std::string& destination=std::string()
-);
-
-broker::Message makeMessage(const std::string& content,
- const std::string& destination=std::string());
-
-/** Encode value as a string. */
-template <class T> std::string encodeStr(const T& value) {
- std::string encoded(value.encodedSize(), '\0');
- framing::Buffer buffer(&encoded[0], encoded.size());
- value.encode(buffer);
- return encoded;
-}
-
-/** Decode value from a string. */
-template <class T> T decodeStr(const std::string& encoded) {
- framing::Buffer buffer(const_cast<char*>(&encoded[0]), encoded.size());
- T value;
- value.decode(buffer);
- return value;
-}
-
-}} // namespace qpid::ha
-
-#endif /*!QPID_HA_MAKEMESSAGE_H*/
diff --git a/cpp/src/qpid/ha/types.cpp b/cpp/src/qpid/ha/types.cpp
index 2246355339..10d6bd4e3b 100644
--- a/cpp/src/qpid/ha/types.cpp
+++ b/cpp/src/qpid/ha/types.cpp
@@ -37,6 +37,14 @@ using namespace std;
const string QPID_REPLICATE("qpid.replicate");
const string QPID_HA_UUID("qpid.ha-uuid");
+const char* QPID_HA_PREFIX = "qpid.ha-";
+const char* QUEUE_REPLICATOR_PREFIX = "qpid.ha-q:";
+const char* TRANSACTION_REPLICATOR_PREFIX = "qpid.ha-tx:";
+
+bool startsWith(const string& name, const string& prefix) {
+ return name.compare(0, prefix.size(), prefix) == 0;
+}
+
string EnumBase::str() const {
assert(value < count);
return names[value];
diff --git a/cpp/src/qpid/ha/types.h b/cpp/src/qpid/ha/types.h
index 9a7e97c66d..3af095d470 100644
--- a/cpp/src/qpid/ha/types.h
+++ b/cpp/src/qpid/ha/types.h
@@ -105,10 +105,17 @@ inline bool isPrimary(BrokerStatus s) {
inline bool isBackup(BrokerStatus s) { return !isPrimary(s); }
-// String constants.
+// String constants, defined as char* to avoid initialization order problems.
extern const std::string QPID_REPLICATE;
extern const std::string QPID_HA_UUID;
+// Strings used as prefixes, defined as char* to avoid link order problems.
+extern const char* QPID_HA_PREFIX;
+extern const char* QUEUE_REPLICATOR_PREFIX;
+extern const char* TRANSACTION_REPLICATOR_PREFIX;
+
+bool startsWith(const std::string& name, const std::string& prefix);
+
/** Define IdSet type, not a typedef so we can overload operator << */
class IdSet : public std::set<types::Uuid> {};