summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-05-12 17:04:07 +0000
committerGordon Sim <gsim@apache.org>2008-05-12 17:04:07 +0000
commit0655ff5aceb9d53eb256a05d7beb55b1c803c8de (patch)
treed478a719d5a5d030c3e228d298c6be8378d4fe44 /cpp
parent4a1605e6b357c251398aca281b90452c1cbd5ab2 (diff)
downloadqpid-python-0655ff5aceb9d53eb256a05d7beb55b1c803c8de.tar.gz
QPID-1050: Patch from Ted Ross:
1) Durability for federation links (broker-to-broker connections) 2) Improved handling of federation links: a) Links can be created even if the remote broker is not reachable b) If links are lost, re-establishment will occur using an exponential back-off algorithm 3) Durability of exchanges is now viewable through management 4) ManagementAgent API has been moved to an interface class to reduce coupling between the broker and manageable plug-ins. 5) General configuration storage capability has been added to the store/recover interface. This is used for federation links. 6) Management object-ids for durable objects are now themselves durable. (Note: some refactoring needed around ProtocolAccess needed to try and reduce dependencies) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@655563 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/managementgen/templates/Class.h1
-rw-r--r--cpp/src/Makefile.am10
-rw-r--r--cpp/src/qpid/amqp_0_10/Connection.cpp17
-rw-r--r--cpp/src/qpid/amqp_0_10/Connection.h5
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp43
-rw-r--r--cpp/src/qpid/broker/Bridge.h19
-rw-r--r--cpp/src/qpid/broker/Broker.cpp42
-rw-r--r--cpp/src/qpid/broker/Broker.h19
-rw-r--r--cpp/src/qpid/broker/Connection.cpp130
-rw-r--r--cpp/src/qpid/broker/Connection.h4
-rw-r--r--cpp/src/qpid/broker/ConnectionFactory.cpp4
-rw-r--r--cpp/src/qpid/broker/ConnectionFactory.h3
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.cpp11
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp17
-rw-r--r--cpp/src/qpid/broker/Exchange.h2
-rw-r--r--cpp/src/qpid/broker/Link.cpp281
-rw-r--r--cpp/src/qpid/broker/Link.h115
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.cpp102
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.h87
-rw-r--r--cpp/src/qpid/broker/MessageStore.h11
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.cpp10
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.h2
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.cpp15
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.h3
-rw-r--r--cpp/src/qpid/broker/PersistableConfig.h45
-rw-r--r--cpp/src/qpid/broker/Queue.cpp2
-rw-r--r--cpp/src/qpid/broker/RecoverableConfig.h45
-rw-r--r--cpp/src/qpid/broker/RecoveryManager.h3
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.cpp34
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.h6
-rw-r--r--cpp/src/qpid/broker/System.h3
-rw-r--r--cpp/src/qpid/management/Manageable.cpp16
-rw-r--r--cpp/src/qpid/management/Manageable.h13
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp695
-rw-r--r--cpp/src/qpid/management/ManagementAgent.h156
-rw-r--r--cpp/src/qpid/management/ManagementBroker.cpp746
-rw-r--r--cpp/src/qpid/management/ManagementBroker.h203
-rw-r--r--cpp/src/qpid/management/ManagementExchange.cpp2
-rw-r--r--cpp/src/qpid/management/ManagementExchange.h6
-rw-r--r--cpp/src/qpid/management/ManagementObject.h1
-rw-r--r--cpp/src/qpid/sys/AsynchIOHandler.cpp7
-rw-r--r--cpp/src/qpid/sys/AsynchIOHandler.h5
-rw-r--r--cpp/src/qpid/sys/ConnectionCodec.h5
-rw-r--r--cpp/src/qpid/sys/ProtocolAccess.h65
-rw-r--r--cpp/src/qpid/sys/ProtocolFactory.h5
-rw-r--r--cpp/src/qpid/sys/TCPIOPlugin.cpp54
-rw-r--r--cpp/src/qpid/sys/posix/AsynchIO.cpp3
-rwxr-xr-xcpp/src/tests/federation.py40
48 files changed, 2013 insertions, 1100 deletions
diff --git a/cpp/managementgen/templates/Class.h b/cpp/managementgen/templates/Class.h
index d95a06479e..628a70d2d9 100644
--- a/cpp/managementgen/templates/Class.h
+++ b/cpp/managementgen/templates/Class.h
@@ -24,6 +24,7 @@
/*MGEN:Root.Disclaimer*/
#include "qpid/management/ManagementObject.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/framing/Uuid.h"
namespace qpid {
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 4a49c83b65..3183aefd6c 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -258,6 +258,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/HeadersExchange.cpp \
qpid/broker/IncomingExecutionContext.cpp \
qpid/broker/IncompleteMessageList.cpp \
+ qpid/broker/Link.cpp \
+ qpid/broker/LinkRegistry.cpp \
qpid/broker/Message.cpp \
qpid/broker/MessageAdapter.cpp \
qpid/broker/MessageBuilder.cpp \
@@ -291,7 +293,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/TxPublish.cpp \
qpid/broker/Vhost.cpp \
qpid/management/Manageable.cpp \
- qpid/management/ManagementAgent.cpp \
+ qpid/management/ManagementBroker.cpp \
qpid/management/ManagementExchange.cpp \
qpid/management/ManagementObject.cpp \
qpid/sys/TCPIOPlugin.cpp
@@ -382,6 +384,8 @@ nobase_include_HEADERS = \
qpid/broker/HeadersExchange.h \
qpid/broker/IncomingExecutionContext.h \
qpid/broker/IncompleteMessageList.h \
+ qpid/broker/Link.h \
+ qpid/broker/LinkRegistry.h \
qpid/broker/Message.h \
qpid/broker/MessageAdapter.h \
qpid/broker/MessageBuilder.h \
@@ -391,6 +395,7 @@ nobase_include_HEADERS = \
qpid/broker/NameGenerator.h \
qpid/broker/NullMessageStore.h \
qpid/broker/Persistable.h \
+ qpid/broker/PersistableConfig.h \
qpid/broker/PersistableExchange.h \
qpid/broker/PersistableMessage.h \
qpid/broker/PersistableQueue.h \
@@ -398,6 +403,7 @@ nobase_include_HEADERS = \
qpid/broker/QueueBindings.h \
qpid/broker/QueuePolicy.h \
qpid/broker/QueueRegistry.h \
+ qpid/broker/RecoverableConfig.h \
qpid/broker/RecoverableExchange.h \
qpid/broker/RecoverableMessage.h \
qpid/broker/RecoverableQueue.h \
@@ -506,6 +512,7 @@ nobase_include_HEADERS = \
qpid/management/Args.h \
qpid/management/Manageable.h \
qpid/management/ManagementAgent.h \
+ qpid/management/ManagementBroker.h \
qpid/management/ManagementExchange.h \
qpid/management/ManagementObject.h \
qpid/sys/AggregateOutput.h \
@@ -527,6 +534,7 @@ nobase_include_HEADERS = \
qpid/sys/OutputControl.h \
qpid/sys/OutputTask.h \
qpid/sys/Poller.h \
+ qpid/sys/ProtocolAccess.h \
qpid/sys/ProtocolFactory.h \
qpid/sys/Runnable.h \
qpid/sys/ScopedIncrement.h \
diff --git a/cpp/src/qpid/amqp_0_10/Connection.cpp b/cpp/src/qpid/amqp_0_10/Connection.cpp
index 03e553f180..9e860ab653 100644
--- a/cpp/src/qpid/amqp_0_10/Connection.cpp
+++ b/cpp/src/qpid/amqp_0_10/Connection.cpp
@@ -19,6 +19,7 @@
*
*/
#include "Connection.h"
+#include "qpid/sys/ProtocolAccess.h"
#include "qpid/log/Statement.h"
#include "qpid/amqp_0_10/exceptions.h"
@@ -27,9 +28,13 @@ namespace amqp_0_10 {
using sys::Mutex;
-Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient)
- : frameQueueClosed(false), output(o), connection(this, broker, id, _isClient),
- identifier(id), initialized(false), isClient(_isClient) {}
+Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient, sys::ProtocolAccess* a)
+ : frameQueueClosed(false), output(o), connection(new broker::Connection(this, broker, id, _isClient)),
+ identifier(id), initialized(false), isClient(_isClient)
+{
+ if (a != 0)
+ a->callConnCb(connection);
+}
size_t Connection::decode(const char* buffer, size_t size) {
framing::Buffer in(const_cast<char*>(buffer), size);
@@ -45,13 +50,13 @@ size_t Connection::decode(const char* buffer, size_t size) {
framing::AMQFrame frame;
while(frame.decode(in)) {
QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
- connection.received(frame);
+ connection->received(frame);
}
return in.getPosition();
}
bool Connection::canEncode() {
- if (!frameQueueClosed) connection.doOutput();
+ if (!frameQueueClosed) connection->doOutput();
Mutex::ScopedLock l(frameQueueLock);
return (!isClient && !initialized) || !frameQueue.empty();
}
@@ -90,7 +95,7 @@ void Connection::close() {
}
void Connection::closed() {
- connection.closed();
+ connection->closed();
}
void Connection::send(framing::AMQFrame& f) {
diff --git a/cpp/src/qpid/amqp_0_10/Connection.h b/cpp/src/qpid/amqp_0_10/Connection.h
index 4369d401bd..ea8d183e01 100644
--- a/cpp/src/qpid/amqp_0_10/Connection.h
+++ b/cpp/src/qpid/amqp_0_10/Connection.h
@@ -29,6 +29,7 @@
#include <queue>
namespace qpid {
+namespace sys { class ProtocolAccess; }
namespace broker { class Broker; }
namespace amqp_0_10 {
@@ -40,13 +41,13 @@ class Connection : public sys::ConnectionCodec,
bool frameQueueClosed;
mutable sys::Mutex frameQueueLock;
sys::OutputControl& output;
- broker::Connection connection; // FIXME aconway 2008-03-18:
+ broker::Connection::shared_ptr connection; // FIXME aconway 2008-03-18:
std::string identifier;
bool initialized;
bool isClient;
public:
- Connection(sys::OutputControl&, broker::Broker&, const std::string& id, bool isClient = false);
+ Connection(sys::OutputControl&, broker::Broker&, const std::string& id, bool isClient = false, sys::ProtocolAccess* a =0);
size_t decode(const char* buffer, size_t size);
size_t encode(const char* buffer, size_t size);
bool isClosed() const;
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index 456eba7f9d..a8e7b3c368 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -31,10 +31,12 @@ using qpid::framing::Uuid;
namespace qpid {
namespace broker {
-Bridge::Bridge(framing::ChannelId id, ConnectionState& c, CancellationListener l, const management::ArgsLinkBridge& _args) :
- args(_args), channel(id, &(c.getOutput())), peer(channel),
- mgmtObject(new management::Bridge(this, &c, id, args.i_src, args.i_dest, args.i_key, args.i_src_is_queue, args.i_src_is_local)),
- connection(c), listener(l), name(Uuid(true).str())
+Bridge::Bridge(Link* link, framing::ChannelId _id, CancellationListener l,
+ const management::ArgsLinkBridge& _args) :
+ id(_id), args(_args),
+ mgmtObject(new management::Bridge(this, link, id, args.i_src, args.i_dest,
+ args.i_key, args.i_src_is_queue, args.i_src_is_local)),
+ listener(l), name(Uuid(true).str())
{
management::ManagementAgent::getAgent()->addObject(mgmtObject);
}
@@ -44,18 +46,21 @@ Bridge::~Bridge()
mgmtObject->resourceDestroy();
}
-void Bridge::create()
+void Bridge::create(ConnectionState& c)
{
- framing::AMQP_ServerProxy::Session session(channel);
- session.attach(name, false);
+ channelHandler.reset(new framing::ChannelHandler(id, &(c.getOutput())));
+ session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
+ peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
+
+ session->attach(name, false);
if (args.i_src_is_local) {
//TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received()
} else {
if (args.i_src_is_queue) {
- peer.getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable());
- peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+ peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable());
+ peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
} else {
string queue = "bridge_queue_";
queue += Uuid(true).str();
@@ -66,22 +71,22 @@ void Bridge::create()
if (args.i_excludes.size()) {
queueSettings.setString("qpid.trace.exclude", args.i_excludes);
}
+
bool durable = false;//should this be an arg, or would be use src_is_queue for durable queues?
bool autoDelete = !durable;//auto delete transient queues?
- peer.getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings);
- peer.getExchange().bind(queue, args.i_src, args.i_key, FieldTable());
- peer.getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable());
- peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+ peer->getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings);
+ peer->getExchange().bind(queue, args.i_src, args.i_key, FieldTable());
+ peer->getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable());
+ peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
}
}
-
}
void Bridge::cancel()
{
- peer.getMessage().cancel(args.i_dest);
- peer.getSession().detach(name);
+ peer->getMessage().cancel(args.i_dest);
+ peer->getSession().detach(name);
}
management::ManagementObject::shared_ptr Bridge::GetManagementObject (void) const
@@ -94,8 +99,6 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId, man
if (methodId == management::Bridge::METHOD_CLOSE) {
//notify that we are closed
listener(this);
- //request time on the connections io thread
- connection.getOutput().activateOutput();
return management::Manageable::STATUS_OK;
} else {
return management::Manageable::STATUS_UNKNOWN_METHOD;
diff --git a/cpp/src/qpid/broker/Bridge.h b/cpp/src/qpid/broker/Bridge.h
index 943050e244..15efcc6482 100644
--- a/cpp/src/qpid/broker/Bridge.h
+++ b/cpp/src/qpid/broker/Bridge.h
@@ -28,33 +28,36 @@
#include "qpid/management/Bridge.h"
#include <boost/function.hpp>
+#include <memory>
namespace qpid {
namespace broker {
class ConnectionState;
+class Link;
class Bridge : public management::Manageable
{
public:
typedef boost::function<void(Bridge*)> CancellationListener;
- Bridge(framing::ChannelId id, ConnectionState& c, CancellationListener l,
- const management::ArgsLinkBridge& args);
+ Bridge(Link* link, framing::ChannelId id, CancellationListener l, const management::ArgsLinkBridge& args);
~Bridge();
- void create();
+ void create(ConnectionState& c);
void cancel();
management::ManagementObject::shared_ptr GetManagementObject() const;
management::Manageable::status_t ManagementMethod(uint32_t methodId, management::Args& args);
private:
- management::ArgsLinkBridge args;
- framing::ChannelHandler channel;
- framing::AMQP_ServerProxy peer;
- management::Bridge::shared_ptr mgmtObject;
- ConnectionState& connection;
+ std::auto_ptr<framing::ChannelHandler> channelHandler;
+ std::auto_ptr<framing::AMQP_ServerProxy::Session> session;
+ std::auto_ptr<framing::AMQP_ServerProxy> peer;
+
+ framing::ChannelId id;
+ management::ArgsLinkBridge args;
+ management::Bridge::shared_ptr mgmtObject;
CancellationListener listener;
std::string name;
};
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index e9b1db0413..d80c13f12a 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -28,6 +28,7 @@
#include "NullMessageStore.h"
#include "RecoveryManagerImpl.h"
#include "TopicExchange.h"
+#include "Link.h"
#include "qpid/management/PackageQpid.h"
#include "qpid/management/ManagementExchange.h"
#include "qpid/management/ArgsBrokerEcho.h"
@@ -60,7 +61,7 @@ using qpid::sys::Dispatcher;
using qpid::sys::Thread;
using qpid::framing::FrameHandler;
using qpid::framing::ChannelId;
-using qpid::management::ManagementAgent;
+using qpid::management::ManagementBroker;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
@@ -129,15 +130,16 @@ Broker::Broker(const Broker::Options& conf) :
config(conf),
store(0),
dataDir(conf.noDataDir ? std::string () : conf.dataDir),
+ links(this),
factory(*this),
sessionManager(conf.ack)
{
if(conf.enableMgmt){
QPID_LOG(info, "Management enabled");
- ManagementAgent::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (),
- conf.mgmtPubInterval);
- managementAgent = ManagementAgent::getAgent ();
- managementAgent->setInterval (conf.mgmtPubInterval);
+ ManagementBroker::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (),
+ conf.mgmtPubInterval, this);
+ managementAgent = management::ManagementAgent::getAgent ();
+ ((ManagementBroker*) managementAgent.get())->setInterval (conf.mgmtPubInterval);
qpid::management::PackageQpid packageInitializer (managementAgent);
System* system = new System (dataDir.isEnabled () ? dataDir.getPath () : string ());
@@ -163,6 +165,7 @@ Broker::Broker(const Broker::Options& conf) :
queues.setParent (vhost);
exchanges.setParent (vhost);
+ links.setParent (vhost);
}
// Early-Initialize plugins
@@ -178,11 +181,12 @@ Broker::Broker(const Broker::Options& conf) :
queues.setStore (store);
dtxManager.setStore (store);
+ links.setStore (store);
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
if (store != 0) {
- RecoveryManagerImpl recoverer(queues, exchanges, dtxManager,
+ RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager,
conf.stagingThreshold);
store->recover(recoverer);
}
@@ -197,8 +201,9 @@ Broker::Broker(const Broker::Options& conf) :
exchanges.declare(qpid_management, ManagementExchange::typeName);
Exchange::shared_ptr mExchange = exchanges.get (qpid_management);
Exchange::shared_ptr dExchange = exchanges.get (amq_direct);
- managementAgent->setExchange (mExchange, dExchange);
- dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent (managementAgent);
+ ((ManagementBroker*) managementAgent.get())->setExchange (mExchange, dExchange);
+ dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent
+ ((ManagementBroker*) managementAgent.get());
}
else
QPID_LOG(info, "Management not enabled");
@@ -285,7 +290,7 @@ void Broker::shutdown() {
Broker::~Broker() {
shutdown();
- ManagementAgent::shutdown ();
+ ManagementBroker::shutdown ();
delete store;
if (config.auth) {
#if HAVE_SASL
@@ -319,7 +324,15 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
case management::Broker::METHOD_CONNECT : {
management::ArgsBrokerConnect& hp=
dynamic_cast<management::ArgsBrokerConnect&>(args);
- connect(hp.i_host, hp.i_port);
+
+ if (hp.i_useSsl)
+ return Manageable::STATUS_FEATURE_NOT_IMPLEMENTED;
+
+ std::pair<Link::shared_ptr, bool> response =
+ links.declare (hp.i_host, hp.i_port, hp.i_useSsl, hp.i_durable);
+ if (hp.i_durable && response.second)
+ store->create(*response.first);
+
status = Manageable::STATUS_OK;
break;
}
@@ -355,10 +368,11 @@ void Broker::accept() {
// TODO: How to chose the protocolFactory to use for the connection
void Broker::connect(
- const std::string& host, uint16_t port,
- sys::ConnectionCodec::Factory* f)
+ const std::string& host, uint16_t port, bool /*useSsl*/,
+ sys::ConnectionCodec::Factory* f,
+ sys::ProtocolAccess* access)
{
- getProtocolFactory()->connect(poller, host, port, f ? f : &factory);
+ getProtocolFactory()->connect(poller, host, port, f ? f : &factory, access);
}
void Broker::connect(
@@ -366,7 +380,7 @@ void Broker::connect(
{
url.throwIfEmpty();
TcpAddress addr=boost::get<TcpAddress>(url[0]);
- connect(addr.host, addr.port, f);
+ connect(addr.host, addr.port, false, f, (sys::ProtocolAccess*) 0);
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index e48f3dc23f..a1eaf4f62f 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -29,11 +29,12 @@
#include "ExchangeRegistry.h"
#include "MessageStore.h"
#include "QueueRegistry.h"
+#include "LinkRegistry.h"
#include "SessionManager.h"
#include "Vhost.h"
#include "System.h"
#include "qpid/management/Manageable.h"
-#include "qpid/management/ManagementAgent.h"
+#include "qpid/management/ManagementBroker.h"
#include "qpid/management/Broker.h"
#include "qpid/management/ArgsBrokerConnect.h"
#include "qpid/Options.h"
@@ -43,6 +44,7 @@
#include "qpid/framing/OutputHandler.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/sys/Runnable.h"
+#include "qpid/sys/ProtocolAccess.h"
#include <vector>
@@ -111,6 +113,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
MessageStore& getStore() { return *store; }
QueueRegistry& getQueues() { return queues; }
ExchangeRegistry& getExchanges() { return exchanges; }
+ LinkRegistry& getLinks() { return links; }
uint64_t getStagingThreshold() { return config.stagingThreshold; }
DtxManager& getDtxManager() { return dtxManager; }
DataDir& getDataDir() { return dataDir; }
@@ -130,11 +133,16 @@ class Broker : public sys::Runnable, public Plugin::Target,
void accept();
/** Create a connection to another broker. */
- void connect(const std::string& host, uint16_t port,
- sys::ConnectionCodec::Factory* =0);
+ void connect(const std::string& host, uint16_t port, bool useSsl,
+ sys::ConnectionCodec::Factory* =0,
+ sys::ProtocolAccess* =0);
/** Create a connection to another broker. */
void connect(const Url& url, sys::ConnectionCodec::Factory* =0);
+ // TODO: There isn't a single ProtocolFactory so the use of the following needs to be fixed
+ // For the present just return the first ProtocolFactory registered.
+ boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory() const;
+
private:
boost::shared_ptr<sys::Poller> poller;
Options config;
@@ -144,6 +152,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
QueueRegistry queues;
ExchangeRegistry exchanges;
+ LinkRegistry links;
ConnectionFactory factory;
DtxManager dtxManager;
SessionManager sessionManager;
@@ -152,10 +161,6 @@ class Broker : public sys::Runnable, public Plugin::Target,
Vhost::shared_ptr vhostObject;
System::shared_ptr systemObject;
- // TODO: There isn't a single ProtocolFactory so the use of the following needs to be fixed
- // For the present just return the first ProtocolFactory registered.
- boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory() const;
-
void declareStandardExchange(const std::string& name, const std::string& type);
};
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 1994c4fdf5..d156b4a914 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -52,37 +52,14 @@ class Connection::MgmtClient : public Connection::MgmtWrapper
management::Client::shared_ptr mgmtClient;
public:
- MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId);
+ MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent,
+ const std::string& mgmtId, bool incoming);
~MgmtClient();
void received(framing::AMQFrame& frame);
management::ManagementObject::shared_ptr getManagementObject() const;
void closing();
};
-class Connection::MgmtLink : public Connection::MgmtWrapper
-{
- typedef boost::ptr_vector<Bridge> Bridges;
-
- management::Link::shared_ptr mgmtLink;
- Bridges created;//holds list of bridges pending creation
- Bridges cancelled;//holds list of bridges pending cancellation
- Bridges active;//holds active bridges
- uint channelCounter;
- sys::Mutex linkLock;
-
- void cancel(Bridge*);
-
-public:
- MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId);
- ~MgmtLink();
- void received(framing::AMQFrame& frame);
- management::ManagementObject::shared_ptr getManagementObject() const;
- void closing();
- void processPending();
- void process(Connection& connection, const management::Args& args);
-};
-
-
Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink) :
ConnectionState(out_, broker_),
adapter(*this, isLink),
@@ -103,14 +80,21 @@ void Connection::initMgmt(bool asLink)
if (agent.get () != 0)
{
if (asLink) {
- mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtLink(this, parent, agent, mgmtId));
+ mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, false));
} else {
- mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId));
+ mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, true));
}
}
}
}
+void Connection::requestIOProcessing (boost::function0<void> callback)
+{
+ ioCallback = callback;
+ out->activateOutput();
+}
+
+
Connection::~Connection () {}
void Connection::received(framing::AMQFrame& frame){
@@ -160,8 +144,9 @@ void Connection::closed(){ // Physically closed, suspend open sessions.
bool Connection::doOutput()
{
try{
- //process any pending mgmt commands:
- if (mgmtWrapper.get()) mgmtWrapper->processPending();
+ if (ioCallback)
+ ioCallback(); // Lend the IO thread for management processing
+ ioCallback = 0;
if (mgmtClosing) close (403, "Closed by Management Request", 0, 0);
//then do other output as needed:
@@ -192,8 +177,7 @@ ManagementObject::shared_ptr Connection::GetManagementObject (void) const
return mgmtWrapper.get() ? mgmtWrapper->getManagementObject() : ManagementObject::shared_ptr();
}
-Manageable::status_t Connection::ManagementMethod (uint32_t methodId,
- Args& args)
+Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
@@ -207,93 +191,17 @@ Manageable::status_t Connection::ManagementMethod (uint32_t methodId,
out->activateOutput();
status = Manageable::STATUS_OK;
break;
- case management::Link::METHOD_BRIDGE :
- //queue this up and request chance to do output (i.e. get connections thread of control):
- mgmtWrapper->process(*this, args);
- out->activateOutput();
- status = Manageable::STATUS_OK;
- break;
}
return status;
}
-Connection::MgmtLink::MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId)
- : channelCounter(1)
-{
- mgmtLink = management::Link::shared_ptr
- (new management::Link(conn, parent, mgmtId));
- agent->addObject (mgmtLink);
-}
-
-Connection::MgmtLink::~MgmtLink()
-{
- if (mgmtLink.get () != 0)
- mgmtLink->resourceDestroy ();
-}
-
-void Connection::MgmtLink::received(framing::AMQFrame& frame)
-{
- if (mgmtLink.get () != 0)
- {
- mgmtLink->inc_framesFromPeer ();
- mgmtLink->inc_bytesFromPeer (frame.size ());
- }
-}
-
-management::ManagementObject::shared_ptr Connection::MgmtLink::getManagementObject() const
-{
- return dynamic_pointer_cast<ManagementObject>(mgmtLink);
-}
-
-void Connection::MgmtLink::closing()
-{
- if (mgmtLink) mgmtLink->set_closing (1);
-}
-
-void Connection::MgmtLink::processPending()
-{
- Mutex::ScopedLock l(linkLock);
- //process any pending creates
- if (!created.empty()) {
- for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
- i->create();
- }
- active.transfer(active.end(), created.begin(), created.end(), created);
- }
- if (!cancelled.empty()) {
- //process any pending cancellations
- for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) {
- i->cancel();
- }
- cancelled.clear();
- }
-}
-
-void Connection::MgmtLink::process(Connection& connection, const management::Args& args)
-{
- Mutex::ScopedLock l(linkLock);
- created.push_back(new Bridge(channelCounter++, connection,
- boost::bind(&MgmtLink::cancel, this, _1),
- dynamic_cast<const management::ArgsLinkBridge&>(args)));
-}
-
-void Connection::MgmtLink::cancel(Bridge* b)
-{
- Mutex::ScopedLock l(linkLock);
- //need to take this out the active map and add it to the cancelled map
- for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- if (&(*i) == b) {
- cancelled.transfer(cancelled.end(), i, active);
- break;
- }
- }
-}
-
-Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId)
+Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent,
+ ManagementAgent::shared_ptr agent,
+ const std::string& mgmtId, bool incoming)
{
mgmtClient = management::Client::shared_ptr
- (new management::Client (conn, parent, mgmtId));
+ (new management::Client (conn, parent, mgmtId, incoming));
agent->addObject (mgmtClient);
}
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index c8e7fb7079..dff1e0653b 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -54,6 +54,7 @@ class Connection : public sys::ConnectionInputHandler,
public ConnectionState
{
public:
+ typedef boost::shared_ptr<Connection> shared_ptr;
Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false);
~Connection ();
@@ -78,6 +79,7 @@ class Connection : public sys::ConnectionInputHandler,
ManagementMethod (uint32_t methodId, management::Args& args);
void initMgmt(bool asLink = false);
+ void requestIOProcessing (boost::function0<void>);
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
@@ -100,7 +102,6 @@ class Connection : public sys::ConnectionInputHandler,
virtual void process(Connection&, const management::Args&){}
};
class MgmtClient;
- class MgmtLink;
ChannelMap channels;
framing::AMQP_ClientProxy::Connection* client;
@@ -108,6 +109,7 @@ class Connection : public sys::ConnectionInputHandler,
std::auto_ptr<MgmtWrapper> mgmtWrapper;
bool mgmtClosing;
const std::string mgmtId;
+ boost::function0<void> ioCallback;
};
}}
diff --git a/cpp/src/qpid/broker/ConnectionFactory.cpp b/cpp/src/qpid/broker/ConnectionFactory.cpp
index 5de5a0230a..cd015ce4f5 100644
--- a/cpp/src/qpid/broker/ConnectionFactory.cpp
+++ b/cpp/src/qpid/broker/ConnectionFactory.cpp
@@ -39,9 +39,9 @@ ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std:
}
sys::ConnectionCodec*
-ConnectionFactory::create(sys::OutputControl& out, const std::string& id) {
+ConnectionFactory::create(sys::OutputControl& out, const std::string& id, sys::ProtocolAccess* a) {
// used to create connections from one broker to another
- return new amqp_0_10::Connection(out, broker, id, true);
+ return new amqp_0_10::Connection(out, broker, id, true, a);
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/ConnectionFactory.h b/cpp/src/qpid/broker/ConnectionFactory.h
index 5797495054..bf55ab3b88 100644
--- a/cpp/src/qpid/broker/ConnectionFactory.h
+++ b/cpp/src/qpid/broker/ConnectionFactory.h
@@ -24,6 +24,7 @@
#include "qpid/sys/ConnectionCodec.h"
namespace qpid {
+namespace sys { class ProtocolAccess; }
namespace broker {
class Broker;
@@ -37,7 +38,7 @@ class ConnectionFactory : public sys::ConnectionCodec::Factory {
create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id);
sys::ConnectionCodec*
- create(sys::OutputControl&, const std::string& id);
+ create(sys::OutputControl&, const std::string& id, sys::ProtocolAccess* a =0);
private:
Broker& broker;
diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp
index 4ed2f5bfa2..162664fb88 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -35,8 +35,9 @@ using namespace qpid::framing;
namespace
{
-const std::string PLAIN = "PLAIN";
-const std::string en_US = "en_US";
+const std::string ANONYMOUS = "ANONYMOUS";
+const std::string PLAIN = "PLAIN";
+const std::string en_US = "en_US";
}
void ConnectionHandler::close(ReplyCode code, const string& text, ClassId, MethodId)
@@ -135,10 +136,8 @@ void ConnectionHandler::Handler::start(const FieldTable& /*serverProperties*/,
const framing::Array& /*mechanisms*/,
const framing::Array& /*locales*/)
{
- string uid = "qpidd";
- string pwd = "qpidd";
- string response = ((char)0) + uid + ((char)0) + pwd;
- server.startOk(FieldTable(), PLAIN, response, en_US);
+ string response;
+ server.startOk(FieldTable(), ANONYMOUS, response, en_US);
connection.initMgmt(true);
}
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index 47d616cf16..0d9ffb7122 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -40,7 +40,7 @@ Exchange::Exchange (const string& _name, Manageable* parent) :
if (agent.get () != 0)
{
mgmtExchange = management::Exchange::shared_ptr
- (new management::Exchange (this, parent, _name));
+ (new management::Exchange (this, parent, _name, durable));
agent->addObject (mgmtExchange);
}
}
@@ -56,8 +56,9 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel
if (agent.get () != 0)
{
mgmtExchange = management::Exchange::shared_ptr
- (new management::Exchange (this, parent, _name));
- agent->addObject (mgmtExchange);
+ (new management::Exchange (this, parent, _name, durable));
+ if (!durable)
+ agent->addObject (mgmtExchange);
}
}
}
@@ -68,6 +69,16 @@ Exchange::~Exchange ()
mgmtExchange->resourceDestroy ();
}
+void Exchange::setPersistenceId(uint64_t id) const
+{
+ if (mgmtExchange != 0 && persistenceId == 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+ agent->addObject (mgmtExchange, id, 2);
+ }
+ persistenceId = id;
+}
+
Exchange::shared_ptr Exchange::decode(ExchangeRegistry& exchanges, Buffer& buffer)
{
string name;
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index 7902eb4219..9b18129857 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -90,7 +90,7 @@ namespace qpid {
virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
//PersistableExchange:
- void setPersistenceId(uint64_t id) const { persistenceId = id; }
+ void setPersistenceId(uint64_t id) const;
uint64_t getPersistenceId() const { return persistenceId; }
uint32_t encodedSize() const;
void encode(framing::Buffer& buffer) const;
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
new file mode 100644
index 0000000000..83c9a2a62e
--- /dev/null
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -0,0 +1,281 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Link.h"
+#include "LinkRegistry.h"
+#include "Broker.h"
+#include "Connection.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/management/Link.h"
+#include "boost/bind.hpp"
+#include "qpid/log/Statement.h"
+
+using namespace qpid::broker;
+using qpid::framing::Buffer;
+using qpid::framing::FieldTable;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
+using qpid::sys::Mutex;
+
+Link::Link(LinkRegistry* _links,
+ string& _host,
+ uint16_t _port,
+ bool _useSsl,
+ bool _durable,
+ Broker* _broker,
+ management::Manageable* parent)
+ : links(_links), host(_host), port(_port), useSsl(_useSsl), durable(_durable),
+ persistenceId(0), broker(_broker), state(0),
+ access(boost::bind(&Link::established, this),
+ boost::bind(&Link::closed, this, _1, _2),
+ boost::bind(&Link::setConnection, this, _1)),
+ visitCount(0),
+ currentInterval(1),
+ closing(false),
+ channelCounter(1)
+{
+ if (parent != 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent();
+ if (agent.get() != 0)
+ {
+ mgmtObject = management::Link::shared_ptr
+ (new management::Link(this, parent, _host, _port, _useSsl, _durable));
+ if (!durable)
+ agent->addObject(mgmtObject);
+ }
+ }
+ setState(STATE_WAITING);
+}
+
+Link::~Link ()
+{
+ if (state == STATE_OPERATIONAL)
+ access.close();
+ if (mgmtObject.get () != 0)
+ mgmtObject->resourceDestroy ();
+}
+
+void Link::setState (int newState)
+{
+ if (newState == state)
+ return;
+
+ state = newState;
+ if (mgmtObject.get() == 0)
+ return;
+
+ switch (state)
+ {
+ case STATE_WAITING : mgmtObject->set_state("Waiting"); break;
+ case STATE_CONNECTING : mgmtObject->set_state("Connecting"); break;
+ case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break;
+ }
+}
+
+void Link::startConnection ()
+{
+ try {
+ broker->connect (host, port, useSsl, 0, &access);
+ setState(STATE_CONNECTING);
+ } catch(std::exception& e) {
+ setState(STATE_WAITING);
+ mgmtObject->set_lastError (e.what());
+ }
+}
+
+void Link::established ()
+{
+ Mutex::ScopedLock mutex(lock);
+
+ QPID_LOG (info, "Inter-broker link established to " << host << ":" << port);
+ setState(STATE_OPERATIONAL);
+ currentInterval = 1;
+ visitCount = 0;
+ if (closing)
+ destroy();
+}
+
+void Link::closed (int, std::string text)
+{
+ Mutex::ScopedLock mutex(lock);
+
+ if (state == STATE_OPERATIONAL)
+ QPID_LOG (warning, "Inter-broker link disconnected from " << host << ":" << port);
+
+ connection.reset();
+ created.transfer(created.end(), active.begin(), active.end(), active);
+ setState(STATE_WAITING);
+ mgmtObject->set_lastError (text);
+ if (closing)
+ destroy();
+}
+
+void Link::destroy ()
+{
+ QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management");
+ connection.reset();
+ links->destroy (host, port);
+}
+
+void Link::cancel(Bridge* bridge)
+{
+ Mutex::ScopedLock mutex(lock);
+
+ //need to take this out of the active map and add it to the cancelled map
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ if (&(*i) == bridge) {
+ cancelled.transfer(cancelled.end(), i, active);
+ break;
+ }
+ }
+
+ if (connection.get() != 0)
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+}
+
+void Link::ioThreadProcessing()
+{
+ Mutex::ScopedLock mutex(lock);
+
+ //process any pending creates
+ if (!created.empty()) {
+ for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
+ i->create(*connection);
+ }
+ active.transfer(active.end(), created.begin(), created.end(), created);
+ }
+ if (!cancelled.empty()) {
+ //process any pending cancellations
+ for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) {
+ i->cancel();
+ }
+ cancelled.clear();
+ }
+}
+
+void Link::setConnection(Connection::shared_ptr c)
+{
+ connection = c;
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+}
+
+void Link::maintenanceVisit ()
+{
+ Mutex::ScopedLock mutex(lock);
+
+ if (state == STATE_WAITING)
+ {
+ visitCount++;
+ if (visitCount >= currentInterval)
+ {
+ visitCount = 0;
+ currentInterval *= 2;
+ if (currentInterval > MAX_INTERVAL)
+ currentInterval = MAX_INTERVAL;
+ startConnection();
+ }
+ }
+}
+
+void Link::setPersistenceId(uint64_t id) const
+{
+ if (mgmtObject != 0 && persistenceId == 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+ agent->addObject (mgmtObject, id);
+ }
+ persistenceId = id;
+}
+
+const string& Link::getName() const
+{
+ return host;
+}
+
+Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
+{
+ string host;
+ uint16_t port;
+
+ buffer.getShortString(host);
+ port = buffer.getShort();
+ bool useSsl(buffer.getOctet());
+ bool durable(buffer.getOctet());
+
+ return links.declare(host, port, useSsl, durable).first;
+}
+
+void Link::encode(Buffer& buffer) const
+{
+ buffer.putShortString(string("link"));
+ buffer.putShortString(host);
+ buffer.putShort(port);
+ buffer.putOctet(useSsl ? 1 : 0);
+ buffer.putOctet(durable ? 1 : 0);
+}
+
+uint32_t Link::encodedSize() const
+{
+ return host.size() + 1 // short-string (host)
+ + 5 // short-string ("link")
+ + 2 // port
+ + 1 // useSsl
+ + 1; // durable
+}
+
+ManagementObject::shared_ptr Link::GetManagementObject (void) const
+{
+ return boost::dynamic_pointer_cast<ManagementObject> (mgmtObject);
+}
+
+Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args)
+{
+ Mutex::ScopedLock mutex(lock);
+
+ switch (op)
+ {
+ case management::Link::METHOD_CLOSE :
+ closing = true;
+ if (state != STATE_CONNECTING)
+ destroy();
+ return Manageable::STATUS_OK;
+
+ case management::Link::METHOD_BRIDGE :
+ management::ArgsLinkBridge iargs =
+ dynamic_cast<const management::ArgsLinkBridge&>(args);
+
+ // Durable bridges are only valid on durable links
+ if (iargs.i_durable && !durable)
+ return Manageable::STATUS_INVALID_PARAMETER;
+
+ created.push_back(new Bridge(this, channelCounter++,
+ boost::bind(&Link::cancel, this, _1), iargs));
+
+ if (state == STATE_OPERATIONAL && connection.get() != 0)
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+ return Manageable::STATUS_OK;
+ }
+
+ return Manageable::STATUS_UNKNOWN_METHOD;
+}
diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h
new file mode 100644
index 0000000000..838c3bf696
--- /dev/null
+++ b/cpp/src/qpid/broker/Link.h
@@ -0,0 +1,115 @@
+#ifndef _broker_Link_h
+#define _broker_Link_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+#include "MessageStore.h"
+#include "PersistableConfig.h"
+#include "Bridge.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/ProtocolAccess.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/management/Link.h"
+#include <boost/ptr_container/ptr_vector.hpp>
+
+namespace qpid {
+ namespace broker {
+
+ using std::string;
+ class LinkRegistry;
+ class Broker;
+ class Connection;
+
+ class Link : public PersistableConfig, public management::Manageable {
+ private:
+ sys::Mutex lock;
+ LinkRegistry* links;
+ const string host;
+ const uint16_t port;
+ const bool useSsl;
+ const bool durable;
+ mutable uint64_t persistenceId;
+ management::Link::shared_ptr mgmtObject;
+ Broker* broker;
+ int state;
+ sys::ProtocolAccess access;
+ uint32_t visitCount;
+ uint32_t currentInterval;
+ bool closing;
+
+ typedef boost::ptr_vector<Bridge> Bridges;
+ Bridges created; // Bridges pending creation
+ Bridges active; // Bridges active
+ Bridges cancelled; // Bridges pending deletion
+ uint channelCounter;
+ boost::shared_ptr<Connection> connection;
+
+ static const int STATE_WAITING = 1;
+ static const int STATE_CONNECTING = 2;
+ static const int STATE_OPERATIONAL = 3;
+
+ static const uint32_t MAX_INTERVAL = 16;
+
+ void setState (int newState);
+ void startConnection(); // Start the IO Connection
+ void established(); // Called when connection is created
+ void closed(int, std::string); // Called when connection goes away
+ void destroy(); // Called when mgmt deletes this link
+ void cancel(Bridge*); // Called by self-cancelling bridge
+ void ioThreadProcessing(); // Called on connection's IO thread by request
+ void setConnection(boost::shared_ptr<Connection>); // Set pointer to the AMQP Connection
+
+ public:
+ typedef boost::shared_ptr<Link> shared_ptr;
+
+ Link(LinkRegistry* links,
+ string& host,
+ uint16_t port,
+ bool useSsl,
+ bool durable,
+ Broker* broker,
+ management::Manageable* parent = 0);
+ virtual ~Link();
+
+ bool isDurable() { return durable; }
+ void maintenanceVisit ();
+
+ // PersistableConfig:
+ void setPersistenceId(uint64_t id) const;
+ uint64_t getPersistenceId() const { return persistenceId; }
+ uint32_t encodedSize() const;
+ void encode(framing::Buffer& buffer) const;
+ const string& getName() const;
+
+ static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
+
+ // Manageable entry points
+ management::ManagementObject::shared_ptr GetManagementObject (void) const;
+ management::Manageable::status_t ManagementMethod (uint32_t, management::Args&);
+ };
+ }
+}
+
+
+#endif /*!_broker_Link.cpp_h*/
diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp
new file mode 100644
index 0000000000..6e20a3f7ce
--- /dev/null
+++ b/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "LinkRegistry.h"
+#include <iostream>
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+using std::pair;
+using std::stringstream;
+using boost::intrusive_ptr;
+
+#define LINK_MAINT_INTERVAL 5
+
+LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0)
+{
+ timer.add (intrusive_ptr<TimerTask> (new Periodic(*this)));
+}
+
+LinkRegistry::Periodic::Periodic (LinkRegistry& _links) :
+ TimerTask (Duration (LINK_MAINT_INTERVAL * TIME_SEC)), links(_links) {}
+
+void LinkRegistry::Periodic::fire ()
+{
+ links.periodicMaintenance ();
+ links.timer.add (intrusive_ptr<TimerTask> (new Periodic(links)));
+}
+
+void LinkRegistry::periodicMaintenance ()
+{
+ Mutex::ScopedLock locker(lock);
+ linksToDestroy.clear();
+ for (LinkMap::iterator i = links.begin(); i != links.end(); i++)
+ i->second->maintenanceVisit();
+}
+
+pair<Link::shared_ptr, bool> LinkRegistry::declare(std::string& host,
+ uint16_t port,
+ bool useSsl,
+ bool durable)
+{
+ Mutex::ScopedLock locker(lock);
+ stringstream keystream;
+ keystream << host << ":" << port;
+ string key = string(keystream.str());
+
+ LinkMap::iterator i = links.find(key);
+ if (i == links.end())
+ {
+ Link::shared_ptr link;
+
+ link = Link::shared_ptr (new Link (this, host, port, useSsl, durable, broker, parent));
+ links[key] = link;
+ return std::pair<Link::shared_ptr, bool>(link, true);
+ }
+ return std::pair<Link::shared_ptr, bool>(i->second, false);
+}
+
+void LinkRegistry::destroy(const string& host, const uint16_t port)
+{
+ Mutex::ScopedLock locker(lock);
+ stringstream keystream;
+ keystream << host << ":" << port;
+ string key = string(keystream.str());
+
+ LinkMap::iterator i = links.find(key);
+ if (i != links.end())
+ {
+ if (i->second->isDurable() && store)
+ store->destroy(*(i->second));
+ linksToDestroy[key] = i->second;
+ links.erase(i);
+ }
+}
+
+void LinkRegistry::setStore (MessageStore* _store)
+{
+ assert (store == 0 && _store != 0);
+ store = _store;
+}
+
+MessageStore* LinkRegistry::getStore() const {
+ return store;
+}
+
diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h
new file mode 100644
index 0000000000..86d8c3d2f9
--- /dev/null
+++ b/cpp/src/qpid/broker/LinkRegistry.h
@@ -0,0 +1,87 @@
+#ifndef _broker_LinkRegistry_h
+#define _broker_LinkRegistry_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <map>
+#include "Link.h"
+#include "MessageStore.h"
+#include "Timer.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/management/Manageable.h"
+
+namespace qpid {
+namespace broker {
+
+ class Broker;
+ class LinkRegistry {
+
+ // Declare a timer task to manage the establishment of link connections and the
+ // re-establishment of lost link connections.
+ struct Periodic : public TimerTask
+ {
+ LinkRegistry& links;
+
+ Periodic(LinkRegistry& links);
+ virtual ~Periodic() {};
+ void fire();
+ };
+
+ typedef std::map<std::string, Link::shared_ptr> LinkMap;
+ LinkMap links;
+ LinkMap linksToDestroy;
+ qpid::sys::Mutex lock;
+ Broker* broker;
+ Timer timer;
+ management::Manageable* parent;
+ MessageStore* store;
+
+ void periodicMaintenance ();
+
+ public:
+ LinkRegistry (Broker* _broker);
+ std::pair<Link::shared_ptr, bool> declare(std::string& host,
+ uint16_t port,
+ bool useSsl,
+ bool durable);
+ void destroy(const std::string& host, const uint16_t port);
+
+ /**
+ * Register the manageable parent for declared queues
+ */
+ void setParent (management::Manageable* _parent) { parent = _parent; }
+
+ /**
+ * Set the store to use. May only be called once.
+ */
+ void setStore (MessageStore*);
+
+ /**
+ * Return the message store used.
+ */
+ MessageStore* getStore() const;
+ };
+}
+}
+
+
+#endif /*!_broker_LinkRegistry_h*/
diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h
index 76469ccc50..17fd6aefb8 100644
--- a/cpp/src/qpid/broker/MessageStore.h
+++ b/cpp/src/qpid/broker/MessageStore.h
@@ -24,6 +24,7 @@
#include "PersistableExchange.h"
#include "PersistableMessage.h"
#include "PersistableQueue.h"
+#include "PersistableConfig.h"
#include "RecoveryManager.h"
#include "TransactionalStore.h"
#include "qpid/framing/FieldTable.h"
@@ -87,6 +88,16 @@ public:
const std::string& key, const framing::FieldTable& args) = 0;
/**
+ * Record generic durable configuration
+ */
+ virtual void create(const PersistableConfig& config) = 0;
+
+ /**
+ * Destroy generic durable configuration
+ */
+ virtual void destroy(const PersistableConfig& config) = 0;
+
+ /**
* Stores a messages before it has been enqueued
* (enqueueing automatically stores the message so this is
* only required if storage is required prior to that
diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp
index e02c87f069..2544d5d533 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.cpp
+++ b/cpp/src/qpid/broker/MessageStoreModule.cpp
@@ -70,6 +70,16 @@ void MessageStoreModule::unbind(const PersistableExchange& e, const PersistableQ
TRANSFER_EXCEPTION(store->unbind(e, q, k, a));
}
+void MessageStoreModule::create(const PersistableConfig& config)
+{
+ TRANSFER_EXCEPTION(store->create(config));
+}
+
+void MessageStoreModule::destroy(const PersistableConfig& config)
+{
+ TRANSFER_EXCEPTION(store->destroy(config));
+}
+
void MessageStoreModule::recover(RecoveryManager& registry)
{
TRANSFER_EXCEPTION(store->recover(registry));
diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h
index c7ad76d8bb..f4d05e3e0d 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.h
+++ b/cpp/src/qpid/broker/MessageStoreModule.h
@@ -57,6 +57,8 @@ public:
const std::string& key, const framing::FieldTable& args);
void unbind(const PersistableExchange& exchange, const PersistableQueue& queue,
const std::string& key, const framing::FieldTable& args);
+ void create(const PersistableConfig& config);
+ void destroy(const PersistableConfig& config);
void recover(RecoveryManager& queues);
void stage(boost::intrusive_ptr<PersistableMessage>& msg);
void destroy(PersistableMessage& msg);
diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp
index 8936b0440f..401c76f5a2 100644
--- a/cpp/src/qpid/broker/NullMessageStore.cpp
+++ b/cpp/src/qpid/broker/NullMessageStore.cpp
@@ -49,7 +49,7 @@ public:
using namespace qpid::broker;
-NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){}
+NullMessageStore::NullMessageStore(bool _warn) : warn(_warn), nextPersistenceId(1) {}
bool NullMessageStore::init(const Options* /*options*/) {return true;}
@@ -57,6 +57,7 @@ void NullMessageStore::create(PersistableQueue& queue, const framing::FieldTable
{
QPID_LOG(info, "Queue '" << queue.getName()
<< "' will not be durable. Persistence not enabled.");
+ queue.setPersistenceId(nextPersistenceId++);
}
void NullMessageStore::destroy(PersistableQueue&)
@@ -67,6 +68,7 @@ void NullMessageStore::create(const PersistableExchange& exchange, const framing
{
QPID_LOG(info, "Exchange'" << exchange.getName()
<< "' will not be durable. Persistence not enabled.");
+ exchange.setPersistenceId(nextPersistenceId++);
}
void NullMessageStore::destroy(const PersistableExchange& )
@@ -76,6 +78,17 @@ void NullMessageStore::bind(const PersistableExchange&, const PersistableQueue&,
void NullMessageStore::unbind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&){}
+void NullMessageStore::create(const PersistableConfig& config)
+{
+ QPID_LOG(info, "Persistence not enabled, configuration not stored.");
+ config.setPersistenceId(nextPersistenceId++);
+}
+
+void NullMessageStore::destroy(const PersistableConfig&)
+{
+ QPID_LOG(info, "Persistence not enabled, configuration not stored.");
+}
+
void NullMessageStore::recover(RecoveryManager&)
{
QPID_LOG(info, "Persistence not enabled, no recovery attempted.");
diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h
index 96d1c483a2..f06e749ebb 100644
--- a/cpp/src/qpid/broker/NullMessageStore.h
+++ b/cpp/src/qpid/broker/NullMessageStore.h
@@ -37,6 +37,7 @@ class NullMessageStore : public MessageStore
{
std::set<std::string> prepared;
const bool warn;
+ uint64_t nextPersistenceId;
public:
NullMessageStore(bool warn = false);
@@ -57,6 +58,8 @@ public:
const std::string& key, const framing::FieldTable& args);
virtual void unbind(const PersistableExchange& exchange, const PersistableQueue& queue,
const std::string& key, const framing::FieldTable& args);
+ virtual void create(const PersistableConfig& config);
+ virtual void destroy(const PersistableConfig& config);
virtual void recover(RecoveryManager& queues);
virtual void stage(boost::intrusive_ptr<PersistableMessage>& msg);
virtual void destroy(PersistableMessage& msg);
diff --git a/cpp/src/qpid/broker/PersistableConfig.h b/cpp/src/qpid/broker/PersistableConfig.h
new file mode 100644
index 0000000000..914e91ea80
--- /dev/null
+++ b/cpp/src/qpid/broker/PersistableConfig.h
@@ -0,0 +1,45 @@
+#ifndef _broker_PersistableConfig_h
+#define _broker_PersistableConfig_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include "Persistable.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * The interface used by general-purpose persistable configuration for
+ * the message store.
+ */
+class PersistableConfig : public Persistable
+{
+public:
+ virtual const std::string& getName() const = 0;
+ virtual ~PersistableConfig() {};
+};
+
+}}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index f7bad8ebc6..355ebdd81e 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -586,7 +586,7 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const
if (mgmtObject != 0 && persistenceId == 0)
{
ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
- agent->addObject (mgmtObject, _persistenceId);
+ agent->addObject (mgmtObject, _persistenceId, 3);
}
persistenceId = _persistenceId;
}
diff --git a/cpp/src/qpid/broker/RecoverableConfig.h b/cpp/src/qpid/broker/RecoverableConfig.h
new file mode 100644
index 0000000000..838a8582dc
--- /dev/null
+++ b/cpp/src/qpid/broker/RecoverableConfig.h
@@ -0,0 +1,45 @@
+#ifndef _broker_RecoverableConfig_h
+#define _broker_RecoverableConfig_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * The interface through which configurations are recovered.
+ */
+class RecoverableConfig
+{
+public:
+ typedef boost::shared_ptr<RecoverableConfig> shared_ptr;
+
+ virtual void setPersistenceId(uint64_t id) = 0;
+ virtual ~RecoverableConfig() {};
+};
+
+}}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/RecoveryManager.h b/cpp/src/qpid/broker/RecoveryManager.h
index bf1813a093..7dcbe3a2b0 100644
--- a/cpp/src/qpid/broker/RecoveryManager.h
+++ b/cpp/src/qpid/broker/RecoveryManager.h
@@ -25,6 +25,7 @@
#include "RecoverableQueue.h"
#include "RecoverableMessage.h"
#include "RecoverableTransaction.h"
+#include "RecoverableConfig.h"
#include "TransactionalStore.h"
#include "qpid/framing/Buffer.h"
@@ -39,6 +40,8 @@ class RecoveryManager{
virtual RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer) = 0;
virtual RecoverableTransaction::shared_ptr recoverTransaction(const std::string& xid,
std::auto_ptr<TPCTransactionContext> txn) = 0;
+ virtual RecoverableConfig::shared_ptr recoverConfig(framing::Buffer& buffer) = 0;
+
virtual void recoveryComplete() = 0;
};
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index feb629e118..c6ec573822 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -22,6 +22,7 @@
#include "Message.h"
#include "Queue.h"
+#include "Link.h"
#include "RecoveredEnqueue.h"
#include "RecoveredDequeue.h"
#include "qpid/framing/reply_exceptions.h"
@@ -34,9 +35,9 @@ using boost::intrusive_ptr;
static const uint8_t BASIC = 1;
static const uint8_t MESSAGE = 2;
-RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges,
+RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links,
DtxManager& _dtxMgr, uint64_t _stagingThreshold)
- : queues(_queues), exchanges(_exchanges), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {}
+ : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {}
RecoveryManagerImpl::~RecoveryManagerImpl() {}
@@ -82,6 +83,15 @@ public:
void bind(std::string& queue, std::string& routingKey, qpid::framing::FieldTable& args);
};
+class RecoverableConfigImpl : public RecoverableConfig
+{
+ // TODO: Add links for other config types, consider using super class (PersistableConfig?)
+ Link::shared_ptr link;
+public:
+ RecoverableConfigImpl(Link::shared_ptr _link) : link(_link) {}
+ void setPersistenceId(uint64_t id);
+};
+
class RecoverableTransactionImpl : public RecoverableTransaction
{
DtxBuffer::shared_ptr buffer;
@@ -125,6 +135,19 @@ RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const
return RecoverableTransaction::shared_ptr(new RecoverableTransactionImpl(buffer));
}
+RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer& buffer)
+{
+ string kind;
+
+ buffer.getShortString (kind);
+ if (kind == "link")
+ {
+ return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Link::decode (links, buffer)));
+ }
+
+ return RecoverableConfig::shared_ptr(); // TODO: raise an exception instead
+}
+
void RecoveryManagerImpl::recoveryComplete()
{
//TODO (finalise binding setup etc)
@@ -185,6 +208,13 @@ void RecoverableExchangeImpl::setPersistenceId(uint64_t id)
exchange->setPersistenceId(id);
}
+void RecoverableConfigImpl::setPersistenceId(uint64_t id)
+{
+ if (link.get())
+ link->setPersistenceId(id);
+ // TODO: add calls to other types. Consider using a parent class.
+}
+
void RecoverableExchangeImpl::bind(string& queueName, string& key, framing::FieldTable& args)
{
Queue::shared_ptr queue = queues.find(queueName);
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.h b/cpp/src/qpid/broker/RecoveryManagerImpl.h
index 58ec63926c..cd34d464f5 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.h
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.h
@@ -25,6 +25,7 @@
#include "DtxManager.h"
#include "ExchangeRegistry.h"
#include "QueueRegistry.h"
+#include "LinkRegistry.h"
#include "RecoveryManager.h"
namespace qpid {
@@ -33,10 +34,12 @@ namespace broker {
class RecoveryManagerImpl : public RecoveryManager{
QueueRegistry& queues;
ExchangeRegistry& exchanges;
+ LinkRegistry& links;
DtxManager& dtxMgr;
const uint64_t stagingThreshold;
public:
- RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, DtxManager& dtxMgr, uint64_t stagingThreshold);
+ RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, LinkRegistry& links,
+ DtxManager& dtxMgr, uint64_t stagingThreshold);
~RecoveryManagerImpl();
RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer);
@@ -44,6 +47,7 @@ namespace broker {
RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer);
RecoverableTransaction::shared_ptr recoverTransaction(const std::string& xid,
std::auto_ptr<TPCTransactionContext> txn);
+ RecoverableConfig::shared_ptr recoverConfig(framing::Buffer& buffer);
void recoveryComplete();
};
diff --git a/cpp/src/qpid/broker/System.h b/cpp/src/qpid/broker/System.h
index 0d63bd1b3d..65086abec0 100644
--- a/cpp/src/qpid/broker/System.h
+++ b/cpp/src/qpid/broker/System.h
@@ -42,9 +42,6 @@ class System : public management::Manageable
management::ManagementObject::shared_ptr GetManagementObject (void) const
{ return mgmtObject; }
-
- management::Manageable::status_t ManagementMethod (uint32_t, management::Args&)
- { return management::Manageable::STATUS_OK; }
};
}}
diff --git a/cpp/src/qpid/management/Manageable.cpp b/cpp/src/qpid/management/Manageable.cpp
index 479cb4e0ce..0f3fbab55c 100644
--- a/cpp/src/qpid/management/Manageable.cpp
+++ b/cpp/src/qpid/management/Manageable.cpp
@@ -25,13 +25,19 @@ std::string Manageable::StatusText (status_t status)
{
switch (status)
{
- case STATUS_OK : return "OK";
- case STATUS_UNKNOWN_OBJECT : return "UnknownObject";
- case STATUS_UNKNOWN_METHOD : return "UnknownMethod";
- case STATUS_NOT_IMPLEMENTED : return "NotImplemented";
- case STATUS_INVALID_PARAMETER : return "InvalidParameter";
+ case STATUS_OK : return "OK";
+ case STATUS_UNKNOWN_OBJECT : return "UnknownObject";
+ case STATUS_UNKNOWN_METHOD : return "UnknownMethod";
+ case STATUS_NOT_IMPLEMENTED : return "NotImplemented";
+ case STATUS_INVALID_PARAMETER : return "InvalidParameter";
+ case STATUS_FEATURE_NOT_IMPLEMENTED : return "FeatureNotImplemented";
}
return "??";
}
+Manageable::status_t Manageable::ManagementMethod (uint32_t, Args&)
+{
+ return STATUS_UNKNOWN_METHOD;
+}
+
diff --git a/cpp/src/qpid/management/Manageable.h b/cpp/src/qpid/management/Manageable.h
index 836ba03b23..25c24588fc 100644
--- a/cpp/src/qpid/management/Manageable.h
+++ b/cpp/src/qpid/management/Manageable.h
@@ -39,11 +39,12 @@ class Manageable
typedef uint32_t status_t;
static std::string StatusText (status_t status);
- static const status_t STATUS_OK = 0;
- static const status_t STATUS_UNKNOWN_OBJECT = 1;
- static const status_t STATUS_UNKNOWN_METHOD = 2;
- static const status_t STATUS_NOT_IMPLEMENTED = 3;
- static const status_t STATUS_INVALID_PARAMETER = 4;
+ static const status_t STATUS_OK = 0;
+ static const status_t STATUS_UNKNOWN_OBJECT = 1;
+ static const status_t STATUS_UNKNOWN_METHOD = 2;
+ static const status_t STATUS_NOT_IMPLEMENTED = 3;
+ static const status_t STATUS_INVALID_PARAMETER = 4;
+ static const status_t STATUS_FEATURE_NOT_IMPLEMENTED = 5;
// Every "Manageable" object must hold a reference to exactly one
// management object. This object is always of a class derived from
@@ -58,7 +59,7 @@ class Manageable
// on this object. The input and output arguments are specific to the
// method being called and must be down-cast to the appropriate sub class
// before use.
- virtual status_t ManagementMethod (uint32_t methodId, Args& args) = 0;
+ virtual status_t ManagementMethod (uint32_t methodId, Args& args);
};
inline Manageable::~Manageable (void) {}
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index 9b4290232d..e69de29bb2 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -1,695 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "ManagementAgent.h"
-#include "qpid/broker/DeliverableMessage.h"
-#include "qpid/log/Statement.h"
-#include <qpid/broker/Message.h>
-#include <qpid/broker/MessageDelivery.h>
-#include "qpid/framing/MessageTransferBody.h"
-#include <list>
-#include <iostream>
-#include <fstream>
-
-using boost::intrusive_ptr;
-using namespace qpid::framing;
-using namespace qpid::management;
-using namespace qpid::broker;
-using namespace qpid::sys;
-using namespace std;
-
-ManagementAgent::shared_ptr ManagementAgent::agent;
-bool ManagementAgent::enabled = 0;
-
-ManagementAgent::ManagementAgent (string _dataDir, uint16_t _interval) :
- dataDir (_dataDir), interval (_interval)
-{
- timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval)));
- localBank = 3;
- nextObjectId = 1;
- nextRemotePrefix = 101;
-
- // Get from file or generate and save to file.
- if (dataDir.empty ())
- {
- uuid.generate ();
- bootSequence = 1;
- QPID_LOG (info, "ManagementAgent has no data directory, generated new broker ID: "
- << uuid);
- }
- else
- {
- string filename (dataDir + "/brokerId");
- string seqFilename (dataDir + "/bootseq");
- ifstream inFile (filename.c_str ());
- ifstream seqFile (seqFilename.c_str ());
-
- if (inFile.good ())
- {
- inFile >> uuid;
- inFile.close ();
- QPID_LOG (debug, "ManagementAgent restored broker ID: " << uuid);
- }
- else
- {
- uuid.generate ();
- QPID_LOG (info, "ManagementAgent generated broker ID: " << uuid);
-
- ofstream outFile (filename.c_str ());
- if (outFile.good ())
- {
- outFile << uuid << endl;
- outFile.close ();
- QPID_LOG (debug, "ManagementAgent saved broker ID");
- }
- else
- {
- QPID_LOG (warning, "ManagementAgent unable to save broker ID");
- }
- }
-
- if (seqFile.good ())
- {
- seqFile >> bootSequence;
- seqFile.close ();
- }
- else
- bootSequence = 1;
-
- ofstream seqOut (seqFilename.c_str ());
- if (seqOut.good ())
- {
- uint16_t nextSeq = (bootSequence + 1) & 0x7FFF;
- if (nextSeq == 0)
- nextSeq = 1;
- seqOut << nextSeq << endl;
- seqOut.close ();
- }
-
- QPID_LOG (debug, "ManagementAgent boot sequence: " << bootSequence);
- }
-}
-
-ManagementAgent::~ManagementAgent () {}
-
-void ManagementAgent::enableManagement (string dataDir, uint16_t interval)
-{
- enabled = 1;
- if (agent.get () == 0)
- agent = shared_ptr (new ManagementAgent (dataDir, interval));
-}
-
-ManagementAgent::shared_ptr ManagementAgent::getAgent (void)
-{
- return agent;
-}
-
-void ManagementAgent::shutdown (void)
-{
- if (agent.get () != 0)
- {
- agent->mExchange.reset ();
- agent->dExchange.reset ();
- agent.reset ();
- }
-}
-
-void ManagementAgent::setExchange (broker::Exchange::shared_ptr _mexchange,
- broker::Exchange::shared_ptr _dexchange)
-{
- mExchange = _mexchange;
- dExchange = _dexchange;
-}
-
-void ManagementAgent::RegisterClass (string packageName,
- string className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall)
-{
- Mutex::ScopedLock lock (userLock);
- PackageMap::iterator pIter = FindOrAddPackage (packageName);
- AddClassLocal (pIter, className, md5Sum, schemaCall);
-}
-
-void ManagementAgent::addObject (ManagementObject::shared_ptr object,
- uint32_t persistId,
- uint32_t persistBank)
-{
- Mutex::ScopedLock lock (userLock);
- uint64_t objectId;
-
- if (persistId == 0)
- objectId = ((uint64_t) bootSequence) << 48 |
- ((uint64_t) localBank) << 24 | nextObjectId++;
- else
- objectId = ((uint64_t) persistBank) << 24 | persistId;
-
- object->setObjectId (objectId);
- managementObjects[objectId] = object;
-}
-
-ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
- : TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), agent(_agent) {}
-
-ManagementAgent::Periodic::~Periodic () {}
-
-void ManagementAgent::Periodic::fire ()
-{
- agent.timer.add (intrusive_ptr<TimerTask> (new Periodic (agent, agent.interval)));
- agent.PeriodicProcessing ();
-}
-
-void ManagementAgent::clientAdded (void)
-{
- for (ManagementObjectMap::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
- iter++)
- {
- ManagementObject::shared_ptr object = iter->second;
- object->setAllChanged ();
- }
-}
-
-void ManagementAgent::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
-{
- buf.putOctet ('A');
- buf.putOctet ('M');
- buf.putOctet ('1');
- buf.putOctet (opcode);
- buf.putLong (seq);
-}
-
-bool ManagementAgent::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
-{
- uint8_t h1 = buf.getOctet ();
- uint8_t h2 = buf.getOctet ();
- uint8_t h3 = buf.getOctet ();
-
- *opcode = buf.getOctet ();
- *seq = buf.getLong ();
-
- return h1 == 'A' && h2 == 'M' && h3 == '1';
-}
-
-void ManagementAgent::SendBuffer (Buffer& buf,
- uint32_t length,
- broker::Exchange::shared_ptr exchange,
- string routingKey)
-{
- if (exchange.get() == 0)
- return;
-
- intrusive_ptr<Message> msg (new Message ());
- AMQFrame method (in_place<MessageTransferBody>(
- ProtocolVersion(), exchange->getName (), 0, 0));
- AMQFrame header (in_place<AMQHeaderBody>());
- AMQFrame content(in_place<AMQContentBody>());
-
- content.castBody<AMQContentBody>()->decode(buf, length);
-
- method.setEof (false);
- header.setBof (false);
- header.setEof (false);
- content.setBof (false);
-
- msg->getFrames().append(method);
- msg->getFrames().append(header);
-
- MessageProperties* props =
- msg->getFrames().getHeaders()->get<MessageProperties>(true);
- props->setContentLength(length);
- msg->getFrames().append(content);
-
- DeliverableMessage deliverable (msg);
- exchange->route (deliverable, routingKey, 0);
-}
-
-void ManagementAgent::PeriodicProcessing (void)
-{
-#define BUFSIZE 65536
- Mutex::ScopedLock lock (userLock);
- char msgChars[BUFSIZE];
- uint32_t contentSize;
- string routingKey;
- std::list<uint64_t> deleteList;
-
- if (managementObjects.empty ())
- return;
-
- for (ManagementObjectMap::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
- iter++)
- {
- ManagementObject::shared_ptr object = iter->second;
-
- if (object->getConfigChanged () || object->isDeleted ())
- {
- Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer, 'c');
- object->writeConfig (msgBuffer);
-
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
- routingKey = "mgmt." + uuid.str() + ".config." + object->getClassName ();
- SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
- }
-
- if (object->getInstChanged ())
- {
- Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer, 'i');
- object->writeInstrumentation (msgBuffer);
-
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
- routingKey = "mgmt." + uuid.str () + ".inst." + object->getClassName ();
- SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
- }
-
- if (object->isDeleted ())
- deleteList.push_back (iter->first);
- }
-
- // Delete flagged objects
- for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin ();
- iter != deleteList.rend ();
- iter++)
- managementObjects.erase (*iter);
-
- deleteList.clear ();
-}
-
-void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence,
- uint32_t code, string text)
-{
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- EncodeHeader (outBuffer, 'z', sequence);
- outBuffer.putLong (code);
- outBuffer.putShortString (text);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
-}
-
-void ManagementAgent::dispatchCommand (Deliverable& deliverable,
- const string& routingKey,
- const FieldTable* /*args*/)
-{
- Mutex::ScopedLock lock (userLock);
- Message& msg = ((DeliverableMessage&) deliverable).getMessage ();
-
- if (routingKey.compare (0, 13, "agent.method.") == 0)
- dispatchMethod (msg, routingKey, 13);
-
- else if (routingKey.length () == 5 &&
- routingKey.compare (0, 5, "agent") == 0)
- dispatchAgentCommand (msg);
-
- else
- {
- QPID_LOG (debug, "Illegal routing key for dispatch: " << routingKey);
- return;
- }
-}
-
-void ManagementAgent::dispatchMethod (Message& msg,
- const string& routingKey,
- size_t first)
-{
- size_t pos, start = first;
- uint32_t contentSize;
-
- if (routingKey.length () == start)
- {
- QPID_LOG (debug, "Missing package-name in routing key: " << routingKey);
- return;
- }
-
- pos = routingKey.find ('.', start);
- if (pos == string::npos || routingKey.length () == pos + 1)
- {
- QPID_LOG (debug, "Missing class-name in routing key: " << routingKey);
- return;
- }
-
- string packageName = routingKey.substr (start, pos - start);
-
- start = pos + 1;
- pos = routingKey.find ('.', start);
- if (pos == string::npos || routingKey.length () == pos + 1)
- {
- QPID_LOG (debug, "Missing method-name in routing key: " << routingKey);
- return;
- }
-
- string className = routingKey.substr (start, pos - start);
-
- start = pos + 1;
- string methodName = routingKey.substr (start, routingKey.length () - start);
-
- contentSize = msg.encodedContentSize ();
- if (contentSize < 8 || contentSize > MA_BUFFER_SIZE)
- return;
-
- Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen, sequence;
- uint8_t opcode;
-
- msg.encodeContent (inBuffer);
- inBuffer.reset ();
-
- if (!CheckHeader (inBuffer, &opcode, &sequence))
- {
- QPID_LOG (debug, " Invalid content header");
- return;
- }
-
- if (opcode != 'M')
- {
- QPID_LOG (debug, " Unexpected opcode " << opcode);
- return;
- }
-
- uint64_t objId = inBuffer.getLongLong ();
- string replyToKey;
-
- const framing::MessageProperties* p =
- msg.getFrames().getHeaders()->get<framing::MessageProperties>();
- if (p && p->hasReplyTo())
- {
- const framing::ReplyTo& rt = p->getReplyTo ();
- replyToKey = rt.getRoutingKey ();
- }
- else
- {
- QPID_LOG (debug, " Reply-to missing");
- return;
- }
-
- EncodeHeader (outBuffer, 'm', sequence);
-
- ManagementObjectMap::iterator iter = managementObjects.find (objId);
- if (iter == managementObjects.end () || iter->second->isDeleted ())
- {
- outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT);
- outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT));
- }
- else
- {
- iter->second->doMethod (methodName, inBuffer, outBuffer);
- }
-
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
-}
-
-void ManagementAgent::handleBrokerRequest (Buffer&, string replyToKey, uint32_t sequence)
-{
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- EncodeHeader (outBuffer, 'b', sequence);
- uuid.encode (outBuffer);
-
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
-}
-
-void ManagementAgent::handlePackageQuery (Buffer&, string replyToKey, uint32_t sequence)
-{
- for (PackageMap::iterator pIter = packages.begin ();
- pIter != packages.end ();
- pIter++)
- {
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- EncodeHeader (outBuffer, 'p', sequence);
- EncodePackageIndication (outBuffer, pIter);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
- }
-
- sendCommandComplete (replyToKey, sequence);
-}
-
-void ManagementAgent::handlePackageInd (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/)
-{
- std::string packageName;
-
- inBuffer.getShortString (packageName);
- FindOrAddPackage (packageName);
-}
-
-void ManagementAgent::handleClassQuery (Buffer& inBuffer, string replyToKey, uint32_t sequence)
-{
- std::string packageName;
-
- inBuffer.getShortString (packageName);
- PackageMap::iterator pIter = packages.find (packageName);
- if (pIter != packages.end ())
- {
- ClassMap cMap = pIter->second;
- for (ClassMap::iterator cIter = cMap.begin ();
- cIter != cMap.end ();
- cIter++)
- {
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- EncodeHeader (outBuffer, 'q', sequence);
- EncodeClassIndication (outBuffer, pIter, cIter);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
- }
- }
-
- sendCommandComplete (replyToKey, sequence);
-}
-
-void ManagementAgent::handleSchemaQuery (Buffer& inBuffer, string replyToKey, uint32_t sequence)
-{
- string packageName;
- SchemaClassKey key;
-
- inBuffer.getShortString (packageName);
- inBuffer.getShortString (key.name);
- inBuffer.getBin128 (key.hash);
-
- PackageMap::iterator pIter = packages.find (packageName);
- if (pIter != packages.end ())
- {
- ClassMap cMap = pIter->second;
- ClassMap::iterator cIter = cMap.find (key);
- if (cIter != cMap.end ())
- {
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
- SchemaClass classInfo = cIter->second;
-
- if (classInfo.writeSchemaCall != 0)
- {
- EncodeHeader (outBuffer, 's', sequence);
- classInfo.writeSchemaCall (outBuffer);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
- }
- else
- {
- // TODO: Forward request to remote agent.
- }
-
- clientAdded ();
- // TODO: Send client-added to each remote agent.
- }
- }
-}
-
-uint32_t ManagementAgent::assignPrefix (uint32_t /*requestedPrefix*/)
-{
- // TODO: Allow remote agents to keep their requested prefixes if able.
- return nextRemotePrefix++;
-}
-
-void ManagementAgent::handleAttachRequest (Buffer& inBuffer, string replyToKey, uint32_t sequence)
-{
- string label;
- uint32_t requestedPrefix;
- uint32_t assignedPrefix;
-
- inBuffer.getShortString (label);
- requestedPrefix = inBuffer.getLong ();
- assignedPrefix = assignPrefix (requestedPrefix);
-
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- EncodeHeader (outBuffer, 'a', sequence);
- outBuffer.putLong (assignedPrefix);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
-}
-
-void ManagementAgent::handleGetRequest (Buffer& inBuffer, string replyToKey, uint32_t sequence)
-{
- FieldTable ft;
- FieldTable::ValuePtr value;
-
- ft.decode (inBuffer);
- value = ft.get ("_class");
- if (value->empty () || !value->convertsTo<string> ())
- {
- // TODO: Send completion with an error code
- return;
- }
-
- string className (value->get<string> ());
-
- for (ManagementObjectMap::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
- iter++)
- {
- ManagementObject::shared_ptr object = iter->second;
- if (object->getClassName () == className)
- {
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- EncodeHeader (outBuffer, 'g', sequence);
- object->writeConfig (outBuffer);
- object->writeInstrumentation (outBuffer, true);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
- }
- }
-
- sendCommandComplete (replyToKey, sequence);
-}
-
-void ManagementAgent::dispatchAgentCommand (Message& msg)
-{
- Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
- uint8_t opcode;
- uint32_t sequence;
- string replyToKey;
-
- const framing::MessageProperties* p =
- msg.getFrames().getHeaders()->get<framing::MessageProperties>();
- if (p && p->hasReplyTo())
- {
- const framing::ReplyTo& rt = p->getReplyTo ();
- replyToKey = rt.getRoutingKey ();
- }
- else
- return;
-
- msg.encodeContent (inBuffer);
- inBuffer.reset ();
-
- if (!CheckHeader (inBuffer, &opcode, &sequence))
- return;
-
- if (opcode == 'B') handleBrokerRequest (inBuffer, replyToKey, sequence);
- else if (opcode == 'P') handlePackageQuery (inBuffer, replyToKey, sequence);
- else if (opcode == 'p') handlePackageInd (inBuffer, replyToKey, sequence);
- else if (opcode == 'Q') handleClassQuery (inBuffer, replyToKey, sequence);
- else if (opcode == 'S') handleSchemaQuery (inBuffer, replyToKey, sequence);
- else if (opcode == 'A') handleAttachRequest (inBuffer, replyToKey, sequence);
- else if (opcode == 'G') handleGetRequest (inBuffer, replyToKey, sequence);
-}
-
-ManagementAgent::PackageMap::iterator ManagementAgent::FindOrAddPackage (std::string name)
-{
- PackageMap::iterator pIter = packages.find (name);
- if (pIter != packages.end ())
- return pIter;
-
- // No such package found, create a new map entry.
- pair<PackageMap::iterator, bool> result =
- packages.insert (pair<string, ClassMap> (name, ClassMap ()));
- QPID_LOG (debug, "ManagementAgent added package " << name);
-
- // Publish a package-indication message
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- EncodeHeader (outBuffer, 'p');
- EncodePackageIndication (outBuffer, result.first);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package");
-
- return result.first;
-}
-
-void ManagementAgent::AddClassLocal (PackageMap::iterator pIter,
- string className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall)
-{
- SchemaClassKey key;
- ClassMap& cMap = pIter->second;
-
- key.name = className;
- memcpy (&key.hash, md5Sum, 16);
-
- ClassMap::iterator cIter = cMap.find (key);
- if (cIter != cMap.end ())
- return;
-
- // No such class found, create a new class with local information.
- QPID_LOG (debug, "ManagementAgent added class " << pIter->first << "." <<
- key.name);
- SchemaClass classInfo;
-
- classInfo.writeSchemaCall = schemaCall;
- cMap[key] = classInfo;
-
- // TODO: Publish a class-indication message
-}
-
-void ManagementAgent::EncodePackageIndication (Buffer& buf,
- PackageMap::iterator pIter)
-{
- buf.putShortString ((*pIter).first);
-}
-
-void ManagementAgent::EncodeClassIndication (Buffer& buf,
- PackageMap::iterator pIter,
- ClassMap::iterator cIter)
-{
- SchemaClassKey key = (*cIter).first;
-
- buf.putShortString ((*pIter).first);
- buf.putShortString (key.name);
- buf.putBin128 (key.hash);
-}
-
diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h
index 4cd679a035..c38e273c49 100644
--- a/cpp/src/qpid/management/ManagementAgent.h
+++ b/cpp/src/qpid/management/ManagementAgent.h
@@ -22,168 +22,28 @@
*
*/
-#include "qpid/Options.h"
-#include "qpid/broker/Exchange.h"
-#include "qpid/broker/Timer.h"
-#include "qpid/framing/Uuid.h"
-#include "qpid/sys/Mutex.h"
#include "ManagementObject.h"
-#include <qpid/framing/AMQFrame.h>
-#include <boost/shared_ptr.hpp>
namespace qpid {
namespace management {
class ManagementAgent
{
- private:
-
- ManagementAgent (std::string dataDir, uint16_t interval);
-
public:
- virtual ~ManagementAgent ();
+ virtual ~ManagementAgent () {}
typedef boost::shared_ptr<ManagementAgent> shared_ptr;
- static void enableManagement (std::string dataDir, uint16_t interval);
static shared_ptr getAgent (void);
- static void shutdown (void);
-
- void setInterval (uint16_t _interval) { interval = _interval; }
- void setExchange (broker::Exchange::shared_ptr mgmtExchange,
- broker::Exchange::shared_ptr directExchange);
- void RegisterClass (std::string packageName,
- std::string className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall);
- void addObject (ManagementObject::shared_ptr object,
- uint32_t persistId = 0,
- uint32_t persistBank = 2);
- void clientAdded (void);
- void dispatchCommand (broker::Deliverable& msg,
- const std::string& routingKey,
- const qpid::framing::FieldTable* args);
-
- private:
-
- struct Periodic : public broker::TimerTask
- {
- ManagementAgent& agent;
-
- Periodic (ManagementAgent& agent, uint32_t seconds);
- virtual ~Periodic ();
- void fire ();
- };
-
- // Storage for tracking remote management agents, attached via the client
- // management agent API.
- //
- struct RemoteAgent
- {
- std::string name;
- uint64_t objIdBase;
- };
-
- // TODO: Eventually replace string with entire reply-to structure. reply-to
- // currently assumes that the exchange is "amq.direct" even though it could
- // in theory be specified differently.
- typedef std::map<std::string, RemoteAgent> RemoteAgentMap;
- typedef std::vector<std::string> ReplyToVector;
-
- // Storage for known schema classes:
- //
- // SchemaClassKey -- Key elements for map lookups
- // SchemaClassKeyComp -- Comparison class for SchemaClassKey
- // SchemaClass -- Non-key elements for classes
- //
- struct SchemaClassKey
- {
- std::string name;
- uint8_t hash[16];
- };
-
- struct SchemaClassKeyComp
- {
- bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
- {
- if (lhs.name != rhs.name)
- return lhs.name < rhs.name;
- else
- for (int i = 0; i < 16; i++)
- if (lhs.hash[i] != rhs.hash[i])
- return lhs.hash[i] < rhs.hash[i];
- return false;
- }
- };
-
- struct SchemaClass
- {
- ManagementObject::writeSchemaCall_t writeSchemaCall;
- ReplyToVector remoteAgents;
-
- SchemaClass () : writeSchemaCall(0) {}
- };
-
- typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
- typedef std::map<std::string, ClassMap> PackageMap;
-
- RemoteAgentMap remoteAgents;
- PackageMap packages;
- ManagementObjectMap managementObjects;
-
- static shared_ptr agent;
- static bool enabled;
-
- qpid::framing::Uuid uuid;
- qpid::sys::Mutex userLock;
- broker::Timer timer;
- broker::Exchange::shared_ptr mExchange;
- broker::Exchange::shared_ptr dExchange;
- std::string dataDir;
- uint16_t interval;
- uint16_t bootSequence;
- uint32_t localBank;
- uint32_t nextObjectId;
- uint32_t nextRemotePrefix;
-
-# define MA_BUFFER_SIZE 65536
- char inputBuffer[MA_BUFFER_SIZE];
- char outputBuffer[MA_BUFFER_SIZE];
-
- void PeriodicProcessing (void);
- void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
- bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
- void SendBuffer (qpid::framing::Buffer& buf,
- uint32_t length,
- broker::Exchange::shared_ptr exchange,
- std::string routingKey);
-
- void dispatchMethod (broker::Message& msg,
- const std::string& routingKey,
- size_t first);
- void dispatchAgentCommand (broker::Message& msg);
- PackageMap::iterator FindOrAddPackage (std::string name);
- void AddClassLocal (PackageMap::iterator pIter,
- std::string className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall);
- void EncodePackageIndication (qpid::framing::Buffer& buf,
- PackageMap::iterator pIter);
- void EncodeClassIndication (qpid::framing::Buffer& buf,
- PackageMap::iterator pIter,
- ClassMap::iterator cIter);
- uint32_t assignPrefix (uint32_t requestedPrefix);
- void sendCommandComplete (std::string replyToKey, uint32_t sequence,
- uint32_t code = 0, std::string text = std::string("OK"));
- void handleBrokerRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handlePackageQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handlePackageInd (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handleClassQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handleSchemaQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handleAttachRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handleGetRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ virtual void RegisterClass (std::string packageName,
+ std::string className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall) = 0;
+ virtual void addObject (ManagementObject::shared_ptr object,
+ uint32_t persistId = 0,
+ uint32_t persistBank = 4) = 0;
};
}}
diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp
new file mode 100644
index 0000000000..6466028c00
--- /dev/null
+++ b/cpp/src/qpid/management/ManagementBroker.cpp
@@ -0,0 +1,746 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ManagementBroker.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/log/Statement.h"
+#include <qpid/broker/Message.h>
+#include <qpid/broker/MessageDelivery.h>
+#include "qpid/framing/MessageTransferBody.h"
+#include <list>
+#include <iostream>
+#include <fstream>
+
+using boost::intrusive_ptr;
+using qpid::framing::Uuid;
+using namespace qpid::framing;
+using namespace qpid::management;
+using namespace qpid::broker;
+using namespace qpid::sys;
+using namespace std;
+
+ManagementAgent::shared_ptr ManagementBroker::agent;
+bool ManagementBroker::enabled = 0;
+
+ManagementBroker::RemoteAgent::~RemoteAgent ()
+{
+ if (mgmtObject.get () != 0)
+ mgmtObject->resourceDestroy ();
+}
+
+ManagementBroker::ManagementBroker (string _dataDir, uint16_t _interval, Manageable* _broker) :
+ dataDir (_dataDir), interval (_interval), broker (_broker)
+{
+ timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval)));
+ localBank = 5;
+ nextObjectId = 1;
+ bootSequence = 1;
+ nextRemoteBank = 10;
+
+ // Get from file or generate and save to file.
+ if (dataDir.empty ())
+ {
+ uuid.generate ();
+ QPID_LOG (info, "ManagementBroker has no data directory, generated new broker ID: "
+ << uuid);
+ }
+ else
+ {
+ string filename (dataDir + "/.mbrokerdata");
+ ifstream inFile (filename.c_str ());
+
+ if (inFile.good ())
+ {
+ inFile >> uuid;
+ inFile >> bootSequence;
+ inFile >> nextRemoteBank;
+ inFile.close ();
+ QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid);
+
+ bootSequence++;
+ writeData ();
+ }
+ else
+ {
+ uuid.generate ();
+ QPID_LOG (info, "ManagementBroker generated broker ID: " << uuid);
+ writeData ();
+ }
+
+ QPID_LOG (debug, "ManagementBroker boot sequence: " << bootSequence);
+ }
+}
+
+ManagementBroker::~ManagementBroker () {}
+
+void ManagementBroker::writeData ()
+{
+ string filename (dataDir + "/.mbrokerdata");
+ ofstream outFile (filename.c_str ());
+
+ if (outFile.good ())
+ {
+ outFile << uuid << " " << bootSequence << " " << nextRemoteBank << endl;
+ outFile.close ();
+ }
+}
+
+void ManagementBroker::enableManagement (string dataDir, uint16_t interval, Manageable* broker)
+{
+ enabled = 1;
+ if (agent.get () == 0)
+ agent = shared_ptr (new ManagementBroker (dataDir, interval, broker));
+}
+
+ManagementAgent::shared_ptr ManagementAgent::getAgent (void)
+{
+ return ManagementBroker::agent;
+}
+
+void ManagementBroker::shutdown (void)
+{
+ if (agent.get () != 0)
+ {
+ ManagementBroker* broker = (ManagementBroker*) agent.get();
+
+ broker->mExchange.reset ();
+ broker->dExchange.reset ();
+ agent.reset ();
+ }
+}
+
+void ManagementBroker::setExchange (broker::Exchange::shared_ptr _mexchange,
+ broker::Exchange::shared_ptr _dexchange)
+{
+ mExchange = _mexchange;
+ dExchange = _dexchange;
+}
+
+void ManagementBroker::RegisterClass (string packageName,
+ string className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall)
+{
+ Mutex::ScopedLock lock (userLock);
+ PackageMap::iterator pIter = FindOrAddPackage (packageName);
+ AddClassLocal (pIter, className, md5Sum, schemaCall);
+}
+
+void ManagementBroker::addObject (ManagementObject::shared_ptr object,
+ uint32_t persistId,
+ uint32_t persistBank)
+{
+ Mutex::ScopedLock lock (userLock);
+ uint64_t objectId;
+
+ if (persistId == 0)
+ objectId = ((uint64_t) bootSequence) << 48 |
+ ((uint64_t) localBank) << 24 | nextObjectId++;
+ else
+ objectId = ((uint64_t) persistBank) << 24 | persistId;
+
+ object->setObjectId (objectId);
+ managementObjects[objectId] = object;
+}
+
+ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds)
+ : TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), broker(_broker) {}
+
+ManagementBroker::Periodic::~Periodic () {}
+
+void ManagementBroker::Periodic::fire ()
+{
+ broker.timer.add (intrusive_ptr<TimerTask> (new Periodic (broker, broker.interval)));
+ broker.PeriodicProcessing ();
+}
+
+void ManagementBroker::clientAdded (void)
+{
+ for (ManagementObjectMap::iterator iter = managementObjects.begin ();
+ iter != managementObjects.end ();
+ iter++)
+ {
+ ManagementObject::shared_ptr object = iter->second;
+ object->setAllChanged ();
+ }
+}
+
+void ManagementBroker::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
+{
+ buf.putOctet ('A');
+ buf.putOctet ('M');
+ buf.putOctet ('1');
+ buf.putOctet (opcode);
+ buf.putLong (seq);
+}
+
+bool ManagementBroker::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
+{
+ uint8_t h1 = buf.getOctet ();
+ uint8_t h2 = buf.getOctet ();
+ uint8_t h3 = buf.getOctet ();
+
+ *opcode = buf.getOctet ();
+ *seq = buf.getLong ();
+
+ return h1 == 'A' && h2 == 'M' && h3 == '1';
+}
+
+void ManagementBroker::SendBuffer (Buffer& buf,
+ uint32_t length,
+ broker::Exchange::shared_ptr exchange,
+ string routingKey)
+{
+ if (exchange.get() == 0)
+ return;
+
+ intrusive_ptr<Message> msg (new Message ());
+ AMQFrame method (in_place<MessageTransferBody>(
+ ProtocolVersion(), exchange->getName (), 0, 0));
+ AMQFrame header (in_place<AMQHeaderBody>());
+ AMQFrame content(in_place<AMQContentBody>());
+
+ content.castBody<AMQContentBody>()->decode(buf, length);
+
+ method.setEof (false);
+ header.setBof (false);
+ header.setEof (false);
+ content.setBof (false);
+
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+
+ MessageProperties* props =
+ msg->getFrames().getHeaders()->get<MessageProperties>(true);
+ props->setContentLength(length);
+ msg->getFrames().append(content);
+
+ DeliverableMessage deliverable (msg);
+ exchange->route (deliverable, routingKey, 0);
+}
+
+void ManagementBroker::PeriodicProcessing (void)
+{
+#define BUFSIZE 65536
+ Mutex::ScopedLock lock (userLock);
+ char msgChars[BUFSIZE];
+ uint32_t contentSize;
+ string routingKey;
+ std::list<uint64_t> deleteList;
+
+ if (managementObjects.empty ())
+ return;
+
+ for (ManagementObjectMap::iterator iter = managementObjects.begin ();
+ iter != managementObjects.end ();
+ iter++)
+ {
+ ManagementObject::shared_ptr object = iter->second;
+
+ if (object->getConfigChanged () || object->isDeleted ())
+ {
+ Buffer msgBuffer (msgChars, BUFSIZE);
+ EncodeHeader (msgBuffer, 'c');
+ object->writeConfig (msgBuffer);
+
+ contentSize = BUFSIZE - msgBuffer.available ();
+ msgBuffer.reset ();
+ routingKey = "mgmt." + uuid.str() + ".config." + object->getClassName ();
+ SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
+ }
+
+ if (object->getInstChanged ())
+ {
+ Buffer msgBuffer (msgChars, BUFSIZE);
+ EncodeHeader (msgBuffer, 'i');
+ object->writeInstrumentation (msgBuffer);
+
+ contentSize = BUFSIZE - msgBuffer.available ();
+ msgBuffer.reset ();
+ routingKey = "mgmt." + uuid.str () + ".inst." + object->getClassName ();
+ SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
+ }
+
+ if (object->isDeleted ())
+ deleteList.push_back (iter->first);
+ }
+
+ // Delete flagged objects
+ for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin ();
+ iter != deleteList.rend ();
+ iter++)
+ managementObjects.erase (*iter);
+
+ deleteList.clear ();
+}
+
+void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence,
+ uint32_t code, string text)
+{
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'z', sequence);
+ outBuffer.putLong (code);
+ outBuffer.putShortString (text);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+}
+
+void ManagementBroker::dispatchCommand (Deliverable& deliverable,
+ const string& routingKey,
+ const FieldTable* /*args*/)
+{
+ Mutex::ScopedLock lock (userLock);
+ Message& msg = ((DeliverableMessage&) deliverable).getMessage ();
+
+ if (routingKey.compare (0, 13, "agent.method.") == 0)
+ dispatchMethodLH (msg, routingKey, 13);
+
+ else if (routingKey.length () == 5 &&
+ routingKey.compare (0, 5, "agent") == 0)
+ dispatchAgentCommandLH (msg);
+
+ else
+ {
+ QPID_LOG (debug, "Illegal routing key for dispatch: " << routingKey);
+ return;
+ }
+}
+
+void ManagementBroker::dispatchMethodLH (Message& msg,
+ const string& routingKey,
+ size_t first)
+{
+ size_t pos, start = first;
+ uint32_t contentSize;
+
+ if (routingKey.length () == start)
+ {
+ QPID_LOG (debug, "Missing package-name in routing key: " << routingKey);
+ return;
+ }
+
+ pos = routingKey.find ('.', start);
+ if (pos == string::npos || routingKey.length () == pos + 1)
+ {
+ QPID_LOG (debug, "Missing class-name in routing key: " << routingKey);
+ return;
+ }
+
+ string packageName = routingKey.substr (start, pos - start);
+
+ start = pos + 1;
+ pos = routingKey.find ('.', start);
+ if (pos == string::npos || routingKey.length () == pos + 1)
+ {
+ QPID_LOG (debug, "Missing method-name in routing key: " << routingKey);
+ return;
+ }
+
+ string className = routingKey.substr (start, pos - start);
+
+ start = pos + 1;
+ string methodName = routingKey.substr (start, routingKey.length () - start);
+
+ contentSize = msg.encodedContentSize ();
+ if (contentSize < 8 || contentSize > MA_BUFFER_SIZE)
+ return;
+
+ Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen, sequence;
+ uint8_t opcode;
+
+ msg.encodeContent (inBuffer);
+ inBuffer.reset ();
+
+ if (!CheckHeader (inBuffer, &opcode, &sequence))
+ {
+ QPID_LOG (debug, " Invalid content header");
+ return;
+ }
+
+ if (opcode != 'M')
+ {
+ QPID_LOG (debug, " Unexpected opcode " << opcode);
+ return;
+ }
+
+ uint64_t objId = inBuffer.getLongLong ();
+ string replyToKey;
+
+ const framing::MessageProperties* p =
+ msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+ if (p && p->hasReplyTo())
+ {
+ const framing::ReplyTo& rt = p->getReplyTo ();
+ replyToKey = rt.getRoutingKey ();
+ }
+ else
+ {
+ QPID_LOG (debug, " Reply-to missing");
+ return;
+ }
+
+ EncodeHeader (outBuffer, 'm', sequence);
+
+ ManagementObjectMap::iterator iter = managementObjects.find (objId);
+ if (iter == managementObjects.end () || iter->second->isDeleted ())
+ {
+ outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT);
+ outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT));
+ }
+ else
+ {
+ iter->second->doMethod (methodName, inBuffer, outBuffer);
+ }
+
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+}
+
+void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence)
+{
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'b', sequence);
+ uuid.encode (outBuffer);
+
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+}
+
+void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence)
+{
+ for (PackageMap::iterator pIter = packages.begin ();
+ pIter != packages.end ();
+ pIter++)
+ {
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'p', sequence);
+ EncodePackageIndication (outBuffer, pIter);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ }
+
+ sendCommandComplete (replyToKey, sequence);
+}
+
+void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/)
+{
+ std::string packageName;
+
+ inBuffer.getShortString (packageName);
+ FindOrAddPackage (packageName);
+}
+
+void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+{
+ std::string packageName;
+
+ inBuffer.getShortString (packageName);
+ PackageMap::iterator pIter = packages.find (packageName);
+ if (pIter != packages.end ())
+ {
+ ClassMap cMap = pIter->second;
+ for (ClassMap::iterator cIter = cMap.begin ();
+ cIter != cMap.end ();
+ cIter++)
+ {
+ if (cIter->second.hasSchema ())
+ {
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'q', sequence);
+ EncodeClassIndication (outBuffer, pIter, cIter);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ }
+ }
+ }
+
+ sendCommandComplete (replyToKey, sequence);
+}
+
+void ManagementBroker::SchemaClass::appendSchema (Buffer& buf)
+{
+ // If the management package is attached locally (embedded in the broker or
+ // linked in via plug-in), call the schema handler directly. If the package
+ // is from a remote management agent, send the stored schema information.
+
+ if (writeSchemaCall != 0)
+ writeSchemaCall (buf);
+ else
+ buf.putRawData (buffer, bufferLen);
+}
+
+void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+{
+ string packageName;
+ SchemaClassKey key;
+
+ inBuffer.getShortString (packageName);
+ inBuffer.getShortString (key.name);
+ inBuffer.getBin128 (key.hash);
+
+ PackageMap::iterator pIter = packages.find (packageName);
+ if (pIter != packages.end ())
+ {
+ ClassMap cMap = pIter->second;
+ ClassMap::iterator cIter = cMap.find (key);
+ if (cIter != cMap.end ())
+ {
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+ SchemaClass classInfo = cIter->second;
+
+ if (classInfo.hasSchema())
+ {
+ EncodeHeader (outBuffer, 's', sequence);
+ classInfo.appendSchema (outBuffer);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ }
+
+ clientAdded ();
+ // TODO: Send client-added to each remote agent.
+ }
+ }
+}
+
+bool ManagementBroker::bankInUse (uint32_t bank)
+{
+ for (RemoteAgentMap::iterator aIter = remoteAgents.begin();
+ aIter != remoteAgents.end();
+ aIter++)
+ if (aIter->second->objIdBank == bank)
+ return true;
+ return false;
+}
+
+uint32_t ManagementBroker::allocateNewBank ()
+{
+ while (bankInUse (nextRemoteBank))
+ nextRemoteBank++;
+
+ uint32_t allocated = nextRemoteBank++;
+ writeData ();
+ return allocated;
+}
+
+uint32_t ManagementBroker::assignBankLH (uint32_t requestedBank)
+{
+ if (requestedBank == 0 || bankInUse (requestedBank))
+ return allocateNewBank ();
+ return requestedBank;
+}
+
+void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+{
+ string label;
+ uint32_t requestedBank;
+ uint32_t assignedBank;
+ Uuid sessionId;
+ Uuid systemId;
+
+ inBuffer.getShortString (label);
+ sessionId.decode (inBuffer);
+ systemId.decode (inBuffer);
+ requestedBank = inBuffer.getLong ();
+ assignedBank = assignBankLH (requestedBank);
+
+ RemoteAgentMap::iterator aIter = remoteAgents.find (sessionId);
+ if (aIter != remoteAgents.end())
+ {
+ // There already exists an agent on this session. Reject the request.
+ sendCommandComplete (replyToKey, sequence, 1, "Session already has remote agent");
+ return;
+ }
+
+ RemoteAgent* agent = new RemoteAgent;
+ agent->objIdBank = assignedBank;
+ agent->mgmtObject = management::Agent::shared_ptr
+ (new management::Agent (agent));
+ agent->mgmtObject->set_sessionId (sessionId);
+ agent->mgmtObject->set_label (label);
+ agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
+ agent->mgmtObject->set_sysId (systemId);
+ addObject (agent->mgmtObject);
+
+ remoteAgents[sessionId] = agent;
+
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'a', sequence);
+ outBuffer.putLong (assignedBank);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+}
+
+void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+{
+ FieldTable ft;
+ FieldTable::ValuePtr value;
+
+ ft.decode (inBuffer);
+ value = ft.get ("_class");
+ if (value->empty () || !value->convertsTo<string> ())
+ {
+ // TODO: Send completion with an error code
+ return;
+ }
+
+ string className (value->get<string> ());
+
+ for (ManagementObjectMap::iterator iter = managementObjects.begin ();
+ iter != managementObjects.end ();
+ iter++)
+ {
+ ManagementObject::shared_ptr object = iter->second;
+ if (object->getClassName () == className)
+ {
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'g', sequence);
+ object->writeConfig (outBuffer);
+ object->writeInstrumentation (outBuffer, true);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ }
+ }
+
+ sendCommandComplete (replyToKey, sequence);
+}
+
+void ManagementBroker::dispatchAgentCommandLH (Message& msg)
+{
+ Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
+ uint8_t opcode;
+ uint32_t sequence;
+ string replyToKey;
+
+ const framing::MessageProperties* p =
+ msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+ if (p && p->hasReplyTo())
+ {
+ const framing::ReplyTo& rt = p->getReplyTo ();
+ replyToKey = rt.getRoutingKey ();
+ }
+ else
+ return;
+
+ msg.encodeContent (inBuffer);
+ inBuffer.reset ();
+
+ if (!CheckHeader (inBuffer, &opcode, &sequence))
+ return;
+
+ if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'P') handlePackageQueryLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'p') handlePackageIndLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'Q') handleClassQueryLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence);
+}
+
+ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackage (std::string name)
+{
+ PackageMap::iterator pIter = packages.find (name);
+ if (pIter != packages.end ())
+ return pIter;
+
+ // No such package found, create a new map entry.
+ pair<PackageMap::iterator, bool> result =
+ packages.insert (pair<string, ClassMap> (name, ClassMap ()));
+ QPID_LOG (debug, "ManagementBroker added package " << name);
+
+ // Publish a package-indication message
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'p');
+ EncodePackageIndication (outBuffer, result.first);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package");
+
+ return result.first;
+}
+
+void ManagementBroker::AddClassLocal (PackageMap::iterator pIter,
+ string className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall)
+{
+ SchemaClassKey key;
+ ClassMap& cMap = pIter->second;
+
+ key.name = className;
+ memcpy (&key.hash, md5Sum, 16);
+
+ ClassMap::iterator cIter = cMap.find (key);
+ if (cIter != cMap.end ())
+ return;
+
+ // No such class found, create a new class with local information.
+ QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." <<
+ key.name);
+ SchemaClass classInfo;
+
+ classInfo.writeSchemaCall = schemaCall;
+ cMap[key] = classInfo;
+
+ // TODO: Publish a class-indication message
+}
+
+void ManagementBroker::EncodePackageIndication (Buffer& buf,
+ PackageMap::iterator pIter)
+{
+ buf.putShortString ((*pIter).first);
+}
+
+void ManagementBroker::EncodeClassIndication (Buffer& buf,
+ PackageMap::iterator pIter,
+ ClassMap::iterator cIter)
+{
+ SchemaClassKey key = (*cIter).first;
+
+ buf.putShortString ((*pIter).first);
+ buf.putShortString (key.name);
+ buf.putBin128 (key.hash);
+}
+
diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementBroker.h
new file mode 100644
index 0000000000..2e02cb2a43
--- /dev/null
+++ b/cpp/src/qpid/management/ManagementBroker.h
@@ -0,0 +1,203 @@
+#ifndef _ManagementBroker_
+#define _ManagementBroker_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/Options.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Timer.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/sys/Mutex.h"
+#include "ManagementAgent.h"
+#include "ManagementObject.h"
+#include "Manageable.h"
+#include "qpid/management/Agent.h"
+#include <qpid/framing/AMQFrame.h>
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace management {
+
+class ManagementBroker : public ManagementAgent
+{
+ private:
+
+ ManagementBroker (std::string dataDir, uint16_t interval, Manageable* broker);
+
+ public:
+
+ virtual ~ManagementBroker ();
+
+ static void enableManagement (std::string dataDir, uint16_t interval, Manageable* broker);
+ static shared_ptr getAgent (void);
+ static void shutdown (void);
+
+ void setInterval (uint16_t _interval) { interval = _interval; }
+ void setExchange (broker::Exchange::shared_ptr mgmtExchange,
+ broker::Exchange::shared_ptr directExchange);
+ void RegisterClass (std::string packageName,
+ std::string className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall);
+ void addObject (ManagementObject::shared_ptr object,
+ uint32_t persistId = 0,
+ uint32_t persistBank = 4);
+ void clientAdded (void);
+ void dispatchCommand (broker::Deliverable& msg,
+ const std::string& routingKey,
+ const framing::FieldTable* args);
+
+ private:
+ friend class ManagementAgent;
+
+ struct Periodic : public broker::TimerTask
+ {
+ ManagementBroker& broker;
+
+ Periodic (ManagementBroker& broker, uint32_t seconds);
+ virtual ~Periodic ();
+ void fire ();
+ };
+
+ // Storage for tracking remote management agents, attached via the client
+ // management agent API.
+ //
+ struct RemoteAgent : public Manageable
+ {
+ uint32_t objIdBank;
+ Agent::shared_ptr mgmtObject;
+ ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; }
+ virtual ~RemoteAgent ();
+ };
+
+ // TODO: Eventually replace string with entire reply-to structure. reply-to
+ // currently assumes that the exchange is "amq.direct" even though it could
+ // in theory be specified differently.
+ typedef std::map<framing::Uuid, RemoteAgent*> RemoteAgentMap;
+ typedef std::vector<std::string> ReplyToVector;
+
+ // Storage for known schema classes:
+ //
+ // SchemaClassKey -- Key elements for map lookups
+ // SchemaClassKeyComp -- Comparison class for SchemaClassKey
+ // SchemaClass -- Non-key elements for classes
+ //
+ struct SchemaClassKey
+ {
+ std::string name;
+ uint8_t hash[16];
+ };
+
+ struct SchemaClassKeyComp
+ {
+ bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
+ {
+ if (lhs.name != rhs.name)
+ return lhs.name < rhs.name;
+ else
+ for (int i = 0; i < 16; i++)
+ if (lhs.hash[i] != rhs.hash[i])
+ return lhs.hash[i] < rhs.hash[i];
+ return false;
+ }
+ };
+
+ struct SchemaClass
+ {
+ ManagementObject::writeSchemaCall_t writeSchemaCall;
+ ReplyToVector remoteAgents;
+ size_t bufferLen;
+ uint8_t* buffer;
+
+ SchemaClass () : writeSchemaCall(0), bufferLen(0), buffer(0) {}
+ bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); }
+ void appendSchema (framing::Buffer& buf);
+ };
+
+ typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
+ typedef std::map<std::string, ClassMap> PackageMap;
+
+ RemoteAgentMap remoteAgents;
+ PackageMap packages;
+ ManagementObjectMap managementObjects;
+
+ static shared_ptr agent;
+ static bool enabled;
+
+ framing::Uuid uuid;
+ sys::Mutex userLock;
+ broker::Timer timer;
+ broker::Exchange::shared_ptr mExchange;
+ broker::Exchange::shared_ptr dExchange;
+ std::string dataDir;
+ uint16_t interval;
+ Manageable* broker;
+ uint16_t bootSequence;
+ uint32_t localBank;
+ uint32_t nextObjectId;
+ uint32_t nextRemoteBank;
+
+# define MA_BUFFER_SIZE 65536
+ char inputBuffer[MA_BUFFER_SIZE];
+ char outputBuffer[MA_BUFFER_SIZE];
+
+ void writeData ();
+ void PeriodicProcessing (void);
+ void EncodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
+ bool CheckHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
+ void SendBuffer (framing::Buffer& buf,
+ uint32_t length,
+ broker::Exchange::shared_ptr exchange,
+ std::string routingKey);
+
+ void dispatchMethodLH (broker::Message& msg,
+ const std::string& routingKey,
+ size_t first);
+ void dispatchAgentCommandLH (broker::Message& msg);
+
+ PackageMap::iterator FindOrAddPackage (std::string name);
+ void AddClassLocal (PackageMap::iterator pIter,
+ std::string className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall);
+ void EncodePackageIndication (framing::Buffer& buf,
+ PackageMap::iterator pIter);
+ void EncodeClassIndication (framing::Buffer& buf,
+ PackageMap::iterator pIter,
+ ClassMap::iterator cIter);
+ bool bankInUse (uint32_t bank);
+ uint32_t allocateNewBank ();
+ uint32_t assignBankLH (uint32_t requestedPrefix);
+ void sendCommandComplete (std::string replyToKey, uint32_t sequence,
+ uint32_t code = 0, std::string text = std::string("OK"));
+ void handleBrokerRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handlePackageQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handlePackageIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleClassQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleSchemaRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+};
+
+}}
+
+#endif /*!_ManagementBroker_*/
diff --git a/cpp/src/qpid/management/ManagementExchange.cpp b/cpp/src/qpid/management/ManagementExchange.cpp
index c589aefba0..28e6fb8d0a 100644
--- a/cpp/src/qpid/management/ManagementExchange.cpp
+++ b/cpp/src/qpid/management/ManagementExchange.cpp
@@ -53,7 +53,7 @@ void ManagementExchange::route (Deliverable& msg,
TopicExchange::route (msg, routingKey, args);
}
-void ManagementExchange::setManagmentAgent (ManagementAgent::shared_ptr agent)
+void ManagementExchange::setManagmentAgent (ManagementBroker* agent)
{
managementAgent = agent;
}
diff --git a/cpp/src/qpid/management/ManagementExchange.h b/cpp/src/qpid/management/ManagementExchange.h
index 7faec32b0f..28066b1e80 100644
--- a/cpp/src/qpid/management/ManagementExchange.h
+++ b/cpp/src/qpid/management/ManagementExchange.h
@@ -22,7 +22,7 @@
#define _ManagementExchange_
#include "qpid/broker/TopicExchange.h"
-#include "ManagementAgent.h"
+#include "ManagementBroker.h"
namespace qpid {
namespace broker {
@@ -30,7 +30,7 @@ namespace broker {
class ManagementExchange : public virtual TopicExchange
{
private:
- management::ManagementAgent::shared_ptr managementAgent;
+ management::ManagementBroker* managementAgent;
public:
static const std::string typeName;
@@ -46,7 +46,7 @@ class ManagementExchange : public virtual TopicExchange
const string& routingKey,
const qpid::framing::FieldTable* args);
- void setManagmentAgent (management::ManagementAgent::shared_ptr agent);
+ void setManagmentAgent (management::ManagementBroker* agent);
virtual ~ManagementExchange();
};
diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h
index 48a3372d16..047f8c5754 100644
--- a/cpp/src/qpid/management/ManagementObject.h
+++ b/cpp/src/qpid/management/ManagementObject.h
@@ -22,7 +22,6 @@
*
*/
-#include "Manageable.h"
#include "qpid/sys/Time.h"
#include "qpid/sys/Mutex.h"
#include <qpid/framing/Buffer.h>
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp
index ca2bd7c93c..31974993bb 100644
--- a/cpp/src/qpid/sys/AsynchIOHandler.cpp
+++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp
@@ -36,13 +36,14 @@ struct Buff : public AsynchIO::BufferBase {
{ delete [] bytes;}
};
-AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) :
+AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f, ProtocolAccess* a) :
identifier(id),
aio(0),
factory(f),
codec(0),
readError(false),
- isClient(false)
+ isClient(false),
+ access(a)
{}
AsynchIOHandler::~AsynchIOHandler() {
@@ -152,7 +153,7 @@ void AsynchIOHandler::nobuffs(AsynchIO&) {
void AsynchIOHandler::idle(AsynchIO&){
if (isClient && codec == 0) {
- codec = factory->create(*this, identifier);
+ codec = factory->create(*this, identifier, access);
write(framing::ProtocolInitiation(codec->getVersion()));
return;
}
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.h b/cpp/src/qpid/sys/AsynchIOHandler.h
index 530613367a..ece52f57c4 100644
--- a/cpp/src/qpid/sys/AsynchIOHandler.h
+++ b/cpp/src/qpid/sys/AsynchIOHandler.h
@@ -32,7 +32,7 @@ namespace framing {
}
namespace sys {
-
+class ProtocolAccess;
class AsynchIOHandler : public OutputControl {
std::string identifier;
AsynchIO* aio;
@@ -40,11 +40,12 @@ class AsynchIOHandler : public OutputControl {
ConnectionCodec* codec;
bool readError;
bool isClient;
+ ProtocolAccess* access;
void write(const framing::ProtocolInitiation&);
public:
- AsynchIOHandler(std::string id, ConnectionCodec::Factory* f);
+ AsynchIOHandler(std::string id, ConnectionCodec::Factory* f, ProtocolAccess* a =0);
~AsynchIOHandler();
void init(AsynchIO* a, int numBuffs);
diff --git a/cpp/src/qpid/sys/ConnectionCodec.h b/cpp/src/qpid/sys/ConnectionCodec.h
index 205596c709..4c5a68e576 100644
--- a/cpp/src/qpid/sys/ConnectionCodec.h
+++ b/cpp/src/qpid/sys/ConnectionCodec.h
@@ -28,9 +28,8 @@
namespace qpid {
-namespace broker { class Broker; }
-
namespace sys {
+class ProtocolAccess;
/**
* Interface of coder/decoder for a connection of a specific protocol
@@ -70,7 +69,7 @@ class ConnectionCodec {
/** Return "preferred" codec for outbound connections. */
virtual ConnectionCodec* create(
- OutputControl&, const std::string& id
+ OutputControl&, const std::string& id, ProtocolAccess* a = 0
) = 0;
};
};
diff --git a/cpp/src/qpid/sys/ProtocolAccess.h b/cpp/src/qpid/sys/ProtocolAccess.h
new file mode 100644
index 0000000000..433bf0ef97
--- /dev/null
+++ b/cpp/src/qpid/sys/ProtocolAccess.h
@@ -0,0 +1,65 @@
+#ifndef _sys_ProtocolAccess_h
+#define _sys_ProtocolAccess_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "AsynchIO.h"
+#include "AsynchIOHandler.h"
+#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+
+namespace broker
+{
+class Connection;
+}
+
+namespace sys {
+
+class ProtocolAccess
+{
+public:
+ typedef boost::function0<void> Callback;
+ typedef boost::function2<void, int, std::string> ClosedCallback;
+ typedef boost::function1<void, boost::shared_ptr<broker::Connection> > SetConnCallback;
+
+ ProtocolAccess (Callback ecb, ClosedCallback ccb, SetConnCallback sccb)
+ : aio(0), establishedCb(ecb), closedCb(ccb), setConnCb(sccb) {}
+ ~ProtocolAccess() {}
+ inline void close() { if (aio) aio->queueWriteClose(); }
+
+ inline void setAio(AsynchIO *_aio) { aio = _aio; establishedCb(); }
+ inline void closedEof(AsynchIOHandler* async) { async->eof(*aio); closedCb(-1, "Closed by Peer"); }
+ inline void closed(int err, std::string str) { closedCb(err, str); }
+ inline void callConnCb(boost::shared_ptr<broker::Connection> c) { setConnCb(c); }
+
+private:
+ AsynchIO* aio;
+ Callback establishedCb;
+ ClosedCallback closedCb;
+ SetConnCallback setConnCb;
+};
+
+}}
+
+#endif //!_sys_ProtocolAccess_h
diff --git a/cpp/src/qpid/sys/ProtocolFactory.h b/cpp/src/qpid/sys/ProtocolFactory.h
index 5f80771e49..e61a94b205 100644
--- a/cpp/src/qpid/sys/ProtocolFactory.h
+++ b/cpp/src/qpid/sys/ProtocolFactory.h
@@ -25,7 +25,7 @@
#include <stdint.h>
#include "qpid/SharedObject.h"
#include "ConnectionCodec.h"
-
+#include "ProtocolAccess.h"
namespace qpid {
namespace sys {
@@ -42,7 +42,8 @@ class ProtocolFactory : public qpid::SharedObject<ProtocolFactory>
virtual void connect(
boost::shared_ptr<Poller>,
const std::string& host, int16_t port,
- ConnectionCodec::Factory* codec) = 0;
+ ConnectionCodec::Factory* codec,
+ ProtocolAccess* access = 0) = 0;
};
inline ProtocolFactory::~ProtocolFactory() {}
diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp
index 045bc56e90..5d2cadbe03 100644
--- a/cpp/src/qpid/sys/TCPIOPlugin.cpp
+++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp
@@ -41,13 +41,15 @@ class AsynchIOProtocolFactory : public ProtocolFactory {
public:
AsynchIOProtocolFactory(int16_t port, int backlog);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& host, int16_t port, ConnectionCodec::Factory*);
+ void connect(Poller::shared_ptr, const std::string& host, int16_t port,
+ ConnectionCodec::Factory*, ProtocolAccess*);
uint16_t getPort() const;
std::string getHost() const;
private:
- void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, bool isClient);
+ void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
+ bool isClient, ProtocolAccess*);
};
// Static instance to initialise plugin
@@ -72,17 +74,32 @@ AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog) :
{}
void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
- ConnectionCodec::Factory* f, bool isClient) {
- AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f);
+ ConnectionCodec::Factory* f, bool isClient,
+ ProtocolAccess* a) {
+ AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f, a);
+ AsynchIO* aio;
+
if (isClient)
async->setClient();
- AsynchIO* aio = new AsynchIO(s,
- boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
- boost::bind(&AsynchIOHandler::eof, async, _1),
- boost::bind(&AsynchIOHandler::disconnect, async, _1),
- boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
- boost::bind(&AsynchIOHandler::nobuffs, async, _1),
- boost::bind(&AsynchIOHandler::idle, async, _1));
+ if (a == 0)
+ aio = new AsynchIO(s,
+ boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect, async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+ boost::bind(&AsynchIOHandler::idle, async, _1));
+ else {
+ aio = new AsynchIO(s,
+ boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+ boost::bind(&ProtocolAccess::closedEof, a, async),
+ boost::bind(&AsynchIOHandler::disconnect, async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+ boost::bind(&AsynchIOHandler::idle, async, _1));
+ a->setAio(aio);
+ }
+
async->init(aio, 4);
aio->start(poller);
}
@@ -95,26 +112,31 @@ std::string AsynchIOProtocolFactory::getHost() const {
return listener.getSockname();
}
-void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) {
+void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller,
+ ConnectionCodec::Factory* fact) {
acceptor.reset(
new AsynchAcceptor(listener,
- boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
+ boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false,
+ (ProtocolAccess*) 0)));
acceptor->start(poller);
}
void AsynchIOProtocolFactory::connect(
Poller::shared_ptr poller,
const std::string& host, int16_t port,
- ConnectionCodec::Factory* f)
+ ConnectionCodec::Factory* fact,
+ ProtocolAccess* access)
{
// Note that the following logic does not cause a memory leak.
// The allocated Socket is freed either by the AsynchConnector
// upon connection failure or by the AsynchIO upon connection
// shutdown. The allocated AsynchConnector frees itself when it
// is no longer needed.
+
Socket* socket = new Socket();
- new AsynchConnector(*socket, poller, host, port,
- boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, f, true));
+ new AsynchConnector (*socket, poller, host, port,
+ boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, true, access),
+ boost::bind(&ProtocolAccess::closed, access, _1, _2));
}
}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp
index 9dcb841992..470db4c614 100644
--- a/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -130,7 +130,7 @@ void AsynchConnector::connComplete(DispatchHandle& h)
h.stopWatch();
if (errCode == 0) {
connCallback(socket);
- DispatchHandle::doDelete();
+ DispatchHandle::doDelete();
} else {
failure(errCode, std::string(strerror(errCode)));
}
@@ -148,6 +148,7 @@ void AsynchConnector::failure(int errCode, std::string message)
}
/*
+>>>>>>> .r654667
* Asynch reader/writer
*/
AsynchIO::AsynchIO(const Socket& s,
diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py
index ce9c4a8757..25654fe1c7 100755
--- a/cpp/src/tests/federation.py
+++ b/cpp/src/tests/federation.py
@@ -23,6 +23,7 @@ from qpid.testlib import TestBase010, testrunner
from qpid.management import managementChannel, managementClient
from qpid.datatypes import Message
from qpid.queue import Empty
+from time import sleep
def add_module(args=sys.argv[1:]):
for a in args:
@@ -89,18 +90,18 @@ class FederationTests(TestBase010):
mgmt = Helper(self)
broker = mgmt.get_object("broker")
- for i in range(10):
- mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()})
- link = mgmt.get_object("link")
-
- mgmt.call_method(link, "bridge", {"src":"amq.direct", "dest":"amq.direct", "key":"my-key"})
- bridge = mgmt.get_object("bridge")
+ mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()})
+ link = mgmt.get_object("link")
- mgmt.call_method(bridge, "close")
- self.assertEqual(len(mgmt.get_objects("bridge")), 0)
+ mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.direct", "key":"my-key"})
+ bridge = mgmt.get_object("bridge")
- mgmt.call_method(link, "close")
- self.assertEqual(len(mgmt.get_objects("link")), 0)
+ mgmt.call_method(bridge, "close")
+ mgmt.call_method(link, "close")
+
+ sleep(6)
+ self.assertEqual(len(mgmt.get_objects("bridge")), 0)
+ self.assertEqual(len(mgmt.get_objects("link")), 0)
mgmt.shutdown ()
@@ -113,7 +114,7 @@ class FederationTests(TestBase010):
mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()})
link = mgmt.get_object("link")
- mgmt.call_method(link, "bridge", {"src":"amq.direct", "dest":"amq.fanout", "key":"my-key"})
+ mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.fanout", "key":"my-key"})
bridge = mgmt.get_object("bridge")
#setup queue to receive messages from local broker
@@ -121,6 +122,7 @@ class FederationTests(TestBase010):
session.exchange_bind(queue="fed1", exchange="amq.fanout")
self.subscribe(queue="fed1", destination="f1")
queue = session.incoming("f1")
+ sleep(6)
#send messages to remote broker and confirm it is routed to local broker
r_conn = self.connect(host=remote_host(), port=remote_port())
@@ -138,12 +140,8 @@ class FederationTests(TestBase010):
self.fail("Got unexpected message in queue: " + extra.body)
except Empty: None
-
mgmt.call_method(bridge, "close")
- self.assertEqual(len(mgmt.get_objects("bridge")), 0)
-
mgmt.call_method(link, "close")
- self.assertEqual(len(mgmt.get_objects("link")), 0)
mgmt.shutdown()
@@ -170,7 +168,8 @@ class FederationTests(TestBase010):
mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()})
link = mgmt.get_object("link")
- mgmt.call_method(link, "bridge", {"src":"my-bridge-queue", "dest":"amq.fanout", "key":"", "id":"", "excludes":"", "src_is_queue":1})
+ mgmt.call_method(link, "bridge", {"durable":0, "src":"my-bridge-queue", "dest":"amq.fanout", "key":"", "id":"", "excludes":"", "src_is_queue":1})
+ sleep(6)
bridge = mgmt.get_object("bridge")
#add some more messages (i.e. after bridge was created)
@@ -191,10 +190,7 @@ class FederationTests(TestBase010):
mgmt.call_method(bridge, "close")
- self.assertEqual(len(mgmt.get_objects("bridge")), 0)
-
mgmt.call_method(link, "close")
- self.assertEqual(len(mgmt.get_objects("link")), 0)
mgmt.shutdown ()
@@ -207,8 +203,9 @@ class FederationTests(TestBase010):
mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()})
link = mgmt.get_object("link")
- mgmt.call_method(link, "bridge", {"src":"amq.direct", "dest":"amq.fanout", "key":"my-key",
+ mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.fanout", "key":"my-key",
"id":"my-bridge-id", "excludes":"exclude-me,also-exclude-me"})
+ sleep(6)
bridge = mgmt.get_object("bridge")
#setup queue to receive messages from local broker
@@ -241,10 +238,7 @@ class FederationTests(TestBase010):
except Empty: None
mgmt.call_method(bridge, "close")
- self.assertEqual(len(mgmt.get_objects("bridge")), 0)
-
mgmt.call_method(link, "close")
- self.assertEqual(len(mgmt.get_objects("link")), 0)
mgmt.shutdown ()