summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid')
-rw-r--r--qpid/cpp/src/qpid/Url.cpp1
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp60
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.h19
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp11
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h8
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp38
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h5
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionObserver.h59
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionObservers.h79
-rw-r--r--qpid/cpp/src/qpid/broker/Consumer.h22
-rw-r--r--qpid/cpp/src/qpid/broker/ConsumerFactory.h70
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.cpp51
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.h17
-rw-r--r--qpid/cpp/src/qpid/broker/FifoDistributor.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/LegacyLVQ.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/LegacyLVQ.h4
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp185
-rw-r--r--qpid/cpp/src/qpid/broker/Link.h221
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.cpp126
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.h58
-rw-r--r--qpid/cpp/src/qpid/broker/MessageDeque.cpp161
-rw-r--r--qpid/cpp/src/qpid/broker/MessageDeque.h21
-rw-r--r--qpid/cpp/src/qpid/broker/MessageGroupManager.cpp13
-rw-r--r--qpid/cpp/src/qpid/broker/MessageMap.cpp36
-rw-r--r--qpid/cpp/src/qpid/broker/MessageMap.h12
-rw-r--r--qpid/cpp/src/qpid/broker/Messages.h47
-rw-r--r--qpid/cpp/src/qpid/broker/PriorityQueue.cpp33
-rw-r--r--qpid/cpp/src/qpid/broker/PriorityQueue.h13
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp145
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h10
-rw-r--r--qpid/cpp/src/qpid/broker/QueuedMessage.h1
-rw-r--r--qpid/cpp/src/qpid/broker/RetryList.h3
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp38
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h19
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.h1
-rw-r--r--qpid/cpp/src/qpid/client/TCPConnector.cpp8
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp42
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp51
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h6
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp5
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp5
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp90
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.h67
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp497
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h85
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp41
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionExcluder.h54
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp137
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h74
-rw-r--r--qpid/cpp/src/qpid/ha/HaPlugin.cpp67
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp174
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h86
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp292
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h132
-rw-r--r--qpid/cpp/src/qpid/ha/Settings.h45
-rw-r--r--qpid/cpp/src/qpid/ha/management-schema.xml38
-rw-r--r--qpid/cpp/src/qpid/log/Logger.cpp2
-rw-r--r--qpid/cpp/src/qpid/log/Options.cpp2
-rw-r--r--qpid/cpp/src/qpid/sys/posix/Time.cpp4
-rw-r--r--qpid/cpp/src/qpid/types/Variant.cpp42
64 files changed, 2946 insertions, 713 deletions
diff --git a/qpid/cpp/src/qpid/Url.cpp b/qpid/cpp/src/qpid/Url.cpp
index f699b60c17..2061499ec3 100644
--- a/qpid/cpp/src/qpid/Url.cpp
+++ b/qpid/cpp/src/qpid/Url.cpp
@@ -255,6 +255,7 @@ void Url::parse(const char* url) {
}
void Url::parseNoThrow(const char* url) {
+ clear();
cache.clear();
if (!UrlParser(*this, url).parse())
clear();
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index 7ac1edd1db..9a1f4be468 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,16 +24,27 @@
#include "qpid/broker/Connection.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/LinkRegistry.h"
+#include "qpid/ha/BrokerReplicator.h"
#include "qpid/broker/SessionState.h"
#include "qpid/management/ManagementAgent.h"
+#include "qpid/types/Variant.h"
+#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/framing/Uuid.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
#include <iostream>
using qpid::framing::FieldTable;
using qpid::framing::Uuid;
using qpid::framing::Buffer;
+using qpid::framing::AMQFrame;
+using qpid::framing::AMQContentBody;
+using qpid::framing::AMQHeaderBody;
+using qpid::framing::MessageProperties;
+using qpid::framing::MessageTransferBody;
+using qpid::types::Variant;
using qpid::management::ManagementAgent;
using std::string;
namespace _qmf = qmf::org::apache::qpid::broker;
@@ -47,9 +58,11 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame)
}
Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
- const _qmf::ArgsLinkBridge& _args) :
+ const _qmf::ArgsLinkBridge& _args,
+ InitializeCallback init) :
link(_link), id(_id), args(_args), mgmtObject(0),
- listener(l), name(Uuid(true).str()), queueName("qpid.bridge_queue_"), persistenceId(0)
+ listener(l), name(Uuid(true).str()), queueName("qpid.bridge_queue_"), persistenceId(0),
+ initialize(init)
{
std::stringstream title;
title << id << "_" << name;
@@ -62,12 +75,12 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync);
agent->addObject(mgmtObject);
}
- QPID_LOG(debug, "Bridge created from " << args.i_src << " to " << args.i_dest);
+ QPID_LOG(debug, "Bridge " << name << " created from " << args.i_src << " to " << args.i_dest);
}
-Bridge::~Bridge()
+Bridge::~Bridge()
{
- mgmtObject->resourceDestroy();
+ mgmtObject->resourceDestroy();
}
void Bridge::create(Connection& c)
@@ -86,7 +99,7 @@ void Bridge::create(Connection& c)
session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
-
+
session->attach(name, false);
session->commandPoint(0,0);
} else {
@@ -96,11 +109,12 @@ void Bridge::create(Connection& c)
}
if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking();
- if (args.i_srcIsQueue) {
+ if (initialize) initialize(*this, sessionHandler);
+ else if (args.i_srcIsQueue) {
peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options);
peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
- QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest);
+ QPID_LOG(debug, "Activated bridge " << name << " for route from queue " << args.i_src << " to " << args.i_dest);
} else {
FieldTable queueSettings;
@@ -134,9 +148,9 @@ void Bridge::create(Connection& c)
if (exchange.get() == 0)
throw Exception("Exchange not found for dynamic route");
exchange->registerDynamicBridge(this);
- QPID_LOG(debug, "Activated dynamic route for exchange " << args.i_src);
+ QPID_LOG(debug, "Activated bridge " << name << " for dynamic route for exchange " << args.i_src);
} else {
- QPID_LOG(debug, "Activated static route from exchange " << args.i_src << " to " << args.i_dest);
+ QPID_LOG(debug, "Activated bridge " << name << " for static route from exchange " << args.i_src << " to " << args.i_dest);
}
}
if (args.i_srcIsLocal) sessionHandler.getSession()->enableReceiverTracking();
@@ -148,15 +162,16 @@ void Bridge::cancel(Connection&)
peer->getMessage().cancel(args.i_dest);
peer->getSession().detach(name);
}
+ QPID_LOG(debug, "Cancelled bridge " << name);
}
void Bridge::closed()
{
if (args.i_dynamic) {
- Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src);
- if (exchange.get() != 0)
- exchange->removeDynamicBridge(this);
+ Exchange::shared_ptr exchange = link->getBroker()->getExchanges().find(args.i_src);
+ if (exchange.get()) exchange->removeDynamicBridge(this);
}
+ QPID_LOG(debug, "Closed bridge " << name);
}
void Bridge::destroy()
@@ -175,11 +190,6 @@ void Bridge::setPersistenceId(uint64_t pId) const
persistenceId = pId;
}
-const string& Bridge::getName() const
-{
- return name;
-}
-
Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
{
string host;
@@ -207,7 +217,7 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
is_queue, is_local, id, excludes, dynamic, sync).first;
}
-void Bridge::encode(Buffer& buffer) const
+void Bridge::encode(Buffer& buffer) const
{
buffer.putShortString(string("bridge"));
buffer.putShortString(link->getHost());
@@ -224,8 +234,8 @@ void Bridge::encode(Buffer& buffer) const
buffer.putShort(args.i_sync);
}
-uint32_t Bridge::encodedSize() const
-{
+uint32_t Bridge::encodedSize() const
+{
return link->getHost().size() + 1 // short-string (host)
+ 7 // short-string ("bridge")
+ 2 // port
@@ -250,7 +260,7 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId,
management::Args& /*args*/,
string&)
{
- if (methodId == _qmf::Bridge::METHOD_CLOSE) {
+ if (methodId == _qmf::Bridge::METHOD_CLOSE) {
//notify that we are closed
destroy();
return management::Manageable::STATUS_OK;
@@ -297,7 +307,7 @@ void Bridge::sendReorigin()
conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this,
queueName, args.i_src, args.i_key, bindArgs));
}
-bool Bridge::resetProxy()
+bool Bridge::resetProxy()
{
SessionHandler& sessionHandler = conn->getChannel(id);
if (!sessionHandler.getSession()) peer.reset();
diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h
index 8b4559a871..b849b11ba8 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.h
+++ b/qpid/cpp/src/qpid/broker/Bridge.h
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -42,15 +42,19 @@ class Connection;
class ConnectionState;
class Link;
class LinkRegistry;
+class SessionHandler;
class Bridge : public PersistableConfig, public management::Manageable, public Exchange::DynamicBridge
{
public:
typedef boost::shared_ptr<Bridge> shared_ptr;
typedef boost::function<void(Bridge*)> CancellationListener;
+ typedef boost::function<void(Bridge&, SessionHandler&)> InitializeCallback;
Bridge(Link* link, framing::ChannelId id, CancellationListener l,
- const qmf::org::apache::qpid::broker::ArgsLinkBridge& args);
+ const qmf::org::apache::qpid::broker::ArgsLinkBridge& args,
+ InitializeCallback init
+ );
~Bridge();
void create(Connection& c);
@@ -70,8 +74,8 @@ public:
void setPersistenceId(uint64_t id) const;
uint64_t getPersistenceId() const { return persistenceId; }
uint32_t encodedSize() const;
- void encode(framing::Buffer& buffer) const;
- const std::string& getName() const;
+ void encode(framing::Buffer& buffer) const;
+ const std::string& getName() const { return name; }
static Bridge::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
// Exchange::DynamicBridge methods
@@ -81,6 +85,10 @@ public:
bool containsLocalTag(const std::string& tagList) const;
const std::string& getLocalTag() const;
+ // Methods needed by initialization functions
+ std::string getQueueName() const { return queueName; }
+ const qmf::org::apache::qpid::broker::ArgsLinkBridge& getArgs() { return args; }
+
private:
struct PushHandler : framing::FrameHandler {
PushHandler(Connection* c) { conn = c; }
@@ -103,6 +111,7 @@ private:
mutable uint64_t persistenceId;
ConnectionState* connState;
Connection* conn;
+ InitializeCallback initialize;
bool resetProxy();
};
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index ff6da087c3..221c31583b 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -127,7 +127,8 @@ Broker::Options::Options(const std::string& name) :
queueFlowResumeRatio(70),
queueThresholdEventRatio(80),
defaultMsgGroup("qpid.no-group"),
- timestampRcvMsgs(false) // set the 0.10 timestamp delivery property
+ timestampRcvMsgs(false), // set the 0.10 timestamp delivery property
+ linkMaintenanceInterval(2)
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
@@ -149,6 +150,8 @@ Broker::Options::Options(const std::string& name) :
("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Enable broadcast of management information over QMF v2")
("mgmt-qmf1", optValue(qmf1Support,"yes|no"), "Enable broadcast of management information over QMF v1")
+ // FIXME aconway 2012-02-13: consistent treatment of values in SECONDS
+ // allow sub-second intervals.
("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval")
("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"),
"Interval between attempts to purge any expired messages from queues")
@@ -164,7 +167,9 @@ Broker::Options::Options(const std::string& name) :
("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.")
("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised")
("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.")
- ("enable-timestamp", optValue(timestampRcvMsgs, "yes|no"), "Add current time to each received message.");
+ ("enable-timestamp", optValue(timestampRcvMsgs, "yes|no"), "Add current time to each received message.")
+ ("link-maintenace-interval", optValue(linkMaintenanceInterval, "SECONDS"))
+ ;
}
const std::string empty;
@@ -904,7 +909,7 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
//event instead?
managementAgent->raiseEvent(
_qmf::EventQueueDeclare(connectionId, userId, name,
- durable, owner, autodelete,
+ durable, owner, autodelete, alternateExchange,
ManagementAgent::toMap(arguments),
"created"));
}
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 840d47ac38..b6eab894f3 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -37,6 +37,8 @@
#include "qpid/broker/Vhost.h"
#include "qpid/broker/System.h"
#include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/broker/ConsumerFactory.h"
+#include "qpid/broker/ConnectionObservers.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementAgent.h"
#include "qmf/org/apache/qpid/broker/Broker.h"
@@ -122,6 +124,7 @@ public:
uint16_t queueThresholdEventRatio;
std::string defaultMsgGroup;
bool timestampRcvMsgs;
+ double linkMaintenanceInterval; // FIXME aconway 2012-02-13: consistent parsing of SECONDS values.
private:
std::string getHome();
@@ -177,6 +180,7 @@ public:
std::auto_ptr<MessageStore> store;
AclModule* acl;
DataDir dataDir;
+ ConnectionObservers connectionObservers;
QueueRegistry queues;
ExchangeRegistry exchanges;
@@ -198,6 +202,7 @@ public:
bool inCluster, clusterUpdatee;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
ConnectionCounter connectionCounter;
+ ConsumerFactories consumerFactories;
public:
virtual ~Broker();
@@ -356,6 +361,9 @@ public:
const std::string& key,
const std::string& userId,
const std::string& connectionId);
+
+ ConsumerFactories& getConsumerFactories() { return consumerFactories; }
+ ConnectionObservers& getConnectionObservers() { return connectionObservers; }
};
}}
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index 14e9abc0d1..1e6aab217c 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -19,6 +19,7 @@
*
*/
#include "qpid/broker/Connection.h"
+#include "qpid/broker/ConnectionObserver.h"
#include "qpid/broker/SessionOutputException.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/Bridge.h"
@@ -103,8 +104,7 @@ Connection::Connection(ConnectionOutputHandler* out_,
outboundTracker(*this)
{
outboundTracker.wrap(out);
- if (link)
- links.notifyConnection(mgmtId, this);
+ broker.getConnectionObservers().connection(*this);
// In a cluster, allow adding the management object to be delayed.
if (!delayManagement) addManagementObject();
if (!isShadow()) broker.getConnectionCounter().inc_connectionCount();
@@ -130,7 +130,7 @@ void Connection::requestIOProcessing(boost::function0<void> callback)
{
ScopedLock<Mutex> l(ioCallbackLock);
ioCallbacks.push(callback);
- out.activateOutput();
+ if (isOpen()) out.activateOutput();
}
Connection::~Connection()
@@ -142,8 +142,7 @@ Connection::~Connection()
if (!link && isClusterSafe())
agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, ConnectionState::getUserId()));
}
- if (link)
- links.notifyClosed(mgmtId);
+ broker.getConnectionObservers().closed(*this);
if (heartbeatTimer)
heartbeatTimer->cancel();
@@ -156,11 +155,16 @@ Connection::~Connection()
void Connection::received(framing::AMQFrame& frame) {
// Received frame on connection so delay timeout
restartTimeout();
+ bool wasOpen = isOpen();
adapter.handle(frame);
if (link) //i.e. we are acting as the client to another broker
recordFromServer(frame);
else
recordFromClient(frame);
+ if (!wasOpen && isOpen()) {
+ doIoCallbacks(); // Do any callbacks registered before we opened.
+ broker.getConnectionObservers().opened(*this);
+ }
}
void Connection::sent(const framing::AMQFrame& frame)
@@ -260,8 +264,7 @@ string Connection::getAuthCredentials()
void Connection::notifyConnectionForced(const string& text)
{
- if (link)
- links.notifyConnectionForced(mgmtId, text);
+ broker.getConnectionObservers().forced(*this, text);
}
void Connection::setUserId(const string& userId)
@@ -329,17 +332,16 @@ void Connection::closed(){ // Physically closed, suspend open sessions.
}
void Connection::doIoCallbacks() {
- {
- ScopedLock<Mutex> l(ioCallbackLock);
- // Although IO callbacks execute in the connection thread context, they are
- // not cluster safe because they are queued for execution in non-IO threads.
- ClusterUnsafeScope cus;
- while (!ioCallbacks.empty()) {
- boost::function0<void> cb = ioCallbacks.front();
- ioCallbacks.pop();
- ScopedUnlock<Mutex> ul(ioCallbackLock);
- cb(); // Lend the IO thread for management processing
- }
+ if (!isOpen()) return; // Don't process IO callbacks until we are open.
+ ScopedLock<Mutex> l(ioCallbackLock);
+ // Although IO callbacks execute in the connection thread context, they are
+ // not cluster safe because they are queued for execution in non-IO threads.
+ ClusterUnsafeScope cus;
+ while (!ioCallbacks.empty()) {
+ boost::function0<void> cb = ioCallbacks.front();
+ ioCallbacks.pop();
+ ScopedUnlock<Mutex> ul(ioCallbackLock);
+ cb(); // Lend the IO thread for management processing
}
}
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index 6186c06a3c..855172bc43 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -165,6 +165,9 @@ class Connection : public sys::ConnectionInputHandler,
// Used by cluster during catch-up, see cluster::OutputInterceptor
void doIoCallbacks();
+ void setClientProperties(const framing::FieldTable& cp) { clientProperties = cp; }
+ const framing::FieldTable& getClientProperties() const { return clientProperties; }
+
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
typedef std::vector<boost::shared_ptr<Queue> >::iterator queue_iterator;
@@ -186,6 +189,8 @@ class Connection : public sys::ConnectionInputHandler,
ErrorListener* errorListener;
uint64_t objectId;
bool shadow;
+ framing::FieldTable clientProperties;
+
/**
* Chained ConnectionOutputHandler that allows outgoing frames to be
* tracked (for updating mgmt stats).
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
index 6048a46f79..f1d43c5cdb 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -158,6 +158,8 @@ void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body)
throw;
}
const framing::FieldTable& clientProperties = body.getClientProperties();
+ connection.setClientProperties(clientProperties);
+
connection.setFederationLink(clientProperties.get(QPID_FED_LINK));
if (clientProperties.isSet(QPID_FED_TAG)) {
connection.setFederationPeerTag(clientProperties.getAsString(QPID_FED_TAG));
diff --git a/qpid/cpp/src/qpid/broker/ConnectionObserver.h b/qpid/cpp/src/qpid/broker/ConnectionObserver.h
new file mode 100644
index 0000000000..eea2981185
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/ConnectionObserver.h
@@ -0,0 +1,59 @@
+#ifndef QPID_BROKER_CONNECTIONOBSERVER_H
+#define QPID_BROKER_CONNECTIONOBSERVER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+
+namespace qpid {
+namespace broker {
+
+class Connection;
+
+/**
+ * Observer that is informed of connection events. For use by
+ * plug-ins that want to be notified of, or influence, connection
+ * events.
+ */
+class ConnectionObserver
+{
+ public:
+ virtual ~ConnectionObserver() {}
+
+ /** Called when a connection is first established. */
+ virtual void connection(Connection&) {}
+
+ /** Called when the opening negotiation is done and the connection is authenticated.
+ * @exception Throwing an exception will abort the connection.
+ */
+ virtual void opened(Connection&) {}
+
+ /** Called when a connection is closed. */
+ virtual void closed(Connection&) {}
+
+ /** Called when a connection is forced closed. */
+ virtual void forced(Connection&, const std::string& /*message*/) {}
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_CONNECTIONOBSERVER_H*/
diff --git a/qpid/cpp/src/qpid/broker/ConnectionObservers.h b/qpid/cpp/src/qpid/broker/ConnectionObservers.h
new file mode 100644
index 0000000000..07e515f3c9
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/ConnectionObservers.h
@@ -0,0 +1,79 @@
+#ifndef QPID_BROKER_CONNECTIONOBSERVERS_H
+#define QPID_BROKER_CONNECTIONOBSERVERS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ConnectionObserver.h"
+#include "qpid/sys/Mutex.h"
+#include <set>
+#include <algorithm>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * A collection of connection observers.
+ * Calling a ConnectionObserver function will call that function on each observer.
+ * THREAD SAFE.
+ */
+class ConnectionObservers : public ConnectionObserver {
+ public:
+ void add(boost::shared_ptr<ConnectionObserver> observer) {
+ sys::Mutex::ScopedLock l(lock);
+ observers.insert(observer);
+ }
+
+ void remove(boost::shared_ptr<ConnectionObserver> observer) {
+ sys::Mutex::ScopedLock l(lock);
+ observers.erase(observer);
+ }
+
+ void connection(Connection& c) {
+ each(boost::bind(&ConnectionObserver::connection, _1, boost::ref(c)));
+ }
+
+ void opened(Connection& c) {
+ each(boost::bind(&ConnectionObserver::opened, _1, boost::ref(c)));
+ }
+
+ void closed(Connection& c) {
+ each(boost::bind(&ConnectionObserver::closed, _1, boost::ref(c)));
+ }
+
+ void forced(Connection& c, const std::string& text) {
+ each(boost::bind(&ConnectionObserver::forced, _1, boost::ref(c), text));
+ }
+
+ private:
+ typedef std::set<boost::shared_ptr<ConnectionObserver> > Observers;
+ sys::Mutex lock;
+ Observers observers;
+
+ template <class F> void each(F f) {
+ sys::Mutex::ScopedLock l(lock);
+ std::for_each(observers.begin(), observers.end(), f);
+ }
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_CONNECTIONOBSERVERS_H*/
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h
index 647f082e44..b3d6f23732 100644
--- a/qpid/cpp/src/qpid/broker/Consumer.h
+++ b/qpid/cpp/src/qpid/broker/Consumer.h
@@ -31,10 +31,15 @@ namespace broker {
class Queue;
class QueueListeners;
-class Consumer {
+/**
+ * Base class for consumers which represent a subscription to a queue.
+ */
+class Consumer
+{
const bool acquires;
- // inListeners allows QueueListeners to efficiently track if this instance is registered
- // for notifications without having to search its containers
+ // inListeners allows QueueListeners to efficiently track if this
+ // instance is registered for notifications without having to
+ // search its containers
bool inListeners;
// the name is generated by broker and is unique within broker scope. It is not
// provided or known by the remote Consumer.
@@ -59,6 +64,17 @@ class Consumer {
virtual OwnershipToken* getSession() = 0;
virtual void cancel() = 0;
+ /** Called when the peer has acknowledged receipt of the message.
+ * Not to be confused with accept() above, which is asking if
+ * this consumer will consume/browse the message.
+ */
+ virtual void acknowledged(const QueuedMessage&) = 0;
+
+ /** Called if queue has been deleted, if true suppress the error message.
+ * Used by HA ReplicatingSubscriptions where such errors are normal.
+ */
+ virtual bool hideDeletedError() { return false; }
+
protected:
framing::SequenceNumber position;
diff --git a/qpid/cpp/src/qpid/broker/ConsumerFactory.h b/qpid/cpp/src/qpid/broker/ConsumerFactory.h
new file mode 100644
index 0000000000..abd39fb3f8
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/ConsumerFactory.h
@@ -0,0 +1,70 @@
+#ifndef QPID_BROKER_CONSUMERFACTORY_H
+#define QPID_BROKER_CONSUMERFACTORY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// TODO aconway 2011-11-25: it's ugly exposing SemanticState::ConsumerImpl in public.
+// Refactor to use a more abstract interface.
+
+#include "qpid/broker/SemanticState.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Base class for consumer factoires. Plugins can register a
+ * ConsumerFactory via Broker:: getConsumerFactories() Each time a
+ * conumer is created, each factory is tried in turn till one returns
+ * non-0.
+ */
+class ConsumerFactory
+{
+ public:
+ virtual ~ConsumerFactory() {}
+
+ virtual boost::shared_ptr<SemanticState::ConsumerImpl> create(
+ SemanticState* parent,
+ const std::string& name, boost::shared_ptr<Queue> queue,
+ bool ack, bool acquire, bool exclusive, const std::string& tag,
+ const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments) = 0;
+};
+
+/** A set of factories held by the broker
+ * THREAD UNSAFE: see notes on member functions.
+ */
+class ConsumerFactories {
+ public:
+ typedef std::vector<boost::shared_ptr<ConsumerFactory> > Factories;
+
+ /** Thread safety: May only be called during plug-in initialization. */
+ void add(const boost::shared_ptr<ConsumerFactory>& cf) { factories.push_back(cf); }
+
+ /** Thread safety: May only be called after plug-in initialization. */
+ const Factories& get() const { return factories; }
+
+ private:
+ Factories factories;
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_CONSUMERFACTORY_H*/
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
index adc145dc84..fdb562b7c5 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,6 +21,7 @@
#include "qpid/broker/DeliveryRecord.h"
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/SemanticState.h"
+#include "qpid/broker/Consumer.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/Queue.h"
#include "qpid/log/Statement.h"
@@ -31,22 +32,25 @@ using namespace qpid;
using namespace qpid::broker;
using std::string;
-DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
- const Queue::shared_ptr& _queue,
+DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
+ const Queue::shared_ptr& _queue,
const std::string& _tag,
+ const boost::shared_ptr<Consumer>& _consumer,
bool _acquired,
- bool accepted,
+ bool accepted,
bool _windowing,
- uint32_t _credit) : msg(_msg),
- queue(_queue),
- tag(_tag),
- acquired(_acquired),
- acceptExpected(!accepted),
- cancelled(false),
- completed(false),
- ended(accepted && acquired),
- windowing(_windowing),
- credit(msg.payload ? msg.payload->getRequiredCredit() : _credit)
+ uint32_t _credit):
+ msg(_msg),
+ queue(_queue),
+ tag(_tag),
+ consumer(_consumer),
+ acquired(_acquired),
+ acceptExpected(!accepted),
+ cancelled(false),
+ completed(false),
+ ended(accepted && acquired),
+ windowing(_windowing),
+ credit(msg.payload ? msg.payload->getRequiredCredit() : _credit)
{}
bool DeliveryRecord::setEnded()
@@ -94,7 +98,7 @@ void DeliveryRecord::requeue() const
}
}
-void DeliveryRecord::release(bool setRedelivered)
+void DeliveryRecord::release(bool setRedelivered)
{
if (acquired && !ended) {
if (setRedelivered) msg.payload->redeliver();
@@ -107,12 +111,13 @@ void DeliveryRecord::release(bool setRedelivered)
}
void DeliveryRecord::complete() {
- completed = true;
+ completed = true;
}
bool DeliveryRecord::accept(TransactionContext* ctxt) {
- if (acquired && !ended) {
- queue->dequeue(ctxt, msg);
+ if (!ended) {
+ consumer->acknowledged(getMessage());
+ if (acquired) queue->dequeue(ctxt, msg);
setEnded();
QPID_LOG(debug, "Accepted " << id);
}
@@ -129,8 +134,8 @@ void DeliveryRecord::committed() const{
queue->dequeueCommitted(msg);
}
-void DeliveryRecord::reject()
-{
+void DeliveryRecord::reject()
+{
if (acquired && !ended) {
Exchange::shared_ptr alternate = queue->getAlternateExchange();
if (alternate) {
@@ -166,7 +171,7 @@ void DeliveryRecord::acquire(DeliveryIds& results) {
}
}
-void DeliveryRecord::cancel(const std::string& cancelledTag)
+void DeliveryRecord::cancel(const std::string& cancelledTag)
{
if (tag == cancelledTag)
cancelled = true;
@@ -185,7 +190,7 @@ AckRange DeliveryRecord::findRange(DeliveryRecords& records, DeliveryId first, D
namespace qpid {
namespace broker {
-std::ostream& operator<<(std::ostream& out, const DeliveryRecord& r)
+std::ostream& operator<<(std::ostream& out, const DeliveryRecord& r)
{
out << "{" << "id=" << r.id.getValue();
out << ", tag=" << r.tag << "}";
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.h b/qpid/cpp/src/qpid/broker/DeliveryRecord.h
index 5a331357be..21074d4274 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -38,6 +38,7 @@ namespace broker {
class TransactionContext;
class SemanticState;
struct AckRange;
+class Consumer;
/**
* Record of a delivery for which an ack is outstanding.
@@ -47,6 +48,7 @@ class DeliveryRecord
QueuedMessage msg;
mutable boost::shared_ptr<Queue> queue;
std::string tag; // name of consumer
+ boost::shared_ptr<Consumer> consumer;
DeliveryId id;
bool acquired : 1;
bool acceptExpected : 1;
@@ -68,14 +70,15 @@ class DeliveryRecord
QPID_BROKER_EXTERN DeliveryRecord(const QueuedMessage& msg,
const boost::shared_ptr<Queue>& queue,
const std::string& tag,
+ const boost::shared_ptr<Consumer>& consumer,
bool acquired,
bool accepted,
bool windowing,
- uint32_t credit=0 // Only used if msg is empty.
+ uint32_t credit=0 // Only used if msg is empty.
);
-
+
bool coveredBy(const framing::SequenceSet* const range) const { return range->contains(id); }
-
+
void dequeue(TransactionContext* ctxt = 0) const;
void requeue() const;
void release(bool setRedelivered);
@@ -95,7 +98,7 @@ class DeliveryRecord
bool isAccepted() const { return !acceptExpected; }
bool isEnded() const { return ended; }
bool isWindowing() const { return windowing; }
-
+
uint32_t getCredit() const;
const std::string& getTag() const { return tag; }
@@ -132,7 +135,7 @@ typedef DeliveryRecord::DeliveryRecords DeliveryRecords;
struct AckRange
{
DeliveryRecords::iterator start;
- DeliveryRecords::iterator end;
+ DeliveryRecords::iterator end;
AckRange(DeliveryRecords::iterator _start, DeliveryRecords::iterator _end) : start(_start), end(_end) {}
};
diff --git a/qpid/cpp/src/qpid/broker/FifoDistributor.cpp b/qpid/cpp/src/qpid/broker/FifoDistributor.cpp
index eb1f0a402e..074c2b9a9d 100644
--- a/qpid/cpp/src/qpid/broker/FifoDistributor.cpp
+++ b/qpid/cpp/src/qpid/broker/FifoDistributor.cpp
@@ -30,11 +30,7 @@ FifoDistributor::FifoDistributor(Messages& container)
bool FifoDistributor::nextConsumableMessage( Consumer::shared_ptr&, QueuedMessage& next )
{
- if (!messages.empty()) {
- next = messages.front(); // by default, consume oldest msg
- return true;
- }
- return false;
+ return messages.consume(next);
}
bool FifoDistributor::allocate(const std::string&, const QueuedMessage& )
@@ -46,9 +42,7 @@ bool FifoDistributor::allocate(const std::string&, const QueuedMessage& )
bool FifoDistributor::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
{
- if (!messages.empty() && messages.next(c->getPosition(), next))
- return true;
- return false;
+ return messages.browse(c->getPosition(), next, false);
}
void FifoDistributor::query(qpid::types::Variant::Map&) const
diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
index 3262e343a3..49c0a32c19 100644
--- a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
+++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
@@ -32,7 +32,7 @@ void LegacyLVQ::setNoBrowse(bool b)
noBrowse = b;
}
-bool LegacyLVQ::remove(const framing::SequenceNumber& position, QueuedMessage& message)
+bool LegacyLVQ::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
{
Ordering::iterator i = messages.find(position);
if (i != messages.end() && i->second.payload == message.payload) {
@@ -44,9 +44,9 @@ bool LegacyLVQ::remove(const framing::SequenceNumber& position, QueuedMessage& m
}
}
-bool LegacyLVQ::next(const framing::SequenceNumber& position, QueuedMessage& message)
+bool LegacyLVQ::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
{
- if (MessageMap::next(position, message)) {
+ if (MessageMap::browse(position, message, unacquired)) {
if (!noBrowse) index.erase(getKey(message));
return true;
} else {
diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.h b/qpid/cpp/src/qpid/broker/LegacyLVQ.h
index dd0fd7aaec..695e51131d 100644
--- a/qpid/cpp/src/qpid/broker/LegacyLVQ.h
+++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.h
@@ -40,8 +40,8 @@ class LegacyLVQ : public MessageMap
{
public:
LegacyLVQ(const std::string& key, bool noBrowse = false, Broker* broker = 0);
- bool remove(const framing::SequenceNumber&, QueuedMessage&);
- bool next(const framing::SequenceNumber&, QueuedMessage&);
+ bool acquire(const framing::SequenceNumber&, QueuedMessage&);
+ bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
bool push(const QueuedMessage& added, QueuedMessage& removed);
void removeIf(Predicate);
void setNoBrowse(bool);
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index 0bc7d8f47b..e3b2b1f29c 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -23,6 +23,7 @@
#include "qpid/broker/LinkRegistry.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Connection.h"
+#include "qpid/sys/Timer.h"
#include "qmf/org/apache/qpid/broker/EventBrokerLinkUp.h"
#include "qmf/org/apache/qpid/broker/EventBrokerLinkDown.h"
#include "boost/bind.hpp"
@@ -31,29 +32,48 @@
#include "qpid/framing/reply_exceptions.h"
#include "qpid/broker/AclModule.h"
-using namespace qpid::broker;
-using qpid::framing::Buffer;
-using qpid::framing::FieldTable;
-using qpid::framing::UnauthorizedAccessException;
-using qpid::framing::connection::CLOSE_CODE_CONNECTION_FORCED;
-using qpid::management::ManagementAgent;
-using qpid::management::ManagementObject;
-using qpid::management::Manageable;
-using qpid::management::Args;
-using qpid::sys::Mutex;
+namespace qpid {
+namespace broker {
+
+using framing::Buffer;
+using framing::FieldTable;
+using framing::UnauthorizedAccessException;
+using framing::connection::CLOSE_CODE_CONNECTION_FORCED;
+using management::ManagementAgent;
+using management::ManagementObject;
+using management::Manageable;
+using management::Args;
+using sys::Mutex;
using std::stringstream;
using std::string;
-namespace _qmf = qmf::org::apache::qpid::broker;
+namespace _qmf = ::qmf::org::apache::qpid::broker;
+
+struct LinkTimerTask : public sys::TimerTask {
+ LinkTimerTask(Link& l, sys::Timer& t)
+ : TimerTask(int64_t(l.getBroker()->getOptions().linkMaintenanceInterval*
+ sys::TIME_SEC),
+ "Link retry timer"),
+ link(l), timer(t) {}
+
+ void fire() {
+ link.maintenanceVisit();
+ setupNextFire();
+ timer.add(this);
+ }
+
+ Link& link;
+ sys::Timer& timer;
+};
Link::Link(LinkRegistry* _links,
MessageStore* _store,
- string& _host,
+ const string& _host,
uint16_t _port,
- string& _transport,
+ const string& _transport,
bool _durable,
- string& _authMechanism,
- string& _username,
- string& _password,
+ const string& _authMechanism,
+ const string& _username,
+ const string& _password,
Broker* _broker,
Manageable* parent)
: links(_links), store(_store), host(_host), port(_port),
@@ -64,10 +84,11 @@ Link::Link(LinkRegistry* _links,
visitCount(0),
currentInterval(1),
closing(false),
- updateUrls(false),
+ reconnectNext(0), // Index of next address for reconnecting in url.
channelCounter(1),
connection(0),
- agent(0)
+ agent(0),
+ timerTask(new LinkTimerTask(*this, broker->getTimer()))
{
if (parent != 0 && broker != 0)
{
@@ -79,13 +100,14 @@ Link::Link(LinkRegistry* _links,
}
}
setStateLH(STATE_WAITING);
+ startConnectionLH();
+ broker->getTimer().add(timerTask);
}
Link::~Link ()
{
- if (state == STATE_OPERATIONAL && connection != 0)
- connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
-
+ assert(state == STATE_CLOSED); // Can only get here after destroy()
+ assert(connection == 0);
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
}
@@ -113,6 +135,7 @@ void Link::setStateLH (int newState)
void Link::startConnectionLH ()
{
+ assert(state == STATE_WAITING);
try {
// Set the state before calling connect. It is possible that connect
// will fail synchronously and call Link::closed before returning.
@@ -120,14 +143,16 @@ void Link::startConnectionLH ()
broker->connect (host, boost::lexical_cast<std::string>(port), transport,
boost::bind (&Link::closed, this, _1, _2));
QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port);
- } catch(std::exception& e) {
+ } catch(const std::exception& e) {
+ QPID_LOG(error, "Link connection to " << host << ":" << port << " failed: "
+ << e.what());
setStateLH(STATE_WAITING);
if (!hideManagement())
mgmtObject->set_lastError (e.what());
}
}
-void Link::established ()
+void Link::established(Connection* c)
{
stringstream addr;
addr << host << ":" << port;
@@ -136,17 +161,41 @@ void Link::established ()
if (!hideManagement() && agent)
agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
- {
- Mutex::ScopedLock mutex(lock);
- setStateLH(STATE_OPERATIONAL);
- currentInterval = 1;
- visitCount = 0;
- if (closing)
- destroy();
+ Mutex::ScopedLock mutex(lock);
+ assert(state == STATE_CONNECTING);
+ setStateLH(STATE_OPERATIONAL);
+ currentInterval = 1;
+ visitCount = 0;
+ connection = c;
+ if (closing)
+ destroy();
+ else // Process any IO tasks bridges added before established.
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+}
+
+
+void Link::setUrl(const Url& u) {
+ Mutex::ScopedLock mutex(lock);
+ url = u;
+ reconnectNext = 0;
+}
+
+void Link::opened() {
+ Mutex::ScopedLock mutex(lock);
+ assert(connection);
+ // Get default URL from known-hosts if not already set
+ if (url.empty()) {
+ const std::vector<Url>& known = connection->getKnownHosts();
+ // Flatten vector of URLs into a single URL listing all addresses.
+ url.clear();
+ for(size_t i = 0; i < known.size(); ++i)
+ url.insert(url.end(), known[i].begin(), known[i].end());
+ reconnectNext = 0;
+ QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url);
}
}
-void Link::closed (int, std::string text)
+void Link::closed(int, std::string text)
{
Mutex::ScopedLock mutex(lock);
QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text);
@@ -156,7 +205,7 @@ void Link::closed (int, std::string text)
if (state == STATE_OPERATIONAL) {
stringstream addr;
addr << host << ":" << port;
- QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str());
+ QPID_LOG(warning, "Inter-broker link disconnected from " << addr.str());
if (!hideManagement() && agent)
agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
}
@@ -178,6 +227,7 @@ void Link::closed (int, std::string text)
destroy();
}
+// Called in connection IO thread.
void Link::destroy ()
{
Bridges toDelete;
@@ -187,7 +237,7 @@ void Link::destroy ()
QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management");
if (connection)
connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
-
+ connection = 0;
setStateLH(STATE_CLOSED);
// Move the bridges to be deleted into a local vector so there is no
@@ -201,6 +251,8 @@ void Link::destroy ()
for (Bridges::iterator i = created.begin(); i != created.end(); i++)
toDelete.push_back(*i);
created.clear();
+
+ timerTask->cancel();
}
// Now delete all bridges on this link (don't hold the lock for this).
for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
@@ -213,10 +265,14 @@ void Link::add(Bridge::shared_ptr bridge)
{
Mutex::ScopedLock mutex(lock);
created.push_back (bridge);
+ if (connection)
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+
}
void Link::cancel(Bridge::shared_ptr bridge)
{
+ bool needIOProcessing = false;
{
Mutex::ScopedLock mutex(lock);
@@ -234,10 +290,10 @@ void Link::cancel(Bridge::shared_ptr bridge)
break;
}
}
+ needIOProcessing = !cancellations.empty();
}
- if (!cancellations.empty()) {
+ if (needIOProcessing && connection)
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
- }
}
void Link::ioThreadProcessing()
@@ -246,7 +302,6 @@ void Link::ioThreadProcessing()
if (state != STATE_OPERATIONAL)
return;
- QPID_LOG(debug, "Link::ioThreadProcessing()");
// check for bridge session errors and recover
if (!active.empty()) {
@@ -279,23 +334,10 @@ void Link::ioThreadProcessing()
}
}
-void Link::setConnection(Connection* c)
-{
- Mutex::ScopedLock mutex(lock);
- connection = c;
- updateUrls = true;
-}
-
void Link::maintenanceVisit ()
{
Mutex::ScopedLock mutex(lock);
- if (connection && updateUrls) {
- urls.reset(connection->getKnownHosts());
- QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls);
- updateUrls = false;
- }
-
if (state == STATE_WAITING)
{
visitCount++;
@@ -303,7 +345,7 @@ void Link::maintenanceVisit ()
{
visitCount = 0;
//switch host and port to next in url list if possible
- if (!tryFailover()) {
+ if (!tryFailoverLH()) {
currentInterval *= 2;
if (currentInterval > MAX_INTERVAL)
currentInterval = MAX_INTERVAL;
@@ -313,11 +355,10 @@ void Link::maintenanceVisit ()
}
else if (state == STATE_OPERATIONAL && (!active.empty() || !created.empty() || !cancellations.empty()) && connection != 0)
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
-}
+ }
-void Link::reconnect(const qpid::Address& a)
+void Link::reconnectLH(const Address& a)
{
- Mutex::ScopedLock mutex(lock);
host = a.host;
port = a.port;
transport = a.protocol;
@@ -329,17 +370,17 @@ void Link::reconnect(const qpid::Address& a)
}
}
-bool Link::tryFailover()
-{
- Address next;
- if (urls.next(next) &&
- (next.host != host || next.port != port || next.protocol != transport)) {
+bool Link::tryFailoverLH() {
+ if (reconnectNext >= url.size()) reconnectNext = 0;
+ if (url.empty()) return false;
+ Address next = url[reconnectNext++];
+ if (next.host != host || next.port != port || next.protocol != transport) {
links->changeAddress(Address(transport, host, port), next);
- QPID_LOG(debug, "Link failing over to " << host << ":" << port);
+ QPID_LOG(debug, "Inter-broker link failing over to " << next.host << ":" << next.port);
+ reconnectLH(next);
return true;
- } else {
- return false;
}
+ return false;
}
// Management updates for a linke are inconsistent in a cluster, so they are
@@ -423,18 +464,24 @@ ManagementObject* Link::GetManagementObject (void) const
return (ManagementObject*) mgmtObject;
}
+void Link::close() {
+ Mutex::ScopedLock mutex(lock);
+ if (!closing) {
+ closing = true;
+ if (state != STATE_CONNECTING && connection) {
+ //connection can only be closed on the connections own IO processing thread
+ connection->requestIOProcessing(boost::bind(&Link::destroy, this));
+ }
+ }
+}
+
+
Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& text)
{
switch (op)
{
case _qmf::Link::METHOD_CLOSE :
- if (!closing) {
- closing = true;
- if (state != STATE_CONNECTING && connection) {
- //connection can only be closed on the connections own IO processing thread
- connection->requestIOProcessing(boost::bind(&Link::destroy, this));
- }
- }
+ close();
return Manageable::STATUS_OK;
case _qmf::Link::METHOD_BRIDGE :
@@ -487,3 +534,5 @@ void Link::setPassive(bool passive)
}
}
}
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h
index 4badd8b3a1..4085c3bfcf 100644
--- a/qpid/cpp/src/qpid/broker/Link.h
+++ b/qpid/cpp/src/qpid/broker/Link.h
@@ -23,10 +23,10 @@
*/
#include <boost/shared_ptr.hpp>
+#include "qpid/Url.h"
#include "qpid/broker/MessageStore.h"
#include "qpid/broker/PersistableConfig.h"
#include "qpid/broker/Bridge.h"
-#include "qpid/broker/RetryList.h"
#include "qpid/sys/Mutex.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/management/Manageable.h"
@@ -35,110 +35,121 @@
#include <boost/ptr_container/ptr_vector.hpp>
namespace qpid {
- namespace broker {
-
- class LinkRegistry;
- class Broker;
- class Connection;
-
- class Link : public PersistableConfig, public management::Manageable {
- private:
- sys::Mutex lock;
- LinkRegistry* links;
- MessageStore* store;
- std::string host;
- uint16_t port;
- std::string transport;
- bool durable;
- std::string authMechanism;
- std::string username;
- std::string password;
- mutable uint64_t persistenceId;
- qmf::org::apache::qpid::broker::Link* mgmtObject;
- Broker* broker;
- int state;
- uint32_t visitCount;
- uint32_t currentInterval;
- bool closing;
- RetryList urls;
- bool updateUrls;
-
- typedef std::vector<Bridge::shared_ptr> Bridges;
- Bridges created; // Bridges pending creation
- Bridges active; // Bridges active
- Bridges cancellations; // Bridges pending cancellation
- uint channelCounter;
- Connection* connection;
- management::ManagementAgent* agent;
-
- static const int STATE_WAITING = 1;
- static const int STATE_CONNECTING = 2;
- static const int STATE_OPERATIONAL = 3;
- static const int STATE_FAILED = 4;
- static const int STATE_CLOSED = 5;
- static const int STATE_PASSIVE = 6;
-
- static const uint32_t MAX_INTERVAL = 32;
-
- void setStateLH (int newState);
- void startConnectionLH(); // Start the IO Connection
- void destroy(); // Called when mgmt deletes this link
- void ioThreadProcessing(); // Called on connection's IO thread by request
- bool tryFailover(); // Called during maintenance visit
- bool hideManagement() const;
-
- public:
- typedef boost::shared_ptr<Link> shared_ptr;
-
- Link(LinkRegistry* links,
- MessageStore* store,
- std::string& host,
- uint16_t port,
- std::string& transport,
- bool durable,
- std::string& authMechanism,
- std::string& username,
- std::string& password,
- Broker* broker,
- management::Manageable* parent = 0);
- virtual ~Link();
-
- std::string getHost() { return host; }
- uint16_t getPort() { return port; }
- bool isDurable() { return durable; }
- void maintenanceVisit ();
- uint nextChannel();
- void add(Bridge::shared_ptr);
- void cancel(Bridge::shared_ptr);
-
- void established(); // Called when connection is created
- void closed(int, std::string); // Called when connection goes away
- void setConnection(Connection*); // Set pointer to the AMQP Connection
- void reconnect(const Address&); //called by LinkRegistry
-
- std::string getAuthMechanism() { return authMechanism; }
- std::string getUsername() { return username; }
- std::string getPassword() { return password; }
- Broker* getBroker() { return broker; }
-
- void notifyConnectionForced(const std::string text);
- void setPassive(bool p);
-
- // PersistableConfig:
- void setPersistenceId(uint64_t id) const;
- uint64_t getPersistenceId() const { return persistenceId; }
- uint32_t encodedSize() const;
- void encode(framing::Buffer& buffer) const;
- const std::string& getName() const;
-
- static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
-
- // Manageable entry points
- management::ManagementObject* GetManagementObject(void) const;
- management::Manageable::status_t ManagementMethod(uint32_t, management::Args&, std::string&);
-
- };
- }
+
+namespace sys {
+class TimerTask;
+}
+
+namespace broker {
+
+class LinkRegistry;
+class Broker;
+class Connection;
+
+class Link : public PersistableConfig, public management::Manageable {
+ private:
+ sys::Mutex lock;
+ LinkRegistry* links;
+ MessageStore* store;
+ std::string host;
+ uint16_t port;
+ std::string transport;
+ bool durable;
+ std::string authMechanism;
+ std::string username;
+ std::string password;
+ mutable uint64_t persistenceId;
+ qmf::org::apache::qpid::broker::Link* mgmtObject;
+ Broker* broker;
+ int state;
+ uint32_t visitCount;
+ uint32_t currentInterval;
+ bool closing;
+ Url url; // URL can contain many addresses.
+ size_t reconnectNext; // Index for next re-connect attempt
+
+ typedef std::vector<Bridge::shared_ptr> Bridges;
+ Bridges created; // Bridges pending creation
+ Bridges active; // Bridges active
+ Bridges cancellations; // Bridges pending cancellation
+ uint channelCounter;
+ Connection* connection;
+ management::ManagementAgent* agent;
+
+ boost::intrusive_ptr<sys::TimerTask> timerTask;
+
+ static const int STATE_WAITING = 1;
+ static const int STATE_CONNECTING = 2;
+ static const int STATE_OPERATIONAL = 3;
+ static const int STATE_FAILED = 4;
+ static const int STATE_CLOSED = 5;
+ static const int STATE_PASSIVE = 6;
+
+ static const uint32_t MAX_INTERVAL = 32;
+
+ void setStateLH (int newState);
+ void startConnectionLH(); // Start the IO Connection
+ void destroy(); // Called when mgmt deletes this link
+ void ioThreadProcessing(); // Called on connection's IO thread by request
+ bool tryFailoverLH(); // Called during maintenance visit
+ bool hideManagement() const;
+
+ public:
+ typedef boost::shared_ptr<Link> shared_ptr;
+
+ Link(LinkRegistry* links,
+ MessageStore* store,
+ const std::string& host,
+ uint16_t port,
+ const std::string& transport,
+ bool durable,
+ const std::string& authMechanism,
+ const std::string& username,
+ const std::string& password,
+ Broker* broker,
+ management::Manageable* parent = 0);
+ virtual ~Link();
+
+ std::string getHost() { return host; }
+ uint16_t getPort() { return port; }
+ std::string getTransport() { return transport; }
+
+ bool isDurable() { return durable; }
+ void maintenanceVisit ();
+ uint nextChannel();
+ void add(Bridge::shared_ptr);
+ void cancel(Bridge::shared_ptr);
+ void setUrl(const Url&); // Set URL for reconnection.
+
+ void established(Connection*); // Called when connection is create
+ void opened(); // Called when connection is open (after create)
+ void closed(int, std::string); // Called when connection goes away
+ void reconnectLH(const Address&); //called by LinkRegistry
+ void close(); // Close the link from within the broker.
+
+ std::string getAuthMechanism() { return authMechanism; }
+ std::string getUsername() { return username; }
+ std::string getPassword() { return password; }
+ Broker* getBroker() { return broker; }
+
+ void notifyConnectionForced(const std::string text);
+ void setPassive(bool p);
+
+ // PersistableConfig:
+ void setPersistenceId(uint64_t id) const;
+ uint64_t getPersistenceId() const { return persistenceId; }
+ uint32_t encodedSize() const;
+ void encode(framing::Buffer& buffer) const;
+ const std::string& getName() const;
+
+ static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
+
+ // Manageable entry points
+ management::ManagementObject* GetManagementObject(void) const;
+ management::Manageable::status_t ManagementMethod(uint32_t, management::Args&, std::string&);
+
+};
+}
}
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
index e9885f5462..a4fd90684e 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -35,102 +35,65 @@ using boost::format;
using boost::str;
namespace _qmf = qmf::org::apache::qpid::broker;
-#define LINK_MAINT_INTERVAL 2
-
// TODO: This constructor is only used by the store unit tests -
// That probably indicates that LinkRegistry isn't correctly
-// factored: The persistence element and maintenance element
-// should be factored separately
+// factored: The persistence element should be factored separately
LinkRegistry::LinkRegistry () :
- broker(0), timer(0),
- parent(0), store(0), passive(false), passiveChanged(false),
+ broker(0),
+ parent(0), store(0), passive(false),
realm("")
{
}
-LinkRegistry::LinkRegistry (Broker* _broker) :
- broker(_broker), timer(&broker->getTimer()),
- maintenanceTask(new Periodic(*this)),
- parent(0), store(0), passive(false), passiveChanged(false),
- realm(broker->getOptions().realm)
-{
- timer->add(maintenanceTask);
-}
-
-LinkRegistry::~LinkRegistry()
-{
- // This test is only necessary if the default constructor above is present
- if (maintenanceTask)
- maintenanceTask->cancel();
+namespace {
+struct ConnectionObserverImpl : public ConnectionObserver {
+ LinkRegistry& links;
+ ConnectionObserverImpl(LinkRegistry& l) : links(l) {}
+ void connection(Connection& c) { links.notifyConnection(c.getMgmtId(), &c); }
+ void opened(Connection& c) { links.notifyOpened(c.getMgmtId()); }
+ void closed(Connection& c) { links.notifyClosed(c.getMgmtId()); }
+ void forced(Connection& c, const string& text) { links.notifyConnectionForced(c.getMgmtId(), text); }
+};
}
-LinkRegistry::Periodic::Periodic (LinkRegistry& _links) :
- TimerTask (Duration (LINK_MAINT_INTERVAL * TIME_SEC),"LinkRegistry"), links(_links) {}
-
-void LinkRegistry::Periodic::fire ()
+LinkRegistry::LinkRegistry (Broker* _broker) :
+ broker(_broker),
+ parent(0), store(0), passive(false),
+ realm(broker->getOptions().realm)
{
- links.periodicMaintenance ();
- setupNextFire();
- links.timer->add(this);
+ broker->getConnectionObservers().add(
+ boost::shared_ptr<ConnectionObserver>(new ConnectionObserverImpl(*this)));
}
-void LinkRegistry::periodicMaintenance ()
-{
- Mutex::ScopedLock locker(lock);
+LinkRegistry::~LinkRegistry() {}
- linksToDestroy.clear();
- bridgesToDestroy.clear();
- if (passiveChanged) {
- if (passive) { QPID_LOG(info, "Passivating links"); }
- else { QPID_LOG(info, "Activating links"); }
- for (LinkMap::iterator i = links.begin(); i != links.end(); i++) {
- i->second->setPassive(passive);
- }
- passiveChanged = false;
- }
- for (LinkMap::iterator i = links.begin(); i != links.end(); i++)
- i->second->maintenanceVisit();
- //now process any requests for re-addressing
- for (AddressMap::iterator i = reMappings.begin(); i != reMappings.end(); i++)
- updateAddress(i->first, i->second);
- reMappings.clear();
-}
void LinkRegistry::changeAddress(const qpid::Address& oldAddress, const qpid::Address& newAddress)
{
- //done on periodic maintenance thread; hold changes in separate
- //map to avoid modifying the link map that is iterated over
- reMappings[createKey(oldAddress)] = newAddress;
-}
-
-bool LinkRegistry::updateAddress(const std::string& oldKey, const qpid::Address& newAddress)
-{
+ Mutex::ScopedLock locker(lock);
+ std::string oldKey = createKey(oldAddress);
std::string newKey = createKey(newAddress);
if (links.find(newKey) != links.end()) {
QPID_LOG(error, "Attempted to update key from " << oldKey << " to " << newKey << " which is already in use");
- return false;
} else {
LinkMap::iterator i = links.find(oldKey);
if (i == links.end()) {
QPID_LOG(error, "Attempted to update key from " << oldKey << " which does not exist, to " << newKey);
- return false;
} else {
links[newKey] = i->second;
- i->second->reconnect(newAddress);
links.erase(oldKey);
QPID_LOG(info, "Updated link key from " << oldKey << " to " << newKey);
- return true;
}
}
}
-pair<Link::shared_ptr, bool> LinkRegistry::declare(string& host,
+pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& host,
uint16_t port,
- string& transport,
+ const string& transport,
bool durable,
- string& authMechanism,
- string& username,
- string& password)
+ const string& authMechanism,
+ const string& username,
+ const string& password)
{
Mutex::ScopedLock locker(lock);
@@ -151,18 +114,20 @@ pair<Link::shared_ptr, bool> LinkRegistry::declare(string& host,
return std::pair<Link::shared_ptr, bool>(i->second, false);
}
-pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host,
+pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host,
uint16_t port,
bool durable,
- std::string& src,
- std::string& dest,
- std::string& key,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key,
bool isQueue,
bool isLocal,
- std::string& tag,
- std::string& excludes,
+ const std::string& tag,
+ const std::string& excludes,
bool dynamic,
- uint16_t sync)
+ uint16_t sync,
+ Bridge::InitializeCallback init
+)
{
Mutex::ScopedLock locker(lock);
QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")");
@@ -196,7 +161,8 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host,
bridge = Bridge::shared_ptr
(new Bridge (l->second.get(), l->second->nextChannel(),
boost::bind(&LinkRegistry::destroy, this,
- host, port, src, dest, key), args));
+ host, port, src, dest, key),
+ args, init));
bridges[bridgeKey] = bridge;
l->second->add(bridge);
return std::pair<Bridge::shared_ptr, bool>(bridge, true);
@@ -214,7 +180,6 @@ void LinkRegistry::destroy(const string& host, const uint16_t port)
{
if (i->second->isDurable() && store)
store->destroy(*(i->second));
- linksToDestroy[key] = i->second;
links.erase(i);
}
}
@@ -242,7 +207,6 @@ void LinkRegistry::destroy(const std::string& host,
l->second->cancel(b->second);
if (b->second->isDurable())
store->destroy(*(b->second));
- bridgesToDestroy[bridgeKey] = b->second;
bridges.erase(b);
}
@@ -276,12 +240,17 @@ void LinkRegistry::notifyConnection(const std::string& key, Connection* c)
{
Link::shared_ptr link = findLink(key);
if (link) {
- link->established();
- link->setConnection(c);
+ link->established(c);
c->setUserId(str(format("%1%@%2%") % link->getUsername() % realm));
}
}
+void LinkRegistry::notifyOpened(const std::string& key)
+{
+ Link::shared_ptr link = findLink(key);
+ if (link) link->opened();
+}
+
void LinkRegistry::notifyClosed(const std::string& key)
{
Link::shared_ptr link = findLink(key);
@@ -384,9 +353,12 @@ std::string LinkRegistry::createKey(const std::string& host, uint16_t port) {
void LinkRegistry::setPassive(bool p)
{
Mutex::ScopedLock locker(lock);
- passiveChanged = p != passive;
passive = p;
- //will activate or passivate links on maintenance visit
+ if (passive) { QPID_LOG(info, "Passivating links"); }
+ else { QPID_LOG(info, "Activating links"); }
+ for (LinkMap::iterator i = links.begin(); i != links.end(); i++) {
+ i->second->setPassive(passive);
+ }
}
void LinkRegistry::eachLink(boost::function<void(boost::shared_ptr<Link>)> f) {
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h
index 4c97e4f9d8..ef4871192f 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.h
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,7 +27,6 @@
#include "qpid/broker/MessageStore.h"
#include "qpid/Address.h"
#include "qpid/sys/Mutex.h"
-#include "qpid/sys/Timer.h"
#include "qpid/management/Manageable.h"
#include <boost/shared_ptr.hpp>
#include <boost/intrusive_ptr.hpp>
@@ -40,40 +39,19 @@ namespace broker {
class Broker;
class Connection;
class LinkRegistry {
-
- // Declare a timer task to manage the establishment of link connections and the
- // re-establishment of lost link connections.
- struct Periodic : public sys::TimerTask
- {
- LinkRegistry& links;
-
- Periodic(LinkRegistry& links);
- virtual ~Periodic() {};
- void fire();
- };
-
typedef std::map<std::string, boost::shared_ptr<Link> > LinkMap;
typedef std::map<std::string, Bridge::shared_ptr> BridgeMap;
- typedef std::map<std::string, Address> AddressMap;
LinkMap links;
- LinkMap linksToDestroy;
BridgeMap bridges;
- BridgeMap bridgesToDestroy;
- AddressMap reMappings;
qpid::sys::Mutex lock;
Broker* broker;
- sys::Timer* timer;
- boost::intrusive_ptr<qpid::sys::TimerTask> maintenanceTask;
management::Manageable* parent;
MessageStore* store;
bool passive;
- bool passiveChanged;
std::string realm;
- void periodicMaintenance ();
- bool updateAddress(const std::string& oldKey, const Address& newAddress);
boost::shared_ptr<Link> findLink(const std::string& key);
static std::string createKey(const Address& address);
static std::string createKey(const std::string& host, uint16_t port);
@@ -84,28 +62,32 @@ namespace broker {
~LinkRegistry();
std::pair<boost::shared_ptr<Link>, bool>
- declare(std::string& host,
+ declare(const std::string& host,
uint16_t port,
- std::string& transport,
+ const std::string& transport,
bool durable,
- std::string& authMechanism,
- std::string& username,
- std::string& password);
+ const std::string& authMechanism,
+ const std::string& username,
+ const std::string& password);
+
std::pair<Bridge::shared_ptr, bool>
- declare(std::string& host,
+ declare(const std::string& host,
uint16_t port,
bool durable,
- std::string& src,
- std::string& dest,
- std::string& key,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key,
bool isQueue,
bool isLocal,
- std::string& id,
- std::string& excludes,
+ const std::string& id,
+ const std::string& excludes,
bool dynamic,
- uint16_t sync);
+ uint16_t sync,
+ Bridge::InitializeCallback=0
+ );
void destroy(const std::string& host, const uint16_t port);
+
void destroy(const std::string& host,
const uint16_t port,
const std::string& src,
@@ -128,6 +110,7 @@ namespace broker {
MessageStore* getStore() const;
void notifyConnection (const std::string& key, Connection* c);
+ void notifyOpened (const std::string& key);
void notifyClosed (const std::string& key);
void notifyConnectionForced (const std::string& key, const std::string& text);
std::string getAuthMechanism (const std::string& key);
@@ -142,6 +125,7 @@ namespace broker {
* Called by links failing over to new address
*/
void changeAddress(const Address& oldAddress, const Address& newAddress);
+
/**
* Called to alter passive state. In passive state the links
* and bridges managed by a link registry will be recorded and
@@ -150,7 +134,7 @@ namespace broker {
*/
void setPassive(bool);
-
+
/** Iterate over each link in the registry. Used for cluster updates. */
void eachLink(boost::function<void(boost::shared_ptr<Link>)> f);
/** Iterate over each bridge in the registry. Used for cluster updates. */
diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.cpp b/qpid/cpp/src/qpid/broker/MessageDeque.cpp
index 24b8f6f895..9f874e4c9a 100644
--- a/qpid/cpp/src/qpid/broker/MessageDeque.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageDeque.cpp
@@ -20,121 +20,156 @@
*/
#include "qpid/broker/MessageDeque.h"
#include "qpid/broker/QueuedMessage.h"
+#include "qpid/log/Statement.h"
namespace qpid {
namespace broker {
-size_t MessageDeque::size()
-{
- return messages.size();
-}
-
-bool MessageDeque::empty()
-{
- return messages.empty();
-}
+MessageDeque::MessageDeque() : available(0), head(0) {}
-void MessageDeque::reinsert(const QueuedMessage& message)
+size_t MessageDeque::index(const framing::SequenceNumber& position)
{
- messages.insert(lower_bound(messages.begin(), messages.end(), message), message);
-}
-
-MessageDeque::Deque::iterator MessageDeque::seek(const framing::SequenceNumber& position)
-{
- if (!messages.empty()) {
- QueuedMessage comp;
- comp.position = position;
- unsigned long diff = position.getValue() - messages.front().position.getValue();
- long maxEnd = diff < messages.size()? diff : messages.size();
- return lower_bound(messages.begin(),messages.begin()+maxEnd,comp);
- } else {
- return messages.end();
- }
+ //assuming a monotonic sequence, with no messages removed except
+ //from the ends of the deque, we can use the position to determin
+ //an index into the deque
+ if (messages.empty() || position < messages.front().position) return 0;
+ return position - messages.front().position;
}
-bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message, bool remove)
+bool MessageDeque::deleted(const QueuedMessage& m)
{
- Deque::iterator i = seek(position);
- if (i != messages.end() && i->position == position) {
- message = *i;
- if (remove) messages.erase(i);
+ size_t i = index(m.position);
+ if (i < messages.size()) {
+ messages[i].status = QueuedMessage::DELETED;
+ clean();
return true;
} else {
return false;
}
}
-bool MessageDeque::remove(const framing::SequenceNumber& position, QueuedMessage& message)
+size_t MessageDeque::size()
{
- return find(position, message, true);
+ return available;
}
-bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message)
+void MessageDeque::release(const QueuedMessage& message)
{
- return find(position, message, false);
+ size_t i = index(message.position);
+ if (i < messages.size()) {
+ QueuedMessage& m = messages[i];
+ if (m.status == QueuedMessage::ACQUIRED) {
+ if (head > i) head = i;
+ m.status = QueuedMessage::AVAILABLE;
+ ++available;
+ }
+ } else {
+ QPID_LOG(error, "Failed to release message at " << message.position << " " << message.payload->getFrames().getContent() << "; no such message (index=" << i << ", size=" << messages.size() << ")");
+ }
}
-bool MessageDeque::next(const framing::SequenceNumber& position, QueuedMessage& message)
+bool MessageDeque::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
{
- if (messages.empty()) {
- return false;
- } else if (position < front().position) {
- message = front();
- return true;
- } else {
- Deque::iterator i = seek(position+1);
- if (i != messages.end()) {
- message = *i;
+ if (position < messages.front().position) return false;
+ size_t i = index(position);
+ if (i < messages.size()) {
+ QueuedMessage& temp = messages[i];
+ if (temp.status == QueuedMessage::AVAILABLE) {
+ temp.status = QueuedMessage::ACQUIRED;
+ --available;
+ message = temp;
return true;
- } else {
- return false;
}
}
+ return false;
}
-QueuedMessage& MessageDeque::front()
+bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message)
{
- return messages.front();
+ size_t i = index(position);
+ if (i < messages.size()) {
+ message = messages[i];
+ return true;
+ } else {
+ return false;
+ }
}
-void MessageDeque::pop()
+bool MessageDeque::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
{
- if (!messages.empty()) {
- messages.pop_front();
+ //get first message that is greater than position
+ size_t i = index(position + 1);
+ while (i < messages.size()) {
+ QueuedMessage& m = messages[i++];
+ if (m.status == QueuedMessage::AVAILABLE || (!unacquired && m.status == QueuedMessage::ACQUIRED)) {
+ message = m;
+ return true;
+ }
}
+ return false;
}
-bool MessageDeque::pop(QueuedMessage& out)
+bool MessageDeque::consume(QueuedMessage& message)
{
- if (messages.empty()) {
- return false;
- } else {
- out = front();
- messages.pop_front();
- return true;
+ while (head < messages.size()) {
+ QueuedMessage& i = messages[head++];
+ if (i.status == QueuedMessage::AVAILABLE) {
+ i.status = QueuedMessage::ACQUIRED;
+ --available;
+ message = i;
+ return true;
+ }
}
+ return false;
}
bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
{
+ //add padding to prevent gaps in sequence, which break the index
+ //calculation (needed for queue replication)
+ while (messages.size() && (added.position - messages.back().position) > 1) {
+ QueuedMessage dummy;
+ dummy.position = messages.back().position + 1;
+ dummy.status = QueuedMessage::DELETED;
+ messages.push_back(dummy);
+ QPID_LOG(debug, "Adding padding at " << dummy.position << ", between " << messages.back().position << " and " << added.position);
+ }
messages.push_back(added);
+ messages.back().status = QueuedMessage::AVAILABLE;
+ if (head >= messages.size()) head = messages.size() - 1;
+ ++available;
return false;//adding a message never causes one to be removed for deque
}
+void MessageDeque::clean()
+{
+ while (messages.size() && messages.front().status == QueuedMessage::DELETED) {
+ messages.pop_front();
+ if (head) --head;
+ }
+}
+
void MessageDeque::foreach(Functor f)
{
- std::for_each(messages.begin(), messages.end(), f);
+ for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) {
+ if (i->status == QueuedMessage::AVAILABLE) {
+ f(*i);
+ }
+ }
}
void MessageDeque::removeIf(Predicate p)
{
- for (Deque::iterator i = messages.begin(); i != messages.end();) {
- if (p(*i)) {
- i = messages.erase(i);
- } else {
- ++i;
+ for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) {
+ if (i->status == QueuedMessage::AVAILABLE && p(*i)) {
+ //Use special status for this as messages are not yet
+ //dequeued, but should not be considered on the queue
+ //either (used for purging and moving)
+ i->status = QueuedMessage::REMOVED;
+ --available;
}
}
+ clean();
}
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.h b/qpid/cpp/src/qpid/broker/MessageDeque.h
index 0e1aef2986..4d3a5dcdd5 100644
--- a/qpid/cpp/src/qpid/broker/MessageDeque.h
+++ b/qpid/cpp/src/qpid/broker/MessageDeque.h
@@ -34,17 +34,14 @@ namespace broker {
class MessageDeque : public Messages
{
public:
+ MessageDeque();
size_t size();
- bool empty();
-
- void reinsert(const QueuedMessage&);
- bool remove(const framing::SequenceNumber&, QueuedMessage&);
+ bool deleted(const QueuedMessage&);
+ void release(const QueuedMessage&);
+ bool acquire(const framing::SequenceNumber&, QueuedMessage&);
bool find(const framing::SequenceNumber&, QueuedMessage&);
- bool next(const framing::SequenceNumber&, QueuedMessage&);
-
- QueuedMessage& front();
- void pop();
- bool pop(QueuedMessage&);
+ bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
+ bool consume(QueuedMessage&);
bool push(const QueuedMessage& added, QueuedMessage& removed);
void foreach(Functor);
@@ -53,9 +50,11 @@ class MessageDeque : public Messages
private:
typedef std::deque<QueuedMessage> Deque;
Deque messages;
+ size_t available;
+ size_t head;
- Deque::iterator seek(const framing::SequenceNumber&);
- bool find(const framing::SequenceNumber&, QueuedMessage&, bool remove);
+ size_t index(const framing::SequenceNumber&);
+ void clean();
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
index 0aef732e54..5f450cd556 100644
--- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
@@ -204,7 +204,7 @@ MessageGroupManager::~MessageGroupManager()
}
bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
{
- if (messages.empty())
+ if (!messages.size())
return false;
next.position = c->getPosition();
@@ -216,15 +216,16 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued
}
}
- while (messages.next( next.position, next )) {
+ while (messages.browse( next.position, next, true )) {
GroupState& group = findGroup(next);
if (!group.owned()) {
- if (group.members.front() == next.position) { // only take from head!
+ //TODO: make acquire more efficient when we already have the message in question
+ if (group.members.front() == next.position && messages.acquire(next.position, next)) { // only take from head!
return true;
}
QPID_LOG(debug, "Skipping " << next.position << " since group " << group.group
<< "'s head message still pending. pos=" << group.members.front());
- } else if (group.owner == c->getName()) {
+ } else if (group.owner == c->getName() && messages.acquire(next.position, next)) {
return true;
}
}
@@ -249,9 +250,7 @@ bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMess
bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
{
// browse: allow access to any available msg, regardless of group ownership (?ok?)
- if (!messages.empty() && messages.next(c->getPosition(), next))
- return true;
- return false;
+ return messages.browse(c->getPosition(), next, false);
}
void MessageGroupManager::query(qpid::types::Variant::Map& status) const
diff --git a/qpid/cpp/src/qpid/broker/MessageMap.cpp b/qpid/cpp/src/qpid/broker/MessageMap.cpp
index 39e23df533..048df45434 100644
--- a/qpid/cpp/src/qpid/broker/MessageMap.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageMap.cpp
@@ -27,6 +27,8 @@ namespace {
const std::string EMPTY;
}
+bool MessageMap::deleted(const QueuedMessage&) { return true; }
+
std::string MessageMap::getKey(const QueuedMessage& message)
{
const framing::FieldTable* ft = message.payload->getApplicationHeaders();
@@ -44,7 +46,7 @@ bool MessageMap::empty()
return messages.empty();
}
-void MessageMap::reinsert(const QueuedMessage& message)
+void MessageMap::release(const QueuedMessage& message)
{
std::string key = getKey(message);
Index::iterator i = index.find(key);
@@ -54,7 +56,7 @@ void MessageMap::reinsert(const QueuedMessage& message)
} //else message has already been replaced
}
-bool MessageMap::remove(const framing::SequenceNumber& position, QueuedMessage& message)
+bool MessageMap::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
{
Ordering::iterator i = messages.find(position);
if (i != messages.end()) {
@@ -77,38 +79,22 @@ bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& me
}
}
-bool MessageMap::next(const framing::SequenceNumber& position, QueuedMessage& message)
+bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool)
{
- if (!messages.empty() && position < front().position) {
- message = front();
+ Ordering::iterator i = messages.lower_bound(position+1);
+ if (i != messages.end()) {
+ message = i->second;
return true;
} else {
- Ordering::iterator i = messages.lower_bound(position+1);
- if (i != messages.end()) {
- message = i->second;
- return true;
- } else {
- return false;
- }
+ return false;
}
}
-QueuedMessage& MessageMap::front()
-{
- return messages.begin()->second;
-}
-
-void MessageMap::pop()
-{
- QueuedMessage dummy;
- pop(dummy);
-}
-
-bool MessageMap::pop(QueuedMessage& out)
+bool MessageMap::consume(QueuedMessage& message)
{
Ordering::iterator i = messages.begin();
if (i != messages.end()) {
- out = i->second;
+ message = i->second;
erase(i);
return true;
} else {
diff --git a/qpid/cpp/src/qpid/broker/MessageMap.h b/qpid/cpp/src/qpid/broker/MessageMap.h
index 1128a1d54a..d1b8217f9b 100644
--- a/qpid/cpp/src/qpid/broker/MessageMap.h
+++ b/qpid/cpp/src/qpid/broker/MessageMap.h
@@ -43,14 +43,12 @@ class MessageMap : public Messages
size_t size();
bool empty();
- void reinsert(const QueuedMessage&);
- virtual bool remove(const framing::SequenceNumber&, QueuedMessage&);
+ bool deleted(const QueuedMessage&);
+ void release(const QueuedMessage&);
+ virtual bool acquire(const framing::SequenceNumber&, QueuedMessage&);
bool find(const framing::SequenceNumber&, QueuedMessage&);
- virtual bool next(const framing::SequenceNumber&, QueuedMessage&);
-
- QueuedMessage& front();
- void pop();
- bool pop(QueuedMessage&);
+ virtual bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
+ bool consume(QueuedMessage&);
virtual bool push(const QueuedMessage& added, QueuedMessage& removed);
void foreach(Functor);
diff --git a/qpid/cpp/src/qpid/broker/Messages.h b/qpid/cpp/src/qpid/broker/Messages.h
index 448f17432a..89f6d383ae 100644
--- a/qpid/cpp/src/qpid/broker/Messages.h
+++ b/qpid/cpp/src/qpid/broker/Messages.h
@@ -46,22 +46,21 @@ class Messages
* @return the number of messages available for delivery.
*/
virtual size_t size() = 0;
+
/**
- * @return true if there are no messages for delivery, false otherwise
+ * Called when a message is deleted from the queue.
*/
- virtual bool empty() = 0;
-
+ virtual bool deleted(const QueuedMessage&) = 0;
/**
- * Re-inserts a message back into its original position - used
- * when requeing released messages.
+ * Releases an acquired message, making it available again.
*/
- virtual void reinsert(const QueuedMessage&) = 0;
+ virtual void release(const QueuedMessage&) = 0;
/**
- * Remove the message at the specified position, returning true if
- * found, false otherwise. The removed message is passed back via
- * the second parameter.
+ * Acquire the message at the specified position, returning true
+ * if found, false otherwise. The acquired message is passed back
+ * via the second parameter.
*/
- virtual bool remove(const framing::SequenceNumber&, QueuedMessage&) = 0;
+ virtual bool acquire(const framing::SequenceNumber&, QueuedMessage&) = 0;
/**
* Find the message at the specified position, returning true if
* found, false otherwise. The matched message is passed back via
@@ -69,30 +68,22 @@ class Messages
*/
virtual bool find(const framing::SequenceNumber&, QueuedMessage&) = 0;
/**
- * Return the next message to be given to a browsing subscrption
- * that has reached the specified poisition. The next messages is
- * passed back via the second parameter.
+ * Retrieve the next message to be given to a browsing
+ * subscription that has reached the specified position. The next
+ * message is passed back via the second parameter.
+ *
+ * @param unacquired, if true, will only browse unacquired messages
*
* @return true if there is another message, false otherwise.
*/
- virtual bool next(const framing::SequenceNumber&, QueuedMessage&) = 0;
+ virtual bool browse(const framing::SequenceNumber&, QueuedMessage&, bool unacquired) = 0;
/**
- * Note: Caller is responsible for ensuring that there is a front
- * (e.g. empty() returns false)
+ * Retrieve the next message available for a consuming
+ * subscription.
*
- * @return the next message to be delivered
- */
- virtual QueuedMessage& front() = 0;
- /**
- * Removes the front message
- */
- virtual void pop() = 0;
- /**
- * @return true if there is a mesage to be delivered - in which
- * case that message will be returned via the parameter and
- * removed - otherwise false.
+ * @return true if there is such a message, false otherwise.
*/
- virtual bool pop(QueuedMessage&) = 0;
+ virtual bool consume(QueuedMessage&) = 0;
/**
* Pushes a message to the back of the 'queue'. For some types of
* queue this may cause another message to be removed; if that is
diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
index e07e73d323..d807ef22b1 100644
--- a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
+++ b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
@@ -32,6 +32,8 @@ PriorityQueue::PriorityQueue(int l) :
messages(levels, Deque()),
frontLevel(0), haveFront(false), cached(false) {}
+bool PriorityQueue::deleted(const QueuedMessage&) { return true; }
+
size_t PriorityQueue::size()
{
size_t total(0);
@@ -41,15 +43,7 @@ size_t PriorityQueue::size()
return total;
}
-bool PriorityQueue::empty()
-{
- for (int i = 0; i < levels; ++i) {
- if (!messages[i].empty()) return false;
- }
- return true;
-}
-
-void PriorityQueue::reinsert(const QueuedMessage& message)
+void PriorityQueue::release(const QueuedMessage& message)
{
uint p = getPriorityLevel(message);
messages[p].insert(lower_bound(messages[p].begin(), messages[p].end(), message), message);
@@ -78,7 +72,7 @@ bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage&
return false;
}
-bool PriorityQueue::remove(const framing::SequenceNumber& position, QueuedMessage& message)
+bool PriorityQueue::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
{
return find(position, message, true);
}
@@ -88,7 +82,7 @@ bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage&
return find(position, message, false);
}
-bool PriorityQueue::next(const framing::SequenceNumber& position, QueuedMessage& message)
+bool PriorityQueue::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool)
{
QueuedMessage match;
match.position = position+1;
@@ -112,16 +106,7 @@ bool PriorityQueue::next(const framing::SequenceNumber& position, QueuedMessage&
return found;
}
-QueuedMessage& PriorityQueue::front()
-{
- if (checkFront()) {
- return messages[frontLevel].front();
- } else {
- throw qpid::framing::InternalErrorException(QPID_MSG("No message available"));
- }
-}
-
-bool PriorityQueue::pop(QueuedMessage& message)
+bool PriorityQueue::consume(QueuedMessage& message)
{
if (checkFront()) {
message = messages[frontLevel].front();
@@ -133,12 +118,6 @@ bool PriorityQueue::pop(QueuedMessage& message)
}
}
-void PriorityQueue::pop()
-{
- QueuedMessage dummy;
- pop(dummy);
-}
-
bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
{
messages[getPriorityLevel(added)].push_back(added);
diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.h b/qpid/cpp/src/qpid/broker/PriorityQueue.h
index 4bf9d26a9d..67c31468d2 100644
--- a/qpid/cpp/src/qpid/broker/PriorityQueue.h
+++ b/qpid/cpp/src/qpid/broker/PriorityQueue.h
@@ -40,16 +40,13 @@ class PriorityQueue : public Messages
PriorityQueue(int levels);
virtual ~PriorityQueue() {}
size_t size();
- bool empty();
- void reinsert(const QueuedMessage&);
- bool remove(const framing::SequenceNumber&, QueuedMessage&);
+ bool deleted(const QueuedMessage&);
+ void release(const QueuedMessage&);
+ bool acquire(const framing::SequenceNumber&, QueuedMessage&);
bool find(const framing::SequenceNumber&, QueuedMessage&);
- bool next(const framing::SequenceNumber&, QueuedMessage&);
-
- QueuedMessage& front();
- void pop();
- bool pop(QueuedMessage&);
+ bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
+ bool consume(QueuedMessage&);
bool push(const QueuedMessage& added, QueuedMessage& removed);
void foreach(Functor);
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 969d510e26..0e822d3d4a 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -240,7 +240,7 @@ void Queue::requeue(const QueuedMessage& msg){
}
mgntDeqStats(msg.payload);
} else {
- messages->reinsert(msg);
+ messages->release(msg);
listeners.populate(copy);
// for persistLastNode - don't force a message twice to disk, but force it if no force before
@@ -306,7 +306,7 @@ void Queue::notifyListener()
bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
{
- checkNotDeleted();
+ checkNotDeleted(c);
if (c->preAcquires()) {
switch (consumeNextMessage(m, c)) {
case CONSUMED:
@@ -327,48 +327,43 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
while (true) {
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg;
+ if (allocator->nextConsumableMessage(c, msg)) {
+ if (msg.payload->hasExpired()) {
+ QPID_LOG(debug, "Message expired from queue '" << name << "'");
+ c->setPosition(msg.position);
+ dequeue(0, msg);
+ if (mgmtObject) {
+ mgmtObject->inc_discardsTtl();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsTtl();
+ }
- if (!allocator->nextConsumableMessage(c, msg)) { // no next available
- QPID_LOG(debug, "No messages available to dispatch to consumer " <<
- c->getName() << " on queue '" << name << "'");
- listeners.addListener(c);
- return NO_MESSAGES;
- }
-
- if (msg.payload->hasExpired()) {
- QPID_LOG(debug, "Message expired from queue '" << name << "'");
- c->setPosition(msg.position);
- acquire( msg.position, msg, locker);
- dequeue( 0, msg );
- if (mgmtObject) {
- mgmtObject->inc_discardsTtl();
- if (brokerMgmtObject)
- brokerMgmtObject->inc_discardsTtl();
+ continue;
}
- continue;
- }
- // a message is available for this consumer - can the consumer use it?
-
- if (c->filter(msg.payload)) {
- if (c->accept(msg.payload)) {
- bool ok = allocator->allocate( c->getName(), msg ); // inform allocator
- (void) ok; assert(ok);
- ok = acquire( msg.position, msg, locker);
- (void) ok; assert(ok);
- m = msg;
- c->setPosition(m.position);
- return CONSUMED;
+ if (c->filter(msg.payload)) {
+ if (c->accept(msg.payload)) {
+ bool ok = allocator->allocate( c->getName(), msg ); // inform allocator
+ (void) ok; assert(ok);
+ observeAcquire(msg, locker);
+ m = msg;
+ return CONSUMED;
+ } else {
+ //message(s) are available but consumer hasn't got enough credit
+ QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
+ messages->release(msg);
+ return CANT_CONSUME;
+ }
} else {
- //message(s) are available but consumer hasn't got enough credit
- QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
+ //consumer will never want this message
+ QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
+ messages->release(msg);
return CANT_CONSUME;
}
} else {
- //consumer will never want this message
- QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
- c->setPosition(msg.position);
- return CANT_CONSUME;
+ QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
+ listeners.addListener(c);
+ return NO_MESSAGES;
}
}
}
@@ -431,7 +426,6 @@ bool Queue::dispatch(Consumer::shared_ptr c)
}
bool Queue::find(SequenceNumber pos, QueuedMessage& msg) const {
-
Mutex::ScopedLock locker(messageLock);
if (messages->find(pos, msg))
return true;
@@ -493,7 +487,7 @@ void Queue::cancel(Consumer::shared_ptr c){
QueuedMessage Queue::get(){
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
- if (messages->pop(msg))
+ if (messages->consume(msg))
observeAcquire(msg, locker);
return msg;
}
@@ -687,6 +681,7 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange>
// Update observers and message state:
observeAcquire(*qmsg, locker);
dequeue(0, *qmsg);
+ QPID_LOG(debug, "Purged message at " << qmsg->position << " from " << getName());
// now reroute if necessary
if (dest.get()) {
assert(qmsg->payload);
@@ -718,24 +713,11 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty,
return c.matches.size();
}
-/** Acquire the front (oldest) message from the in-memory queue.
- * assumes messageLock held by caller
- */
-void Queue::pop(const Mutex::ScopedLock& locker)
-{
- assertClusterSafe();
- QueuedMessage msg;
- if (messages->pop(msg)) {
- observeAcquire(msg, locker);
- ++dequeueSincePurge;
- }
-}
-
/** Acquire the message at the given position, return true and msg if acquire succeeds */
bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg,
const Mutex::ScopedLock& locker)
{
- if (messages->remove(position, msg)) {
+ if (messages->acquire(position, msg)) {
observeAcquire(msg, locker);
++dequeueSincePurge;
return true;
@@ -952,12 +934,14 @@ void Queue::dequeueCommitted(const QueuedMessage& msg)
* Removes the first (oldest) message from the in-memory delivery queue as well dequeing
* it from the logical (and persistent if applicable) queue
*/
-void Queue::popAndDequeue(const Mutex::ScopedLock& held)
+bool Queue::popAndDequeue(QueuedMessage& msg, const Mutex::ScopedLock& locker)
{
- if (!messages->empty()) {
- QueuedMessage msg = messages->front();
- pop(held);
+ if (messages->consume(msg)) {
+ observeAcquire(msg, locker);
dequeue(0, msg);
+ return true;
+ } else {
+ return false;
}
}
@@ -969,6 +953,7 @@ void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
{
mgntDeqStats(msg.payload);
if (policy.get()) policy->dequeued(msg);
+ messages->deleted(msg);
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
(*i)->dequeued(msg);
@@ -1167,8 +1152,9 @@ void Queue::destroyed()
unbind(broker->getExchanges());
{
Mutex::ScopedLock locker(messageLock);
- while(!messages->empty()){
- DeliverableMessage msg(messages->front().payload);
+ QueuedMessage m;
+ while(popAndDequeue(m, locker)) {
+ DeliverableMessage msg(m.payload);
if (alternateExchange.get()) {
if (brokerMgmtObject)
brokerMgmtObject->inc_abandonedViaAlt();
@@ -1177,7 +1163,6 @@ void Queue::destroyed()
if (brokerMgmtObject)
brokerMgmtObject->inc_abandoned();
}
- popAndDequeue(locker);
}
if (alternateExchange.get())
alternateExchange->decAlternateUsers();
@@ -1191,6 +1176,10 @@ void Queue::destroyed()
}
if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
notifyDeleted();
+ {
+ Mutex::ScopedLock locker(messageLock);
+ observers.clear();
+ }
}
void Queue::notifyDeleted()
@@ -1477,6 +1466,7 @@ void Queue::query(qpid::types::Variant::Map& results) const
void Queue::setPosition(SequenceNumber n) {
Mutex::ScopedLock locker(messageLock);
sequence = n;
+ QPID_LOG(trace, "Set position to " << sequence << " on " << getName());
}
SequenceNumber Queue::getPosition() {
@@ -1549,9 +1539,9 @@ QueueListeners& Queue::getListeners() { return listeners; }
Messages& Queue::getMessages() { return *messages; }
const Messages& Queue::getMessages() const { return *messages; }
-void Queue::checkNotDeleted()
+void Queue::checkNotDeleted(const Consumer::shared_ptr& c)
{
- if (deleted) {
+ if (deleted && !c->hideDeletedError()) {
throw ResourceDeletedException(QPID_MSG("Queue " << getName() << " has been deleted."));
}
}
@@ -1562,6 +1552,12 @@ void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
observers.insert(observer);
}
+void Queue::removeObserver(boost::shared_ptr<QueueObserver> observer)
+{
+ Mutex::ScopedLock locker(messageLock);
+ observers.erase(observer);
+}
+
void Queue::flush()
{
ScopedUse u(barrier);
@@ -1584,7 +1580,7 @@ bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
}
-const Broker* Queue::getBroker()
+Broker* Queue::getBroker()
{
return broker;
}
@@ -1593,6 +1589,29 @@ void Queue::setDequeueSincePurge(uint32_t value) {
dequeueSincePurge = value;
}
+namespace{
+class FindLowest
+{
+ public:
+ FindLowest() : init(false) {}
+ void process(const QueuedMessage& message) {
+ QPID_LOG(debug, "FindLowest processing: " << message.position);
+ if (!init || message.position < lowest) lowest = message.position;
+ init = true;
+ }
+ bool getLowest(qpid::framing::SequenceNumber& result) {
+ if (init) {
+ result = lowest;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ private:
+ bool init;
+ qpid::framing::SequenceNumber lowest;
+};
+}
Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 5eca1e9b0c..e8573c17cc 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -149,10 +149,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
-
- /** modify the Queue's message container - assumes messageLock held */
- void pop(const sys::Mutex::ScopedLock& held); // acquire front msg
- void popAndDequeue(const sys::Mutex::ScopedLock& held); // acquire and dequeue front msg
+ bool popAndDequeue(QueuedMessage&, const sys::Mutex::ScopedLock& lock);
// acquire message @ position, return true and set msg if acquire succeeds
bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg,
const sys::Mutex::ScopedLock& held);
@@ -192,7 +189,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
}
}
- void checkNotDeleted();
+ void checkNotDeleted(const Consumer::shared_ptr& c);
void notifyDeleted();
public:
@@ -400,6 +397,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
*/
QPID_BROKER_EXTERN framing::SequenceNumber getPosition();
void addObserver(boost::shared_ptr<QueueObserver>);
+ void removeObserver(boost::shared_ptr<QueueObserver>);
QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key);
/**
* Notify queue that recovery has completed.
@@ -419,7 +417,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
void flush();
- const Broker* getBroker();
+ Broker* getBroker();
uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
void setDequeueSincePurge(uint32_t value);
diff --git a/qpid/cpp/src/qpid/broker/QueuedMessage.h b/qpid/cpp/src/qpid/broker/QueuedMessage.h
index 35e48b11f3..051ade41ea 100644
--- a/qpid/cpp/src/qpid/broker/QueuedMessage.h
+++ b/qpid/cpp/src/qpid/broker/QueuedMessage.h
@@ -32,6 +32,7 @@ struct QueuedMessage
{
boost::intrusive_ptr<Message> payload;
framing::SequenceNumber position;
+ enum {AVAILABLE, ACQUIRED, DELETED, REMOVED} status;
Queue* queue;
QueuedMessage() : queue(0) {}
diff --git a/qpid/cpp/src/qpid/broker/RetryList.h b/qpid/cpp/src/qpid/broker/RetryList.h
index 242a7d2122..9c4b779bcb 100644
--- a/qpid/cpp/src/qpid/broker/RetryList.h
+++ b/qpid/cpp/src/qpid/broker/RetryList.h
@@ -23,7 +23,6 @@
*/
#include "qpid/broker/BrokerImportExport.h"
-#include "qpid/Address.h"
#include "qpid/Url.h"
namespace qpid {
@@ -36,7 +35,7 @@ namespace broker {
class RetryList
{
public:
- QPID_BROKER_EXTERN RetryList();
+ QPID_BROKER_EXTERN RetryList();
QPID_BROKER_EXTERN void reset(const std::vector<Url>& urls);
QPID_BROKER_EXTERN bool next(Address& address);
private:
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 2b9fd247f5..e7d2259c80 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -106,15 +106,25 @@ bool SemanticState::exists(const string& consumerTag){
namespace {
const std::string SEPARATOR("::");
}
-
+
void SemanticState::consume(const string& tag,
Queue::shared_ptr queue, bool ackRequired, bool acquire,
- bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments)
+ bool exclusive, const string& resumeId, uint64_t resumeTtl,
+ const FieldTable& arguments)
{
// "tag" is only guaranteed to be unique to this session (see AMQP 0-10 Message.subscribe, destination).
// Create a globally unique name so the broker can identify individual consumers
std::string name = session.getSessionId().str() + SEPARATOR + tag;
- ConsumerImpl::shared_ptr c(new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
+ const ConsumerFactories::Factories& cf(
+ session.getBroker().getConsumerFactories().get());
+ ConsumerImpl::shared_ptr c;
+ for (ConsumerFactories::Factories::const_iterator i = cf.begin(); i != cf.end() && !c; ++i)
+ c = (*i)->create(this, name, queue, ackRequired, acquire, exclusive, tag,
+ resumeId, resumeTtl, arguments);
+ if (!c) // Create plain consumer
+ c = ConsumerImpl::shared_ptr(
+ new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag,
+ resumeId, resumeTtl, arguments));
queue->consume(c, exclusive);//may throw exception
consumers[tag] = c;
}
@@ -275,7 +285,6 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
uint64_t _resumeTtl,
const framing::FieldTable& _arguments
-
) :
Consumer(_name, _acquire),
parent(_parent),
@@ -332,7 +341,8 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
assertClusterSafe();
allocateCredit(msg.payload);
- DeliveryRecord record(msg, queue, getTag(), acquire, !ackExpected, credit.isWindowMode());
+ DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(),
+ shared_from_this(), acquire, !ackExpected, credit.isWindowMode(), 0);
bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
if (sync) deliveryCount = 0;//reset
parent->deliver(record, sync);
@@ -340,7 +350,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
parent->record(record);
}
if (acquire && !ackExpected) { // auto acquire && auto accept
- queue->dequeue(0 /*ctxt*/, msg);
+ msg.queue->dequeue(0, msg);
record.setEnded();
}
if (mgmtObject) { mgmtObject->inc_delivered(); }
@@ -355,7 +365,7 @@ bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>)
bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
{
assertClusterSafe();
- // FIXME aconway 2009-06-08: if we have byte & message credit but
+ // TODO aconway 2009-06-08: if we have byte & message credit but
// checkCredit fails because the message is to big, we should
// remain on queue's listener list for possible smaller messages
// in future.
@@ -455,8 +465,11 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
msg->computeExpiration(getSession().getBroker().getExpiryPolicy());
std::string exchangeName = msg->getExchangeName();
- if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed())
+ if (!cacheExchange || cacheExchange->getName() != exchangeName
+ || cacheExchange->isDestroyed())
+ {
cacheExchange = session.getBroker().getExchanges().get(exchangeName);
+ }
cacheExchange->setProperties(msg);
/* verify the userid if specified: */
@@ -646,9 +659,14 @@ bool SemanticState::ConsumerImpl::haveCredit()
}
}
+bool SemanticState::ConsumerImpl::doDispatch()
+{
+ return queue->dispatch(shared_from_this());
+}
+
void SemanticState::ConsumerImpl::flush()
{
- while(haveCredit() && queue->dispatch(shared_from_this()))
+ while(haveCredit() && doDispatch())
;
credit.cancel();
}
@@ -710,7 +728,7 @@ void SemanticState::reject(DeliveryId first, DeliveryId last)
bool SemanticState::ConsumerImpl::doOutput()
{
try {
- return haveCredit() && queue->dispatch(shared_from_this());
+ return haveCredit() && doDispatch();
} catch (const SessionException& e) {
throw SessionOutputException(e, parent->session.getChannel());
}
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index 26fd815424..5a83fd0fb3 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -30,6 +30,7 @@
#include "qpid/broker/DtxBuffer.h"
#include "qpid/broker/DtxManager.h"
#include "qpid/broker/NameGenerator.h"
+#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/TxBuffer.h"
#include "qpid/framing/FrameHandler.h"
@@ -74,8 +75,10 @@ class SemanticState : private boost::noncopyable {
public boost::enable_shared_from_this<ConsumerImpl>,
public management::Manageable
{
+ protected:
mutable qpid::sys::Mutex lock;
SemanticState* const parent;
+ private:
const boost::shared_ptr<Queue> queue;
const bool ackExpected;
const bool acquire;
@@ -95,17 +98,20 @@ class SemanticState : private boost::noncopyable {
void allocateCredit(boost::intrusive_ptr<Message>& msg);
bool haveCredit();
+ protected:
+ virtual bool doDispatch();
+ size_t unacked() { return parent->unacked.size(); }
+
public:
typedef boost::shared_ptr<ConsumerImpl> shared_ptr;
ConsumerImpl(SemanticState* parent,
const std::string& name, boost::shared_ptr<Queue> queue,
bool ack, bool acquire, bool exclusive,
- const std::string& tag, const std::string& resumeId,
- uint64_t resumeTtl, const framing::FieldTable& arguments);
- ~ConsumerImpl();
+ const std::string& tag, const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
+ virtual ~ConsumerImpl();
OwnershipToken* getSession();
- bool deliver(QueuedMessage& msg);
+ virtual bool deliver(QueuedMessage& msg);
bool filter(boost::intrusive_ptr<Message> msg);
bool accept(boost::intrusive_ptr<Message> msg);
void cancel() {}
@@ -142,7 +148,10 @@ class SemanticState : private boost::noncopyable {
SemanticState& getParent() { return *parent; }
const SemanticState& getParent() const { return *parent; }
- // Manageable entry points
+
+ void acknowledged(const broker::QueuedMessage&) {}
+
+ // manageable entry points
management::ManagementObject* GetManagementObject (void) const;
management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
};
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
index c50bf10f7b..4aad46f782 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -316,8 +316,8 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
- name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments),
- "existing"));
+ name, durable, exclusive, autoDelete, alternateExchange, ManagementAgent::toMap(arguments),
+ "existing"));
}
}
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h
index ca6d6bb193..8cd5072574 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.h
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.h
@@ -23,6 +23,7 @@
*/
#include "qpid/amqp_0_10/SessionHandler.h"
+#include "qpid/broker/SessionHandler.h"
#include "qpid/framing/AMQP_ClientProxy.h"
namespace qpid {
diff --git a/qpid/cpp/src/qpid/client/TCPConnector.cpp b/qpid/cpp/src/qpid/client/TCPConnector.cpp
index 4660a41c07..51eacf77e8 100644
--- a/qpid/cpp/src/qpid/client/TCPConnector.cpp
+++ b/qpid/cpp/src/qpid/client/TCPConnector.cpp
@@ -97,7 +97,7 @@ void TCPConnector::connect(const std::string& host, const std::string& port) {
boost::bind(&TCPConnector::connected, this, _1),
boost::bind(&TCPConnector::connectFailed, this, _3));
closed = false;
-
+ identifier = str(format("[%1%]") % socket.getFullAddress());
connector->start(poller);
}
@@ -120,8 +120,6 @@ void TCPConnector::start(sys::AsynchIO* aio_) {
for (int i = 0; i < 4; i++) {
aio->queueReadBuffer(new Buff(maxFrameSize));
}
-
- identifier = str(format("[%1%]") % socket.getFullAddress());
}
void TCPConnector::initAmqp() {
@@ -131,7 +129,7 @@ void TCPConnector::initAmqp() {
void TCPConnector::connectFailed(const std::string& msg) {
connector = 0;
- QPID_LOG(warning, "Connect failed: " << msg);
+ QPID_LOG(warning, "Connect failed: " << msg << " " << identifier);
socket.close();
if (!closed)
closed = true;
@@ -185,7 +183,7 @@ sys::ShutdownHandler* TCPConnector::getShutdownHandler() const {
return shutdownHandler;
}
-const std::string& TCPConnector::getIdentifier() const {
+const std::string& TCPConnector::getIdentifier() const {
return identifier;
}
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
index 16e5fde075..5924e30dd8 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -140,7 +140,7 @@ struct Binding
{
Binding(const Variant::Map&);
Binding(const std::string& exchange, const std::string& queue, const std::string& key);
-
+
std::string exchange;
std::string queue;
std::string key;
@@ -243,7 +243,7 @@ class Subscription : public Exchange, public MessageSource
FieldTable queueOptions;
FieldTable subscriptionOptions;
Bindings bindings;
-
+
void bindSubject(const std::string& subject);
void bindAll();
void add(const std::string& exchange, const std::string& key);
@@ -328,7 +328,7 @@ Opt& Opt::operator/(const std::string& name)
{
if (options) {
Variant::Map::const_iterator j = options->find(name);
- if (j == options->end()) {
+ if (j == options->end()) {
value = 0;
options = 0;
} else {
@@ -373,7 +373,7 @@ void Opt::collect(qpid::framing::FieldTable& args) const
bool AddressResolution::is_unreliable(const Address& address)
{
-
+
return in((Opt(address)/LINK/RELIABILITY).str(),
list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE));
}
@@ -475,7 +475,7 @@ void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::stri
checkCreate(session, FOR_RECEIVER);
checkAssert(session, FOR_RECEIVER);
linkBindings.bind(session);
- session.messageSubscribe(arg::queue=name,
+ session.messageSubscribe(arg::queue=name,
arg::destination=destination,
arg::acceptMode=acceptMode,
arg::acquireMode=acquireMode,
@@ -524,7 +524,7 @@ void Subscription::bindSubject(const std::string& subject)
bindings.push_back(b);
} else if (actualType == XML_EXCHANGE) {
Binding b(name, queue, subject);
- std::string query = (boost::format("declare variable $qpid.subject external; $qpid.subject = '%1%'")
+ std::string query = (boost::format("declare variable $qpid.subject external; $qpid.subject = '%1%'")
% subject).str();
b.arguments.setString("xquery", query);
bindings.push_back(b);
@@ -540,7 +540,7 @@ void Subscription::bindAll()
if (actualType == TOPIC_EXCHANGE) {
add(name, WILDCARD_ANY);
} else if (actualType == FANOUT_EXCHANGE) {
- add(name, queue);
+ add(name, queue);
} else if (actualType == HEADERS_EXCHANGE) {
Binding b(name, queue, "match-all");
b.arguments.setString("x-match", "all");
@@ -549,7 +549,7 @@ void Subscription::bindAll()
Binding b(name, queue, EMPTY_STRING);
b.arguments.setString("xquery", "true()");
bindings.push_back(b);
- } else {
+ } else {
add(name, EMPTY_STRING);
}
}
@@ -600,12 +600,13 @@ void ExchangeSink::send(qpid::client::AsyncSession& session, const std::string&,
{
m.message.getDeliveryProperties().setRoutingKey(m.getSubject());
m.status = session.messageTransfer(arg::destination=name, arg::content=m.message);
+ QPID_LOG(debug, "Sending to exchange " << name << " " << m.message.getMessageProperties() << " " << m.message.getDeliveryProperties());
}
void ExchangeSink::cancel(qpid::client::AsyncSession& session, const std::string&)
{
linkBindings.unbind(session);
- checkDelete(session, FOR_SENDER);
+ checkDelete(session, FOR_SENDER);
}
QueueSink::QueueSink(const Address& address) : Queue(address) {}
@@ -620,6 +621,7 @@ void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, Ou
{
m.message.getDeliveryProperties().setRoutingKey(name);
m.status = session.messageTransfer(arg::content=m.message);
+ QPID_LOG(debug, "Sending to queue " << name << " " << m.message.getMessageProperties() << " " << m.message.getDeliveryProperties());
}
void QueueSink::cancel(qpid::client::AsyncSession& session, const std::string&)
@@ -654,9 +656,9 @@ qpid::framing::ReplyTo AddressResolution::convert(const Address& address)
}
}
-bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address)
+bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address)
{
- return address.getType() == QUEUE_ADDRESS ||
+ return address.getType() == QUEUE_ADDRESS ||
(address.getType().empty() && session.queueQuery(address.getName()).getQueue() == address.getName());
}
@@ -695,7 +697,7 @@ void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
{
if (enabled(createPolicy, mode)) {
QPID_LOG(debug, "Auto-creating queue '" << name << "'");
- try {
+ try {
session.queueDeclare(arg::queue=name,
arg::durable=durable,
arg::autoDelete=autoDelete,
@@ -749,7 +751,7 @@ void Queue::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
throw AssertionFailed((boost::format("Queue not exclusive: %1%") % name).str());
}
if (!alternateExchange.empty() && result.getAlternateExchange() != alternateExchange) {
- throw AssertionFailed((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%")
+ throw AssertionFailed((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%")
% name % alternateExchange % result.getAlternateExchange()).str());
}
for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) {
@@ -839,7 +841,7 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
throw NotFound((boost::format("Exchange not found: %1%") % name).str());
} else {
if (specifiedType.size() && result.getType() != specifiedType) {
- throw AssertionFailed((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%")
+ throw AssertionFailed((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%")
% name % specifiedType % result.getType()).str());
}
if (durable && !result.getDurable()) {
@@ -862,7 +864,7 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
}
}
-Binding::Binding(const Variant::Map& b) :
+Binding::Binding(const Variant::Map& b) :
exchange((Opt(b)/EXCHANGE).str()),
queue((Opt(b)/QUEUE).str()),
key((Opt(b)/KEY).str())
@@ -916,11 +918,11 @@ void Bindings::unbind(qpid::client::AsyncSession& session)
void Bindings::check(qpid::client::AsyncSession& session)
{
for (Bindings::const_iterator i = begin(); i != end(); ++i) {
- ExchangeBoundResult result = sync(session).exchangeBound(arg::queue=i->queue,
+ ExchangeBoundResult result = sync(session).exchangeBound(arg::queue=i->queue,
arg::exchange=i->exchange,
arg::bindingKey=i->key);
if (result.getQueueNotMatched() || result.getKeyNotMatched()) {
- throw AssertionFailed((boost::format("No such binding [exchange=%1%, queue=%2%, key=%3%]")
+ throw AssertionFailed((boost::format("No such binding [exchange=%1%, queue=%2%, key=%3%]")
% i->exchange % i->queue % i->key).str());
}
}
@@ -950,7 +952,7 @@ void Node::convert(const Variant& options, FieldTable& arguments)
{
if (!options.isVoid()) {
translate(options.asMap(), arguments);
- }
+ }
}
std::vector<std::string> Node::RECEIVER_MODES = list_of<std::string>(ALWAYS) (RECEIVER);
std::vector<std::string> Node::SENDER_MODES = list_of<std::string>(ALWAYS) (SENDER);
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index cc6e9b9ab2..3cfd2e37f2 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -29,6 +29,7 @@
#include <boost/intrusive_ptr.hpp>
#include <vector>
#include <sstream>
+#include <limits>
namespace qpid {
namespace client {
@@ -39,6 +40,16 @@ using qpid::types::VAR_LIST;
using qpid::framing::Uuid;
namespace {
+
+double FOREVER(std::numeric_limits<double>::max());
+
+// Time values in seconds can be specified as integer or floating point values.
+double timeValue(const Variant& value) {
+ if (types::isIntegerType(value.getType()))
+ return double(value.asInt64());
+ return value.asDouble();
+}
+
void merge(const std::string& value, std::vector<std::string>& list) {
if (std::find(list.begin(), list.end(), value) == list.end())
list.push_back(value);
@@ -60,11 +71,21 @@ std::string asString(const std::vector<std::string>& v) {
os << "]";
return os.str();
}
+
+bool expired(const sys::AbsTime& start, double timeout)
+{
+ if (timeout == 0) return true;
+ if (timeout == FOREVER) return false;
+ sys::Duration used(start, sys::now());
+ sys::Duration allowed(int64_t(timeout*sys::TIME_SEC));
+ return allowed < used;
}
+} // namespace
+
ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) :
- replaceUrls(false), reconnect(false), timeout(-1), limit(-1),
- minReconnectInterval(3), maxReconnectInterval(60),
+ replaceUrls(false), reconnect(false), timeout(FOREVER), limit(-1),
+ minReconnectInterval(0.001), maxReconnectInterval(2),
retries(0), reconnectOnLimitExceeded(true)
{
setOptions(options);
@@ -85,15 +106,15 @@ void ConnectionImpl::setOption(const std::string& name, const Variant& value)
if (name == "reconnect") {
reconnect = value;
} else if (name == "reconnect-timeout" || name == "reconnect_timeout") {
- timeout = value;
+ timeout = timeValue(value);
} else if (name == "reconnect-limit" || name == "reconnect_limit") {
limit = value;
} else if (name == "reconnect-interval" || name == "reconnect_interval") {
- maxReconnectInterval = minReconnectInterval = value;
+ maxReconnectInterval = minReconnectInterval = timeValue(value);
} else if (name == "reconnect-interval-min" || name == "reconnect_interval_min") {
- minReconnectInterval = value;
+ minReconnectInterval = timeValue(value);
} else if (name == "reconnect-interval-max" || name == "reconnect_interval_max") {
- maxReconnectInterval = value;
+ maxReconnectInterval = timeValue(value);
} else if (name == "reconnect-urls-replace" || name == "reconnect_urls_replace") {
replaceUrls = value.asBool();
} else if (name == "reconnect-urls" || name == "reconnect_urls") {
@@ -236,18 +257,10 @@ void ConnectionImpl::reopen()
}
-bool expired(const qpid::sys::AbsTime& start, int64_t timeout)
-{
- if (timeout == 0) return true;
- if (timeout < 0) return false;
- qpid::sys::Duration used(start, qpid::sys::now());
- qpid::sys::Duration allowed = timeout * qpid::sys::TIME_SEC;
- return allowed < used;
-}
-
void ConnectionImpl::connect(const qpid::sys::AbsTime& started)
{
- for (int64_t i = minReconnectInterval; !tryConnect(); i = std::min(i * 2, maxReconnectInterval)) {
+ QPID_LOG(debug, "Starting connection, urls=" << asString(urls));
+ for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2, maxReconnectInterval)) {
if (!reconnect) {
throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
}
@@ -257,8 +270,11 @@ void ConnectionImpl::connect(const qpid::sys::AbsTime& started)
if (expired(started, timeout)) {
throw qpid::messaging::TransportFailure("Failed to connect within reconnect timeout");
}
- else qpid::sys::sleep(i);
+ QPID_LOG(debug, "Connection retry in " << i*1000*1000 << " microseconds, urls="
+ << asString(urls));
+ qpid::sys::usleep(int64_t(i*1000*1000)); // Sleep in microseconds.
}
+ QPID_LOG(debug, "Connection successful, urls=" << asString(urls));
retries = 0;
}
@@ -320,6 +336,7 @@ bool ConnectionImpl::backoff()
return false;
}
}
+
std::string ConnectionImpl::getAuthenticatedUsername()
{
return connection.getNegotiatedSettings().username;
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
index 1b58cbbe3e..d1ac4533d5 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
@@ -64,10 +64,10 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl
std::vector<std::string> urls;
qpid::client::ConnectionSettings settings;
bool reconnect;
- int64_t timeout;
+ double timeout;
int32_t limit;
- int64_t minReconnectInterval;
- int64_t maxReconnectInterval;
+ double minReconnectInterval;
+ double maxReconnectInterval;
int32_t retries;
bool reconnectOnLimitExceeded;
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index 715376fd8d..e832cd2567 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -198,7 +198,8 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
if (content->isA<MessageTransferBody>()) {
MessageTransfer transfer(content, *this);
if (handler && handler->accept(transfer)) {
- QPID_LOG(debug, "Delivered " << *content->getMethod());
+ QPID_LOG(debug, "Delivered " << *content->getMethod() << " "
+ << *content->getHeaders());
return true;
} else {
//received message for another destination, keep for later
@@ -275,7 +276,7 @@ void IncomingMessages::retrieve(FrameSetPtr command, qpid::messaging::Message* m
populate(*message, *command);
}
const MessageTransferBody* transfer = command->as<MessageTransferBody>();
- if (transfer->getAcquireMode() == ACQUIRE_MODE_PRE_ACQUIRED && transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) {
+ if (transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) {
sys::Mutex::ScopedLock l(lock);
acceptTracker.delivered(transfer->getDestination(), command->getId());
}
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index c16ab72876..00a343d71e 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -549,7 +549,7 @@ void Connection::deliveryRecord(const string& qname,
} else { // Message at original position in original queue
queue->find(position, m);
}
- // FIXME aconway 2011-08-19: removed:
+ // NOTE: removed:
// if (!m.payload)
// throw Exception(QPID_MSG("deliveryRecord no update message"));
//
@@ -561,7 +561,8 @@ void Connection::deliveryRecord(const string& qname,
//
}
- broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing, credit);
+ broker::DeliveryRecord dr(m, queue, tag, semanticState().find(tag),
+ acquired, accepted, windowing, credit);
dr.setId(id);
if (cancelled) dr.cancel(dr.getTag());
if (completed) dr.complete();
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index f656ace45e..920c4937db 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -209,6 +209,8 @@ class Connection :
void queueDequeueSincePurgeState(const std::string&, uint32_t);
+ bool isAnnounced() const { return announced; }
+
private:
struct NullFrameHandler : public framing::FrameHandler {
void handle(framing::AMQFrame&) {}
diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
index 4bf03eefa2..2cd1cf9a83 100644
--- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -97,7 +97,7 @@ void OutputInterceptor::deliverDoOutput(uint32_t limit) {
}
void OutputInterceptor::sendDoOutput(size_t newLimit, const sys::Mutex::ScopedLock&) {
- if (parent.isLocal() && !sentDoOutput && !closing) {
+ if (parent.isLocal() && !sentDoOutput && !closing && parent.isAnnounced()) {
sentDoOutput = true;
parent.getCluster().getMulticast().mcastControl(
ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), newLimit),
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
new file mode 100644
index 0000000000..5acbfb9d5f
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Backup.h"
+#include "Settings.h"
+#include "BrokerReplicator.h"
+#include "ReplicatingSubscription.h"
+#include "ConnectionExcluder.h"
+#include "qpid/Url.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/broker/Bridge.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/broker/Link.h"
+#include "qpid/framing/AMQP_ServerProxy.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/types/Variant.h"
+
+namespace qpid {
+namespace ha {
+
+using namespace framing;
+using namespace broker;
+using types::Variant;
+using std::string;
+
+Backup::Backup(broker::Broker& b, const Settings& s) :
+ broker(b), settings(s), excluder(new ConnectionExcluder())
+{
+ // Empty brokerUrl means delay initialization until setUrl() is called.
+ if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
+}
+
+void Backup::initialize(const Url& url) {
+ assert(!url.empty());
+ QPID_LOG(notice, "Ha: Backup started: " << url);
+ string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
+ // Declare the link
+ std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
+ url[0].host, url[0].port, protocol,
+ false, // durable
+ settings.mechanism, settings.username, settings.password);
+ assert(result.second); // FIXME aconway 2011-11-23: error handling
+ link = result.first;
+ link->setUrl(url);
+
+ replicator.reset(new BrokerReplicator(link));
+ broker.getExchanges().registerExchange(replicator);
+ broker.getConnectionObservers().add(excluder);
+}
+
+void Backup::setBrokerUrl(const Url& url) {
+ // Ignore empty URLs seen during start-up for some tests.
+ if (url.empty()) return;
+ sys::Mutex::ScopedLock l(lock);
+ if (link) { // URL changed after we initialized.
+ QPID_LOG(info, "HA: Backup failover URL set to " << url);
+ link->setUrl(url);
+ }
+ else {
+ initialize(url); // Deferred initialization
+ }
+}
+
+Backup::~Backup() {
+ if (link) link->close();
+ if (replicator.get()) broker.getExchanges().destroy(replicator->getName());
+ broker.getConnectionObservers().remove(excluder); // This allows client connections.
+}
+
+}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h
new file mode 100644
index 0000000000..526b238b82
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Backup.h
@@ -0,0 +1,67 @@
+#ifndef QPID_HA_BACKUP_H
+#define QPID_HA_BACKUP_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Settings.h"
+#include "qpid/Url.h"
+#include "qpid/sys/Mutex.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+
+namespace broker {
+class Broker;
+class Link;
+}
+
+namespace ha {
+class Settings;
+class ConnectionExcluder;
+class BrokerReplicator;
+
+/**
+ * State associated with a backup broker. Manages connections to primary.
+ *
+ * THREAD SAFE
+ */
+class Backup
+{
+ public:
+ Backup(broker::Broker&, const Settings&);
+ ~Backup();
+ void setBrokerUrl(const Url&);
+
+ private:
+ void initialize(const Url&);
+
+ sys::Mutex lock;
+ broker::Broker& broker;
+ Settings settings;
+ boost::shared_ptr<broker::Link> link;
+ boost::shared_ptr<BrokerReplicator> replicator;
+ boost::shared_ptr<ConnectionExcluder> excluder;
+};
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_BACKUP_H*/
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
new file mode 100644
index 0000000000..a8f05c1fe3
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -0,0 +1,497 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "BrokerReplicator.h"
+#include "QueueReplicator.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/Link.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/log/Statement.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qmf/org/apache/qpid/broker/EventBind.h"
+#include "qmf/org/apache/qpid/broker/EventUnbind.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
+#include "qmf/org/apache/qpid/broker/EventSubscribe.h"
+#include <algorithm>
+
+namespace qpid {
+namespace ha {
+
+using qmf::org::apache::qpid::broker::EventBind;
+using qmf::org::apache::qpid::broker::EventUnbind;
+using qmf::org::apache::qpid::broker::EventExchangeDeclare;
+using qmf::org::apache::qpid::broker::EventExchangeDelete;
+using qmf::org::apache::qpid::broker::EventQueueDeclare;
+using qmf::org::apache::qpid::broker::EventQueueDelete;
+using qmf::org::apache::qpid::broker::EventSubscribe;
+using namespace framing;
+using std::string;
+using types::Variant;
+using namespace broker;
+
+namespace {
+
+const string QPID_CONFIGURATION_REPLICATOR("qpid.configuration-replicator");
+const string QPID_REPLICATE("qpid.replicate");
+
+const string CLASS_NAME("_class_name");
+const string EVENT("_event");
+const string OBJECT_NAME("_object_name");
+const string PACKAGE_NAME("_package_name");
+const string QUERY_RESPONSE("_query_response");
+const string SCHEMA_ID("_schema_id");
+const string VALUES("_values");
+
+const string ALTEX("altEx");
+const string ARGS("args");
+const string ARGUMENTS("arguments");
+const string AUTODEL("autoDel");
+const string AUTODELETE("autoDelete");
+const string BIND("bind");
+const string UNBIND("unbind");
+const string BINDING("binding");
+const string CREATED("created");
+const string DISP("disp");
+const string DURABLE("durable");
+const string EXCHANGE("exchange");
+const string EXNAME("exName");
+const string EXTYPE("exType");
+const string KEY("key");
+const string NAME("name");
+const string QNAME("qName");
+const string QUEUE("queue");
+const string RHOST("rhost");
+const string TYPE("type");
+const string USER("user");
+
+const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#");
+const string QMF2("qmf2");
+const string QMF_CONTENT("qmf.content");
+const string QMF_DEFAULT_TOPIC("qmf.default.topic");
+const string QMF_OPCODE("qmf.opcode");
+
+const string _WHAT("_what");
+const string _CLASS_NAME("_class_name");
+const string _PACKAGE_NAME("_package_name");
+const string _SCHEMA_ID("_schema_id");
+const string OBJECT("OBJECT");
+const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker");
+const string QMF_DEFAULT_DIRECT("qmf.default.direct");
+const string _QUERY_REQUEST("_query_request");
+const string BROKER("broker");
+
+bool isQMFv2(const Message& message) {
+ const framing::MessageProperties* props = message.getProperties<framing::MessageProperties>();
+ return props && props->getAppId() == QMF2;
+}
+
+template <class T> bool match(Variant::Map& schema) {
+ return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
+}
+
+enum ReplicateLevel { RL_NONE=0, RL_CONFIGURATION, RL_MESSAGES };
+const string S_NONE="none";
+const string S_CONFIGURATION="configuration";
+const string S_MESSAGES="messages";
+
+ReplicateLevel replicateLevel(const string& level) {
+ if (level == S_NONE) return RL_NONE;
+ if (level == S_CONFIGURATION) return RL_CONFIGURATION;
+ if (level == S_MESSAGES) return RL_MESSAGES;
+ throw Exception("Invalid value for "+QPID_REPLICATE+": "+level);
+}
+
+ReplicateLevel replicateLevel(const framing::FieldTable& f) {
+ if (f.isSet(QPID_REPLICATE)) return replicateLevel(f.getAsString(QPID_REPLICATE));
+ else return RL_NONE;
+}
+
+ReplicateLevel replicateLevel(const Variant::Map& m) {
+ Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
+ if (i != m.end()) return replicateLevel(i->second.asString());
+ else return RL_NONE;
+}
+
+void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) {
+ framing::AMQP_ServerProxy peer(sessionHandler.out);
+ Variant::Map request;
+ request[_WHAT] = OBJECT;
+ Variant::Map schema;
+ schema[_CLASS_NAME] = className;
+ schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER;
+ request[_SCHEMA_ID] = schema;
+
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0)));
+ method.setBof(true);
+ method.setEof(false);
+ method.setBos(true);
+ method.setEos(true);
+ AMQHeaderBody headerBody;
+ MessageProperties* props = headerBody.get<MessageProperties>(true);
+ props->setReplyTo(qpid::framing::ReplyTo("", queueName));
+ props->setAppId(QMF2);
+ props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST);
+ headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey(BROKER);
+ AMQFrame header(headerBody);
+ header.setBof(false);
+ header.setEof(false);
+ header.setBos(true);
+ header.setEos(true);
+ AMQContentBody data;
+ qpid::amqp_0_10::MapCodec::encode(request, data.getData());
+ AMQFrame content(data);
+ content.setBof(false);
+ content.setEof(true);
+ content.setBos(true);
+ content.setEos(true);
+ sessionHandler.out->handle(method);
+ sessionHandler.out->handle(header);
+ sessionHandler.out->handle(content);
+}
+
+// Like Variant::asMap but treat void value as an empty map.
+Variant::Map asMapVoid(const Variant& value) {
+ if (!value.isVoid()) return value.asMap();
+ else return Variant::Map();
+}
+
+} // namespace
+
+BrokerReplicator::~BrokerReplicator() {}
+
+BrokerReplicator::BrokerReplicator(const boost::shared_ptr<Link>& l)
+ : Exchange(QPID_CONFIGURATION_REPLICATOR), broker(*l->getBroker()), link(l)
+{
+ QPID_LOG(info, "HA: Backup replicating from " <<
+ link->getTransport() << ":" << link->getHost() << ":" << link->getPort());
+ broker.getLinks().declare(
+ link->getHost(), link->getPort(),
+ false, // durable
+ QPID_CONFIGURATION_REPLICATOR, // src
+ QPID_CONFIGURATION_REPLICATOR, // dest
+ "", // key
+ false, // isQueue
+ false, // isLocal
+ "", // id/tag
+ "", // excludes
+ false, // dynamic
+ 0, // sync?
+ boost::bind(&BrokerReplicator::initializeBridge, this, _1, _2)
+ );
+}
+
+// This is called in the connection IO thread when the bridge is started.
+void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
+ framing::AMQP_ServerProxy peer(sessionHandler.out);
+ string queueName = bridge.getQueueName();
+ const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
+
+ //declare and bind an event queue
+ peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable());
+ peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable());
+ //subscribe to the queue
+ peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
+ peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+
+ //issue a query request for queues and another for exchanges using event queue as the reply-to address
+ sendQuery(QUEUE, queueName, sessionHandler);
+ sendQuery(EXCHANGE, queueName, sessionHandler);
+ sendQuery(BINDING, queueName, sessionHandler);
+ QPID_LOG(debug, "HA: Backup activated configuration bridge: " << queueName);
+}
+
+// FIXME aconway 2011-12-02: error handling in route.
+void BrokerReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) {
+ Variant::List list;
+ try {
+ if (!isQMFv2(msg.getMessage()) || !headers)
+ throw Exception("Unexpected message, not QMF2 event or query response.");
+ // decode as list
+ string content = msg.getMessage().getFrames().getContent();
+ amqp_0_10::ListCodec::decode(content, list);
+
+ if (headers->getAsString(QMF_CONTENT) == EVENT) {
+ for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
+ Variant::Map& map = i->asMap();
+ Variant::Map& schema = map[SCHEMA_ID].asMap();
+ Variant::Map& values = map[VALUES].asMap();
+ if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values);
+ else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values);
+ else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values);
+ else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values);
+ else if (match<EventBind>(schema)) doEventBind(values);
+ else if (match<EventUnbind>(schema)) doEventUnbind(values);
+ }
+ } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
+ for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
+ string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME];
+ Variant::Map& values = i->asMap()[VALUES].asMap();
+ framing::FieldTable args;
+ amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
+ if (type == QUEUE) doResponseQueue(values);
+ else if (type == EXCHANGE) doResponseExchange(values);
+ else if (type == BINDING) doResponseBind(values);
+ else QPID_LOG(error, "HA: Backup received unknown response type=" << type
+ << " values=" << values);
+ }
+ } else QPID_LOG(error, "HA: Backup received unexpected message: " << *headers);
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "HA: Backup replication error: " << e.what() << ": while handling: " << list);
+ }
+}
+
+void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
+ string name = values[QNAME].asString();
+ Variant::Map argsMap = asMapVoid(values[ARGS]);
+ if (values[DISP] == CREATED && replicateLevel(argsMap)) {
+ framing::FieldTable args;
+ amqp_0_10::translate(argsMap, args);
+ std::pair<boost::shared_ptr<Queue>, bool> result =
+ broker.createQueue(
+ name,
+ values[DURABLE].asBool(),
+ values[AUTODEL].asBool(),
+ 0 /*i.e. no owner regardless of exclusivity on master*/,
+ values[ALTEX].asString(),
+ args,
+ values[USER].asString(),
+ values[RHOST].asString());
+ if (result.second) {
+ // FIXME aconway 2011-11-22: should delete old queue and
+ // re-create from event.
+ // Events are always up to date, whereas responses may be
+ // out of date.
+ QPID_LOG(debug, "HA: Backup created queue: " << name);
+ startQueueReplicator(result.first);
+ } else {
+ // FIXME aconway 2011-12-02: what's the right way to handle this?
+ QPID_LOG(warning, "HA: Backup queue already exists: " << name);
+ }
+ }
+}
+
+void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
+ // The remote queue has already been deleted so replicator
+ // sessions may be closed by a "queue deleted" exception.
+ string name = values[QNAME].asString();
+ boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
+ if (queue && replicateLevel(queue->getSettings())) {
+ QPID_LOG(debug, "HA: Backup deleting queue: " << name);
+ string rname = QueueReplicator::replicatorName(name);
+ boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname);
+ boost::shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex);
+ if (qr) qr->deactivate();
+ // QueueReplicator's bridge is now queued for destruction but may not
+ // actually be destroyed, deleting the exhange
+ broker.getExchanges().destroy(rname);
+ broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString());
+ }
+}
+
+void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
+ Variant::Map argsMap(asMapVoid(values[ARGS]));
+ if (values[DISP] == CREATED && replicateLevel(argsMap)) {
+ string name = values[EXNAME].asString();
+ framing::FieldTable args;
+ amqp_0_10::translate(argsMap, args);
+ if (broker.createExchange(
+ name,
+ values[EXTYPE].asString(),
+ values[DURABLE].asBool(),
+ values[ALTEX].asString(),
+ args,
+ values[USER].asString(),
+ values[RHOST].asString()).second)
+ {
+ QPID_LOG(debug, "HA: Backup created exchange: " << name);
+ } else {
+ // FIXME aconway 2011-11-22: should delete pre-exisitng exchange
+ // and re-create from event. See comment in doEventQueueDeclare.
+ QPID_LOG(warning, "HA: Backup exchange already exists: " << name);
+ }
+ }
+}
+
+void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
+ string name = values[EXNAME].asString();
+ try {
+ boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
+ if (exchange && replicateLevel(exchange->getArgs())) {
+ QPID_LOG(debug, "HA: Backup deleting exchange:" << name);
+ broker.deleteExchange(
+ name,
+ values[USER].asString(),
+ values[RHOST].asString());
+ }
+ } catch (const framing::NotFoundException&) {}
+}
+
+void BrokerReplicator::doEventBind(Variant::Map& values) {
+ boost::shared_ptr<Exchange> exchange =
+ broker.getExchanges().find(values[EXNAME].asString());
+ boost::shared_ptr<Queue> queue =
+ broker.getQueues().find(values[QNAME].asString());
+ // We only replicate binds for a replicated queue to replicated
+ // exchange that both exist locally.
+ if (exchange && replicateLevel(exchange->getArgs()) &&
+ queue && replicateLevel(queue->getSettings()))
+ {
+ framing::FieldTable args;
+ amqp_0_10::translate(asMapVoid(values[ARGS]), args);
+ string key = values[KEY].asString();
+ QPID_LOG(debug, "HA: Backup replicated binding exchange=" << exchange->getName()
+ << " queue=" << queue->getName()
+ << " key=" << key);
+ exchange->bind(queue, key, &args);
+ }
+}
+
+void BrokerReplicator::doEventUnbind(Variant::Map& values) {
+ boost::shared_ptr<Exchange> exchange =
+ broker.getExchanges().find(values[EXNAME].asString());
+ boost::shared_ptr<Queue> queue =
+ broker.getQueues().find(values[QNAME].asString());
+ // We only replicate unbinds for a replicated queue to replicated
+ // exchange that both exist locally.
+ if (exchange && replicateLevel(exchange->getArgs()) &&
+ queue && replicateLevel(queue->getSettings()))
+ {
+ framing::FieldTable args;
+ amqp_0_10::translate(asMapVoid(values[ARGS]), args);
+ string key = values[KEY].asString();
+ QPID_LOG(debug, "HA: Backup replicated unbinding exchange=" << exchange->getName()
+ << " queue=" << queue->getName()
+ << " key=" << key);
+ exchange->unbind(queue, key, &args);
+ }
+}
+
+void BrokerReplicator::doResponseQueue(Variant::Map& values) {
+ // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication
+ Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
+ if (!replicateLevel(argsMap)) return;
+ framing::FieldTable args;
+ amqp_0_10::translate(argsMap, args);
+ string name(values[NAME].asString());
+ std::pair<boost::shared_ptr<Queue>, bool> result =
+ broker.createQueue(
+ name,
+ values[DURABLE].asBool(),
+ values[AUTODELETE].asBool(),
+ 0 /*i.e. no owner regardless of exclusivity on master*/,
+ ""/*TODO: need to include alternate-exchange*/,
+ args,
+ ""/*TODO: who is the user?*/,
+ ""/*TODO: what should we use as connection id?*/);
+ if (result.second) {
+ QPID_LOG(debug, "HA: Backup created catch-up queue: " << values[NAME]);
+ startQueueReplicator(result.first);
+ } else {
+ // FIXME aconway 2011-11-22: Normal to find queue already
+ // exists if we're failing over.
+ QPID_LOG(warning, "HA: Backup catch-up queue already exists: " << name);
+ }
+}
+
+void BrokerReplicator::doResponseExchange(Variant::Map& values) {
+ Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
+ if (!replicateLevel(argsMap)) return;
+ framing::FieldTable args;
+ amqp_0_10::translate(argsMap, args);
+ if (broker.createExchange(
+ values[NAME].asString(),
+ values[TYPE].asString(),
+ values[DURABLE].asBool(),
+ ""/*TODO: need to include alternate-exchange*/,
+ args,
+ ""/*TODO: who is the user?*/,
+ ""/*TODO: what should we use as connection id?*/).second)
+ {
+ QPID_LOG(debug, "HA: Backup catch-up exchange: " << values[NAME]);
+ } else {
+ QPID_LOG(warning, "HA: Backup catch-up exchange already exists: " << values[QNAME]);
+ }
+}
+
+namespace {
+const std::string QUEUE_REF_PREFIX("org.apache.qpid.broker:queue:");
+const std::string EXCHANGE_REF_PREFIX("org.apache.qpid.broker:exchange:");
+
+std::string getRefName(const std::string& prefix, const Variant& ref) {
+ Variant::Map map(ref.asMap());
+ Variant::Map::const_iterator i = map.find(OBJECT_NAME);
+ if (i == map.end())
+ throw Exception(QPID_MSG("Replicator: invalid object reference: " << ref));
+ const std::string name = i->second.asString();
+ if (name.compare(0, prefix.size(), prefix) != 0)
+ throw Exception(QPID_MSG("Replicator: unexpected reference prefix: " << name));
+ std::string ret = name.substr(prefix.size());
+ return ret;
+}
+
+const std::string EXCHANGE_REF("exchangeRef");
+const std::string QUEUE_REF("queueRef");
+
+} // namespace
+
+void BrokerReplicator::doResponseBind(Variant::Map& values) {
+ std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]);
+ std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
+ boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName);
+ boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
+ // FIXME aconway 2011-11-24: more flexible configuration for binding replication.
+
+ // Automatically replicate binding if queue and exchange exist and are replicated
+ if (exchange && replicateLevel(exchange->getArgs()) &&
+ queue && replicateLevel(queue->getSettings()))
+ {
+ framing::FieldTable args;
+ amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
+ string key = values[KEY].asString();
+ exchange->bind(queue, key, &args);
+ QPID_LOG(debug, "HA: Backup catch-up binding: exchange=" << exchange->getName()
+ << " queue=" << queue->getName()
+ << " key=" << key);
+ }
+}
+
+void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) {
+ if (replicateLevel(queue->getSettings()) == RL_MESSAGES) {
+ boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
+ broker.getExchanges().registerExchange(qr);
+ qr->activate();
+ }
+}
+
+bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
+bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
+bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; }
+
+string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
+
+}} // namespace broker
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
new file mode 100644
index 0000000000..cfb6cf9a28
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -0,0 +1,85 @@
+#ifndef QPID_HA_REPLICATOR_H
+#define QPID_HA_REPLICATOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/Exchange.h"
+#include "qpid/types/Variant.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+
+namespace broker {
+class Broker;
+class Link;
+class Bridge;
+class SessionHandler;
+}
+
+namespace ha {
+
+/**
+ * Replicate configuration on a backup broker.
+ *
+ * Implemented as an exchange that subscribes to receive QMF
+ * configuration events from the primary. It configures local queues
+ * exchanges and bindings to replicate the primary.
+ * It also creates QueueReplicators for newly replicated queues.
+ *
+ * THREAD SAFE: Has no mutable state.
+ *
+ */
+class BrokerReplicator : public broker::Exchange
+{
+ public:
+ BrokerReplicator(const boost::shared_ptr<broker::Link>&);
+ ~BrokerReplicator();
+ std::string getType() const;
+
+ // Exchange methods
+ bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+ bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+ void route(broker::Deliverable&, const std::string&, const framing::FieldTable*);
+ bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
+
+ private:
+ void initializeBridge(broker::Bridge&, broker::SessionHandler&);
+
+ void doEventQueueDeclare(types::Variant::Map& values);
+ void doEventQueueDelete(types::Variant::Map& values);
+ void doEventExchangeDeclare(types::Variant::Map& values);
+ void doEventExchangeDelete(types::Variant::Map& values);
+ void doEventBind(types::Variant::Map&);
+ void doEventUnbind(types::Variant::Map&);
+
+ void doResponseQueue(types::Variant::Map& values);
+ void doResponseExchange(types::Variant::Map& values);
+ void doResponseBind(types::Variant::Map& values);
+
+ void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
+
+ broker::Broker& broker;
+ boost::shared_ptr<broker::Link> link;
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_HA_REPLICATOR_H*/
diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
new file mode 100644
index 0000000000..67409803e8
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ConnectionExcluder.h"
+#include "qpid/broker/Connection.h"
+#include <boost/function.hpp>
+#include <sstream>
+
+namespace qpid {
+namespace ha {
+
+ConnectionExcluder::ConnectionExcluder() {}
+
+void ConnectionExcluder::opened(broker::Connection& connection) {
+ if (!connection.isLink() && !connection.getClientProperties().isSet(ADMIN_TAG))
+ throw Exception(
+ QPID_MSG("HA: Backup broker rejected connection " << connection.getMgmtId()));
+}
+
+const std::string ConnectionExcluder::ADMIN_TAG="qpid.ha-admin";
+
+}} // namespace qpid::ha
+
diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
new file mode 100644
index 0000000000..f8f2843a0c
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
@@ -0,0 +1,54 @@
+#ifndef QPID_HA_CONNECTIONEXCLUDER_H
+#define QPID_HA_CONNECTIONEXCLUDER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/ConnectionObserver.h"
+#include <boost/function.hpp>
+
+namespace qpid {
+
+namespace broker {
+class Connection;
+}
+
+namespace ha {
+
+/**
+ * Exclude normal connections to a backup broker.
+ * Admin connections are identified by a special flag in client-properties
+ * during connection negotiation.
+ */
+class ConnectionExcluder : public broker::ConnectionObserver
+{
+ public:
+ ConnectionExcluder();
+
+ void opened(broker::Connection& connection);
+
+ private:
+ static const std::string ADMIN_TAG;
+};
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_CONNECTIONEXCLUDER_H*/
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
new file mode 100644
index 0000000000..0d3bd51439
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -0,0 +1,137 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Backup.h"
+#include "ConnectionExcluder.h"
+#include "HaBroker.h"
+#include "Settings.h"
+#include "ReplicatingSubscription.h"
+#include "qpid/Exception.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qmf/org/apache/qpid/ha/Package.h"
+#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetClientAddresses.h"
+#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokerAddresses.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace ha {
+
+namespace _qmf = ::qmf::org::apache::qpid::ha;
+using namespace management;
+using namespace std;
+
+namespace {
+
+const std::string PRIMARY="primary";
+const std::string BACKUP="backup";
+
+} // namespace
+
+
+HaBroker::HaBroker(broker::Broker& b, const Settings& s)
+ : broker(b),
+ settings(s),
+ backup(new Backup(b, s)),
+ mgmtObject(0)
+{
+ // Register a factory for replicating subscriptions.
+ broker.getConsumerFactories().add(
+ boost::shared_ptr<ReplicatingSubscription::Factory>(
+ new ReplicatingSubscription::Factory()));
+
+ broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this);
+
+ ManagementAgent* ma = broker.getManagementAgent();
+ if (!ma)
+ throw Exception("Cannot start HA: management is disabled");
+ if (ma) {
+ _qmf::Package packageInit(ma);
+ mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker");
+ mgmtObject->set_status(BACKUP);
+ ma->addObject(mgmtObject);
+ }
+ sys::Mutex::ScopedLock l(lock);
+ if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l);
+ if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l);
+}
+
+HaBroker::~HaBroker() {}
+
+Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) {
+ sys::Mutex::ScopedLock l(lock);
+ switch (methodId) {
+ case _qmf::HaBroker::METHOD_PROMOTE: {
+ if (backup.get()) { // I am a backup
+ // FIXME aconway 2012-01-26: create primary state before resetting backup
+ // as that allows client connections.
+ backup.reset();
+ QPID_LOG(notice, "HA: Primary promoted from backup");
+ mgmtObject->set_status(PRIMARY);
+ }
+ break;
+ }
+ case _qmf::HaBroker::METHOD_SETCLIENTADDRESSES: {
+ setClientUrl(
+ Url(dynamic_cast<_qmf::ArgsHaBrokerSetClientAddresses&>(args).
+ i_clientAddresses), l);
+ break;
+ }
+ case _qmf::HaBroker::METHOD_SETBROKERADDRESSES: {
+ setBrokerUrl(
+ Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokerAddresses&>(args)
+ .i_brokerAddresses), l);
+ break;
+ }
+ default:
+ return Manageable::STATUS_UNKNOWN_METHOD;
+ }
+ return Manageable::STATUS_OK;
+}
+
+void HaBroker::setClientUrl(const Url& url, const sys::Mutex::ScopedLock& l) {
+ if (url.empty()) throw Exception("Invalid empty URL for HA client failover");
+ clientUrl = url;
+ updateClientUrl(l);
+}
+
+void HaBroker::updateClientUrl(const sys::Mutex::ScopedLock&) {
+ Url url = clientUrl.empty() ? brokerUrl : clientUrl;
+ assert(!url.empty());
+ mgmtObject->set_clientAddresses(url.str());
+ knownBrokers.clear();
+ knownBrokers.push_back(url);
+ QPID_LOG(debug, "HA: Setting client known-brokers to: " << url);
+}
+
+void HaBroker::setBrokerUrl(const Url& url, const sys::Mutex::ScopedLock& l) {
+ if (url.empty()) throw Exception("Invalid empty URL for HA broker failover");
+ brokerUrl = url;
+ mgmtObject->set_brokerAddresses(brokerUrl.str());
+ if (backup.get()) backup->setBrokerUrl(brokerUrl);
+ // Updating broker URL also updates defaulted client URL:
+ if (clientUrl.empty()) updateClientUrl(l);
+}
+
+std::vector<Url> HaBroker::getKnownBrokers() const {
+ return knownBrokers;
+}
+
+}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
new file mode 100644
index 0000000000..4d7bf80c90
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -0,0 +1,74 @@
+#ifndef QPID_HA_BROKER_H
+#define QPID_HA_BROKER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Settings.h"
+#include "qpid/Url.h"
+#include "qpid/sys/Mutex.h"
+#include "qmf/org/apache/qpid/ha/HaBroker.h"
+#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetStatus.h"
+#include "qpid/management/Manageable.h"
+#include <memory>
+
+namespace qpid {
+namespace broker {
+class Broker;
+}
+namespace ha {
+class Backup;
+
+/**
+ * HA state and actions associated with a broker.
+ *
+ * THREAD SAFE: may be called in arbitrary broker IO or timer threads.
+ */
+class HaBroker : public management::Manageable
+{
+ public:
+ HaBroker(broker::Broker&, const Settings&);
+ ~HaBroker();
+
+ // Implement Manageable.
+ qpid::management::ManagementObject* GetManagementObject() const { return mgmtObject; }
+ management::Manageable::status_t ManagementMethod (
+ uint32_t methodId, management::Args& args, std::string& text);
+
+ private:
+ void setClientUrl(const Url&, const sys::Mutex::ScopedLock&);
+ void setBrokerUrl(const Url&, const sys::Mutex::ScopedLock&);
+ void updateClientUrl(const sys::Mutex::ScopedLock&);
+ bool isPrimary(const sys::Mutex::ScopedLock&) { return !backup.get(); }
+ std::vector<Url> getKnownBrokers() const;
+
+ broker::Broker& broker;
+ const Settings settings;
+
+ sys::Mutex lock;
+ std::auto_ptr<Backup> backup;
+ qmf::org::apache::qpid::ha::HaBroker* mgmtObject;
+ Url clientUrl, brokerUrl;
+ std::vector<Url> knownBrokers;
+};
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_BROKER_H*/
diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
new file mode 100644
index 0000000000..fc9e48411d
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
@@ -0,0 +1,67 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "HaBroker.h"
+#include "Settings.h"
+#include "qpid/Plugin.h"
+#include "qpid/Options.h"
+#include "qpid/broker/Broker.h"
+
+
+namespace qpid {
+namespace ha {
+
+using namespace std;
+
+struct Options : public qpid::Options {
+ Settings& settings;
+ Options(Settings& s) : qpid::Options("HA Options"), settings(s) {
+ addOptions()
+ ("ha-enable", optValue(settings.enabled, "yes|no"), "Enable High Availability features")
+ ("ha-client-url", optValue(settings.clientUrl,"URL"), "URL that clients use to connect and fail over.")
+ ("ha-broker-url", optValue(settings.brokerUrl,"URL"), "URL that backup brokers use to connect and fail over.")
+ ("ha-username", optValue(settings.username, "USER"), "Username for connections between brokers")
+ ("ha-password", optValue(settings.password, "PASS"), "Password for connections between brokers")
+ ("ha-mechanism", optValue(settings.mechanism, "MECH"), "Authentication mechanism for connections between brokers")
+ ;
+ }
+};
+
+struct HaPlugin : public Plugin {
+
+ Settings settings;
+ Options options;
+ auto_ptr<HaBroker> haBroker;
+
+ HaPlugin() : options(settings) {}
+
+ Options* getOptions() { return &options; }
+
+ void earlyInitialize(Plugin::Target& ) {}
+
+ void initialize(Plugin::Target& target) {
+ broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
+ if (broker && settings.enabled) {
+ haBroker.reset(new ha::HaBroker(*broker, settings));
+ } else
+ QPID_LOG(notice, "HA: Disabled");
+ }
+};
+
+static HaPlugin instance; // Static initialization.
+
+}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
new file mode 100644
index 0000000000..0017cc82cd
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -0,0 +1,174 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "QueueReplicator.h"
+#include "ReplicatingSubscription.h"
+#include "qpid/broker/Bridge.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Link.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/framing/SequenceSet.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/log/Statement.h"
+#include <boost/shared_ptr.hpp>
+#include <sstream>
+
+namespace {
+const std::string QPID_REPLICATOR_("qpid.replicator-");
+const std::string TYPE_NAME("qpid.queue-replicator");
+const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
+}
+
+namespace qpid {
+namespace ha {
+using namespace broker;
+using namespace framing;
+
+const std::string QueueReplicator::DEQUEUE_EVENT_KEY("qpid.dequeue-event");
+const std::string QueueReplicator::POSITION_EVENT_KEY("qpid.position-event");
+
+std::string QueueReplicator::replicatorName(const std::string& queueName) {
+ return QPID_REPLICATOR_ + queueName;
+}
+
+QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l)
+ : Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l)
+{
+ std::stringstream ss;
+ ss << "HA: Backup " << queue->getName() << ": ";
+ logPrefix = ss.str();
+ QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings());
+}
+
+// This must be separate from the constructor so we can call shared_from_this.
+void QueueReplicator::activate() {
+ // Note this may create a new bridge or use an existing one.
+ queue->getBroker()->getLinks().declare(
+ link->getHost(), link->getPort(),
+ false, // durable
+ queue->getName(), // src
+ getName(), // dest
+ "", // key
+ false, // isQueue
+ false, // isLocal
+ "", // id/tag
+ "", // excludes
+ false, // dynamic
+ 0, // sync?
+ // Include shared_ptr to self to ensure we are not deleted
+ // before initializeBridge is called.
+ boost::bind(&QueueReplicator::initializeBridge, this, _1, _2, shared_from_this())
+ );
+}
+
+QueueReplicator::~QueueReplicator() {}
+
+void QueueReplicator::deactivate() {
+ sys::Mutex::ScopedLock l(lock);
+ queue->getBroker()->getLinks().destroy(
+ link->getHost(), link->getPort(), queue->getName(), getName(), string());
+ QPID_LOG(debug, logPrefix << "Deactivated bridge " << bridgeName);
+}
+
+// Called in a broker connection thread when the bridge is created.
+// shared_ptr to self ensures we are not deleted before initializeBridge is called.
+void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler,
+ boost::shared_ptr<QueueReplicator> /*self*/) {
+ sys::Mutex::ScopedLock l(lock);
+ bridgeName = bridge.getName();
+ framing::AMQP_ServerProxy peer(sessionHandler.out);
+ const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
+ framing::FieldTable settings;
+
+ // FIXME aconway 2011-12-09: Failover optimization removed.
+ // There was code here to re-use messages already on the backup
+ // during fail-over. This optimization was removed to simplify
+ // the logic till we get the basic replication stable, it
+ // can be re-introduced later. Last revision with the optimization:
+ // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
+
+ // Clear out any old messages, reset the queue to start replicating fresh.
+ queue->purge();
+ queue->setPosition(0);
+
+ settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
+ // TODO aconway 2011-12-19: optimize.
+ settings.setInt(QPID_SYNC_FREQUENCY, 1);
+ peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false/*exclusive*/, "", 0, settings);
+ peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
+ peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
+ QPID_LOG(debug, logPrefix << "Activated bridge " << bridgeName);
+}
+
+namespace {
+template <class T> T decodeContent(Message& m) {
+ std::string content;
+ m.getFrames().getContent(content);
+ Buffer buffer(const_cast<char*>(content.c_str()), content.size());
+ T result;
+ result.decode(buffer);
+ return result;
+}
+}
+
+void QueueReplicator::dequeue(SequenceNumber n, const sys::Mutex::ScopedLock&) {
+ // Thread safe: only calls thread safe Queue functions.
+ if (queue->getPosition() >= n) { // Ignore messages we haven't reached yet
+ QueuedMessage message;
+ if (queue->acquireMessageAt(n, message))
+ queue->dequeue(0, message);
+ }
+}
+
+// Called in connection thread of the queues bridge to primary.
+void QueueReplicator::route(Deliverable& msg, const std::string& key, const FieldTable*)
+{
+ sys::Mutex::ScopedLock l(lock);
+ if (key == DEQUEUE_EVENT_KEY) {
+ SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage());
+ QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues);
+ //TODO: should be able to optimise the following
+ for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++)
+ dequeue(*i, l);
+ } else if (key == POSITION_EVENT_KEY) {
+ SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage());
+ QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition()
+ << " to " << position);
+ assert(queue->getPosition() <= position);
+ //TODO aconway 2011-12-14: Optimize this?
+ for (SequenceNumber i = queue->getPosition(); i < position; ++i)
+ dequeue(i,l);
+ queue->setPosition(position);
+ } else {
+ msg.deliverTo(queue);
+ QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition());
+ }
+}
+
+// Unused Exchange methods.
+bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
+bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
+bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; }
+std::string QueueReplicator::getType() const { return TYPE_NAME; }
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
new file mode 100644
index 0000000000..9de7dd480c
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -0,0 +1,86 @@
+#ifndef QPID_HA_QUEUEREPLICATOR_H
+#define QPID_HA_QUEUEREPLICATOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Exchange.h"
+#include "qpid/framing/SequenceSet.h"
+#include <boost/enable_shared_from_this.hpp>
+#include <iosfwd>
+
+namespace qpid {
+
+namespace broker {
+class Bridge;
+class Link;
+class Queue;
+class QueueRegistry;
+class SessionHandler;
+class Deliverable;
+}
+
+namespace ha {
+
+/**
+ * Exchange created on a backup broker to replicate a queue on the primary.
+ *
+ * Puts replicated messages on the local queue, handles dequeue events.
+ * Creates a ReplicatingSubscription on the primary by passing special
+ * arguments to the consume command.
+ *
+ * THREAD SAFE: Called in different connection threads.
+ */
+class QueueReplicator : public broker::Exchange,
+ public boost::enable_shared_from_this<QueueReplicator>
+{
+ public:
+ static const std::string DEQUEUE_EVENT_KEY;
+ static const std::string POSITION_EVENT_KEY;
+ static std::string replicatorName(const std::string& queueName);
+
+ QueueReplicator(boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l);
+ ~QueueReplicator();
+
+ void activate(); // Call after ctor
+ void deactivate(); // Call before dtor
+
+ std::string getType() const;
+ bool bind(boost::shared_ptr<broker::Queue
+ >, const std::string&, const framing::FieldTable*);
+ bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+ void route(broker::Deliverable&, const std::string&, const framing::FieldTable*);
+ bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
+
+ private:
+ void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler,
+ boost::shared_ptr<QueueReplicator> self);
+ void dequeue(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
+
+ std::string logPrefix;
+ std::string bridgeName;
+ sys::Mutex lock;
+ boost::shared_ptr<broker::Queue> queue;
+ boost::shared_ptr<broker::Link> link;
+};
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_QUEUEREPLICATOR_H*/
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
new file mode 100644
index 0000000000..e8571cf871
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -0,0 +1,292 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ReplicatingSubscription.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/SessionContext.h"
+#include "qpid/broker/ConnectionState.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/log/Statement.h"
+#include <sstream>
+
+namespace qpid {
+namespace ha {
+
+using namespace framing;
+using namespace broker;
+using namespace std;
+
+const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription");
+
+namespace {
+const string DOLLAR("$");
+const string INTERNAL("-internal");
+} // namespace
+
+string mask(const string& in)
+{
+ return DOLLAR + in + INTERNAL;
+}
+
+/* Called by SemanticState::consume to create a consumer */
+boost::shared_ptr<broker::SemanticState::ConsumerImpl>
+ReplicatingSubscription::Factory::create(
+ SemanticState* parent,
+ const string& name,
+ Queue::shared_ptr queue,
+ bool ack,
+ bool acquire,
+ bool exclusive,
+ const string& tag,
+ const string& resumeId,
+ uint64_t resumeTtl,
+ const framing::FieldTable& arguments
+) {
+ boost::shared_ptr<ReplicatingSubscription> rs;
+ if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) {
+ rs.reset(new ReplicatingSubscription(
+ parent, name, queue, ack, acquire, exclusive, tag,
+ resumeId, resumeTtl, arguments));
+ queue->addObserver(rs);
+ }
+ return rs;
+}
+
+ReplicatingSubscription::ReplicatingSubscription(
+ SemanticState* parent,
+ const string& name,
+ Queue::shared_ptr queue,
+ bool ack,
+ bool acquire,
+ bool exclusive,
+ const string& tag,
+ const string& resumeId,
+ uint64_t resumeTtl,
+ const framing::FieldTable& arguments
+) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag,
+ resumeId, resumeTtl, arguments),
+ events(new Queue(mask(name))),
+ consumer(new DelegatingConsumer(*this))
+{
+ stringstream ss;
+ ss << "HA: Primary: " << getQueue()->getName() << " at "
+ << parent->getSession().getConnection().getUrl() << ": ";
+ logPrefix = ss.str();
+
+ // FIXME aconway 2011-12-09: Failover optimization removed.
+ // There was code here to re-use messages already on the backup
+ // during fail-over. This optimization was removed to simplify
+ // the logic till we get the basic replication stable, it
+ // can be re-introduced later. Last revision with the optimization:
+ // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
+
+ QPID_LOG(debug, logPrefix << "Created backup subscription " << getName());
+
+ // FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0
+ // so we will start consuming from the lowest numbered message.
+ // This is incorrect if the sequence number wraps around, but
+ // this is what all consumers currently do.
+}
+
+// Message is delivered in the subscription's connection thread.
+bool ReplicatingSubscription::deliver(QueuedMessage& m) {
+ // Add position events for the subscribed queue, not for the internal event queue.
+ if (m.queue && m.queue == getQueue().get()) {
+ sys::Mutex::ScopedLock l(lock);
+ assert(position == m.position);
+ // m.position is the position of the newly enqueued m on the local queue.
+ // backupPosition is latest position on the backup queue (before enqueueing m.)
+ assert(m.position > backupPosition);
+ if (m.position - backupPosition > 1) {
+ // Position has advanced because of messages dequeued ahead of us.
+ SequenceNumber send(m.position);
+ --send; // Send the position before m was enqueued.
+ sendPositionEvent(send, l);
+ }
+ backupPosition = m.position;
+ QPID_LOG(trace, logPrefix << "Replicating message " << m.position);
+ }
+ return ConsumerImpl::deliver(m);
+}
+
+ReplicatingSubscription::~ReplicatingSubscription() {}
+
+
+// INVARIANT: delayed contains msg <=> we have outstanding startCompletion on msg
+
+// Mark a message completed. May be called by acknowledge or dequeued
+void ReplicatingSubscription::complete(
+ const QueuedMessage& qm, const sys::Mutex::ScopedLock&)
+{
+ // Handle completions for the subscribed queue, not the internal event queue.
+ if (qm.queue && qm.queue == getQueue().get()) {
+ QPID_LOG(trace, logPrefix << "Completed message " << qm.position);
+ Delayed::iterator i= delayed.find(qm.position);
+ // The same message can be completed twice, by acknowledged and
+ // dequeued, remove it from the set so it only gets completed
+ // once.
+ if (i != delayed.end()) {
+ assert(i->second.payload == qm.payload);
+ qm.payload->getIngressCompletion().finishCompleter();
+ delayed.erase(i);
+ }
+ }
+}
+
+// Called before we get notified of the message being available and
+// under the message lock in the queue. Called in arbitrary connection thread.
+void ReplicatingSubscription::enqueued(const QueuedMessage& qm) {
+ sys::Mutex::ScopedLock l(lock);
+ // Delay completion
+ QPID_LOG(trace, logPrefix << "Delaying completion of message " << qm.position);
+ qm.payload->getIngressCompletion().startCompleter();
+ assert(delayed.find(qm.position) == delayed.end());
+ delayed[qm.position] = qm;
+}
+
+
+// Function to complete a delayed message, called by cancel()
+void ReplicatingSubscription::cancelComplete(
+ const Delayed::value_type& v, const sys::Mutex::ScopedLock&)
+{
+ QPID_LOG(trace, logPrefix << "Cancel completed message " << v.second.position);
+ v.second.payload->getIngressCompletion().finishCompleter();
+}
+
+// Called in the subscription's connection thread.
+void ReplicatingSubscription::cancel()
+{
+ getQueue()->removeObserver(
+ boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
+ {
+ sys::Mutex::ScopedLock l(lock);
+ QPID_LOG(debug, logPrefix <<"Cancelled backup subscription " << getName());
+ for_each(delayed.begin(), delayed.end(),
+ boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l)));
+ delayed.clear();
+ }
+ ConsumerImpl::cancel();
+}
+
+// Called on primary in the backups IO thread.
+void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) {
+ sys::Mutex::ScopedLock l(lock);
+ // Finish completion of message, it has been acknowledged by the backup.
+ complete(msg, l);
+}
+
+// Hide the "queue deleted" error for a ReplicatingSubscription when a
+// queue is deleted, this is normal and not an error.
+bool ReplicatingSubscription::hideDeletedError() { return true; }
+
+// Called with lock held. Called in subscription's connection thread.
+void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l)
+{
+ QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
+ string buf(dequeues.encodedSize(),'\0');
+ framing::Buffer buffer(&buf[0], buf.size());
+ dequeues.encode(buffer);
+ dequeues.clear();
+ buffer.reset();
+ sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l);
+}
+
+// Called after the message has been removed from the deque and under
+// the messageLock in the queue. Called in arbitrary connection threads.
+void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
+{
+ {
+ sys::Mutex::ScopedLock l(lock);
+ QPID_LOG(trace, logPrefix << "Dequeued message " << qm.position);
+ dequeues.add(qm.position);
+ // If we have not yet sent this message to the backup, then
+ // complete it now as it will never be accepted.
+ if (qm.position > position) complete(qm, l);
+ }
+ notify(); // Ensure a call to doDispatch
+}
+
+// Called with lock held. Called in subscription's connection thread.
+void ReplicatingSubscription::sendPositionEvent(
+ SequenceNumber position, const sys::Mutex::ScopedLock&l )
+{
+ QPID_LOG(trace, logPrefix << "Sending position " << position
+ << ", was " << backupPosition);
+ string buf(backupPosition.encodedSize(),'\0');
+ framing::Buffer buffer(&buf[0], buf.size());
+ position.encode(buffer);
+ buffer.reset();
+ sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer, l);
+}
+
+void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer,
+ const sys::Mutex::ScopedLock&)
+{
+ //generate event message
+ boost::intrusive_ptr<Message> event = new Message();
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0)));
+ AMQFrame header((AMQHeaderBody()));
+ AMQFrame content((AMQContentBody()));
+ content.castBody<AMQContentBody>()->decode(buffer, buffer.getSize());
+ header.setBof(false);
+ header.setEof(false);
+ header.setBos(true);
+ header.setEos(true);
+ content.setBof(false);
+ content.setEof(true);
+ content.setBos(true);
+ content.setEos(true);
+ event->getFrames().append(method);
+ event->getFrames().append(header);
+ event->getFrames().append(content);
+
+ DeliveryProperties* props = event->getFrames().getHeaders()->get<DeliveryProperties>(true);
+ props->setRoutingKey(key);
+ // Send the event using the events queue. Consumer is a
+ // DelegatingConsumer that delegates to *this for everything but
+ // has an independnet position. We put an event on events and
+ // dispatch it through ourselves to send it in line with the
+ // normal browsing messages.
+ events->deliver(event);
+ events->dispatch(consumer);
+}
+
+
+// Called in subscription's connection thread.
+bool ReplicatingSubscription::doDispatch()
+{
+ {
+ sys::Mutex::ScopedLock l(lock);
+ if (!dequeues.empty()) sendDequeueEvent(l);
+ }
+ return ConsumerImpl::doDispatch();
+}
+
+ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {}
+ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {}
+bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) { return delegate.deliver(m); }
+void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); }
+bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); }
+bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); }
+OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); }
+
+}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
new file mode 100644
index 0000000000..fa2093ac61
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -0,0 +1,132 @@
+#ifndef QPID_BROKER_REPLICATINGSUBSCRIPTION_H
+#define QPID_BROKER_REPLICATINGSUBSCRIPTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "QueueReplicator.h" // For DEQUEUE_EVENT_KEY
+#include "qpid/broker/SemanticState.h"
+#include "qpid/broker/QueueObserver.h"
+#include "qpid/broker/ConsumerFactory.h"
+#include <iosfwd>
+
+namespace qpid {
+
+namespace broker {
+class Message;
+class Queue;
+class QueuedMessage;
+class OwnershipToken;
+}
+
+namespace framing {
+class Buffer;
+}
+
+namespace ha {
+
+/**
+ * A susbcription that represents a backup replicating a queue.
+ *
+ * Runs on the primary. Delays completion of messages till the backup
+ * has acknowledged, informs backup of locally dequeued messages.
+ *
+ * THREAD SAFE: Used as a consumer in subscription's connection
+ * thread, and as a QueueObserver in arbitrary connection threads.
+ */
+class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
+ public broker::QueueObserver
+{
+ public:
+ struct Factory : public broker::ConsumerFactory {
+ boost::shared_ptr<broker::SemanticState::ConsumerImpl> create(
+ broker::SemanticState* parent,
+ const std::string& name, boost::shared_ptr<broker::Queue> ,
+ bool ack, bool acquire, bool exclusive, const std::string& tag,
+ const std::string& resumeId, uint64_t resumeTtl,
+ const framing::FieldTable& arguments);
+ };
+
+ // Argument names for consume command.
+ static const std::string QPID_REPLICATING_SUBSCRIPTION;
+
+ ReplicatingSubscription(broker::SemanticState* parent,
+ const std::string& name, boost::shared_ptr<broker::Queue> ,
+ bool ack, bool acquire, bool exclusive, const std::string& tag,
+ const std::string& resumeId, uint64_t resumeTtl,
+ const framing::FieldTable& arguments);
+
+ ~ReplicatingSubscription();
+
+ // QueueObserver overrides.
+ bool deliver(broker::QueuedMessage& msg);
+ void enqueued(const broker::QueuedMessage&);
+ void dequeued(const broker::QueuedMessage&);
+ void acquired(const broker::QueuedMessage&) {}
+ void requeued(const broker::QueuedMessage&) {}
+
+ // Consumer overrides.
+ void cancel();
+ void acknowledged(const broker::QueuedMessage&);
+
+ bool hideDeletedError();
+
+ protected:
+ bool doDispatch();
+ private:
+ typedef std::map<framing::SequenceNumber, broker::QueuedMessage> Delayed;
+ std::string logPrefix;
+ boost::shared_ptr<broker::Queue> events;
+ boost::shared_ptr<broker::Consumer> consumer;
+ Delayed delayed;
+ framing::SequenceSet dequeues;
+ framing::SequenceNumber backupPosition;
+
+ void complete(const broker::QueuedMessage&, const sys::Mutex::ScopedLock&);
+ void cancelComplete(const Delayed::value_type& v, const sys::Mutex::ScopedLock&);
+ void sendDequeueEvent(const sys::Mutex::ScopedLock&);
+ void sendPositionEvent(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
+ void sendEvent(const std::string& key, framing::Buffer&,
+ const sys::Mutex::ScopedLock&);
+
+ class DelegatingConsumer : public Consumer
+ {
+ public:
+ DelegatingConsumer(ReplicatingSubscription&);
+ ~DelegatingConsumer();
+ bool deliver(broker::QueuedMessage& msg);
+ void notify();
+ bool filter(boost::intrusive_ptr<broker::Message>);
+ bool accept(boost::intrusive_ptr<broker::Message>);
+ void cancel() {}
+ void acknowledged(const broker::QueuedMessage&) {}
+
+ broker::OwnershipToken* getSession();
+
+ private:
+ ReplicatingSubscription& delegate;
+ };
+};
+
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_REPLICATINGSUBSCRIPTION_H*/
diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h
new file mode 100644
index 0000000000..049c873b9f
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Settings.h
@@ -0,0 +1,45 @@
+#ifndef QPID_HA_SETTINGS_H
+#define QPID_HA_SETTINGS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+
+namespace qpid {
+namespace ha {
+
+/**
+ * Configurable settings for HA.
+ */
+class Settings
+{
+ public:
+ Settings() : enabled(false) {}
+ bool enabled;
+ std::string clientUrl;
+ std::string brokerUrl;
+ std::string username, password, mechanism;
+ private:
+};
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_SETTINGS_H*/
diff --git a/qpid/cpp/src/qpid/ha/management-schema.xml b/qpid/cpp/src/qpid/ha/management-schema.xml
new file mode 100644
index 0000000000..fe4a14d111
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/management-schema.xml
@@ -0,0 +1,38 @@
+<schema package="org.apache.qpid.ha">
+
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+ <!-- Monitor and control HA status of a broker. -->
+ <class name="HaBroker">
+ <property name="name" type="sstr" access="RC" index="y" desc="Primary Key"/>
+ <property name="status" type="sstr" desc="HA status: primary or backup"/>
+ <property name="clientAddresses" type="sstr" desc="List of addresses used by clients to connect to the HA cluster."/>
+ <property name="brokerAddresses" type="sstr" desc="List of addresses used by HA brokers to connect to each other."/>
+
+ <method name="promote" desc="Promote a backup broker to primary."/>
+ <method name="setClientAddresses" desc="Set HA client addresses">
+ <arg name="clientAddresses" type="sstr" dir="I"/>
+ </method>
+ <method name="setBrokerAddresses" desc="Set HA broker addresses">
+ <arg name="brokerAddresses" type="sstr" dir="I"/>
+ </method>
+ </class>
+
+</schema>
diff --git a/qpid/cpp/src/qpid/log/Logger.cpp b/qpid/cpp/src/qpid/log/Logger.cpp
index 1600822142..92578b8357 100644
--- a/qpid/cpp/src/qpid/log/Logger.cpp
+++ b/qpid/cpp/src/qpid/log/Logger.cpp
@@ -83,7 +83,7 @@ void Logger::log(const Statement& s, const std::string& msg) {
if (flags&HIRES)
qpid::sys::outputHiresNow(os);
else
- qpid::sys::outputFormattedNow(os);
+ qpid::sys::outputFormattedNow(os);
}
if (flags&LEVEL)
os << LevelTraits::name(s.level) << " ";
diff --git a/qpid/cpp/src/qpid/log/Options.cpp b/qpid/cpp/src/qpid/log/Options.cpp
index 0001d00bdf..1259244297 100644
--- a/qpid/cpp/src/qpid/log/Options.cpp
+++ b/qpid/cpp/src/qpid/log/Options.cpp
@@ -66,7 +66,7 @@ Options::Options(const std::string& argv0_, const std::string& name_) :
("log-source", optValue(source,"yes|no"), "Include source file:line in log messages")
("log-thread", optValue(thread,"yes|no"), "Include thread ID in log messages")
("log-function", optValue(function,"yes|no"), "Include function signature in log messages")
- ("log-hires-timestamp", optValue(hiresTs,"yes|no"), "Use unformatted hi-res timestamp in log messages")
+ ("log-hires-timestamp", optValue(hiresTs,"yes|no"), "Use hi-resolution timestamps in log messages")
("log-prefix", optValue(prefix,"STRING"), "Prefix to append to all log messages")
;
add(*sinkOptions);
diff --git a/qpid/cpp/src/qpid/sys/posix/Time.cpp b/qpid/cpp/src/qpid/sys/posix/Time.cpp
index dee393f4bf..272c6c21a5 100644
--- a/qpid/cpp/src/qpid/sys/posix/Time.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/Time.cpp
@@ -114,7 +114,9 @@ void outputFormattedNow(std::ostream& o) {
void outputHiresNow(std::ostream& o) {
::timespec time;
::clock_gettime(CLOCK_REALTIME, &time);
- o << time.tv_sec << "." << std::setw(9) << std::setfill('0') << time.tv_nsec << "s ";
+ ::time_t seconds = time.tv_sec;
+ outputFormattedTime(o, &seconds);
+ o << "." << std::setw(9) << std::setfill('0') << time.tv_nsec << " ";
}
void sleep(int secs) {
diff --git a/qpid/cpp/src/qpid/types/Variant.cpp b/qpid/cpp/src/qpid/types/Variant.cpp
index f563d5de5b..6af06ede5d 100644
--- a/qpid/cpp/src/qpid/types/Variant.cpp
+++ b/qpid/cpp/src/qpid/types/Variant.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -88,7 +88,7 @@ class VariantImpl
bool isEqualTo(VariantImpl&) const;
bool isEquivalentTo(VariantImpl&) const;
- static VariantImpl* create(const Variant&);
+ static VariantImpl* create(const Variant&);
private:
const VariantType type;
union {
@@ -150,7 +150,7 @@ VariantImpl::VariantImpl(const Variant::Map& m) : type(VAR_MAP) { value.v = new
VariantImpl::VariantImpl(const Variant::List& l) : type(VAR_LIST) { value.v = new Variant::List(l); }
VariantImpl::VariantImpl(const Uuid& u) : type(VAR_UUID) { value.v = new Uuid(u); }
-VariantImpl::~VariantImpl() {
+VariantImpl::~VariantImpl() {
switch (type) {
case VAR_STRING:
delete reinterpret_cast<std::string*>(value.v);
@@ -173,7 +173,7 @@ VariantType VariantImpl::getType() const { return type; }
namespace {
-bool same_char(char a, char b)
+bool same_char(char a, char b)
{
return toupper(a) == toupper(b);
}
@@ -191,7 +191,7 @@ bool toBool(const std::string& s)
if (caseInsensitiveMatch(s, TRUE)) return true;
if (caseInsensitiveMatch(s, FALSE)) return false;
try { return boost::lexical_cast<int>(s); } catch(const boost::bad_lexical_cast&) {}
- throw InvalidConversion(QPID_MSG("Cannot convert " << s << " to bool"));
+ throw InvalidConversion(QPID_MSG("Cannot convert " << s << " to bool"));
}
template <class T> std::string toString(const T& t)
@@ -531,9 +531,9 @@ bool VariantImpl::isEqualTo(VariantImpl& other) const
case VAR_INT64: return value.i64 == other.value.i64;
case VAR_DOUBLE: return value.d == other.value.d;
case VAR_FLOAT: return value.f == other.value.f;
- case VAR_STRING: return *reinterpret_cast<std::string*>(value.v)
+ case VAR_STRING: return *reinterpret_cast<std::string*>(value.v)
== *reinterpret_cast<std::string*>(other.value.v);
- case VAR_UUID: return *reinterpret_cast<Uuid*>(value.v)
+ case VAR_UUID: return *reinterpret_cast<Uuid*>(value.v)
== *reinterpret_cast<Uuid*>(other.value.v);
case VAR_LIST: return equal(asList(), other.asList());
case VAR_MAP: return equal(asMap(), other.asMap());
@@ -616,7 +616,25 @@ std::string getTypeName(VariantType type)
return "<unknown>";//should never happen
}
-VariantImpl* VariantImpl::create(const Variant& v)
+bool isIntegerType(VariantType type)
+{
+ switch (type) {
+ case VAR_BOOL:
+ case VAR_UINT8:
+ case VAR_UINT16:
+ case VAR_UINT32:
+ case VAR_UINT64:
+ case VAR_INT8:
+ case VAR_INT16:
+ case VAR_INT32:
+ case VAR_INT64:
+ return true;
+ default:
+ return false;
+ }
+}
+
+VariantImpl* VariantImpl::create(const Variant& v)
{
switch (v.getType()) {
case VAR_BOOL: return new VariantImpl(v.asBool());
@@ -815,9 +833,9 @@ const Variant::List& Variant::asList() const { if (!impl) throw InvalidConversio
Variant::List& Variant::asList() { if (!impl) throw InvalidConversion("Can't convert VOID to LIST"); return impl->asList(); }
const std::string& Variant::getString() const { if (!impl) throw InvalidConversion("Can't convert VOID to STRING"); return impl->getString(); }
std::string& Variant::getString() { if (!impl) throw InvalidConversion("Can't convert VOID to STRING"); return impl->getString(); }
-void Variant::setEncoding(const std::string& s) {
+void Variant::setEncoding(const std::string& s) {
if (!impl) impl = new VariantImpl();
- impl->setEncoding(s);
+ impl->setEncoding(s);
}
const std::string& Variant::getEncoding() const { return impl ? impl->getEncoding() : EMPTY; }
@@ -873,7 +891,7 @@ std::ostream& operator<<(std::ostream& out, const Variant& value)
out << value.asString();
break;
}
- return out;
+ return out;
}
bool operator==(const Variant& a, const Variant& b)