diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2008-01-02 15:56:20 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2008-01-02 15:56:20 +0000 |
commit | 3fe6853a7029e48f693c0853e51af33be5c79aec (patch) | |
tree | 6139a715591aabc91370350aa26f854639a2aa11 /cpp | |
parent | 8bc0b992a0e67259a7d9c525bbbbbc32fbc60a20 (diff) | |
download | qpid-python-3fe6853a7029e48f693c0853e51af33be5c79aec.tar.gz |
patch-715 (tross)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@608135 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rwxr-xr-x | cpp/managementgen/main.py | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 66 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.h | 16 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConnectionFactory.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConnectionFactory.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionManager.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 75 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 11 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 85 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementExchange.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementObject.h | 16 | ||||
-rw-r--r-- | cpp/src/qpid/sys/AsynchIOAcceptor.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/sys/ConnectionInputHandlerFactory.h | 3 |
17 files changed, 262 insertions, 51 deletions
diff --git a/cpp/managementgen/main.py b/cpp/managementgen/main.py index 2f70639482..ddf18ef873 100755 --- a/cpp/managementgen/main.py +++ b/cpp/managementgen/main.py @@ -41,8 +41,7 @@ parser.add_option ("-m", "--makefile", dest="makefile", metavar="FILE", if opts.outdir == None or \ opts.typefile == None or \ opts.schemafile == None or \ - opts.templatedir == None or \ - opts.makefile == None: + opts.templatedir == None: parser.error ("Incorrect options, see --help for help") gen = Generator (opts.outdir, opts.templatedir) @@ -51,4 +50,6 @@ schema = PackageSchema (opts.typefile, opts.schemafile) gen.makeClassFiles ("Class.h", schema) gen.makeClassFiles ("Class.cpp", schema) gen.makeMethodFiles ("Args.h", schema) -gen.makeMakeFile (opts.makefile) + +if opts.makefile != None: + gen.makeMakeFile (opts.makefile) diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index d83eabee06..5607d22498 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -231,6 +231,7 @@ void Broker::run() { void Broker::shutdown() { if (acceptor) acceptor->shutdown(); + ManagementAgent::shutdown (); } Broker::~Broker() { @@ -270,6 +271,11 @@ ManagementObject::shared_ptr Broker::GetManagementObject(void) const return dynamic_pointer_cast<ManagementObject> (mgmtObject); } +Manageable* Broker::GetVhostObject(void) const +{ + return vhostObject.get(); +} + Manageable::status_t Broker::ManagementMethod (uint32_t methodId, Args& /*_args*/) { diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 1c1c303be8..aaa0be72bb 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -119,6 +119,7 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M SessionManager& getSessionManager() { return sessionManager; } management::ManagementObject::shared_ptr GetManagementObject (void) const; + management::Manageable* GetVhostObject (void) const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args); diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index f2cd1c11e4..fbe018e8ae 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -26,6 +26,7 @@ #include "qpid/log/Statement.h" #include "qpid/ptr_map.h" #include "qpid/framing/AMQP_ClientProxy.h" +#include "qpid/management/ManagementAgent.h" #include <boost/bind.hpp> @@ -38,11 +39,15 @@ using namespace qpid::sys; using namespace qpid::framing; using namespace qpid::sys; using namespace qpid::ptr_map; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; namespace qpid { namespace broker { -Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) : + Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const Socket& s) : broker(broker_), outputTasks(*out_), out(out_), @@ -50,15 +55,45 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) : heartbeat(0), client(0), stagingThreshold(broker.getStagingThreshold()), - adapter(*this) -{} + adapter(*this), + mgmtClosing(0) +{ + Manageable* parent = broker.GetVhostObject (); + + if (parent != 0) + { + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + + if (agent.get () != 0) + { + mgmtObject = management::Client::shared_ptr + (new management::Client (this, parent, s.getPeerAddress ())); + agent->addObject (mgmtObject); + } + } +} + +Connection::~Connection () +{ + if (mgmtObject.get () != 0) + mgmtObject->resourceDestroy (); +} void Connection::received(framing::AMQFrame& frame){ + if (mgmtClosing) + close (403, "Closed by Management Request", 0, 0); + if (frame.getChannel() == 0) { adapter.handle(frame); } else { getChannel(frame.getChannel()).in(frame); } + + if (mgmtObject.get () != 0) + { + mgmtObject->inc_framesFromClient (); + mgmtObject->inc_bytesFromClient (frame.size ()); + } } void Connection::close( @@ -122,5 +157,30 @@ SessionHandler& Connection::getChannel(ChannelId id) { return *get_pointer(i); } +ManagementObject::shared_ptr Connection::GetManagementObject (void) const +{ + return dynamic_pointer_cast<ManagementObject> (mgmtObject); +} + +Manageable::status_t Connection::ManagementMethod (uint32_t methodId, + Args& /*args*/) +{ + Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; + + QPID_LOG (debug, "Connection::ManagementMethod [id=" << methodId << "]"); + + switch (methodId) + { + case management::Client::METHOD_CLOSE : + mgmtClosing = 1; + mgmtObject->set_closing (1); + status = Manageable::STATUS_OK; + break; + } + + return status; +} + + }} diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index 395aa7b0bd..cbe7addec2 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -35,9 +35,12 @@ #include "qpid/sys/TimeoutHandler.h" #include "qpid/framing/ProtocolVersion.h" #include "Broker.h" +#include "qpid/sys/Socket.h" #include "qpid/Exception.h" #include "ConnectionHandler.h" #include "SessionHandler.h" +#include "qpid/management/Manageable.h" +#include "qpid/management/Client.h" #include <boost/ptr_container/ptr_map.hpp> @@ -45,10 +48,12 @@ namespace qpid { namespace broker { class Connection : public sys::ConnectionInputHandler, - public ConnectionToken + public ConnectionToken, + public management::Manageable { public: - Connection(sys::ConnectionOutputHandler* out, Broker& broker); + Connection(sys::ConnectionOutputHandler* out, Broker& broker, const sys::Socket& s); + ~Connection (); /** Get the SessionHandler for channel. Create if it does not already exist */ SessionHandler& getChannel(framing::ChannelId channel); @@ -85,6 +90,11 @@ class Connection : public sys::ConnectionInputHandler, void closeChannel(framing::ChannelId channel); + // Manageable entry points + management::ManagementObject::shared_ptr GetManagementObject (void) const; + management::Manageable::status_t + ManagementMethod (uint32_t methodId, management::Args& args); + private: typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; @@ -97,6 +107,8 @@ class Connection : public sys::ConnectionInputHandler, framing::AMQP_ClientProxy::Connection* client; uint64_t stagingThreshold; ConnectionHandler adapter; + management::Client::shared_ptr mgmtObject; + bool mgmtClosing; }; }} diff --git a/cpp/src/qpid/broker/ConnectionFactory.cpp b/cpp/src/qpid/broker/ConnectionFactory.cpp index 4429a2583c..c925c4be2f 100644 --- a/cpp/src/qpid/broker/ConnectionFactory.cpp +++ b/cpp/src/qpid/broker/ConnectionFactory.cpp @@ -35,9 +35,10 @@ ConnectionFactory::~ConnectionFactory() } qpid::sys::ConnectionInputHandler* -ConnectionFactory::create(qpid::sys::ConnectionOutputHandler* out) +ConnectionFactory::create(qpid::sys::ConnectionOutputHandler* out, + const qpid::sys::Socket& s) { - return new Connection(out, broker); + return new Connection(out, broker, s); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/ConnectionFactory.h b/cpp/src/qpid/broker/ConnectionFactory.h index 6af3dda7a6..23fba5c1ab 100644 --- a/cpp/src/qpid/broker/ConnectionFactory.h +++ b/cpp/src/qpid/broker/ConnectionFactory.h @@ -32,8 +32,8 @@ class ConnectionFactory : public qpid::sys::ConnectionInputHandlerFactory public: ConnectionFactory(Broker& b); - virtual qpid::sys::ConnectionInputHandler* create( - qpid::sys::ConnectionOutputHandler* ctxt); + virtual qpid::sys::ConnectionInputHandler* create + (qpid::sys::ConnectionOutputHandler* ctxt, const sys::Socket& s); virtual ~ConnectionFactory(); diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index b68a7db8b0..f753e7ef75 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -616,10 +616,6 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, purge (); status = Manageable::STATUS_OK; break; - - case management::Queue::METHOD_INCREASEJOURNALSIZE : - status = Manageable::STATUS_NOT_IMPLEMENTED; - break; } return status; diff --git a/cpp/src/qpid/broker/SessionManager.cpp b/cpp/src/qpid/broker/SessionManager.cpp index f12ebc6db1..1d5f9ebada 100644 --- a/cpp/src/qpid/broker/SessionManager.cpp +++ b/cpp/src/qpid/broker/SessionManager.cpp @@ -58,6 +58,8 @@ void SessionManager::suspend(std::auto_ptr<SessionState> session) { active.erase(session->getId()); session->suspend(); session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC); + if (session->mgmtObject.get() != 0) + session->mgmtObject->set_expireTime ((uint64_t) Duration (session->expiry)); suspended.push_back(session.release()); // In expiry order eraseExpired(); } diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index bea1eaedcf..a75b32cbb5 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -31,6 +31,10 @@ namespace broker { using namespace framing; using sys::Mutex; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; void SessionState::handleIn(AMQFrame& f) { semanticHandler->handle(f); } @@ -50,11 +54,34 @@ SessionState::SessionState( // TODO aconway 2007-09-20: SessionManager may add plugin // handlers to the chain. getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState()); + + Manageable* parent = broker.GetVhostObject (); + + if (parent != 0) + { + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + + if (agent.get () != 0) + { + mgmtObject = management::Session::shared_ptr + (new management::Session (this, parent, id.str ())); + mgmtObject->set_attached (1); + mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId()); + mgmtObject->set_channelId (h.getChannel()); + mgmtObject->set_detachedLifespan (getTimeout()); + agent->addObject (mgmtObject); + } + } } SessionState::~SessionState() { // Remove ID from active session list. factory.erase(getId()); + + if (mgmtObject.get () != 0) + { + mgmtObject->resourceDestroy (); + } } SessionHandler* SessionState::getHandler() { @@ -75,12 +102,22 @@ void SessionState::detach() { getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState()); Mutex::ScopedLock l(lock); handler = 0; + if (mgmtObject.get() != 0) + { + mgmtObject->set_attached (0); + } } void SessionState::attach(SessionHandler& h) { { Mutex::ScopedLock l(lock); handler = &h; + if (mgmtObject.get() != 0) + { + mgmtObject->set_attached (1); + mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId()); + mgmtObject->set_channelId (h.getChannel()); + } } h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState()); } @@ -96,4 +133,42 @@ void SessionState::activateOutput() //if not attached, it can simply ignore the callback, else pass it //on to the connection +ManagementObject::shared_ptr SessionState::GetManagementObject (void) const +{ + return dynamic_pointer_cast<ManagementObject> (mgmtObject); +} + +Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, + Args& /*args*/) +{ + Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; + + switch (methodId) + { + case management::Session::METHOD_DETACH : + if (handler != 0) + { + handler->localSuspend (); + } + status = Manageable::STATUS_OK; + break; + + case management::Session::METHOD_CLOSE : + if (handler != 0) + { + handler->getConnection().closeChannel(handler->getChannel()); + } + status = Manageable::STATUS_OK; + break; + + case management::Session::METHOD_SOLICITACK : + case management::Session::METHOD_RESETLIFESPAN : + status = Manageable::STATUS_NOT_IMPLEMENTED; + break; + } + + return status; +} + + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index ac2a33442a..c8c32a046d 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -29,6 +29,8 @@ #include "qpid/sys/Mutex.h" #include "qpid/sys/OutputControl.h" #include "qpid/sys/Time.h" +#include "qpid/management/Manageable.h" +#include "qpid/management/Session.h" #include <boost/noncopyable.hpp> #include <boost/scoped_ptr.hpp> @@ -57,7 +59,8 @@ class Connection; */ class SessionState : public framing::SessionState, public framing::FrameHandler::InOutHandler, - public sys::OutputControl + public sys::OutputControl, + public management::Manageable { public: ~SessionState(); @@ -82,6 +85,11 @@ class SessionState : public framing::SessionState, /** OutputControl **/ void activateOutput(); + // Manageable entry points + management::ManagementObject::shared_ptr GetManagementObject (void) const; + management::Manageable::status_t + ManagementMethod (uint32_t methodId, management::Args& args); + protected: void handleIn(framing::AMQFrame&); void handleOut(framing::AMQFrame&); @@ -102,6 +110,7 @@ class SessionState : public framing::SessionState, framing::ProtocolVersion version; sys::Mutex lock; boost::scoped_ptr<SemanticHandler> semanticHandler; + management::Session::shared_ptr mgmtObject; friend class SessionManager; }; diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 90da74404b..d3c5d7c266 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -41,6 +41,8 @@ ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval) nextObjectId = uint64_t (qpid::sys::Duration (qpid::sys::now ())); } +ManagementAgent::~ManagementAgent () {} + void ManagementAgent::enableManagement (void) { enabled = 1; @@ -54,6 +56,16 @@ ManagementAgent::shared_ptr ManagementAgent::getAgent (void) return agent; } +void ManagementAgent::shutdown (void) +{ + if (agent.get () != 0) + { + agent->mExchange.reset (); + agent->dExchange.reset (); + agent.reset (); + } +} + void ManagementAgent::setExchange (broker::Exchange::shared_ptr _mexchange, broker::Exchange::shared_ptr _dexchange) { @@ -73,6 +85,8 @@ void ManagementAgent::addObject (ManagementObject::shared_ptr object) ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) : TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), agent(_agent) {} +ManagementAgent::Periodic::~Periodic () {} + void ManagementAgent::Periodic::fire () { agent.timer.add (intrusive_ptr<TimerTask> (new Periodic (agent, agent.interval))); @@ -93,12 +107,27 @@ void ManagementAgent::clientAdded (void) } } -void ManagementAgent::EncodeHeader (Buffer& buf) +void ManagementAgent::EncodeHeader (Buffer& buf, uint8_t opcode, uint8_t cls) { buf.putOctet ('A'); buf.putOctet ('M'); buf.putOctet ('0'); buf.putOctet ('1'); + buf.putOctet (opcode); + buf.putOctet (cls); +} + +bool ManagementAgent::CheckHeader (Buffer& buf, uint8_t *opcode, uint8_t *cls) +{ + uint8_t h1 = buf.getOctet (); + uint8_t h2 = buf.getOctet (); + uint8_t h3 = buf.getOctet (); + uint8_t h4 = buf.getOctet (); + + *opcode = buf.getOctet (); + *cls = buf.getOctet (); + + return h1 == 'A' && h2 == 'M' && h3 == '0' && h4 == '1'; } void ManagementAgent::SendBuffer (Buffer& buf, @@ -112,9 +141,6 @@ void ManagementAgent::SendBuffer (Buffer& buf, AMQFrame header (in_place<AMQHeaderBody>()); AMQFrame content(in_place<AMQContentBody>()); - QPID_LOG (debug, "ManagementAgent::SendBuffer - key=" - << routingKey << " len=" << length); - content.castBody<AMQContentBody>()->decode(buf, length); method.setEof (false); @@ -156,9 +182,7 @@ void ManagementAgent::PeriodicProcessing (void) if (object->getSchemaNeeded ()) { Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer); - msgBuffer.putOctet ('S'); // opcode = Schema Record - msgBuffer.putOctet (0); // content-class = N/A + EncodeHeader (msgBuffer, 'S'); object->writeSchema (msgBuffer); contentSize = BUFSIZE - msgBuffer.available (); @@ -170,9 +194,7 @@ void ManagementAgent::PeriodicProcessing (void) if (object->getConfigChanged () || object->isDeleted ()) { Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer); - msgBuffer.putOctet ('C'); // opcode = Content Record - msgBuffer.putOctet ('C'); // content-class = Configuration + EncodeHeader (msgBuffer, 'C', 'C'); object->writeConfig (msgBuffer); contentSize = BUFSIZE - msgBuffer.available (); @@ -184,9 +206,7 @@ void ManagementAgent::PeriodicProcessing (void) if (object->getInstChanged ()) { Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer); - msgBuffer.putOctet ('C'); // opcode = Content Record - msgBuffer.putOctet ('I'); // content-class = Instrumentation + EncodeHeader (msgBuffer, 'C', 'I'); object->writeInstrumentation (msgBuffer); contentSize = BUFSIZE - msgBuffer.available (); @@ -251,9 +271,6 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable, start = pos + 1; string methodName = routingKey.substr (start, routingKey.length () - start); - QPID_LOG (debug, "Dispatch package: " << packageName << ", class: " - << className << ", method: " << methodName); - contentSize = msg.encodedContentSize (); if (contentSize < 8 || contentSize > 65536) return; @@ -263,19 +280,41 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable, Buffer inBuffer (inMem, contentSize); Buffer outBuffer (outMem, 4096); uint32_t outLen; + uint8_t opcode, unused; msg.encodeContent (inBuffer); inBuffer.reset (); - uint32_t methodId = inBuffer.getLong (); - uint64_t objId = inBuffer.getLongLong (); - string replyTo; + if (!CheckHeader (inBuffer, &opcode, &unused)) + { + QPID_LOG (debug, " Invalid content header"); + return; + } + + if (opcode != 'M') + { + QPID_LOG (debug, " Unexpected opcode " << opcode); + return; + } - inBuffer.getShortString (replyTo); + uint32_t methodId = inBuffer.getLong (); + uint64_t objId = inBuffer.getLongLong (); + string replyToKey; - QPID_LOG (debug, " len = " << contentSize << ", methodId = " << - methodId << ", objId = " << objId); + const framing::MessageProperties* p = + msg.getFrames().getHeaders()->get<framing::MessageProperties>(); + if (p && p->hasReplyTo()) + { + const framing::ReplyTo& rt = p->getReplyTo (); + replyToKey = rt.getRoutingKey (); + } + else + { + QPID_LOG (debug, " Reply-to missing"); + return; + } + EncodeHeader (outBuffer, 'R'); outBuffer.putLong (methodId); ManagementObjectMap::iterator iter = managementObjects.find (objId); @@ -291,7 +330,7 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable, outLen = 4096 - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyTo); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); free (inMem); } diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index a4f10632da..36ba1f0542 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -40,10 +40,13 @@ class ManagementAgent public: + virtual ~ManagementAgent (); + typedef boost::shared_ptr<ManagementAgent> shared_ptr; static void enableManagement (void); static shared_ptr getAgent (void); + static void shutdown (void); void setInterval (uint16_t _interval) { interval = _interval; } void setExchange (broker::Exchange::shared_ptr mgmtExchange, @@ -61,7 +64,7 @@ class ManagementAgent ManagementAgent& agent; Periodic (ManagementAgent& agent, uint32_t seconds); - ~Periodic () {} + virtual ~Periodic (); void fire (); }; @@ -77,7 +80,8 @@ class ManagementAgent uint64_t nextObjectId; void PeriodicProcessing (void); - void EncodeHeader (qpid::framing::Buffer& buf); + void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint8_t cls = 0); + bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint8_t *cls); void SendBuffer (qpid::framing::Buffer& buf, uint32_t length, broker::Exchange::shared_ptr exchange, diff --git a/cpp/src/qpid/management/ManagementExchange.h b/cpp/src/qpid/management/ManagementExchange.h index 0fcd65b092..1a79482c9d 100644 --- a/cpp/src/qpid/management/ManagementExchange.h +++ b/cpp/src/qpid/management/ManagementExchange.h @@ -40,7 +40,7 @@ class ManagementExchange : public virtual TopicExchange const qpid::framing::FieldTable& _args, Manageable* _parent = 0); - virtual std::string getType() const { return typeName; } + virtual std::string getType() const { return typeName; } virtual bool bind (Queue::shared_ptr queue, const string& routingKey, diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h index ff136c397d..a32055721d 100644 --- a/cpp/src/qpid/management/ManagementObject.h +++ b/cpp/src/qpid/management/ManagementObject.h @@ -46,12 +46,16 @@ class ManagementObject Manageable* coreObject; std::string className; - static const uint8_t TYPE_U8 = 1; - static const uint8_t TYPE_U16 = 2; - static const uint8_t TYPE_U32 = 3; - static const uint8_t TYPE_U64 = 4; - static const uint8_t TYPE_SSTR = 6; - static const uint8_t TYPE_LSTR = 7; + static const uint8_t TYPE_U8 = 1; + static const uint8_t TYPE_U16 = 2; + static const uint8_t TYPE_U32 = 3; + static const uint8_t TYPE_U64 = 4; + static const uint8_t TYPE_SSTR = 6; + static const uint8_t TYPE_LSTR = 7; + static const uint8_t TYPE_ABSTIME = 8; + static const uint8_t TYPE_DELTATIME = 9; + static const uint8_t TYPE_REF = 10; + static const uint8_t TYPE_BOOL = 11; static const uint8_t ACCESS_RC = 1; static const uint8_t ACCESS_RW = 2; diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index 51ec7f718a..9ca083354b 100644 --- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -131,7 +131,7 @@ public: void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionInputHandlerFactory* f) { AsynchIOHandler* async = new AsynchIOHandler; - ConnectionInputHandler* handler = f->create(async); + ConnectionInputHandler* handler = f->create(async, s); AsynchIO* aio = new AsynchIO(s, boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), boost::bind(&AsynchIOHandler::eof, async, _1), diff --git a/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h b/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h index af7d411928..4a90f8b736 100644 --- a/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h +++ b/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h @@ -22,6 +22,7 @@ #define _ConnectionInputHandlerFactory_ #include <boost/noncopyable.hpp> +#include "qpid/sys/Socket.h" namespace qpid { namespace sys { @@ -36,7 +37,7 @@ class ConnectionInputHandler; class ConnectionInputHandlerFactory : private boost::noncopyable { public: - virtual ConnectionInputHandler* create(ConnectionOutputHandler* ctxt) = 0; + virtual ConnectionInputHandler* create(ConnectionOutputHandler* ctxt, const Socket& s) = 0; virtual ~ConnectionInputHandlerFactory(){} }; |