diff options
author | Gordon Sim <gsim@apache.org> | 2008-05-12 17:04:07 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-05-12 17:04:07 +0000 |
commit | 0655ff5aceb9d53eb256a05d7beb55b1c803c8de (patch) | |
tree | d478a719d5a5d030c3e228d298c6be8378d4fe44 /cpp | |
parent | 4a1605e6b357c251398aca281b90452c1cbd5ab2 (diff) | |
download | qpid-python-0655ff5aceb9d53eb256a05d7beb55b1c803c8de.tar.gz |
QPID-1050: Patch from Ted Ross:
1) Durability for federation links (broker-to-broker connections)
2) Improved handling of federation links:
a) Links can be created even if the remote broker is not reachable
b) If links are lost, re-establishment will occur using an exponential back-off algorithm
3) Durability of exchanges is now viewable through management
4) ManagementAgent API has been moved to an interface class to reduce coupling between the broker and manageable plug-ins.
5) General configuration storage capability has been added to the store/recover interface. This is used for federation links.
6) Management object-ids for durable objects are now themselves durable.
(Note: some refactoring needed around ProtocolAccess needed to try and reduce dependencies)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@655563 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
48 files changed, 2013 insertions, 1100 deletions
diff --git a/cpp/managementgen/templates/Class.h b/cpp/managementgen/templates/Class.h index d95a06479e..628a70d2d9 100644 --- a/cpp/managementgen/templates/Class.h +++ b/cpp/managementgen/templates/Class.h @@ -24,6 +24,7 @@ /*MGEN:Root.Disclaimer*/ #include "qpid/management/ManagementObject.h" +#include "qpid/framing/FieldTable.h" #include "qpid/framing/Uuid.h" namespace qpid { diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 4a49c83b65..3183aefd6c 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -258,6 +258,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/HeadersExchange.cpp \ qpid/broker/IncomingExecutionContext.cpp \ qpid/broker/IncompleteMessageList.cpp \ + qpid/broker/Link.cpp \ + qpid/broker/LinkRegistry.cpp \ qpid/broker/Message.cpp \ qpid/broker/MessageAdapter.cpp \ qpid/broker/MessageBuilder.cpp \ @@ -291,7 +293,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/TxPublish.cpp \ qpid/broker/Vhost.cpp \ qpid/management/Manageable.cpp \ - qpid/management/ManagementAgent.cpp \ + qpid/management/ManagementBroker.cpp \ qpid/management/ManagementExchange.cpp \ qpid/management/ManagementObject.cpp \ qpid/sys/TCPIOPlugin.cpp @@ -382,6 +384,8 @@ nobase_include_HEADERS = \ qpid/broker/HeadersExchange.h \ qpid/broker/IncomingExecutionContext.h \ qpid/broker/IncompleteMessageList.h \ + qpid/broker/Link.h \ + qpid/broker/LinkRegistry.h \ qpid/broker/Message.h \ qpid/broker/MessageAdapter.h \ qpid/broker/MessageBuilder.h \ @@ -391,6 +395,7 @@ nobase_include_HEADERS = \ qpid/broker/NameGenerator.h \ qpid/broker/NullMessageStore.h \ qpid/broker/Persistable.h \ + qpid/broker/PersistableConfig.h \ qpid/broker/PersistableExchange.h \ qpid/broker/PersistableMessage.h \ qpid/broker/PersistableQueue.h \ @@ -398,6 +403,7 @@ nobase_include_HEADERS = \ qpid/broker/QueueBindings.h \ qpid/broker/QueuePolicy.h \ qpid/broker/QueueRegistry.h \ + qpid/broker/RecoverableConfig.h \ qpid/broker/RecoverableExchange.h \ qpid/broker/RecoverableMessage.h \ qpid/broker/RecoverableQueue.h \ @@ -506,6 +512,7 @@ nobase_include_HEADERS = \ qpid/management/Args.h \ qpid/management/Manageable.h \ qpid/management/ManagementAgent.h \ + qpid/management/ManagementBroker.h \ qpid/management/ManagementExchange.h \ qpid/management/ManagementObject.h \ qpid/sys/AggregateOutput.h \ @@ -527,6 +534,7 @@ nobase_include_HEADERS = \ qpid/sys/OutputControl.h \ qpid/sys/OutputTask.h \ qpid/sys/Poller.h \ + qpid/sys/ProtocolAccess.h \ qpid/sys/ProtocolFactory.h \ qpid/sys/Runnable.h \ qpid/sys/ScopedIncrement.h \ diff --git a/cpp/src/qpid/amqp_0_10/Connection.cpp b/cpp/src/qpid/amqp_0_10/Connection.cpp index 03e553f180..9e860ab653 100644 --- a/cpp/src/qpid/amqp_0_10/Connection.cpp +++ b/cpp/src/qpid/amqp_0_10/Connection.cpp @@ -19,6 +19,7 @@ * */ #include "Connection.h" +#include "qpid/sys/ProtocolAccess.h" #include "qpid/log/Statement.h" #include "qpid/amqp_0_10/exceptions.h" @@ -27,9 +28,13 @@ namespace amqp_0_10 { using sys::Mutex; -Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient) - : frameQueueClosed(false), output(o), connection(this, broker, id, _isClient), - identifier(id), initialized(false), isClient(_isClient) {} +Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient, sys::ProtocolAccess* a) + : frameQueueClosed(false), output(o), connection(new broker::Connection(this, broker, id, _isClient)), + identifier(id), initialized(false), isClient(_isClient) +{ + if (a != 0) + a->callConnCb(connection); +} size_t Connection::decode(const char* buffer, size_t size) { framing::Buffer in(const_cast<char*>(buffer), size); @@ -45,13 +50,13 @@ size_t Connection::decode(const char* buffer, size_t size) { framing::AMQFrame frame; while(frame.decode(in)) { QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); - connection.received(frame); + connection->received(frame); } return in.getPosition(); } bool Connection::canEncode() { - if (!frameQueueClosed) connection.doOutput(); + if (!frameQueueClosed) connection->doOutput(); Mutex::ScopedLock l(frameQueueLock); return (!isClient && !initialized) || !frameQueue.empty(); } @@ -90,7 +95,7 @@ void Connection::close() { } void Connection::closed() { - connection.closed(); + connection->closed(); } void Connection::send(framing::AMQFrame& f) { diff --git a/cpp/src/qpid/amqp_0_10/Connection.h b/cpp/src/qpid/amqp_0_10/Connection.h index 4369d401bd..ea8d183e01 100644 --- a/cpp/src/qpid/amqp_0_10/Connection.h +++ b/cpp/src/qpid/amqp_0_10/Connection.h @@ -29,6 +29,7 @@ #include <queue> namespace qpid { +namespace sys { class ProtocolAccess; } namespace broker { class Broker; } namespace amqp_0_10 { @@ -40,13 +41,13 @@ class Connection : public sys::ConnectionCodec, bool frameQueueClosed; mutable sys::Mutex frameQueueLock; sys::OutputControl& output; - broker::Connection connection; // FIXME aconway 2008-03-18: + broker::Connection::shared_ptr connection; // FIXME aconway 2008-03-18: std::string identifier; bool initialized; bool isClient; public: - Connection(sys::OutputControl&, broker::Broker&, const std::string& id, bool isClient = false); + Connection(sys::OutputControl&, broker::Broker&, const std::string& id, bool isClient = false, sys::ProtocolAccess* a =0); size_t decode(const char* buffer, size_t size); size_t encode(const char* buffer, size_t size); bool isClosed() const; diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index 456eba7f9d..a8e7b3c368 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -31,10 +31,12 @@ using qpid::framing::Uuid; namespace qpid { namespace broker { -Bridge::Bridge(framing::ChannelId id, ConnectionState& c, CancellationListener l, const management::ArgsLinkBridge& _args) : - args(_args), channel(id, &(c.getOutput())), peer(channel), - mgmtObject(new management::Bridge(this, &c, id, args.i_src, args.i_dest, args.i_key, args.i_src_is_queue, args.i_src_is_local)), - connection(c), listener(l), name(Uuid(true).str()) +Bridge::Bridge(Link* link, framing::ChannelId _id, CancellationListener l, + const management::ArgsLinkBridge& _args) : + id(_id), args(_args), + mgmtObject(new management::Bridge(this, link, id, args.i_src, args.i_dest, + args.i_key, args.i_src_is_queue, args.i_src_is_local)), + listener(l), name(Uuid(true).str()) { management::ManagementAgent::getAgent()->addObject(mgmtObject); } @@ -44,18 +46,21 @@ Bridge::~Bridge() mgmtObject->resourceDestroy(); } -void Bridge::create() +void Bridge::create(ConnectionState& c) { - framing::AMQP_ServerProxy::Session session(channel); - session.attach(name, false); + channelHandler.reset(new framing::ChannelHandler(id, &(c.getOutput()))); + session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler)); + peer.reset(new framing::AMQP_ServerProxy(*channelHandler)); + + session->attach(name, false); if (args.i_src_is_local) { //TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received() } else { if (args.i_src_is_queue) { - peer.getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable()); - peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); - peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); + peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable()); + peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); + peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); } else { string queue = "bridge_queue_"; queue += Uuid(true).str(); @@ -66,22 +71,22 @@ void Bridge::create() if (args.i_excludes.size()) { queueSettings.setString("qpid.trace.exclude", args.i_excludes); } + bool durable = false;//should this be an arg, or would be use src_is_queue for durable queues? bool autoDelete = !durable;//auto delete transient queues? - peer.getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings); - peer.getExchange().bind(queue, args.i_src, args.i_key, FieldTable()); - peer.getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable()); - peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); - peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); + peer->getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings); + peer->getExchange().bind(queue, args.i_src, args.i_key, FieldTable()); + peer->getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable()); + peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); + peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); } } - } void Bridge::cancel() { - peer.getMessage().cancel(args.i_dest); - peer.getSession().detach(name); + peer->getMessage().cancel(args.i_dest); + peer->getSession().detach(name); } management::ManagementObject::shared_ptr Bridge::GetManagementObject (void) const @@ -94,8 +99,6 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId, man if (methodId == management::Bridge::METHOD_CLOSE) { //notify that we are closed listener(this); - //request time on the connections io thread - connection.getOutput().activateOutput(); return management::Manageable::STATUS_OK; } else { return management::Manageable::STATUS_UNKNOWN_METHOD; diff --git a/cpp/src/qpid/broker/Bridge.h b/cpp/src/qpid/broker/Bridge.h index 943050e244..15efcc6482 100644 --- a/cpp/src/qpid/broker/Bridge.h +++ b/cpp/src/qpid/broker/Bridge.h @@ -28,33 +28,36 @@ #include "qpid/management/Bridge.h" #include <boost/function.hpp> +#include <memory> namespace qpid { namespace broker { class ConnectionState; +class Link; class Bridge : public management::Manageable { public: typedef boost::function<void(Bridge*)> CancellationListener; - Bridge(framing::ChannelId id, ConnectionState& c, CancellationListener l, - const management::ArgsLinkBridge& args); + Bridge(Link* link, framing::ChannelId id, CancellationListener l, const management::ArgsLinkBridge& args); ~Bridge(); - void create(); + void create(ConnectionState& c); void cancel(); management::ManagementObject::shared_ptr GetManagementObject() const; management::Manageable::status_t ManagementMethod(uint32_t methodId, management::Args& args); private: - management::ArgsLinkBridge args; - framing::ChannelHandler channel; - framing::AMQP_ServerProxy peer; - management::Bridge::shared_ptr mgmtObject; - ConnectionState& connection; + std::auto_ptr<framing::ChannelHandler> channelHandler; + std::auto_ptr<framing::AMQP_ServerProxy::Session> session; + std::auto_ptr<framing::AMQP_ServerProxy> peer; + + framing::ChannelId id; + management::ArgsLinkBridge args; + management::Bridge::shared_ptr mgmtObject; CancellationListener listener; std::string name; }; diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index e9b1db0413..d80c13f12a 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -28,6 +28,7 @@ #include "NullMessageStore.h" #include "RecoveryManagerImpl.h" #include "TopicExchange.h" +#include "Link.h" #include "qpid/management/PackageQpid.h" #include "qpid/management/ManagementExchange.h" #include "qpid/management/ArgsBrokerEcho.h" @@ -60,7 +61,7 @@ using qpid::sys::Dispatcher; using qpid::sys::Thread; using qpid::framing::FrameHandler; using qpid::framing::ChannelId; -using qpid::management::ManagementAgent; +using qpid::management::ManagementBroker; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; @@ -129,15 +130,16 @@ Broker::Broker(const Broker::Options& conf) : config(conf), store(0), dataDir(conf.noDataDir ? std::string () : conf.dataDir), + links(this), factory(*this), sessionManager(conf.ack) { if(conf.enableMgmt){ QPID_LOG(info, "Management enabled"); - ManagementAgent::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (), - conf.mgmtPubInterval); - managementAgent = ManagementAgent::getAgent (); - managementAgent->setInterval (conf.mgmtPubInterval); + ManagementBroker::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (), + conf.mgmtPubInterval, this); + managementAgent = management::ManagementAgent::getAgent (); + ((ManagementBroker*) managementAgent.get())->setInterval (conf.mgmtPubInterval); qpid::management::PackageQpid packageInitializer (managementAgent); System* system = new System (dataDir.isEnabled () ? dataDir.getPath () : string ()); @@ -163,6 +165,7 @@ Broker::Broker(const Broker::Options& conf) : queues.setParent (vhost); exchanges.setParent (vhost); + links.setParent (vhost); } // Early-Initialize plugins @@ -178,11 +181,12 @@ Broker::Broker(const Broker::Options& conf) : queues.setStore (store); dtxManager.setStore (store); + links.setStore (store); exchanges.declare(empty, DirectExchange::typeName); // Default exchange. if (store != 0) { - RecoveryManagerImpl recoverer(queues, exchanges, dtxManager, + RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, conf.stagingThreshold); store->recover(recoverer); } @@ -197,8 +201,9 @@ Broker::Broker(const Broker::Options& conf) : exchanges.declare(qpid_management, ManagementExchange::typeName); Exchange::shared_ptr mExchange = exchanges.get (qpid_management); Exchange::shared_ptr dExchange = exchanges.get (amq_direct); - managementAgent->setExchange (mExchange, dExchange); - dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent (managementAgent); + ((ManagementBroker*) managementAgent.get())->setExchange (mExchange, dExchange); + dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent + ((ManagementBroker*) managementAgent.get()); } else QPID_LOG(info, "Management not enabled"); @@ -285,7 +290,7 @@ void Broker::shutdown() { Broker::~Broker() { shutdown(); - ManagementAgent::shutdown (); + ManagementBroker::shutdown (); delete store; if (config.auth) { #if HAVE_SASL @@ -319,7 +324,15 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, case management::Broker::METHOD_CONNECT : { management::ArgsBrokerConnect& hp= dynamic_cast<management::ArgsBrokerConnect&>(args); - connect(hp.i_host, hp.i_port); + + if (hp.i_useSsl) + return Manageable::STATUS_FEATURE_NOT_IMPLEMENTED; + + std::pair<Link::shared_ptr, bool> response = + links.declare (hp.i_host, hp.i_port, hp.i_useSsl, hp.i_durable); + if (hp.i_durable && response.second) + store->create(*response.first); + status = Manageable::STATUS_OK; break; } @@ -355,10 +368,11 @@ void Broker::accept() { // TODO: How to chose the protocolFactory to use for the connection void Broker::connect( - const std::string& host, uint16_t port, - sys::ConnectionCodec::Factory* f) + const std::string& host, uint16_t port, bool /*useSsl*/, + sys::ConnectionCodec::Factory* f, + sys::ProtocolAccess* access) { - getProtocolFactory()->connect(poller, host, port, f ? f : &factory); + getProtocolFactory()->connect(poller, host, port, f ? f : &factory, access); } void Broker::connect( @@ -366,7 +380,7 @@ void Broker::connect( { url.throwIfEmpty(); TcpAddress addr=boost::get<TcpAddress>(url[0]); - connect(addr.host, addr.port, f); + connect(addr.host, addr.port, false, f, (sys::ProtocolAccess*) 0); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index e48f3dc23f..a1eaf4f62f 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -29,11 +29,12 @@ #include "ExchangeRegistry.h" #include "MessageStore.h" #include "QueueRegistry.h" +#include "LinkRegistry.h" #include "SessionManager.h" #include "Vhost.h" #include "System.h" #include "qpid/management/Manageable.h" -#include "qpid/management/ManagementAgent.h" +#include "qpid/management/ManagementBroker.h" #include "qpid/management/Broker.h" #include "qpid/management/ArgsBrokerConnect.h" #include "qpid/Options.h" @@ -43,6 +44,7 @@ #include "qpid/framing/OutputHandler.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/sys/Runnable.h" +#include "qpid/sys/ProtocolAccess.h" #include <vector> @@ -111,6 +113,7 @@ class Broker : public sys::Runnable, public Plugin::Target, MessageStore& getStore() { return *store; } QueueRegistry& getQueues() { return queues; } ExchangeRegistry& getExchanges() { return exchanges; } + LinkRegistry& getLinks() { return links; } uint64_t getStagingThreshold() { return config.stagingThreshold; } DtxManager& getDtxManager() { return dtxManager; } DataDir& getDataDir() { return dataDir; } @@ -130,11 +133,16 @@ class Broker : public sys::Runnable, public Plugin::Target, void accept(); /** Create a connection to another broker. */ - void connect(const std::string& host, uint16_t port, - sys::ConnectionCodec::Factory* =0); + void connect(const std::string& host, uint16_t port, bool useSsl, + sys::ConnectionCodec::Factory* =0, + sys::ProtocolAccess* =0); /** Create a connection to another broker. */ void connect(const Url& url, sys::ConnectionCodec::Factory* =0); + // TODO: There isn't a single ProtocolFactory so the use of the following needs to be fixed + // For the present just return the first ProtocolFactory registered. + boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory() const; + private: boost::shared_ptr<sys::Poller> poller; Options config; @@ -144,6 +152,7 @@ class Broker : public sys::Runnable, public Plugin::Target, QueueRegistry queues; ExchangeRegistry exchanges; + LinkRegistry links; ConnectionFactory factory; DtxManager dtxManager; SessionManager sessionManager; @@ -152,10 +161,6 @@ class Broker : public sys::Runnable, public Plugin::Target, Vhost::shared_ptr vhostObject; System::shared_ptr systemObject; - // TODO: There isn't a single ProtocolFactory so the use of the following needs to be fixed - // For the present just return the first ProtocolFactory registered. - boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory() const; - void declareStandardExchange(const std::string& name, const std::string& type); }; diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 1994c4fdf5..d156b4a914 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -52,37 +52,14 @@ class Connection::MgmtClient : public Connection::MgmtWrapper management::Client::shared_ptr mgmtClient; public: - MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId); + MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, + const std::string& mgmtId, bool incoming); ~MgmtClient(); void received(framing::AMQFrame& frame); management::ManagementObject::shared_ptr getManagementObject() const; void closing(); }; -class Connection::MgmtLink : public Connection::MgmtWrapper -{ - typedef boost::ptr_vector<Bridge> Bridges; - - management::Link::shared_ptr mgmtLink; - Bridges created;//holds list of bridges pending creation - Bridges cancelled;//holds list of bridges pending cancellation - Bridges active;//holds active bridges - uint channelCounter; - sys::Mutex linkLock; - - void cancel(Bridge*); - -public: - MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId); - ~MgmtLink(); - void received(framing::AMQFrame& frame); - management::ManagementObject::shared_ptr getManagementObject() const; - void closing(); - void processPending(); - void process(Connection& connection, const management::Args& args); -}; - - Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink) : ConnectionState(out_, broker_), adapter(*this, isLink), @@ -103,14 +80,21 @@ void Connection::initMgmt(bool asLink) if (agent.get () != 0) { if (asLink) { - mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtLink(this, parent, agent, mgmtId)); + mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, false)); } else { - mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId)); + mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, true)); } } } } +void Connection::requestIOProcessing (boost::function0<void> callback) +{ + ioCallback = callback; + out->activateOutput(); +} + + Connection::~Connection () {} void Connection::received(framing::AMQFrame& frame){ @@ -160,8 +144,9 @@ void Connection::closed(){ // Physically closed, suspend open sessions. bool Connection::doOutput() { try{ - //process any pending mgmt commands: - if (mgmtWrapper.get()) mgmtWrapper->processPending(); + if (ioCallback) + ioCallback(); // Lend the IO thread for management processing + ioCallback = 0; if (mgmtClosing) close (403, "Closed by Management Request", 0, 0); //then do other output as needed: @@ -192,8 +177,7 @@ ManagementObject::shared_ptr Connection::GetManagementObject (void) const return mgmtWrapper.get() ? mgmtWrapper->getManagementObject() : ManagementObject::shared_ptr(); } -Manageable::status_t Connection::ManagementMethod (uint32_t methodId, - Args& args) +Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; @@ -207,93 +191,17 @@ Manageable::status_t Connection::ManagementMethod (uint32_t methodId, out->activateOutput(); status = Manageable::STATUS_OK; break; - case management::Link::METHOD_BRIDGE : - //queue this up and request chance to do output (i.e. get connections thread of control): - mgmtWrapper->process(*this, args); - out->activateOutput(); - status = Manageable::STATUS_OK; - break; } return status; } -Connection::MgmtLink::MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId) - : channelCounter(1) -{ - mgmtLink = management::Link::shared_ptr - (new management::Link(conn, parent, mgmtId)); - agent->addObject (mgmtLink); -} - -Connection::MgmtLink::~MgmtLink() -{ - if (mgmtLink.get () != 0) - mgmtLink->resourceDestroy (); -} - -void Connection::MgmtLink::received(framing::AMQFrame& frame) -{ - if (mgmtLink.get () != 0) - { - mgmtLink->inc_framesFromPeer (); - mgmtLink->inc_bytesFromPeer (frame.size ()); - } -} - -management::ManagementObject::shared_ptr Connection::MgmtLink::getManagementObject() const -{ - return dynamic_pointer_cast<ManagementObject>(mgmtLink); -} - -void Connection::MgmtLink::closing() -{ - if (mgmtLink) mgmtLink->set_closing (1); -} - -void Connection::MgmtLink::processPending() -{ - Mutex::ScopedLock l(linkLock); - //process any pending creates - if (!created.empty()) { - for (Bridges::iterator i = created.begin(); i != created.end(); ++i) { - i->create(); - } - active.transfer(active.end(), created.begin(), created.end(), created); - } - if (!cancelled.empty()) { - //process any pending cancellations - for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) { - i->cancel(); - } - cancelled.clear(); - } -} - -void Connection::MgmtLink::process(Connection& connection, const management::Args& args) -{ - Mutex::ScopedLock l(linkLock); - created.push_back(new Bridge(channelCounter++, connection, - boost::bind(&MgmtLink::cancel, this, _1), - dynamic_cast<const management::ArgsLinkBridge&>(args))); -} - -void Connection::MgmtLink::cancel(Bridge* b) -{ - Mutex::ScopedLock l(linkLock); - //need to take this out the active map and add it to the cancelled map - for (Bridges::iterator i = active.begin(); i != active.end(); i++) { - if (&(*i) == b) { - cancelled.transfer(cancelled.end(), i, active); - break; - } - } -} - -Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId) +Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent, + ManagementAgent::shared_ptr agent, + const std::string& mgmtId, bool incoming) { mgmtClient = management::Client::shared_ptr - (new management::Client (conn, parent, mgmtId)); + (new management::Client (conn, parent, mgmtId, incoming)); agent->addObject (mgmtClient); } diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index c8e7fb7079..dff1e0653b 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -54,6 +54,7 @@ class Connection : public sys::ConnectionInputHandler, public ConnectionState { public: + typedef boost::shared_ptr<Connection> shared_ptr; Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false); ~Connection (); @@ -78,6 +79,7 @@ class Connection : public sys::ConnectionInputHandler, ManagementMethod (uint32_t methodId, management::Args& args); void initMgmt(bool asLink = false); + void requestIOProcessing (boost::function0<void>); private: typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; @@ -100,7 +102,6 @@ class Connection : public sys::ConnectionInputHandler, virtual void process(Connection&, const management::Args&){} }; class MgmtClient; - class MgmtLink; ChannelMap channels; framing::AMQP_ClientProxy::Connection* client; @@ -108,6 +109,7 @@ class Connection : public sys::ConnectionInputHandler, std::auto_ptr<MgmtWrapper> mgmtWrapper; bool mgmtClosing; const std::string mgmtId; + boost::function0<void> ioCallback; }; }} diff --git a/cpp/src/qpid/broker/ConnectionFactory.cpp b/cpp/src/qpid/broker/ConnectionFactory.cpp index 5de5a0230a..cd015ce4f5 100644 --- a/cpp/src/qpid/broker/ConnectionFactory.cpp +++ b/cpp/src/qpid/broker/ConnectionFactory.cpp @@ -39,9 +39,9 @@ ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std: } sys::ConnectionCodec* -ConnectionFactory::create(sys::OutputControl& out, const std::string& id) { +ConnectionFactory::create(sys::OutputControl& out, const std::string& id, sys::ProtocolAccess* a) { // used to create connections from one broker to another - return new amqp_0_10::Connection(out, broker, id, true); + return new amqp_0_10::Connection(out, broker, id, true, a); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/ConnectionFactory.h b/cpp/src/qpid/broker/ConnectionFactory.h index 5797495054..bf55ab3b88 100644 --- a/cpp/src/qpid/broker/ConnectionFactory.h +++ b/cpp/src/qpid/broker/ConnectionFactory.h @@ -24,6 +24,7 @@ #include "qpid/sys/ConnectionCodec.h" namespace qpid { +namespace sys { class ProtocolAccess; } namespace broker { class Broker; @@ -37,7 +38,7 @@ class ConnectionFactory : public sys::ConnectionCodec::Factory { create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id); sys::ConnectionCodec* - create(sys::OutputControl&, const std::string& id); + create(sys::OutputControl&, const std::string& id, sys::ProtocolAccess* a =0); private: Broker& broker; diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp index 4ed2f5bfa2..162664fb88 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -35,8 +35,9 @@ using namespace qpid::framing; namespace { -const std::string PLAIN = "PLAIN"; -const std::string en_US = "en_US"; +const std::string ANONYMOUS = "ANONYMOUS"; +const std::string PLAIN = "PLAIN"; +const std::string en_US = "en_US"; } void ConnectionHandler::close(ReplyCode code, const string& text, ClassId, MethodId) @@ -135,10 +136,8 @@ void ConnectionHandler::Handler::start(const FieldTable& /*serverProperties*/, const framing::Array& /*mechanisms*/, const framing::Array& /*locales*/) { - string uid = "qpidd"; - string pwd = "qpidd"; - string response = ((char)0) + uid + ((char)0) + pwd; - server.startOk(FieldTable(), PLAIN, response, en_US); + string response; + server.startOk(FieldTable(), ANONYMOUS, response, en_US); connection.initMgmt(true); } diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 47d616cf16..0d9ffb7122 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -40,7 +40,7 @@ Exchange::Exchange (const string& _name, Manageable* parent) : if (agent.get () != 0) { mgmtExchange = management::Exchange::shared_ptr - (new management::Exchange (this, parent, _name)); + (new management::Exchange (this, parent, _name, durable)); agent->addObject (mgmtExchange); } } @@ -56,8 +56,9 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel if (agent.get () != 0) { mgmtExchange = management::Exchange::shared_ptr - (new management::Exchange (this, parent, _name)); - agent->addObject (mgmtExchange); + (new management::Exchange (this, parent, _name, durable)); + if (!durable) + agent->addObject (mgmtExchange); } } } @@ -68,6 +69,16 @@ Exchange::~Exchange () mgmtExchange->resourceDestroy (); } +void Exchange::setPersistenceId(uint64_t id) const +{ + if (mgmtExchange != 0 && persistenceId == 0) + { + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + agent->addObject (mgmtExchange, id, 2); + } + persistenceId = id; +} + Exchange::shared_ptr Exchange::decode(ExchangeRegistry& exchanges, Buffer& buffer) { string name; diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index 7902eb4219..9b18129857 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -90,7 +90,7 @@ namespace qpid { virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args) = 0; //PersistableExchange: - void setPersistenceId(uint64_t id) const { persistenceId = id; } + void setPersistenceId(uint64_t id) const; uint64_t getPersistenceId() const { return persistenceId; } uint32_t encodedSize() const; void encode(framing::Buffer& buffer) const; diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp new file mode 100644 index 0000000000..83c9a2a62e --- /dev/null +++ b/cpp/src/qpid/broker/Link.cpp @@ -0,0 +1,281 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Link.h" +#include "LinkRegistry.h" +#include "Broker.h" +#include "Connection.h" +#include "qpid/management/ManagementAgent.h" +#include "qpid/management/Link.h" +#include "boost/bind.hpp" +#include "qpid/log/Statement.h" + +using namespace qpid::broker; +using qpid::framing::Buffer; +using qpid::framing::FieldTable; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; +using qpid::sys::Mutex; + +Link::Link(LinkRegistry* _links, + string& _host, + uint16_t _port, + bool _useSsl, + bool _durable, + Broker* _broker, + management::Manageable* parent) + : links(_links), host(_host), port(_port), useSsl(_useSsl), durable(_durable), + persistenceId(0), broker(_broker), state(0), + access(boost::bind(&Link::established, this), + boost::bind(&Link::closed, this, _1, _2), + boost::bind(&Link::setConnection, this, _1)), + visitCount(0), + currentInterval(1), + closing(false), + channelCounter(1) +{ + if (parent != 0) + { + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent(); + if (agent.get() != 0) + { + mgmtObject = management::Link::shared_ptr + (new management::Link(this, parent, _host, _port, _useSsl, _durable)); + if (!durable) + agent->addObject(mgmtObject); + } + } + setState(STATE_WAITING); +} + +Link::~Link () +{ + if (state == STATE_OPERATIONAL) + access.close(); + if (mgmtObject.get () != 0) + mgmtObject->resourceDestroy (); +} + +void Link::setState (int newState) +{ + if (newState == state) + return; + + state = newState; + if (mgmtObject.get() == 0) + return; + + switch (state) + { + case STATE_WAITING : mgmtObject->set_state("Waiting"); break; + case STATE_CONNECTING : mgmtObject->set_state("Connecting"); break; + case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break; + } +} + +void Link::startConnection () +{ + try { + broker->connect (host, port, useSsl, 0, &access); + setState(STATE_CONNECTING); + } catch(std::exception& e) { + setState(STATE_WAITING); + mgmtObject->set_lastError (e.what()); + } +} + +void Link::established () +{ + Mutex::ScopedLock mutex(lock); + + QPID_LOG (info, "Inter-broker link established to " << host << ":" << port); + setState(STATE_OPERATIONAL); + currentInterval = 1; + visitCount = 0; + if (closing) + destroy(); +} + +void Link::closed (int, std::string text) +{ + Mutex::ScopedLock mutex(lock); + + if (state == STATE_OPERATIONAL) + QPID_LOG (warning, "Inter-broker link disconnected from " << host << ":" << port); + + connection.reset(); + created.transfer(created.end(), active.begin(), active.end(), active); + setState(STATE_WAITING); + mgmtObject->set_lastError (text); + if (closing) + destroy(); +} + +void Link::destroy () +{ + QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management"); + connection.reset(); + links->destroy (host, port); +} + +void Link::cancel(Bridge* bridge) +{ + Mutex::ScopedLock mutex(lock); + + //need to take this out of the active map and add it to the cancelled map + for (Bridges::iterator i = active.begin(); i != active.end(); i++) { + if (&(*i) == bridge) { + cancelled.transfer(cancelled.end(), i, active); + break; + } + } + + if (connection.get() != 0) + connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); +} + +void Link::ioThreadProcessing() +{ + Mutex::ScopedLock mutex(lock); + + //process any pending creates + if (!created.empty()) { + for (Bridges::iterator i = created.begin(); i != created.end(); ++i) { + i->create(*connection); + } + active.transfer(active.end(), created.begin(), created.end(), created); + } + if (!cancelled.empty()) { + //process any pending cancellations + for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) { + i->cancel(); + } + cancelled.clear(); + } +} + +void Link::setConnection(Connection::shared_ptr c) +{ + connection = c; + connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); +} + +void Link::maintenanceVisit () +{ + Mutex::ScopedLock mutex(lock); + + if (state == STATE_WAITING) + { + visitCount++; + if (visitCount >= currentInterval) + { + visitCount = 0; + currentInterval *= 2; + if (currentInterval > MAX_INTERVAL) + currentInterval = MAX_INTERVAL; + startConnection(); + } + } +} + +void Link::setPersistenceId(uint64_t id) const +{ + if (mgmtObject != 0 && persistenceId == 0) + { + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + agent->addObject (mgmtObject, id); + } + persistenceId = id; +} + +const string& Link::getName() const +{ + return host; +} + +Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) +{ + string host; + uint16_t port; + + buffer.getShortString(host); + port = buffer.getShort(); + bool useSsl(buffer.getOctet()); + bool durable(buffer.getOctet()); + + return links.declare(host, port, useSsl, durable).first; +} + +void Link::encode(Buffer& buffer) const +{ + buffer.putShortString(string("link")); + buffer.putShortString(host); + buffer.putShort(port); + buffer.putOctet(useSsl ? 1 : 0); + buffer.putOctet(durable ? 1 : 0); +} + +uint32_t Link::encodedSize() const +{ + return host.size() + 1 // short-string (host) + + 5 // short-string ("link") + + 2 // port + + 1 // useSsl + + 1; // durable +} + +ManagementObject::shared_ptr Link::GetManagementObject (void) const +{ + return boost::dynamic_pointer_cast<ManagementObject> (mgmtObject); +} + +Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args) +{ + Mutex::ScopedLock mutex(lock); + + switch (op) + { + case management::Link::METHOD_CLOSE : + closing = true; + if (state != STATE_CONNECTING) + destroy(); + return Manageable::STATUS_OK; + + case management::Link::METHOD_BRIDGE : + management::ArgsLinkBridge iargs = + dynamic_cast<const management::ArgsLinkBridge&>(args); + + // Durable bridges are only valid on durable links + if (iargs.i_durable && !durable) + return Manageable::STATUS_INVALID_PARAMETER; + + created.push_back(new Bridge(this, channelCounter++, + boost::bind(&Link::cancel, this, _1), iargs)); + + if (state == STATE_OPERATIONAL && connection.get() != 0) + connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); + return Manageable::STATUS_OK; + } + + return Manageable::STATUS_UNKNOWN_METHOD; +} diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h new file mode 100644 index 0000000000..838c3bf696 --- /dev/null +++ b/cpp/src/qpid/broker/Link.h @@ -0,0 +1,115 @@ +#ifndef _broker_Link_h +#define _broker_Link_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <boost/shared_ptr.hpp> +#include "MessageStore.h" +#include "PersistableConfig.h" +#include "Bridge.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/ProtocolAccess.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/management/Manageable.h" +#include "qpid/management/Link.h" +#include <boost/ptr_container/ptr_vector.hpp> + +namespace qpid { + namespace broker { + + using std::string; + class LinkRegistry; + class Broker; + class Connection; + + class Link : public PersistableConfig, public management::Manageable { + private: + sys::Mutex lock; + LinkRegistry* links; + const string host; + const uint16_t port; + const bool useSsl; + const bool durable; + mutable uint64_t persistenceId; + management::Link::shared_ptr mgmtObject; + Broker* broker; + int state; + sys::ProtocolAccess access; + uint32_t visitCount; + uint32_t currentInterval; + bool closing; + + typedef boost::ptr_vector<Bridge> Bridges; + Bridges created; // Bridges pending creation + Bridges active; // Bridges active + Bridges cancelled; // Bridges pending deletion + uint channelCounter; + boost::shared_ptr<Connection> connection; + + static const int STATE_WAITING = 1; + static const int STATE_CONNECTING = 2; + static const int STATE_OPERATIONAL = 3; + + static const uint32_t MAX_INTERVAL = 16; + + void setState (int newState); + void startConnection(); // Start the IO Connection + void established(); // Called when connection is created + void closed(int, std::string); // Called when connection goes away + void destroy(); // Called when mgmt deletes this link + void cancel(Bridge*); // Called by self-cancelling bridge + void ioThreadProcessing(); // Called on connection's IO thread by request + void setConnection(boost::shared_ptr<Connection>); // Set pointer to the AMQP Connection + + public: + typedef boost::shared_ptr<Link> shared_ptr; + + Link(LinkRegistry* links, + string& host, + uint16_t port, + bool useSsl, + bool durable, + Broker* broker, + management::Manageable* parent = 0); + virtual ~Link(); + + bool isDurable() { return durable; } + void maintenanceVisit (); + + // PersistableConfig: + void setPersistenceId(uint64_t id) const; + uint64_t getPersistenceId() const { return persistenceId; } + uint32_t encodedSize() const; + void encode(framing::Buffer& buffer) const; + const string& getName() const; + + static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer); + + // Manageable entry points + management::ManagementObject::shared_ptr GetManagementObject (void) const; + management::Manageable::status_t ManagementMethod (uint32_t, management::Args&); + }; + } +} + + +#endif /*!_broker_Link.cpp_h*/ diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp new file mode 100644 index 0000000000..6e20a3f7ce --- /dev/null +++ b/cpp/src/qpid/broker/LinkRegistry.cpp @@ -0,0 +1,102 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "LinkRegistry.h" +#include <iostream> + +using namespace qpid::broker; +using namespace qpid::sys; +using std::pair; +using std::stringstream; +using boost::intrusive_ptr; + +#define LINK_MAINT_INTERVAL 5 + +LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0) +{ + timer.add (intrusive_ptr<TimerTask> (new Periodic(*this))); +} + +LinkRegistry::Periodic::Periodic (LinkRegistry& _links) : + TimerTask (Duration (LINK_MAINT_INTERVAL * TIME_SEC)), links(_links) {} + +void LinkRegistry::Periodic::fire () +{ + links.periodicMaintenance (); + links.timer.add (intrusive_ptr<TimerTask> (new Periodic(links))); +} + +void LinkRegistry::periodicMaintenance () +{ + Mutex::ScopedLock locker(lock); + linksToDestroy.clear(); + for (LinkMap::iterator i = links.begin(); i != links.end(); i++) + i->second->maintenanceVisit(); +} + +pair<Link::shared_ptr, bool> LinkRegistry::declare(std::string& host, + uint16_t port, + bool useSsl, + bool durable) +{ + Mutex::ScopedLock locker(lock); + stringstream keystream; + keystream << host << ":" << port; + string key = string(keystream.str()); + + LinkMap::iterator i = links.find(key); + if (i == links.end()) + { + Link::shared_ptr link; + + link = Link::shared_ptr (new Link (this, host, port, useSsl, durable, broker, parent)); + links[key] = link; + return std::pair<Link::shared_ptr, bool>(link, true); + } + return std::pair<Link::shared_ptr, bool>(i->second, false); +} + +void LinkRegistry::destroy(const string& host, const uint16_t port) +{ + Mutex::ScopedLock locker(lock); + stringstream keystream; + keystream << host << ":" << port; + string key = string(keystream.str()); + + LinkMap::iterator i = links.find(key); + if (i != links.end()) + { + if (i->second->isDurable() && store) + store->destroy(*(i->second)); + linksToDestroy[key] = i->second; + links.erase(i); + } +} + +void LinkRegistry::setStore (MessageStore* _store) +{ + assert (store == 0 && _store != 0); + store = _store; +} + +MessageStore* LinkRegistry::getStore() const { + return store; +} + diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h new file mode 100644 index 0000000000..86d8c3d2f9 --- /dev/null +++ b/cpp/src/qpid/broker/LinkRegistry.h @@ -0,0 +1,87 @@ +#ifndef _broker_LinkRegistry_h +#define _broker_LinkRegistry_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <map> +#include "Link.h" +#include "MessageStore.h" +#include "Timer.h" +#include "qpid/sys/Mutex.h" +#include "qpid/management/Manageable.h" + +namespace qpid { +namespace broker { + + class Broker; + class LinkRegistry { + + // Declare a timer task to manage the establishment of link connections and the + // re-establishment of lost link connections. + struct Periodic : public TimerTask + { + LinkRegistry& links; + + Periodic(LinkRegistry& links); + virtual ~Periodic() {}; + void fire(); + }; + + typedef std::map<std::string, Link::shared_ptr> LinkMap; + LinkMap links; + LinkMap linksToDestroy; + qpid::sys::Mutex lock; + Broker* broker; + Timer timer; + management::Manageable* parent; + MessageStore* store; + + void periodicMaintenance (); + + public: + LinkRegistry (Broker* _broker); + std::pair<Link::shared_ptr, bool> declare(std::string& host, + uint16_t port, + bool useSsl, + bool durable); + void destroy(const std::string& host, const uint16_t port); + + /** + * Register the manageable parent for declared queues + */ + void setParent (management::Manageable* _parent) { parent = _parent; } + + /** + * Set the store to use. May only be called once. + */ + void setStore (MessageStore*); + + /** + * Return the message store used. + */ + MessageStore* getStore() const; + }; +} +} + + +#endif /*!_broker_LinkRegistry_h*/ diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h index 76469ccc50..17fd6aefb8 100644 --- a/cpp/src/qpid/broker/MessageStore.h +++ b/cpp/src/qpid/broker/MessageStore.h @@ -24,6 +24,7 @@ #include "PersistableExchange.h" #include "PersistableMessage.h" #include "PersistableQueue.h" +#include "PersistableConfig.h" #include "RecoveryManager.h" #include "TransactionalStore.h" #include "qpid/framing/FieldTable.h" @@ -87,6 +88,16 @@ public: const std::string& key, const framing::FieldTable& args) = 0; /** + * Record generic durable configuration + */ + virtual void create(const PersistableConfig& config) = 0; + + /** + * Destroy generic durable configuration + */ + virtual void destroy(const PersistableConfig& config) = 0; + + /** * Stores a messages before it has been enqueued * (enqueueing automatically stores the message so this is * only required if storage is required prior to that diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp index e02c87f069..2544d5d533 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.cpp +++ b/cpp/src/qpid/broker/MessageStoreModule.cpp @@ -70,6 +70,16 @@ void MessageStoreModule::unbind(const PersistableExchange& e, const PersistableQ TRANSFER_EXCEPTION(store->unbind(e, q, k, a)); } +void MessageStoreModule::create(const PersistableConfig& config) +{ + TRANSFER_EXCEPTION(store->create(config)); +} + +void MessageStoreModule::destroy(const PersistableConfig& config) +{ + TRANSFER_EXCEPTION(store->destroy(config)); +} + void MessageStoreModule::recover(RecoveryManager& registry) { TRANSFER_EXCEPTION(store->recover(registry)); diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h index c7ad76d8bb..f4d05e3e0d 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.h +++ b/cpp/src/qpid/broker/MessageStoreModule.h @@ -57,6 +57,8 @@ public: const std::string& key, const framing::FieldTable& args); void unbind(const PersistableExchange& exchange, const PersistableQueue& queue, const std::string& key, const framing::FieldTable& args); + void create(const PersistableConfig& config); + void destroy(const PersistableConfig& config); void recover(RecoveryManager& queues); void stage(boost::intrusive_ptr<PersistableMessage>& msg); void destroy(PersistableMessage& msg); diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp index 8936b0440f..401c76f5a2 100644 --- a/cpp/src/qpid/broker/NullMessageStore.cpp +++ b/cpp/src/qpid/broker/NullMessageStore.cpp @@ -49,7 +49,7 @@ public: using namespace qpid::broker; -NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){} +NullMessageStore::NullMessageStore(bool _warn) : warn(_warn), nextPersistenceId(1) {} bool NullMessageStore::init(const Options* /*options*/) {return true;} @@ -57,6 +57,7 @@ void NullMessageStore::create(PersistableQueue& queue, const framing::FieldTable { QPID_LOG(info, "Queue '" << queue.getName() << "' will not be durable. Persistence not enabled."); + queue.setPersistenceId(nextPersistenceId++); } void NullMessageStore::destroy(PersistableQueue&) @@ -67,6 +68,7 @@ void NullMessageStore::create(const PersistableExchange& exchange, const framing { QPID_LOG(info, "Exchange'" << exchange.getName() << "' will not be durable. Persistence not enabled."); + exchange.setPersistenceId(nextPersistenceId++); } void NullMessageStore::destroy(const PersistableExchange& ) @@ -76,6 +78,17 @@ void NullMessageStore::bind(const PersistableExchange&, const PersistableQueue&, void NullMessageStore::unbind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&){} +void NullMessageStore::create(const PersistableConfig& config) +{ + QPID_LOG(info, "Persistence not enabled, configuration not stored."); + config.setPersistenceId(nextPersistenceId++); +} + +void NullMessageStore::destroy(const PersistableConfig&) +{ + QPID_LOG(info, "Persistence not enabled, configuration not stored."); +} + void NullMessageStore::recover(RecoveryManager&) { QPID_LOG(info, "Persistence not enabled, no recovery attempted."); diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h index 96d1c483a2..f06e749ebb 100644 --- a/cpp/src/qpid/broker/NullMessageStore.h +++ b/cpp/src/qpid/broker/NullMessageStore.h @@ -37,6 +37,7 @@ class NullMessageStore : public MessageStore { std::set<std::string> prepared; const bool warn; + uint64_t nextPersistenceId; public: NullMessageStore(bool warn = false); @@ -57,6 +58,8 @@ public: const std::string& key, const framing::FieldTable& args); virtual void unbind(const PersistableExchange& exchange, const PersistableQueue& queue, const std::string& key, const framing::FieldTable& args); + virtual void create(const PersistableConfig& config); + virtual void destroy(const PersistableConfig& config); virtual void recover(RecoveryManager& queues); virtual void stage(boost::intrusive_ptr<PersistableMessage>& msg); virtual void destroy(PersistableMessage& msg); diff --git a/cpp/src/qpid/broker/PersistableConfig.h b/cpp/src/qpid/broker/PersistableConfig.h new file mode 100644 index 0000000000..914e91ea80 --- /dev/null +++ b/cpp/src/qpid/broker/PersistableConfig.h @@ -0,0 +1,45 @@ +#ifndef _broker_PersistableConfig_h +#define _broker_PersistableConfig_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> +#include "Persistable.h" + +namespace qpid { +namespace broker { + +/** + * The interface used by general-purpose persistable configuration for + * the message store. + */ +class PersistableConfig : public Persistable +{ +public: + virtual const std::string& getName() const = 0; + virtual ~PersistableConfig() {}; +}; + +}} + + +#endif diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index f7bad8ebc6..355ebdd81e 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -586,7 +586,7 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const if (mgmtObject != 0 && persistenceId == 0) { ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); - agent->addObject (mgmtObject, _persistenceId); + agent->addObject (mgmtObject, _persistenceId, 3); } persistenceId = _persistenceId; } diff --git a/cpp/src/qpid/broker/RecoverableConfig.h b/cpp/src/qpid/broker/RecoverableConfig.h new file mode 100644 index 0000000000..838a8582dc --- /dev/null +++ b/cpp/src/qpid/broker/RecoverableConfig.h @@ -0,0 +1,45 @@ +#ifndef _broker_RecoverableConfig_h +#define _broker_RecoverableConfig_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace broker { + +/** + * The interface through which configurations are recovered. + */ +class RecoverableConfig +{ +public: + typedef boost::shared_ptr<RecoverableConfig> shared_ptr; + + virtual void setPersistenceId(uint64_t id) = 0; + virtual ~RecoverableConfig() {}; +}; + +}} + + +#endif diff --git a/cpp/src/qpid/broker/RecoveryManager.h b/cpp/src/qpid/broker/RecoveryManager.h index bf1813a093..7dcbe3a2b0 100644 --- a/cpp/src/qpid/broker/RecoveryManager.h +++ b/cpp/src/qpid/broker/RecoveryManager.h @@ -25,6 +25,7 @@ #include "RecoverableQueue.h" #include "RecoverableMessage.h" #include "RecoverableTransaction.h" +#include "RecoverableConfig.h" #include "TransactionalStore.h" #include "qpid/framing/Buffer.h" @@ -39,6 +40,8 @@ class RecoveryManager{ virtual RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer) = 0; virtual RecoverableTransaction::shared_ptr recoverTransaction(const std::string& xid, std::auto_ptr<TPCTransactionContext> txn) = 0; + virtual RecoverableConfig::shared_ptr recoverConfig(framing::Buffer& buffer) = 0; + virtual void recoveryComplete() = 0; }; diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index feb629e118..c6ec573822 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -22,6 +22,7 @@ #include "Message.h" #include "Queue.h" +#include "Link.h" #include "RecoveredEnqueue.h" #include "RecoveredDequeue.h" #include "qpid/framing/reply_exceptions.h" @@ -34,9 +35,9 @@ using boost::intrusive_ptr; static const uint8_t BASIC = 1; static const uint8_t MESSAGE = 2; -RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, +RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links, DtxManager& _dtxMgr, uint64_t _stagingThreshold) - : queues(_queues), exchanges(_exchanges), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {} + : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {} RecoveryManagerImpl::~RecoveryManagerImpl() {} @@ -82,6 +83,15 @@ public: void bind(std::string& queue, std::string& routingKey, qpid::framing::FieldTable& args); }; +class RecoverableConfigImpl : public RecoverableConfig +{ + // TODO: Add links for other config types, consider using super class (PersistableConfig?) + Link::shared_ptr link; +public: + RecoverableConfigImpl(Link::shared_ptr _link) : link(_link) {} + void setPersistenceId(uint64_t id); +}; + class RecoverableTransactionImpl : public RecoverableTransaction { DtxBuffer::shared_ptr buffer; @@ -125,6 +135,19 @@ RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const return RecoverableTransaction::shared_ptr(new RecoverableTransactionImpl(buffer)); } +RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer& buffer) +{ + string kind; + + buffer.getShortString (kind); + if (kind == "link") + { + return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Link::decode (links, buffer))); + } + + return RecoverableConfig::shared_ptr(); // TODO: raise an exception instead +} + void RecoveryManagerImpl::recoveryComplete() { //TODO (finalise binding setup etc) @@ -185,6 +208,13 @@ void RecoverableExchangeImpl::setPersistenceId(uint64_t id) exchange->setPersistenceId(id); } +void RecoverableConfigImpl::setPersistenceId(uint64_t id) +{ + if (link.get()) + link->setPersistenceId(id); + // TODO: add calls to other types. Consider using a parent class. +} + void RecoverableExchangeImpl::bind(string& queueName, string& key, framing::FieldTable& args) { Queue::shared_ptr queue = queues.find(queueName); diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.h b/cpp/src/qpid/broker/RecoveryManagerImpl.h index 58ec63926c..cd34d464f5 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.h +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.h @@ -25,6 +25,7 @@ #include "DtxManager.h" #include "ExchangeRegistry.h" #include "QueueRegistry.h" +#include "LinkRegistry.h" #include "RecoveryManager.h" namespace qpid { @@ -33,10 +34,12 @@ namespace broker { class RecoveryManagerImpl : public RecoveryManager{ QueueRegistry& queues; ExchangeRegistry& exchanges; + LinkRegistry& links; DtxManager& dtxMgr; const uint64_t stagingThreshold; public: - RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, DtxManager& dtxMgr, uint64_t stagingThreshold); + RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, LinkRegistry& links, + DtxManager& dtxMgr, uint64_t stagingThreshold); ~RecoveryManagerImpl(); RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer); @@ -44,6 +47,7 @@ namespace broker { RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer); RecoverableTransaction::shared_ptr recoverTransaction(const std::string& xid, std::auto_ptr<TPCTransactionContext> txn); + RecoverableConfig::shared_ptr recoverConfig(framing::Buffer& buffer); void recoveryComplete(); }; diff --git a/cpp/src/qpid/broker/System.h b/cpp/src/qpid/broker/System.h index 0d63bd1b3d..65086abec0 100644 --- a/cpp/src/qpid/broker/System.h +++ b/cpp/src/qpid/broker/System.h @@ -42,9 +42,6 @@ class System : public management::Manageable management::ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; } - - management::Manageable::status_t ManagementMethod (uint32_t, management::Args&) - { return management::Manageable::STATUS_OK; } }; }} diff --git a/cpp/src/qpid/management/Manageable.cpp b/cpp/src/qpid/management/Manageable.cpp index 479cb4e0ce..0f3fbab55c 100644 --- a/cpp/src/qpid/management/Manageable.cpp +++ b/cpp/src/qpid/management/Manageable.cpp @@ -25,13 +25,19 @@ std::string Manageable::StatusText (status_t status) { switch (status) { - case STATUS_OK : return "OK"; - case STATUS_UNKNOWN_OBJECT : return "UnknownObject"; - case STATUS_UNKNOWN_METHOD : return "UnknownMethod"; - case STATUS_NOT_IMPLEMENTED : return "NotImplemented"; - case STATUS_INVALID_PARAMETER : return "InvalidParameter"; + case STATUS_OK : return "OK"; + case STATUS_UNKNOWN_OBJECT : return "UnknownObject"; + case STATUS_UNKNOWN_METHOD : return "UnknownMethod"; + case STATUS_NOT_IMPLEMENTED : return "NotImplemented"; + case STATUS_INVALID_PARAMETER : return "InvalidParameter"; + case STATUS_FEATURE_NOT_IMPLEMENTED : return "FeatureNotImplemented"; } return "??"; } +Manageable::status_t Manageable::ManagementMethod (uint32_t, Args&) +{ + return STATUS_UNKNOWN_METHOD; +} + diff --git a/cpp/src/qpid/management/Manageable.h b/cpp/src/qpid/management/Manageable.h index 836ba03b23..25c24588fc 100644 --- a/cpp/src/qpid/management/Manageable.h +++ b/cpp/src/qpid/management/Manageable.h @@ -39,11 +39,12 @@ class Manageable typedef uint32_t status_t; static std::string StatusText (status_t status); - static const status_t STATUS_OK = 0; - static const status_t STATUS_UNKNOWN_OBJECT = 1; - static const status_t STATUS_UNKNOWN_METHOD = 2; - static const status_t STATUS_NOT_IMPLEMENTED = 3; - static const status_t STATUS_INVALID_PARAMETER = 4; + static const status_t STATUS_OK = 0; + static const status_t STATUS_UNKNOWN_OBJECT = 1; + static const status_t STATUS_UNKNOWN_METHOD = 2; + static const status_t STATUS_NOT_IMPLEMENTED = 3; + static const status_t STATUS_INVALID_PARAMETER = 4; + static const status_t STATUS_FEATURE_NOT_IMPLEMENTED = 5; // Every "Manageable" object must hold a reference to exactly one // management object. This object is always of a class derived from @@ -58,7 +59,7 @@ class Manageable // on this object. The input and output arguments are specific to the // method being called and must be down-cast to the appropriate sub class // before use. - virtual status_t ManagementMethod (uint32_t methodId, Args& args) = 0; + virtual status_t ManagementMethod (uint32_t methodId, Args& args); }; inline Manageable::~Manageable (void) {} diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 9b4290232d..e69de29bb2 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -1,695 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ManagementAgent.h" -#include "qpid/broker/DeliverableMessage.h" -#include "qpid/log/Statement.h" -#include <qpid/broker/Message.h> -#include <qpid/broker/MessageDelivery.h> -#include "qpid/framing/MessageTransferBody.h" -#include <list> -#include <iostream> -#include <fstream> - -using boost::intrusive_ptr; -using namespace qpid::framing; -using namespace qpid::management; -using namespace qpid::broker; -using namespace qpid::sys; -using namespace std; - -ManagementAgent::shared_ptr ManagementAgent::agent; -bool ManagementAgent::enabled = 0; - -ManagementAgent::ManagementAgent (string _dataDir, uint16_t _interval) : - dataDir (_dataDir), interval (_interval) -{ - timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval))); - localBank = 3; - nextObjectId = 1; - nextRemotePrefix = 101; - - // Get from file or generate and save to file. - if (dataDir.empty ()) - { - uuid.generate (); - bootSequence = 1; - QPID_LOG (info, "ManagementAgent has no data directory, generated new broker ID: " - << uuid); - } - else - { - string filename (dataDir + "/brokerId"); - string seqFilename (dataDir + "/bootseq"); - ifstream inFile (filename.c_str ()); - ifstream seqFile (seqFilename.c_str ()); - - if (inFile.good ()) - { - inFile >> uuid; - inFile.close (); - QPID_LOG (debug, "ManagementAgent restored broker ID: " << uuid); - } - else - { - uuid.generate (); - QPID_LOG (info, "ManagementAgent generated broker ID: " << uuid); - - ofstream outFile (filename.c_str ()); - if (outFile.good ()) - { - outFile << uuid << endl; - outFile.close (); - QPID_LOG (debug, "ManagementAgent saved broker ID"); - } - else - { - QPID_LOG (warning, "ManagementAgent unable to save broker ID"); - } - } - - if (seqFile.good ()) - { - seqFile >> bootSequence; - seqFile.close (); - } - else - bootSequence = 1; - - ofstream seqOut (seqFilename.c_str ()); - if (seqOut.good ()) - { - uint16_t nextSeq = (bootSequence + 1) & 0x7FFF; - if (nextSeq == 0) - nextSeq = 1; - seqOut << nextSeq << endl; - seqOut.close (); - } - - QPID_LOG (debug, "ManagementAgent boot sequence: " << bootSequence); - } -} - -ManagementAgent::~ManagementAgent () {} - -void ManagementAgent::enableManagement (string dataDir, uint16_t interval) -{ - enabled = 1; - if (agent.get () == 0) - agent = shared_ptr (new ManagementAgent (dataDir, interval)); -} - -ManagementAgent::shared_ptr ManagementAgent::getAgent (void) -{ - return agent; -} - -void ManagementAgent::shutdown (void) -{ - if (agent.get () != 0) - { - agent->mExchange.reset (); - agent->dExchange.reset (); - agent.reset (); - } -} - -void ManagementAgent::setExchange (broker::Exchange::shared_ptr _mexchange, - broker::Exchange::shared_ptr _dexchange) -{ - mExchange = _mexchange; - dExchange = _dexchange; -} - -void ManagementAgent::RegisterClass (string packageName, - string className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall) -{ - Mutex::ScopedLock lock (userLock); - PackageMap::iterator pIter = FindOrAddPackage (packageName); - AddClassLocal (pIter, className, md5Sum, schemaCall); -} - -void ManagementAgent::addObject (ManagementObject::shared_ptr object, - uint32_t persistId, - uint32_t persistBank) -{ - Mutex::ScopedLock lock (userLock); - uint64_t objectId; - - if (persistId == 0) - objectId = ((uint64_t) bootSequence) << 48 | - ((uint64_t) localBank) << 24 | nextObjectId++; - else - objectId = ((uint64_t) persistBank) << 24 | persistId; - - object->setObjectId (objectId); - managementObjects[objectId] = object; -} - -ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) - : TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), agent(_agent) {} - -ManagementAgent::Periodic::~Periodic () {} - -void ManagementAgent::Periodic::fire () -{ - agent.timer.add (intrusive_ptr<TimerTask> (new Periodic (agent, agent.interval))); - agent.PeriodicProcessing (); -} - -void ManagementAgent::clientAdded (void) -{ - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); - iter++) - { - ManagementObject::shared_ptr object = iter->second; - object->setAllChanged (); - } -} - -void ManagementAgent::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) -{ - buf.putOctet ('A'); - buf.putOctet ('M'); - buf.putOctet ('1'); - buf.putOctet (opcode); - buf.putLong (seq); -} - -bool ManagementAgent::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) -{ - uint8_t h1 = buf.getOctet (); - uint8_t h2 = buf.getOctet (); - uint8_t h3 = buf.getOctet (); - - *opcode = buf.getOctet (); - *seq = buf.getLong (); - - return h1 == 'A' && h2 == 'M' && h3 == '1'; -} - -void ManagementAgent::SendBuffer (Buffer& buf, - uint32_t length, - broker::Exchange::shared_ptr exchange, - string routingKey) -{ - if (exchange.get() == 0) - return; - - intrusive_ptr<Message> msg (new Message ()); - AMQFrame method (in_place<MessageTransferBody>( - ProtocolVersion(), exchange->getName (), 0, 0)); - AMQFrame header (in_place<AMQHeaderBody>()); - AMQFrame content(in_place<AMQContentBody>()); - - content.castBody<AMQContentBody>()->decode(buf, length); - - method.setEof (false); - header.setBof (false); - header.setEof (false); - content.setBof (false); - - msg->getFrames().append(method); - msg->getFrames().append(header); - - MessageProperties* props = - msg->getFrames().getHeaders()->get<MessageProperties>(true); - props->setContentLength(length); - msg->getFrames().append(content); - - DeliverableMessage deliverable (msg); - exchange->route (deliverable, routingKey, 0); -} - -void ManagementAgent::PeriodicProcessing (void) -{ -#define BUFSIZE 65536 - Mutex::ScopedLock lock (userLock); - char msgChars[BUFSIZE]; - uint32_t contentSize; - string routingKey; - std::list<uint64_t> deleteList; - - if (managementObjects.empty ()) - return; - - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); - iter++) - { - ManagementObject::shared_ptr object = iter->second; - - if (object->getConfigChanged () || object->isDeleted ()) - { - Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer, 'c'); - object->writeConfig (msgBuffer); - - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - routingKey = "mgmt." + uuid.str() + ".config." + object->getClassName (); - SendBuffer (msgBuffer, contentSize, mExchange, routingKey); - } - - if (object->getInstChanged ()) - { - Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer, 'i'); - object->writeInstrumentation (msgBuffer); - - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - routingKey = "mgmt." + uuid.str () + ".inst." + object->getClassName (); - SendBuffer (msgBuffer, contentSize, mExchange, routingKey); - } - - if (object->isDeleted ()) - deleteList.push_back (iter->first); - } - - // Delete flagged objects - for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin (); - iter != deleteList.rend (); - iter++) - managementObjects.erase (*iter); - - deleteList.clear (); -} - -void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence, - uint32_t code, string text) -{ - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - EncodeHeader (outBuffer, 'z', sequence); - outBuffer.putLong (code); - outBuffer.putShortString (text); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); -} - -void ManagementAgent::dispatchCommand (Deliverable& deliverable, - const string& routingKey, - const FieldTable* /*args*/) -{ - Mutex::ScopedLock lock (userLock); - Message& msg = ((DeliverableMessage&) deliverable).getMessage (); - - if (routingKey.compare (0, 13, "agent.method.") == 0) - dispatchMethod (msg, routingKey, 13); - - else if (routingKey.length () == 5 && - routingKey.compare (0, 5, "agent") == 0) - dispatchAgentCommand (msg); - - else - { - QPID_LOG (debug, "Illegal routing key for dispatch: " << routingKey); - return; - } -} - -void ManagementAgent::dispatchMethod (Message& msg, - const string& routingKey, - size_t first) -{ - size_t pos, start = first; - uint32_t contentSize; - - if (routingKey.length () == start) - { - QPID_LOG (debug, "Missing package-name in routing key: " << routingKey); - return; - } - - pos = routingKey.find ('.', start); - if (pos == string::npos || routingKey.length () == pos + 1) - { - QPID_LOG (debug, "Missing class-name in routing key: " << routingKey); - return; - } - - string packageName = routingKey.substr (start, pos - start); - - start = pos + 1; - pos = routingKey.find ('.', start); - if (pos == string::npos || routingKey.length () == pos + 1) - { - QPID_LOG (debug, "Missing method-name in routing key: " << routingKey); - return; - } - - string className = routingKey.substr (start, pos - start); - - start = pos + 1; - string methodName = routingKey.substr (start, routingKey.length () - start); - - contentSize = msg.encodedContentSize (); - if (contentSize < 8 || contentSize > MA_BUFFER_SIZE) - return; - - Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen, sequence; - uint8_t opcode; - - msg.encodeContent (inBuffer); - inBuffer.reset (); - - if (!CheckHeader (inBuffer, &opcode, &sequence)) - { - QPID_LOG (debug, " Invalid content header"); - return; - } - - if (opcode != 'M') - { - QPID_LOG (debug, " Unexpected opcode " << opcode); - return; - } - - uint64_t objId = inBuffer.getLongLong (); - string replyToKey; - - const framing::MessageProperties* p = - msg.getFrames().getHeaders()->get<framing::MessageProperties>(); - if (p && p->hasReplyTo()) - { - const framing::ReplyTo& rt = p->getReplyTo (); - replyToKey = rt.getRoutingKey (); - } - else - { - QPID_LOG (debug, " Reply-to missing"); - return; - } - - EncodeHeader (outBuffer, 'm', sequence); - - ManagementObjectMap::iterator iter = managementObjects.find (objId); - if (iter == managementObjects.end () || iter->second->isDeleted ()) - { - outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); - outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); - } - else - { - iter->second->doMethod (methodName, inBuffer, outBuffer); - } - - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); -} - -void ManagementAgent::handleBrokerRequest (Buffer&, string replyToKey, uint32_t sequence) -{ - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - EncodeHeader (outBuffer, 'b', sequence); - uuid.encode (outBuffer); - - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); -} - -void ManagementAgent::handlePackageQuery (Buffer&, string replyToKey, uint32_t sequence) -{ - for (PackageMap::iterator pIter = packages.begin (); - pIter != packages.end (); - pIter++) - { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - EncodeHeader (outBuffer, 'p', sequence); - EncodePackageIndication (outBuffer, pIter); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); - } - - sendCommandComplete (replyToKey, sequence); -} - -void ManagementAgent::handlePackageInd (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/) -{ - std::string packageName; - - inBuffer.getShortString (packageName); - FindOrAddPackage (packageName); -} - -void ManagementAgent::handleClassQuery (Buffer& inBuffer, string replyToKey, uint32_t sequence) -{ - std::string packageName; - - inBuffer.getShortString (packageName); - PackageMap::iterator pIter = packages.find (packageName); - if (pIter != packages.end ()) - { - ClassMap cMap = pIter->second; - for (ClassMap::iterator cIter = cMap.begin (); - cIter != cMap.end (); - cIter++) - { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - EncodeHeader (outBuffer, 'q', sequence); - EncodeClassIndication (outBuffer, pIter, cIter); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); - } - } - - sendCommandComplete (replyToKey, sequence); -} - -void ManagementAgent::handleSchemaQuery (Buffer& inBuffer, string replyToKey, uint32_t sequence) -{ - string packageName; - SchemaClassKey key; - - inBuffer.getShortString (packageName); - inBuffer.getShortString (key.name); - inBuffer.getBin128 (key.hash); - - PackageMap::iterator pIter = packages.find (packageName); - if (pIter != packages.end ()) - { - ClassMap cMap = pIter->second; - ClassMap::iterator cIter = cMap.find (key); - if (cIter != cMap.end ()) - { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - SchemaClass classInfo = cIter->second; - - if (classInfo.writeSchemaCall != 0) - { - EncodeHeader (outBuffer, 's', sequence); - classInfo.writeSchemaCall (outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); - } - else - { - // TODO: Forward request to remote agent. - } - - clientAdded (); - // TODO: Send client-added to each remote agent. - } - } -} - -uint32_t ManagementAgent::assignPrefix (uint32_t /*requestedPrefix*/) -{ - // TODO: Allow remote agents to keep their requested prefixes if able. - return nextRemotePrefix++; -} - -void ManagementAgent::handleAttachRequest (Buffer& inBuffer, string replyToKey, uint32_t sequence) -{ - string label; - uint32_t requestedPrefix; - uint32_t assignedPrefix; - - inBuffer.getShortString (label); - requestedPrefix = inBuffer.getLong (); - assignedPrefix = assignPrefix (requestedPrefix); - - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - EncodeHeader (outBuffer, 'a', sequence); - outBuffer.putLong (assignedPrefix); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); -} - -void ManagementAgent::handleGetRequest (Buffer& inBuffer, string replyToKey, uint32_t sequence) -{ - FieldTable ft; - FieldTable::ValuePtr value; - - ft.decode (inBuffer); - value = ft.get ("_class"); - if (value->empty () || !value->convertsTo<string> ()) - { - // TODO: Send completion with an error code - return; - } - - string className (value->get<string> ()); - - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); - iter++) - { - ManagementObject::shared_ptr object = iter->second; - if (object->getClassName () == className) - { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - EncodeHeader (outBuffer, 'g', sequence); - object->writeConfig (outBuffer); - object->writeInstrumentation (outBuffer, true); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); - } - } - - sendCommandComplete (replyToKey, sequence); -} - -void ManagementAgent::dispatchAgentCommand (Message& msg) -{ - Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); - uint8_t opcode; - uint32_t sequence; - string replyToKey; - - const framing::MessageProperties* p = - msg.getFrames().getHeaders()->get<framing::MessageProperties>(); - if (p && p->hasReplyTo()) - { - const framing::ReplyTo& rt = p->getReplyTo (); - replyToKey = rt.getRoutingKey (); - } - else - return; - - msg.encodeContent (inBuffer); - inBuffer.reset (); - - if (!CheckHeader (inBuffer, &opcode, &sequence)) - return; - - if (opcode == 'B') handleBrokerRequest (inBuffer, replyToKey, sequence); - else if (opcode == 'P') handlePackageQuery (inBuffer, replyToKey, sequence); - else if (opcode == 'p') handlePackageInd (inBuffer, replyToKey, sequence); - else if (opcode == 'Q') handleClassQuery (inBuffer, replyToKey, sequence); - else if (opcode == 'S') handleSchemaQuery (inBuffer, replyToKey, sequence); - else if (opcode == 'A') handleAttachRequest (inBuffer, replyToKey, sequence); - else if (opcode == 'G') handleGetRequest (inBuffer, replyToKey, sequence); -} - -ManagementAgent::PackageMap::iterator ManagementAgent::FindOrAddPackage (std::string name) -{ - PackageMap::iterator pIter = packages.find (name); - if (pIter != packages.end ()) - return pIter; - - // No such package found, create a new map entry. - pair<PackageMap::iterator, bool> result = - packages.insert (pair<string, ClassMap> (name, ClassMap ())); - QPID_LOG (debug, "ManagementAgent added package " << name); - - // Publish a package-indication message - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - EncodeHeader (outBuffer, 'p'); - EncodePackageIndication (outBuffer, result.first); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package"); - - return result.first; -} - -void ManagementAgent::AddClassLocal (PackageMap::iterator pIter, - string className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall) -{ - SchemaClassKey key; - ClassMap& cMap = pIter->second; - - key.name = className; - memcpy (&key.hash, md5Sum, 16); - - ClassMap::iterator cIter = cMap.find (key); - if (cIter != cMap.end ()) - return; - - // No such class found, create a new class with local information. - QPID_LOG (debug, "ManagementAgent added class " << pIter->first << "." << - key.name); - SchemaClass classInfo; - - classInfo.writeSchemaCall = schemaCall; - cMap[key] = classInfo; - - // TODO: Publish a class-indication message -} - -void ManagementAgent::EncodePackageIndication (Buffer& buf, - PackageMap::iterator pIter) -{ - buf.putShortString ((*pIter).first); -} - -void ManagementAgent::EncodeClassIndication (Buffer& buf, - PackageMap::iterator pIter, - ClassMap::iterator cIter) -{ - SchemaClassKey key = (*cIter).first; - - buf.putShortString ((*pIter).first); - buf.putShortString (key.name); - buf.putBin128 (key.hash); -} - diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index 4cd679a035..c38e273c49 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -22,168 +22,28 @@ * */ -#include "qpid/Options.h" -#include "qpid/broker/Exchange.h" -#include "qpid/broker/Timer.h" -#include "qpid/framing/Uuid.h" -#include "qpid/sys/Mutex.h" #include "ManagementObject.h" -#include <qpid/framing/AMQFrame.h> -#include <boost/shared_ptr.hpp> namespace qpid { namespace management { class ManagementAgent { - private: - - ManagementAgent (std::string dataDir, uint16_t interval); - public: - virtual ~ManagementAgent (); + virtual ~ManagementAgent () {} typedef boost::shared_ptr<ManagementAgent> shared_ptr; - static void enableManagement (std::string dataDir, uint16_t interval); static shared_ptr getAgent (void); - static void shutdown (void); - - void setInterval (uint16_t _interval) { interval = _interval; } - void setExchange (broker::Exchange::shared_ptr mgmtExchange, - broker::Exchange::shared_ptr directExchange); - void RegisterClass (std::string packageName, - std::string className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall); - void addObject (ManagementObject::shared_ptr object, - uint32_t persistId = 0, - uint32_t persistBank = 2); - void clientAdded (void); - void dispatchCommand (broker::Deliverable& msg, - const std::string& routingKey, - const qpid::framing::FieldTable* args); - - private: - - struct Periodic : public broker::TimerTask - { - ManagementAgent& agent; - - Periodic (ManagementAgent& agent, uint32_t seconds); - virtual ~Periodic (); - void fire (); - }; - - // Storage for tracking remote management agents, attached via the client - // management agent API. - // - struct RemoteAgent - { - std::string name; - uint64_t objIdBase; - }; - - // TODO: Eventually replace string with entire reply-to structure. reply-to - // currently assumes that the exchange is "amq.direct" even though it could - // in theory be specified differently. - typedef std::map<std::string, RemoteAgent> RemoteAgentMap; - typedef std::vector<std::string> ReplyToVector; - - // Storage for known schema classes: - // - // SchemaClassKey -- Key elements for map lookups - // SchemaClassKeyComp -- Comparison class for SchemaClassKey - // SchemaClass -- Non-key elements for classes - // - struct SchemaClassKey - { - std::string name; - uint8_t hash[16]; - }; - - struct SchemaClassKeyComp - { - bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const - { - if (lhs.name != rhs.name) - return lhs.name < rhs.name; - else - for (int i = 0; i < 16; i++) - if (lhs.hash[i] != rhs.hash[i]) - return lhs.hash[i] < rhs.hash[i]; - return false; - } - }; - - struct SchemaClass - { - ManagementObject::writeSchemaCall_t writeSchemaCall; - ReplyToVector remoteAgents; - - SchemaClass () : writeSchemaCall(0) {} - }; - - typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap; - typedef std::map<std::string, ClassMap> PackageMap; - - RemoteAgentMap remoteAgents; - PackageMap packages; - ManagementObjectMap managementObjects; - - static shared_ptr agent; - static bool enabled; - - qpid::framing::Uuid uuid; - qpid::sys::Mutex userLock; - broker::Timer timer; - broker::Exchange::shared_ptr mExchange; - broker::Exchange::shared_ptr dExchange; - std::string dataDir; - uint16_t interval; - uint16_t bootSequence; - uint32_t localBank; - uint32_t nextObjectId; - uint32_t nextRemotePrefix; - -# define MA_BUFFER_SIZE 65536 - char inputBuffer[MA_BUFFER_SIZE]; - char outputBuffer[MA_BUFFER_SIZE]; - - void PeriodicProcessing (void); - void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); - bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); - void SendBuffer (qpid::framing::Buffer& buf, - uint32_t length, - broker::Exchange::shared_ptr exchange, - std::string routingKey); - - void dispatchMethod (broker::Message& msg, - const std::string& routingKey, - size_t first); - void dispatchAgentCommand (broker::Message& msg); - PackageMap::iterator FindOrAddPackage (std::string name); - void AddClassLocal (PackageMap::iterator pIter, - std::string className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall); - void EncodePackageIndication (qpid::framing::Buffer& buf, - PackageMap::iterator pIter); - void EncodeClassIndication (qpid::framing::Buffer& buf, - PackageMap::iterator pIter, - ClassMap::iterator cIter); - uint32_t assignPrefix (uint32_t requestedPrefix); - void sendCommandComplete (std::string replyToKey, uint32_t sequence, - uint32_t code = 0, std::string text = std::string("OK")); - void handleBrokerRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handlePackageQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handlePackageInd (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleClassQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleSchemaQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleAttachRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleGetRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + virtual void RegisterClass (std::string packageName, + std::string className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) = 0; + virtual void addObject (ManagementObject::shared_ptr object, + uint32_t persistId = 0, + uint32_t persistBank = 4) = 0; }; }} diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp new file mode 100644 index 0000000000..6466028c00 --- /dev/null +++ b/cpp/src/qpid/management/ManagementBroker.cpp @@ -0,0 +1,746 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementBroker.h" +#include "qpid/broker/DeliverableMessage.h" +#include "qpid/log/Statement.h" +#include <qpid/broker/Message.h> +#include <qpid/broker/MessageDelivery.h> +#include "qpid/framing/MessageTransferBody.h" +#include <list> +#include <iostream> +#include <fstream> + +using boost::intrusive_ptr; +using qpid::framing::Uuid; +using namespace qpid::framing; +using namespace qpid::management; +using namespace qpid::broker; +using namespace qpid::sys; +using namespace std; + +ManagementAgent::shared_ptr ManagementBroker::agent; +bool ManagementBroker::enabled = 0; + +ManagementBroker::RemoteAgent::~RemoteAgent () +{ + if (mgmtObject.get () != 0) + mgmtObject->resourceDestroy (); +} + +ManagementBroker::ManagementBroker (string _dataDir, uint16_t _interval, Manageable* _broker) : + dataDir (_dataDir), interval (_interval), broker (_broker) +{ + timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval))); + localBank = 5; + nextObjectId = 1; + bootSequence = 1; + nextRemoteBank = 10; + + // Get from file or generate and save to file. + if (dataDir.empty ()) + { + uuid.generate (); + QPID_LOG (info, "ManagementBroker has no data directory, generated new broker ID: " + << uuid); + } + else + { + string filename (dataDir + "/.mbrokerdata"); + ifstream inFile (filename.c_str ()); + + if (inFile.good ()) + { + inFile >> uuid; + inFile >> bootSequence; + inFile >> nextRemoteBank; + inFile.close (); + QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid); + + bootSequence++; + writeData (); + } + else + { + uuid.generate (); + QPID_LOG (info, "ManagementBroker generated broker ID: " << uuid); + writeData (); + } + + QPID_LOG (debug, "ManagementBroker boot sequence: " << bootSequence); + } +} + +ManagementBroker::~ManagementBroker () {} + +void ManagementBroker::writeData () +{ + string filename (dataDir + "/.mbrokerdata"); + ofstream outFile (filename.c_str ()); + + if (outFile.good ()) + { + outFile << uuid << " " << bootSequence << " " << nextRemoteBank << endl; + outFile.close (); + } +} + +void ManagementBroker::enableManagement (string dataDir, uint16_t interval, Manageable* broker) +{ + enabled = 1; + if (agent.get () == 0) + agent = shared_ptr (new ManagementBroker (dataDir, interval, broker)); +} + +ManagementAgent::shared_ptr ManagementAgent::getAgent (void) +{ + return ManagementBroker::agent; +} + +void ManagementBroker::shutdown (void) +{ + if (agent.get () != 0) + { + ManagementBroker* broker = (ManagementBroker*) agent.get(); + + broker->mExchange.reset (); + broker->dExchange.reset (); + agent.reset (); + } +} + +void ManagementBroker::setExchange (broker::Exchange::shared_ptr _mexchange, + broker::Exchange::shared_ptr _dexchange) +{ + mExchange = _mexchange; + dExchange = _dexchange; +} + +void ManagementBroker::RegisterClass (string packageName, + string className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) +{ + Mutex::ScopedLock lock (userLock); + PackageMap::iterator pIter = FindOrAddPackage (packageName); + AddClassLocal (pIter, className, md5Sum, schemaCall); +} + +void ManagementBroker::addObject (ManagementObject::shared_ptr object, + uint32_t persistId, + uint32_t persistBank) +{ + Mutex::ScopedLock lock (userLock); + uint64_t objectId; + + if (persistId == 0) + objectId = ((uint64_t) bootSequence) << 48 | + ((uint64_t) localBank) << 24 | nextObjectId++; + else + objectId = ((uint64_t) persistBank) << 24 | persistId; + + object->setObjectId (objectId); + managementObjects[objectId] = object; +} + +ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds) + : TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), broker(_broker) {} + +ManagementBroker::Periodic::~Periodic () {} + +void ManagementBroker::Periodic::fire () +{ + broker.timer.add (intrusive_ptr<TimerTask> (new Periodic (broker, broker.interval))); + broker.PeriodicProcessing (); +} + +void ManagementBroker::clientAdded (void) +{ + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) + { + ManagementObject::shared_ptr object = iter->second; + object->setAllChanged (); + } +} + +void ManagementBroker::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) +{ + buf.putOctet ('A'); + buf.putOctet ('M'); + buf.putOctet ('1'); + buf.putOctet (opcode); + buf.putLong (seq); +} + +bool ManagementBroker::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) +{ + uint8_t h1 = buf.getOctet (); + uint8_t h2 = buf.getOctet (); + uint8_t h3 = buf.getOctet (); + + *opcode = buf.getOctet (); + *seq = buf.getLong (); + + return h1 == 'A' && h2 == 'M' && h3 == '1'; +} + +void ManagementBroker::SendBuffer (Buffer& buf, + uint32_t length, + broker::Exchange::shared_ptr exchange, + string routingKey) +{ + if (exchange.get() == 0) + return; + + intrusive_ptr<Message> msg (new Message ()); + AMQFrame method (in_place<MessageTransferBody>( + ProtocolVersion(), exchange->getName (), 0, 0)); + AMQFrame header (in_place<AMQHeaderBody>()); + AMQFrame content(in_place<AMQContentBody>()); + + content.castBody<AMQContentBody>()->decode(buf, length); + + method.setEof (false); + header.setBof (false); + header.setEof (false); + content.setBof (false); + + msg->getFrames().append(method); + msg->getFrames().append(header); + + MessageProperties* props = + msg->getFrames().getHeaders()->get<MessageProperties>(true); + props->setContentLength(length); + msg->getFrames().append(content); + + DeliverableMessage deliverable (msg); + exchange->route (deliverable, routingKey, 0); +} + +void ManagementBroker::PeriodicProcessing (void) +{ +#define BUFSIZE 65536 + Mutex::ScopedLock lock (userLock); + char msgChars[BUFSIZE]; + uint32_t contentSize; + string routingKey; + std::list<uint64_t> deleteList; + + if (managementObjects.empty ()) + return; + + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) + { + ManagementObject::shared_ptr object = iter->second; + + if (object->getConfigChanged () || object->isDeleted ()) + { + Buffer msgBuffer (msgChars, BUFSIZE); + EncodeHeader (msgBuffer, 'c'); + object->writeConfig (msgBuffer); + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + routingKey = "mgmt." + uuid.str() + ".config." + object->getClassName (); + SendBuffer (msgBuffer, contentSize, mExchange, routingKey); + } + + if (object->getInstChanged ()) + { + Buffer msgBuffer (msgChars, BUFSIZE); + EncodeHeader (msgBuffer, 'i'); + object->writeInstrumentation (msgBuffer); + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + routingKey = "mgmt." + uuid.str () + ".inst." + object->getClassName (); + SendBuffer (msgBuffer, contentSize, mExchange, routingKey); + } + + if (object->isDeleted ()) + deleteList.push_back (iter->first); + } + + // Delete flagged objects + for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin (); + iter != deleteList.rend (); + iter++) + managementObjects.erase (*iter); + + deleteList.clear (); +} + +void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence, + uint32_t code, string text) +{ + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'z', sequence); + outBuffer.putLong (code); + outBuffer.putShortString (text); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); +} + +void ManagementBroker::dispatchCommand (Deliverable& deliverable, + const string& routingKey, + const FieldTable* /*args*/) +{ + Mutex::ScopedLock lock (userLock); + Message& msg = ((DeliverableMessage&) deliverable).getMessage (); + + if (routingKey.compare (0, 13, "agent.method.") == 0) + dispatchMethodLH (msg, routingKey, 13); + + else if (routingKey.length () == 5 && + routingKey.compare (0, 5, "agent") == 0) + dispatchAgentCommandLH (msg); + + else + { + QPID_LOG (debug, "Illegal routing key for dispatch: " << routingKey); + return; + } +} + +void ManagementBroker::dispatchMethodLH (Message& msg, + const string& routingKey, + size_t first) +{ + size_t pos, start = first; + uint32_t contentSize; + + if (routingKey.length () == start) + { + QPID_LOG (debug, "Missing package-name in routing key: " << routingKey); + return; + } + + pos = routingKey.find ('.', start); + if (pos == string::npos || routingKey.length () == pos + 1) + { + QPID_LOG (debug, "Missing class-name in routing key: " << routingKey); + return; + } + + string packageName = routingKey.substr (start, pos - start); + + start = pos + 1; + pos = routingKey.find ('.', start); + if (pos == string::npos || routingKey.length () == pos + 1) + { + QPID_LOG (debug, "Missing method-name in routing key: " << routingKey); + return; + } + + string className = routingKey.substr (start, pos - start); + + start = pos + 1; + string methodName = routingKey.substr (start, routingKey.length () - start); + + contentSize = msg.encodedContentSize (); + if (contentSize < 8 || contentSize > MA_BUFFER_SIZE) + return; + + Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen, sequence; + uint8_t opcode; + + msg.encodeContent (inBuffer); + inBuffer.reset (); + + if (!CheckHeader (inBuffer, &opcode, &sequence)) + { + QPID_LOG (debug, " Invalid content header"); + return; + } + + if (opcode != 'M') + { + QPID_LOG (debug, " Unexpected opcode " << opcode); + return; + } + + uint64_t objId = inBuffer.getLongLong (); + string replyToKey; + + const framing::MessageProperties* p = + msg.getFrames().getHeaders()->get<framing::MessageProperties>(); + if (p && p->hasReplyTo()) + { + const framing::ReplyTo& rt = p->getReplyTo (); + replyToKey = rt.getRoutingKey (); + } + else + { + QPID_LOG (debug, " Reply-to missing"); + return; + } + + EncodeHeader (outBuffer, 'm', sequence); + + ManagementObjectMap::iterator iter = managementObjects.find (objId); + if (iter == managementObjects.end () || iter->second->isDeleted ()) + { + outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); + outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); + } + else + { + iter->second->doMethod (methodName, inBuffer, outBuffer); + } + + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); +} + +void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence) +{ + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'b', sequence); + uuid.encode (outBuffer); + + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); +} + +void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence) +{ + for (PackageMap::iterator pIter = packages.begin (); + pIter != packages.end (); + pIter++) + { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'p', sequence); + EncodePackageIndication (outBuffer, pIter); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); + } + + sendCommandComplete (replyToKey, sequence); +} + +void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/) +{ + std::string packageName; + + inBuffer.getShortString (packageName); + FindOrAddPackage (packageName); +} + +void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +{ + std::string packageName; + + inBuffer.getShortString (packageName); + PackageMap::iterator pIter = packages.find (packageName); + if (pIter != packages.end ()) + { + ClassMap cMap = pIter->second; + for (ClassMap::iterator cIter = cMap.begin (); + cIter != cMap.end (); + cIter++) + { + if (cIter->second.hasSchema ()) + { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'q', sequence); + EncodeClassIndication (outBuffer, pIter, cIter); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); + } + } + } + + sendCommandComplete (replyToKey, sequence); +} + +void ManagementBroker::SchemaClass::appendSchema (Buffer& buf) +{ + // If the management package is attached locally (embedded in the broker or + // linked in via plug-in), call the schema handler directly. If the package + // is from a remote management agent, send the stored schema information. + + if (writeSchemaCall != 0) + writeSchemaCall (buf); + else + buf.putRawData (buffer, bufferLen); +} + +void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +{ + string packageName; + SchemaClassKey key; + + inBuffer.getShortString (packageName); + inBuffer.getShortString (key.name); + inBuffer.getBin128 (key.hash); + + PackageMap::iterator pIter = packages.find (packageName); + if (pIter != packages.end ()) + { + ClassMap cMap = pIter->second; + ClassMap::iterator cIter = cMap.find (key); + if (cIter != cMap.end ()) + { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + SchemaClass classInfo = cIter->second; + + if (classInfo.hasSchema()) + { + EncodeHeader (outBuffer, 's', sequence); + classInfo.appendSchema (outBuffer); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); + } + + clientAdded (); + // TODO: Send client-added to each remote agent. + } + } +} + +bool ManagementBroker::bankInUse (uint32_t bank) +{ + for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); + aIter != remoteAgents.end(); + aIter++) + if (aIter->second->objIdBank == bank) + return true; + return false; +} + +uint32_t ManagementBroker::allocateNewBank () +{ + while (bankInUse (nextRemoteBank)) + nextRemoteBank++; + + uint32_t allocated = nextRemoteBank++; + writeData (); + return allocated; +} + +uint32_t ManagementBroker::assignBankLH (uint32_t requestedBank) +{ + if (requestedBank == 0 || bankInUse (requestedBank)) + return allocateNewBank (); + return requestedBank; +} + +void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +{ + string label; + uint32_t requestedBank; + uint32_t assignedBank; + Uuid sessionId; + Uuid systemId; + + inBuffer.getShortString (label); + sessionId.decode (inBuffer); + systemId.decode (inBuffer); + requestedBank = inBuffer.getLong (); + assignedBank = assignBankLH (requestedBank); + + RemoteAgentMap::iterator aIter = remoteAgents.find (sessionId); + if (aIter != remoteAgents.end()) + { + // There already exists an agent on this session. Reject the request. + sendCommandComplete (replyToKey, sequence, 1, "Session already has remote agent"); + return; + } + + RemoteAgent* agent = new RemoteAgent; + agent->objIdBank = assignedBank; + agent->mgmtObject = management::Agent::shared_ptr + (new management::Agent (agent)); + agent->mgmtObject->set_sessionId (sessionId); + agent->mgmtObject->set_label (label); + agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); + agent->mgmtObject->set_sysId (systemId); + addObject (agent->mgmtObject); + + remoteAgents[sessionId] = agent; + + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'a', sequence); + outBuffer.putLong (assignedBank); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); +} + +void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +{ + FieldTable ft; + FieldTable::ValuePtr value; + + ft.decode (inBuffer); + value = ft.get ("_class"); + if (value->empty () || !value->convertsTo<string> ()) + { + // TODO: Send completion with an error code + return; + } + + string className (value->get<string> ()); + + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) + { + ManagementObject::shared_ptr object = iter->second; + if (object->getClassName () == className) + { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'g', sequence); + object->writeConfig (outBuffer); + object->writeInstrumentation (outBuffer, true); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); + } + } + + sendCommandComplete (replyToKey, sequence); +} + +void ManagementBroker::dispatchAgentCommandLH (Message& msg) +{ + Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); + uint8_t opcode; + uint32_t sequence; + string replyToKey; + + const framing::MessageProperties* p = + msg.getFrames().getHeaders()->get<framing::MessageProperties>(); + if (p && p->hasReplyTo()) + { + const framing::ReplyTo& rt = p->getReplyTo (); + replyToKey = rt.getRoutingKey (); + } + else + return; + + msg.encodeContent (inBuffer); + inBuffer.reset (); + + if (!CheckHeader (inBuffer, &opcode, &sequence)) + return; + + if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence); + else if (opcode == 'P') handlePackageQueryLH (inBuffer, replyToKey, sequence); + else if (opcode == 'p') handlePackageIndLH (inBuffer, replyToKey, sequence); + else if (opcode == 'Q') handleClassQueryLH (inBuffer, replyToKey, sequence); + else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence); + else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence); + else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence); +} + +ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackage (std::string name) +{ + PackageMap::iterator pIter = packages.find (name); + if (pIter != packages.end ()) + return pIter; + + // No such package found, create a new map entry. + pair<PackageMap::iterator, bool> result = + packages.insert (pair<string, ClassMap> (name, ClassMap ())); + QPID_LOG (debug, "ManagementBroker added package " << name); + + // Publish a package-indication message + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'p'); + EncodePackageIndication (outBuffer, result.first); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package"); + + return result.first; +} + +void ManagementBroker::AddClassLocal (PackageMap::iterator pIter, + string className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) +{ + SchemaClassKey key; + ClassMap& cMap = pIter->second; + + key.name = className; + memcpy (&key.hash, md5Sum, 16); + + ClassMap::iterator cIter = cMap.find (key); + if (cIter != cMap.end ()) + return; + + // No such class found, create a new class with local information. + QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." << + key.name); + SchemaClass classInfo; + + classInfo.writeSchemaCall = schemaCall; + cMap[key] = classInfo; + + // TODO: Publish a class-indication message +} + +void ManagementBroker::EncodePackageIndication (Buffer& buf, + PackageMap::iterator pIter) +{ + buf.putShortString ((*pIter).first); +} + +void ManagementBroker::EncodeClassIndication (Buffer& buf, + PackageMap::iterator pIter, + ClassMap::iterator cIter) +{ + SchemaClassKey key = (*cIter).first; + + buf.putShortString ((*pIter).first); + buf.putShortString (key.name); + buf.putBin128 (key.hash); +} + diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementBroker.h new file mode 100644 index 0000000000..2e02cb2a43 --- /dev/null +++ b/cpp/src/qpid/management/ManagementBroker.h @@ -0,0 +1,203 @@ +#ifndef _ManagementBroker_ +#define _ManagementBroker_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/Options.h" +#include "qpid/broker/Exchange.h" +#include "qpid/broker/Timer.h" +#include "qpid/framing/Uuid.h" +#include "qpid/sys/Mutex.h" +#include "ManagementAgent.h" +#include "ManagementObject.h" +#include "Manageable.h" +#include "qpid/management/Agent.h" +#include <qpid/framing/AMQFrame.h> +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace management { + +class ManagementBroker : public ManagementAgent +{ + private: + + ManagementBroker (std::string dataDir, uint16_t interval, Manageable* broker); + + public: + + virtual ~ManagementBroker (); + + static void enableManagement (std::string dataDir, uint16_t interval, Manageable* broker); + static shared_ptr getAgent (void); + static void shutdown (void); + + void setInterval (uint16_t _interval) { interval = _interval; } + void setExchange (broker::Exchange::shared_ptr mgmtExchange, + broker::Exchange::shared_ptr directExchange); + void RegisterClass (std::string packageName, + std::string className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); + void addObject (ManagementObject::shared_ptr object, + uint32_t persistId = 0, + uint32_t persistBank = 4); + void clientAdded (void); + void dispatchCommand (broker::Deliverable& msg, + const std::string& routingKey, + const framing::FieldTable* args); + + private: + friend class ManagementAgent; + + struct Periodic : public broker::TimerTask + { + ManagementBroker& broker; + + Periodic (ManagementBroker& broker, uint32_t seconds); + virtual ~Periodic (); + void fire (); + }; + + // Storage for tracking remote management agents, attached via the client + // management agent API. + // + struct RemoteAgent : public Manageable + { + uint32_t objIdBank; + Agent::shared_ptr mgmtObject; + ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; } + virtual ~RemoteAgent (); + }; + + // TODO: Eventually replace string with entire reply-to structure. reply-to + // currently assumes that the exchange is "amq.direct" even though it could + // in theory be specified differently. + typedef std::map<framing::Uuid, RemoteAgent*> RemoteAgentMap; + typedef std::vector<std::string> ReplyToVector; + + // Storage for known schema classes: + // + // SchemaClassKey -- Key elements for map lookups + // SchemaClassKeyComp -- Comparison class for SchemaClassKey + // SchemaClass -- Non-key elements for classes + // + struct SchemaClassKey + { + std::string name; + uint8_t hash[16]; + }; + + struct SchemaClassKeyComp + { + bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const + { + if (lhs.name != rhs.name) + return lhs.name < rhs.name; + else + for (int i = 0; i < 16; i++) + if (lhs.hash[i] != rhs.hash[i]) + return lhs.hash[i] < rhs.hash[i]; + return false; + } + }; + + struct SchemaClass + { + ManagementObject::writeSchemaCall_t writeSchemaCall; + ReplyToVector remoteAgents; + size_t bufferLen; + uint8_t* buffer; + + SchemaClass () : writeSchemaCall(0), bufferLen(0), buffer(0) {} + bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); } + void appendSchema (framing::Buffer& buf); + }; + + typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap; + typedef std::map<std::string, ClassMap> PackageMap; + + RemoteAgentMap remoteAgents; + PackageMap packages; + ManagementObjectMap managementObjects; + + static shared_ptr agent; + static bool enabled; + + framing::Uuid uuid; + sys::Mutex userLock; + broker::Timer timer; + broker::Exchange::shared_ptr mExchange; + broker::Exchange::shared_ptr dExchange; + std::string dataDir; + uint16_t interval; + Manageable* broker; + uint16_t bootSequence; + uint32_t localBank; + uint32_t nextObjectId; + uint32_t nextRemoteBank; + +# define MA_BUFFER_SIZE 65536 + char inputBuffer[MA_BUFFER_SIZE]; + char outputBuffer[MA_BUFFER_SIZE]; + + void writeData (); + void PeriodicProcessing (void); + void EncodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); + bool CheckHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); + void SendBuffer (framing::Buffer& buf, + uint32_t length, + broker::Exchange::shared_ptr exchange, + std::string routingKey); + + void dispatchMethodLH (broker::Message& msg, + const std::string& routingKey, + size_t first); + void dispatchAgentCommandLH (broker::Message& msg); + + PackageMap::iterator FindOrAddPackage (std::string name); + void AddClassLocal (PackageMap::iterator pIter, + std::string className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); + void EncodePackageIndication (framing::Buffer& buf, + PackageMap::iterator pIter); + void EncodeClassIndication (framing::Buffer& buf, + PackageMap::iterator pIter, + ClassMap::iterator cIter); + bool bankInUse (uint32_t bank); + uint32_t allocateNewBank (); + uint32_t assignBankLH (uint32_t requestedPrefix); + void sendCommandComplete (std::string replyToKey, uint32_t sequence, + uint32_t code = 0, std::string text = std::string("OK")); + void handleBrokerRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handlePackageQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handlePackageIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleClassQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleSchemaRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); +}; + +}} + +#endif /*!_ManagementBroker_*/ diff --git a/cpp/src/qpid/management/ManagementExchange.cpp b/cpp/src/qpid/management/ManagementExchange.cpp index c589aefba0..28e6fb8d0a 100644 --- a/cpp/src/qpid/management/ManagementExchange.cpp +++ b/cpp/src/qpid/management/ManagementExchange.cpp @@ -53,7 +53,7 @@ void ManagementExchange::route (Deliverable& msg, TopicExchange::route (msg, routingKey, args); } -void ManagementExchange::setManagmentAgent (ManagementAgent::shared_ptr agent) +void ManagementExchange::setManagmentAgent (ManagementBroker* agent) { managementAgent = agent; } diff --git a/cpp/src/qpid/management/ManagementExchange.h b/cpp/src/qpid/management/ManagementExchange.h index 7faec32b0f..28066b1e80 100644 --- a/cpp/src/qpid/management/ManagementExchange.h +++ b/cpp/src/qpid/management/ManagementExchange.h @@ -22,7 +22,7 @@ #define _ManagementExchange_ #include "qpid/broker/TopicExchange.h" -#include "ManagementAgent.h" +#include "ManagementBroker.h" namespace qpid { namespace broker { @@ -30,7 +30,7 @@ namespace broker { class ManagementExchange : public virtual TopicExchange { private: - management::ManagementAgent::shared_ptr managementAgent; + management::ManagementBroker* managementAgent; public: static const std::string typeName; @@ -46,7 +46,7 @@ class ManagementExchange : public virtual TopicExchange const string& routingKey, const qpid::framing::FieldTable* args); - void setManagmentAgent (management::ManagementAgent::shared_ptr agent); + void setManagmentAgent (management::ManagementBroker* agent); virtual ~ManagementExchange(); }; diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h index 48a3372d16..047f8c5754 100644 --- a/cpp/src/qpid/management/ManagementObject.h +++ b/cpp/src/qpid/management/ManagementObject.h @@ -22,7 +22,6 @@ * */ -#include "Manageable.h" #include "qpid/sys/Time.h" #include "qpid/sys/Mutex.h" #include <qpid/framing/Buffer.h> diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp index ca2bd7c93c..31974993bb 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.cpp +++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -36,13 +36,14 @@ struct Buff : public AsynchIO::BufferBase { { delete [] bytes;} }; -AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) : +AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f, ProtocolAccess* a) : identifier(id), aio(0), factory(f), codec(0), readError(false), - isClient(false) + isClient(false), + access(a) {} AsynchIOHandler::~AsynchIOHandler() { @@ -152,7 +153,7 @@ void AsynchIOHandler::nobuffs(AsynchIO&) { void AsynchIOHandler::idle(AsynchIO&){ if (isClient && codec == 0) { - codec = factory->create(*this, identifier); + codec = factory->create(*this, identifier, access); write(framing::ProtocolInitiation(codec->getVersion())); return; } diff --git a/cpp/src/qpid/sys/AsynchIOHandler.h b/cpp/src/qpid/sys/AsynchIOHandler.h index 530613367a..ece52f57c4 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.h +++ b/cpp/src/qpid/sys/AsynchIOHandler.h @@ -32,7 +32,7 @@ namespace framing { } namespace sys { - +class ProtocolAccess; class AsynchIOHandler : public OutputControl { std::string identifier; AsynchIO* aio; @@ -40,11 +40,12 @@ class AsynchIOHandler : public OutputControl { ConnectionCodec* codec; bool readError; bool isClient; + ProtocolAccess* access; void write(const framing::ProtocolInitiation&); public: - AsynchIOHandler(std::string id, ConnectionCodec::Factory* f); + AsynchIOHandler(std::string id, ConnectionCodec::Factory* f, ProtocolAccess* a =0); ~AsynchIOHandler(); void init(AsynchIO* a, int numBuffs); diff --git a/cpp/src/qpid/sys/ConnectionCodec.h b/cpp/src/qpid/sys/ConnectionCodec.h index 205596c709..4c5a68e576 100644 --- a/cpp/src/qpid/sys/ConnectionCodec.h +++ b/cpp/src/qpid/sys/ConnectionCodec.h @@ -28,9 +28,8 @@ namespace qpid { -namespace broker { class Broker; } - namespace sys { +class ProtocolAccess; /** * Interface of coder/decoder for a connection of a specific protocol @@ -70,7 +69,7 @@ class ConnectionCodec { /** Return "preferred" codec for outbound connections. */ virtual ConnectionCodec* create( - OutputControl&, const std::string& id + OutputControl&, const std::string& id, ProtocolAccess* a = 0 ) = 0; }; }; diff --git a/cpp/src/qpid/sys/ProtocolAccess.h b/cpp/src/qpid/sys/ProtocolAccess.h new file mode 100644 index 0000000000..433bf0ef97 --- /dev/null +++ b/cpp/src/qpid/sys/ProtocolAccess.h @@ -0,0 +1,65 @@ +#ifndef _sys_ProtocolAccess_h +#define _sys_ProtocolAccess_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "AsynchIO.h" +#include "AsynchIOHandler.h" +#include <boost/function.hpp> +#include <boost/shared_ptr.hpp> + +namespace qpid { + +namespace broker +{ +class Connection; +} + +namespace sys { + +class ProtocolAccess +{ +public: + typedef boost::function0<void> Callback; + typedef boost::function2<void, int, std::string> ClosedCallback; + typedef boost::function1<void, boost::shared_ptr<broker::Connection> > SetConnCallback; + + ProtocolAccess (Callback ecb, ClosedCallback ccb, SetConnCallback sccb) + : aio(0), establishedCb(ecb), closedCb(ccb), setConnCb(sccb) {} + ~ProtocolAccess() {} + inline void close() { if (aio) aio->queueWriteClose(); } + + inline void setAio(AsynchIO *_aio) { aio = _aio; establishedCb(); } + inline void closedEof(AsynchIOHandler* async) { async->eof(*aio); closedCb(-1, "Closed by Peer"); } + inline void closed(int err, std::string str) { closedCb(err, str); } + inline void callConnCb(boost::shared_ptr<broker::Connection> c) { setConnCb(c); } + +private: + AsynchIO* aio; + Callback establishedCb; + ClosedCallback closedCb; + SetConnCallback setConnCb; +}; + +}} + +#endif //!_sys_ProtocolAccess_h diff --git a/cpp/src/qpid/sys/ProtocolFactory.h b/cpp/src/qpid/sys/ProtocolFactory.h index 5f80771e49..e61a94b205 100644 --- a/cpp/src/qpid/sys/ProtocolFactory.h +++ b/cpp/src/qpid/sys/ProtocolFactory.h @@ -25,7 +25,7 @@ #include <stdint.h> #include "qpid/SharedObject.h" #include "ConnectionCodec.h" - +#include "ProtocolAccess.h" namespace qpid { namespace sys { @@ -42,7 +42,8 @@ class ProtocolFactory : public qpid::SharedObject<ProtocolFactory> virtual void connect( boost::shared_ptr<Poller>, const std::string& host, int16_t port, - ConnectionCodec::Factory* codec) = 0; + ConnectionCodec::Factory* codec, + ProtocolAccess* access = 0) = 0; }; inline ProtocolFactory::~ProtocolFactory() {} diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp index 045bc56e90..5d2cadbe03 100644 --- a/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -41,13 +41,15 @@ class AsynchIOProtocolFactory : public ProtocolFactory { public: AsynchIOProtocolFactory(int16_t port, int backlog); void accept(Poller::shared_ptr, ConnectionCodec::Factory*); - void connect(Poller::shared_ptr, const std::string& host, int16_t port, ConnectionCodec::Factory*); + void connect(Poller::shared_ptr, const std::string& host, int16_t port, + ConnectionCodec::Factory*, ProtocolAccess*); uint16_t getPort() const; std::string getHost() const; private: - void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, bool isClient); + void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, + bool isClient, ProtocolAccess*); }; // Static instance to initialise plugin @@ -72,17 +74,32 @@ AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog) : {} void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, - ConnectionCodec::Factory* f, bool isClient) { - AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f); + ConnectionCodec::Factory* f, bool isClient, + ProtocolAccess* a) { + AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f, a); + AsynchIO* aio; + if (isClient) async->setClient(); - AsynchIO* aio = new AsynchIO(s, - boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), - boost::bind(&AsynchIOHandler::eof, async, _1), - boost::bind(&AsynchIOHandler::disconnect, async, _1), - boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), - boost::bind(&AsynchIOHandler::nobuffs, async, _1), - boost::bind(&AsynchIOHandler::idle, async, _1)); + if (a == 0) + aio = new AsynchIO(s, + boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), + boost::bind(&AsynchIOHandler::eof, async, _1), + boost::bind(&AsynchIOHandler::disconnect, async, _1), + boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), + boost::bind(&AsynchIOHandler::nobuffs, async, _1), + boost::bind(&AsynchIOHandler::idle, async, _1)); + else { + aio = new AsynchIO(s, + boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), + boost::bind(&ProtocolAccess::closedEof, a, async), + boost::bind(&AsynchIOHandler::disconnect, async, _1), + boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), + boost::bind(&AsynchIOHandler::nobuffs, async, _1), + boost::bind(&AsynchIOHandler::idle, async, _1)); + a->setAio(aio); + } + async->init(aio, 4); aio->start(poller); } @@ -95,26 +112,31 @@ std::string AsynchIOProtocolFactory::getHost() const { return listener.getSockname(); } -void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { +void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller, + ConnectionCodec::Factory* fact) { acceptor.reset( new AsynchAcceptor(listener, - boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false))); + boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false, + (ProtocolAccess*) 0))); acceptor->start(poller); } void AsynchIOProtocolFactory::connect( Poller::shared_ptr poller, const std::string& host, int16_t port, - ConnectionCodec::Factory* f) + ConnectionCodec::Factory* fact, + ProtocolAccess* access) { // Note that the following logic does not cause a memory leak. // The allocated Socket is freed either by the AsynchConnector // upon connection failure or by the AsynchIO upon connection // shutdown. The allocated AsynchConnector frees itself when it // is no longer needed. + Socket* socket = new Socket(); - new AsynchConnector(*socket, poller, host, port, - boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, f, true)); + new AsynchConnector (*socket, poller, host, port, + boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, true, access), + boost::bind(&ProtocolAccess::closed, access, _1, _2)); } }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index 9dcb841992..470db4c614 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -130,7 +130,7 @@ void AsynchConnector::connComplete(DispatchHandle& h) h.stopWatch(); if (errCode == 0) { connCallback(socket); - DispatchHandle::doDelete(); + DispatchHandle::doDelete(); } else { failure(errCode, std::string(strerror(errCode))); } @@ -148,6 +148,7 @@ void AsynchConnector::failure(int errCode, std::string message) } /* +>>>>>>> .r654667 * Asynch reader/writer */ AsynchIO::AsynchIO(const Socket& s, diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py index ce9c4a8757..25654fe1c7 100755 --- a/cpp/src/tests/federation.py +++ b/cpp/src/tests/federation.py @@ -23,6 +23,7 @@ from qpid.testlib import TestBase010, testrunner from qpid.management import managementChannel, managementClient from qpid.datatypes import Message from qpid.queue import Empty +from time import sleep def add_module(args=sys.argv[1:]): for a in args: @@ -89,18 +90,18 @@ class FederationTests(TestBase010): mgmt = Helper(self) broker = mgmt.get_object("broker") - for i in range(10): - mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()}) - link = mgmt.get_object("link") - - mgmt.call_method(link, "bridge", {"src":"amq.direct", "dest":"amq.direct", "key":"my-key"}) - bridge = mgmt.get_object("bridge") + mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()}) + link = mgmt.get_object("link") - mgmt.call_method(bridge, "close") - self.assertEqual(len(mgmt.get_objects("bridge")), 0) + mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.direct", "key":"my-key"}) + bridge = mgmt.get_object("bridge") - mgmt.call_method(link, "close") - self.assertEqual(len(mgmt.get_objects("link")), 0) + mgmt.call_method(bridge, "close") + mgmt.call_method(link, "close") + + sleep(6) + self.assertEqual(len(mgmt.get_objects("bridge")), 0) + self.assertEqual(len(mgmt.get_objects("link")), 0) mgmt.shutdown () @@ -113,7 +114,7 @@ class FederationTests(TestBase010): mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()}) link = mgmt.get_object("link") - mgmt.call_method(link, "bridge", {"src":"amq.direct", "dest":"amq.fanout", "key":"my-key"}) + mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.fanout", "key":"my-key"}) bridge = mgmt.get_object("bridge") #setup queue to receive messages from local broker @@ -121,6 +122,7 @@ class FederationTests(TestBase010): session.exchange_bind(queue="fed1", exchange="amq.fanout") self.subscribe(queue="fed1", destination="f1") queue = session.incoming("f1") + sleep(6) #send messages to remote broker and confirm it is routed to local broker r_conn = self.connect(host=remote_host(), port=remote_port()) @@ -138,12 +140,8 @@ class FederationTests(TestBase010): self.fail("Got unexpected message in queue: " + extra.body) except Empty: None - mgmt.call_method(bridge, "close") - self.assertEqual(len(mgmt.get_objects("bridge")), 0) - mgmt.call_method(link, "close") - self.assertEqual(len(mgmt.get_objects("link")), 0) mgmt.shutdown() @@ -170,7 +168,8 @@ class FederationTests(TestBase010): mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()}) link = mgmt.get_object("link") - mgmt.call_method(link, "bridge", {"src":"my-bridge-queue", "dest":"amq.fanout", "key":"", "id":"", "excludes":"", "src_is_queue":1}) + mgmt.call_method(link, "bridge", {"durable":0, "src":"my-bridge-queue", "dest":"amq.fanout", "key":"", "id":"", "excludes":"", "src_is_queue":1}) + sleep(6) bridge = mgmt.get_object("bridge") #add some more messages (i.e. after bridge was created) @@ -191,10 +190,7 @@ class FederationTests(TestBase010): mgmt.call_method(bridge, "close") - self.assertEqual(len(mgmt.get_objects("bridge")), 0) - mgmt.call_method(link, "close") - self.assertEqual(len(mgmt.get_objects("link")), 0) mgmt.shutdown () @@ -207,8 +203,9 @@ class FederationTests(TestBase010): mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()}) link = mgmt.get_object("link") - mgmt.call_method(link, "bridge", {"src":"amq.direct", "dest":"amq.fanout", "key":"my-key", + mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.fanout", "key":"my-key", "id":"my-bridge-id", "excludes":"exclude-me,also-exclude-me"}) + sleep(6) bridge = mgmt.get_object("bridge") #setup queue to receive messages from local broker @@ -241,10 +238,7 @@ class FederationTests(TestBase010): except Empty: None mgmt.call_method(bridge, "close") - self.assertEqual(len(mgmt.get_objects("bridge")), 0) - mgmt.call_method(link, "close") - self.assertEqual(len(mgmt.get_objects("link")), 0) mgmt.shutdown () |