diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2008-01-02 15:56:20 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2008-01-02 15:56:20 +0000 |
commit | e336559c82f22ecd0a013b8ea787bb4946ab2fdc (patch) | |
tree | 7ffb2bc4ff702ac50e872529b6a07e041a88df0a | |
parent | 4e1fabd16161480d352d3813a6e41c5a97ab8c57 (diff) | |
download | qpid-python-e336559c82f22ecd0a013b8ea787bb4946ab2fdc.tar.gz |
patch-715 (tross)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@608135 13f79535-47bb-0310-9956-ffa450edef68
22 files changed, 446 insertions, 128 deletions
diff --git a/qpid/cpp/managementgen/main.py b/qpid/cpp/managementgen/main.py index 2f70639482..ddf18ef873 100755 --- a/qpid/cpp/managementgen/main.py +++ b/qpid/cpp/managementgen/main.py @@ -41,8 +41,7 @@ parser.add_option ("-m", "--makefile", dest="makefile", metavar="FILE", if opts.outdir == None or \ opts.typefile == None or \ opts.schemafile == None or \ - opts.templatedir == None or \ - opts.makefile == None: + opts.templatedir == None: parser.error ("Incorrect options, see --help for help") gen = Generator (opts.outdir, opts.templatedir) @@ -51,4 +50,6 @@ schema = PackageSchema (opts.typefile, opts.schemafile) gen.makeClassFiles ("Class.h", schema) gen.makeClassFiles ("Class.cpp", schema) gen.makeMethodFiles ("Args.h", schema) -gen.makeMakeFile (opts.makefile) + +if opts.makefile != None: + gen.makeMakeFile (opts.makefile) diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index d83eabee06..5607d22498 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -231,6 +231,7 @@ void Broker::run() { void Broker::shutdown() { if (acceptor) acceptor->shutdown(); + ManagementAgent::shutdown (); } Broker::~Broker() { @@ -270,6 +271,11 @@ ManagementObject::shared_ptr Broker::GetManagementObject(void) const return dynamic_pointer_cast<ManagementObject> (mgmtObject); } +Manageable* Broker::GetVhostObject(void) const +{ + return vhostObject.get(); +} + Manageable::status_t Broker::ManagementMethod (uint32_t methodId, Args& /*_args*/) { diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 1c1c303be8..aaa0be72bb 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -119,6 +119,7 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M SessionManager& getSessionManager() { return sessionManager; } management::ManagementObject::shared_ptr GetManagementObject (void) const; + management::Manageable* GetVhostObject (void) const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args); diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index f2cd1c11e4..fbe018e8ae 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -26,6 +26,7 @@ #include "qpid/log/Statement.h" #include "qpid/ptr_map.h" #include "qpid/framing/AMQP_ClientProxy.h" +#include "qpid/management/ManagementAgent.h" #include <boost/bind.hpp> @@ -38,11 +39,15 @@ using namespace qpid::sys; using namespace qpid::framing; using namespace qpid::sys; using namespace qpid::ptr_map; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; namespace qpid { namespace broker { -Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) : + Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const Socket& s) : broker(broker_), outputTasks(*out_), out(out_), @@ -50,15 +55,45 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) : heartbeat(0), client(0), stagingThreshold(broker.getStagingThreshold()), - adapter(*this) -{} + adapter(*this), + mgmtClosing(0) +{ + Manageable* parent = broker.GetVhostObject (); + + if (parent != 0) + { + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + + if (agent.get () != 0) + { + mgmtObject = management::Client::shared_ptr + (new management::Client (this, parent, s.getPeerAddress ())); + agent->addObject (mgmtObject); + } + } +} + +Connection::~Connection () +{ + if (mgmtObject.get () != 0) + mgmtObject->resourceDestroy (); +} void Connection::received(framing::AMQFrame& frame){ + if (mgmtClosing) + close (403, "Closed by Management Request", 0, 0); + if (frame.getChannel() == 0) { adapter.handle(frame); } else { getChannel(frame.getChannel()).in(frame); } + + if (mgmtObject.get () != 0) + { + mgmtObject->inc_framesFromClient (); + mgmtObject->inc_bytesFromClient (frame.size ()); + } } void Connection::close( @@ -122,5 +157,30 @@ SessionHandler& Connection::getChannel(ChannelId id) { return *get_pointer(i); } +ManagementObject::shared_ptr Connection::GetManagementObject (void) const +{ + return dynamic_pointer_cast<ManagementObject> (mgmtObject); +} + +Manageable::status_t Connection::ManagementMethod (uint32_t methodId, + Args& /*args*/) +{ + Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; + + QPID_LOG (debug, "Connection::ManagementMethod [id=" << methodId << "]"); + + switch (methodId) + { + case management::Client::METHOD_CLOSE : + mgmtClosing = 1; + mgmtObject->set_closing (1); + status = Manageable::STATUS_OK; + break; + } + + return status; +} + + }} diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index 395aa7b0bd..cbe7addec2 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -35,9 +35,12 @@ #include "qpid/sys/TimeoutHandler.h" #include "qpid/framing/ProtocolVersion.h" #include "Broker.h" +#include "qpid/sys/Socket.h" #include "qpid/Exception.h" #include "ConnectionHandler.h" #include "SessionHandler.h" +#include "qpid/management/Manageable.h" +#include "qpid/management/Client.h" #include <boost/ptr_container/ptr_map.hpp> @@ -45,10 +48,12 @@ namespace qpid { namespace broker { class Connection : public sys::ConnectionInputHandler, - public ConnectionToken + public ConnectionToken, + public management::Manageable { public: - Connection(sys::ConnectionOutputHandler* out, Broker& broker); + Connection(sys::ConnectionOutputHandler* out, Broker& broker, const sys::Socket& s); + ~Connection (); /** Get the SessionHandler for channel. Create if it does not already exist */ SessionHandler& getChannel(framing::ChannelId channel); @@ -85,6 +90,11 @@ class Connection : public sys::ConnectionInputHandler, void closeChannel(framing::ChannelId channel); + // Manageable entry points + management::ManagementObject::shared_ptr GetManagementObject (void) const; + management::Manageable::status_t + ManagementMethod (uint32_t methodId, management::Args& args); + private: typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; @@ -97,6 +107,8 @@ class Connection : public sys::ConnectionInputHandler, framing::AMQP_ClientProxy::Connection* client; uint64_t stagingThreshold; ConnectionHandler adapter; + management::Client::shared_ptr mgmtObject; + bool mgmtClosing; }; }} diff --git a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp index 4429a2583c..c925c4be2f 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp @@ -35,9 +35,10 @@ ConnectionFactory::~ConnectionFactory() } qpid::sys::ConnectionInputHandler* -ConnectionFactory::create(qpid::sys::ConnectionOutputHandler* out) +ConnectionFactory::create(qpid::sys::ConnectionOutputHandler* out, + const qpid::sys::Socket& s) { - return new Connection(out, broker); + return new Connection(out, broker, s); } }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/ConnectionFactory.h b/qpid/cpp/src/qpid/broker/ConnectionFactory.h index 6af3dda7a6..23fba5c1ab 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionFactory.h +++ b/qpid/cpp/src/qpid/broker/ConnectionFactory.h @@ -32,8 +32,8 @@ class ConnectionFactory : public qpid::sys::ConnectionInputHandlerFactory public: ConnectionFactory(Broker& b); - virtual qpid::sys::ConnectionInputHandler* create( - qpid::sys::ConnectionOutputHandler* ctxt); + virtual qpid::sys::ConnectionInputHandler* create + (qpid::sys::ConnectionOutputHandler* ctxt, const sys::Socket& s); virtual ~ConnectionFactory(); diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index b68a7db8b0..f753e7ef75 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -616,10 +616,6 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, purge (); status = Manageable::STATUS_OK; break; - - case management::Queue::METHOD_INCREASEJOURNALSIZE : - status = Manageable::STATUS_NOT_IMPLEMENTED; - break; } return status; diff --git a/qpid/cpp/src/qpid/broker/SessionManager.cpp b/qpid/cpp/src/qpid/broker/SessionManager.cpp index f12ebc6db1..1d5f9ebada 100644 --- a/qpid/cpp/src/qpid/broker/SessionManager.cpp +++ b/qpid/cpp/src/qpid/broker/SessionManager.cpp @@ -58,6 +58,8 @@ void SessionManager::suspend(std::auto_ptr<SessionState> session) { active.erase(session->getId()); session->suspend(); session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC); + if (session->mgmtObject.get() != 0) + session->mgmtObject->set_expireTime ((uint64_t) Duration (session->expiry)); suspended.push_back(session.release()); // In expiry order eraseExpired(); } diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index bea1eaedcf..a75b32cbb5 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -31,6 +31,10 @@ namespace broker { using namespace framing; using sys::Mutex; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; void SessionState::handleIn(AMQFrame& f) { semanticHandler->handle(f); } @@ -50,11 +54,34 @@ SessionState::SessionState( // TODO aconway 2007-09-20: SessionManager may add plugin // handlers to the chain. getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState()); + + Manageable* parent = broker.GetVhostObject (); + + if (parent != 0) + { + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + + if (agent.get () != 0) + { + mgmtObject = management::Session::shared_ptr + (new management::Session (this, parent, id.str ())); + mgmtObject->set_attached (1); + mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId()); + mgmtObject->set_channelId (h.getChannel()); + mgmtObject->set_detachedLifespan (getTimeout()); + agent->addObject (mgmtObject); + } + } } SessionState::~SessionState() { // Remove ID from active session list. factory.erase(getId()); + + if (mgmtObject.get () != 0) + { + mgmtObject->resourceDestroy (); + } } SessionHandler* SessionState::getHandler() { @@ -75,12 +102,22 @@ void SessionState::detach() { getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState()); Mutex::ScopedLock l(lock); handler = 0; + if (mgmtObject.get() != 0) + { + mgmtObject->set_attached (0); + } } void SessionState::attach(SessionHandler& h) { { Mutex::ScopedLock l(lock); handler = &h; + if (mgmtObject.get() != 0) + { + mgmtObject->set_attached (1); + mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId()); + mgmtObject->set_channelId (h.getChannel()); + } } h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState()); } @@ -96,4 +133,42 @@ void SessionState::activateOutput() //if not attached, it can simply ignore the callback, else pass it //on to the connection +ManagementObject::shared_ptr SessionState::GetManagementObject (void) const +{ + return dynamic_pointer_cast<ManagementObject> (mgmtObject); +} + +Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, + Args& /*args*/) +{ + Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; + + switch (methodId) + { + case management::Session::METHOD_DETACH : + if (handler != 0) + { + handler->localSuspend (); + } + status = Manageable::STATUS_OK; + break; + + case management::Session::METHOD_CLOSE : + if (handler != 0) + { + handler->getConnection().closeChannel(handler->getChannel()); + } + status = Manageable::STATUS_OK; + break; + + case management::Session::METHOD_SOLICITACK : + case management::Session::METHOD_RESETLIFESPAN : + status = Manageable::STATUS_NOT_IMPLEMENTED; + break; + } + + return status; +} + + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index ac2a33442a..c8c32a046d 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -29,6 +29,8 @@ #include "qpid/sys/Mutex.h" #include "qpid/sys/OutputControl.h" #include "qpid/sys/Time.h" +#include "qpid/management/Manageable.h" +#include "qpid/management/Session.h" #include <boost/noncopyable.hpp> #include <boost/scoped_ptr.hpp> @@ -57,7 +59,8 @@ class Connection; */ class SessionState : public framing::SessionState, public framing::FrameHandler::InOutHandler, - public sys::OutputControl + public sys::OutputControl, + public management::Manageable { public: ~SessionState(); @@ -82,6 +85,11 @@ class SessionState : public framing::SessionState, /** OutputControl **/ void activateOutput(); + // Manageable entry points + management::ManagementObject::shared_ptr GetManagementObject (void) const; + management::Manageable::status_t + ManagementMethod (uint32_t methodId, management::Args& args); + protected: void handleIn(framing::AMQFrame&); void handleOut(framing::AMQFrame&); @@ -102,6 +110,7 @@ class SessionState : public framing::SessionState, framing::ProtocolVersion version; sys::Mutex lock; boost::scoped_ptr<SemanticHandler> semanticHandler; + management::Session::shared_ptr mgmtObject; friend class SessionManager; }; diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 90da74404b..d3c5d7c266 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -41,6 +41,8 @@ ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval) nextObjectId = uint64_t (qpid::sys::Duration (qpid::sys::now ())); } +ManagementAgent::~ManagementAgent () {} + void ManagementAgent::enableManagement (void) { enabled = 1; @@ -54,6 +56,16 @@ ManagementAgent::shared_ptr ManagementAgent::getAgent (void) return agent; } +void ManagementAgent::shutdown (void) +{ + if (agent.get () != 0) + { + agent->mExchange.reset (); + agent->dExchange.reset (); + agent.reset (); + } +} + void ManagementAgent::setExchange (broker::Exchange::shared_ptr _mexchange, broker::Exchange::shared_ptr _dexchange) { @@ -73,6 +85,8 @@ void ManagementAgent::addObject (ManagementObject::shared_ptr object) ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) : TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), agent(_agent) {} +ManagementAgent::Periodic::~Periodic () {} + void ManagementAgent::Periodic::fire () { agent.timer.add (intrusive_ptr<TimerTask> (new Periodic (agent, agent.interval))); @@ -93,12 +107,27 @@ void ManagementAgent::clientAdded (void) } } -void ManagementAgent::EncodeHeader (Buffer& buf) +void ManagementAgent::EncodeHeader (Buffer& buf, uint8_t opcode, uint8_t cls) { buf.putOctet ('A'); buf.putOctet ('M'); buf.putOctet ('0'); buf.putOctet ('1'); + buf.putOctet (opcode); + buf.putOctet (cls); +} + +bool ManagementAgent::CheckHeader (Buffer& buf, uint8_t *opcode, uint8_t *cls) +{ + uint8_t h1 = buf.getOctet (); + uint8_t h2 = buf.getOctet (); + uint8_t h3 = buf.getOctet (); + uint8_t h4 = buf.getOctet (); + + *opcode = buf.getOctet (); + *cls = buf.getOctet (); + + return h1 == 'A' && h2 == 'M' && h3 == '0' && h4 == '1'; } void ManagementAgent::SendBuffer (Buffer& buf, @@ -112,9 +141,6 @@ void ManagementAgent::SendBuffer (Buffer& buf, AMQFrame header (in_place<AMQHeaderBody>()); AMQFrame content(in_place<AMQContentBody>()); - QPID_LOG (debug, "ManagementAgent::SendBuffer - key=" - << routingKey << " len=" << length); - content.castBody<AMQContentBody>()->decode(buf, length); method.setEof (false); @@ -156,9 +182,7 @@ void ManagementAgent::PeriodicProcessing (void) if (object->getSchemaNeeded ()) { Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer); - msgBuffer.putOctet ('S'); // opcode = Schema Record - msgBuffer.putOctet (0); // content-class = N/A + EncodeHeader (msgBuffer, 'S'); object->writeSchema (msgBuffer); contentSize = BUFSIZE - msgBuffer.available (); @@ -170,9 +194,7 @@ void ManagementAgent::PeriodicProcessing (void) if (object->getConfigChanged () || object->isDeleted ()) { Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer); - msgBuffer.putOctet ('C'); // opcode = Content Record - msgBuffer.putOctet ('C'); // content-class = Configuration + EncodeHeader (msgBuffer, 'C', 'C'); object->writeConfig (msgBuffer); contentSize = BUFSIZE - msgBuffer.available (); @@ -184,9 +206,7 @@ void ManagementAgent::PeriodicProcessing (void) if (object->getInstChanged ()) { Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer); - msgBuffer.putOctet ('C'); // opcode = Content Record - msgBuffer.putOctet ('I'); // content-class = Instrumentation + EncodeHeader (msgBuffer, 'C', 'I'); object->writeInstrumentation (msgBuffer); contentSize = BUFSIZE - msgBuffer.available (); @@ -251,9 +271,6 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable, start = pos + 1; string methodName = routingKey.substr (start, routingKey.length () - start); - QPID_LOG (debug, "Dispatch package: " << packageName << ", class: " - << className << ", method: " << methodName); - contentSize = msg.encodedContentSize (); if (contentSize < 8 || contentSize > 65536) return; @@ -263,19 +280,41 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable, Buffer inBuffer (inMem, contentSize); Buffer outBuffer (outMem, 4096); uint32_t outLen; + uint8_t opcode, unused; msg.encodeContent (inBuffer); inBuffer.reset (); - uint32_t methodId = inBuffer.getLong (); - uint64_t objId = inBuffer.getLongLong (); - string replyTo; + if (!CheckHeader (inBuffer, &opcode, &unused)) + { + QPID_LOG (debug, " Invalid content header"); + return; + } + + if (opcode != 'M') + { + QPID_LOG (debug, " Unexpected opcode " << opcode); + return; + } - inBuffer.getShortString (replyTo); + uint32_t methodId = inBuffer.getLong (); + uint64_t objId = inBuffer.getLongLong (); + string replyToKey; - QPID_LOG (debug, " len = " << contentSize << ", methodId = " << - methodId << ", objId = " << objId); + const framing::MessageProperties* p = + msg.getFrames().getHeaders()->get<framing::MessageProperties>(); + if (p && p->hasReplyTo()) + { + const framing::ReplyTo& rt = p->getReplyTo (); + replyToKey = rt.getRoutingKey (); + } + else + { + QPID_LOG (debug, " Reply-to missing"); + return; + } + EncodeHeader (outBuffer, 'R'); outBuffer.putLong (methodId); ManagementObjectMap::iterator iter = managementObjects.find (objId); @@ -291,7 +330,7 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable, outLen = 4096 - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyTo); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); free (inMem); } diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index a4f10632da..36ba1f0542 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -40,10 +40,13 @@ class ManagementAgent public: + virtual ~ManagementAgent (); + typedef boost::shared_ptr<ManagementAgent> shared_ptr; static void enableManagement (void); static shared_ptr getAgent (void); + static void shutdown (void); void setInterval (uint16_t _interval) { interval = _interval; } void setExchange (broker::Exchange::shared_ptr mgmtExchange, @@ -61,7 +64,7 @@ class ManagementAgent ManagementAgent& agent; Periodic (ManagementAgent& agent, uint32_t seconds); - ~Periodic () {} + virtual ~Periodic (); void fire (); }; @@ -77,7 +80,8 @@ class ManagementAgent uint64_t nextObjectId; void PeriodicProcessing (void); - void EncodeHeader (qpid::framing::Buffer& buf); + void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint8_t cls = 0); + bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint8_t *cls); void SendBuffer (qpid::framing::Buffer& buf, uint32_t length, broker::Exchange::shared_ptr exchange, diff --git a/qpid/cpp/src/qpid/management/ManagementExchange.h b/qpid/cpp/src/qpid/management/ManagementExchange.h index 0fcd65b092..1a79482c9d 100644 --- a/qpid/cpp/src/qpid/management/ManagementExchange.h +++ b/qpid/cpp/src/qpid/management/ManagementExchange.h @@ -40,7 +40,7 @@ class ManagementExchange : public virtual TopicExchange const qpid::framing::FieldTable& _args, Manageable* _parent = 0); - virtual std::string getType() const { return typeName; } + virtual std::string getType() const { return typeName; } virtual bool bind (Queue::shared_ptr queue, const string& routingKey, diff --git a/qpid/cpp/src/qpid/management/ManagementObject.h b/qpid/cpp/src/qpid/management/ManagementObject.h index ff136c397d..a32055721d 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.h +++ b/qpid/cpp/src/qpid/management/ManagementObject.h @@ -46,12 +46,16 @@ class ManagementObject Manageable* coreObject; std::string className; - static const uint8_t TYPE_U8 = 1; - static const uint8_t TYPE_U16 = 2; - static const uint8_t TYPE_U32 = 3; - static const uint8_t TYPE_U64 = 4; - static const uint8_t TYPE_SSTR = 6; - static const uint8_t TYPE_LSTR = 7; + static const uint8_t TYPE_U8 = 1; + static const uint8_t TYPE_U16 = 2; + static const uint8_t TYPE_U32 = 3; + static const uint8_t TYPE_U64 = 4; + static const uint8_t TYPE_SSTR = 6; + static const uint8_t TYPE_LSTR = 7; + static const uint8_t TYPE_ABSTIME = 8; + static const uint8_t TYPE_DELTATIME = 9; + static const uint8_t TYPE_REF = 10; + static const uint8_t TYPE_BOOL = 11; static const uint8_t ACCESS_RC = 1; static const uint8_t ACCESS_RW = 2; diff --git a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index 51ec7f718a..9ca083354b 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -131,7 +131,7 @@ public: void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionInputHandlerFactory* f) { AsynchIOHandler* async = new AsynchIOHandler; - ConnectionInputHandler* handler = f->create(async); + ConnectionInputHandler* handler = f->create(async, s); AsynchIO* aio = new AsynchIO(s, boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), boost::bind(&AsynchIOHandler::eof, async, _1), diff --git a/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h b/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h index af7d411928..4a90f8b736 100644 --- a/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h +++ b/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h @@ -22,6 +22,7 @@ #define _ConnectionInputHandlerFactory_ #include <boost/noncopyable.hpp> +#include "qpid/sys/Socket.h" namespace qpid { namespace sys { @@ -36,7 +37,7 @@ class ConnectionInputHandler; class ConnectionInputHandlerFactory : private boost::noncopyable { public: - virtual ConnectionInputHandler* create(ConnectionOutputHandler* ctxt) = 0; + virtual ConnectionInputHandler* create(ConnectionOutputHandler* ctxt, const Socket& s) = 0; virtual ~ConnectionInputHandlerFactory(){} }; diff --git a/qpid/python/mgmt-cli/main.py b/qpid/python/mgmt-cli/main.py index 4d7c03d1f2..76e1f25c14 100755 --- a/qpid/python/mgmt-cli/main.py +++ b/qpid/python/mgmt-cli/main.py @@ -49,6 +49,7 @@ class Mcli (Cmd): print " list <className> - Print list of objects of the specified class" print " list <className> all - Print contents of all objects of specified class" print " list <className> active - Print contents of all non-deleted objects of specified class" + print " list <list-of-IDs> - Print contents of one or more objects (infer className)" print " list <className> <list-of-IDs> - Print contents of one or more objects" print " list is space-separated, ranges may be specified (i.e. 1004-1010)" print " call <ID> <methodName> [<args>] - Invoke a method on an object" diff --git a/qpid/python/mgmt-cli/managementdata.py b/qpid/python/mgmt-cli/managementdata.py index 2adb962b54..419cbd13c0 100644 --- a/qpid/python/mgmt-cli/managementdata.py +++ b/qpid/python/mgmt-cli/managementdata.py @@ -118,6 +118,7 @@ class ManagementData: self.schema = {} self.baseId = 0 self.disp = disp + self.lastUnit = None self.methodSeq = 1 self.methodsPending = {} self.broker.start () @@ -125,6 +126,44 @@ class ManagementData: def close (self): self.broker.stop () + def refName (self, oid): + if oid == 0: + return "NULL" + return str (oid - self.baseId) + + def valueDisplay (self, className, key, value): + for kind in range (2): + schema = self.schema[className][kind] + for item in schema: + if item[0] == key: + typecode = item[1] + unit = item[2] + if typecode >= 1 and typecode <= 5: # numerics + if unit == None or unit == self.lastUnit: + return str (value) + else: + self.lastUnit = unit + suffix = "" + if value != 1: + suffix = "s" + return str (value) + " " + unit + suffix + elif typecode == 6 or typecode == 7: # strings + return value + elif typecode == 8: + if value == 0: + return "--" + return self.disp.timestamp (value) + elif typecode == 9: + return str (value) + elif typecode == 10: + return self.refName (value) + elif typecode == 11: + if value == 0: + return "False" + else: + return "True" + return "*type-error*" + def getObjIndex (self, className, config): """ Concatenate the values from index columns to form a unique object name """ result = "" @@ -135,9 +174,7 @@ class ManagementData: result = result + "." for key,val in config: if key == item[0]: - if key.find ("Ref") != -1: - val = val - self.baseId - result = result + str (val) + result = result + self.valueDisplay (className, key, val) return result def classCompletions (self, prefix): @@ -168,6 +205,14 @@ class ManagementData: return "short-string" elif typecode == 7: return "long-string" + elif typecode == 8: + return "abs-time" + elif typecode == 9: + return "delta-time" + elif typecode == 10: + return "reference" + elif typecode == 11: + return "boolean" else: raise ValueError ("Invalid type code: %d" % typecode) @@ -180,7 +225,7 @@ class ManagementData: elif code == 3: return "ReadOnly" else: - raise ValueErrir ("Invalid access code: %d" %code) + raise ValueError ("Invalid access code: %d" %code) def notNone (self, text): if text == None: @@ -188,6 +233,12 @@ class ManagementData: else: return text + def isOid (self, id): + for char in str (id): + if not char.isdigit () and not char == '-': + return False + return True + def listOfIds (self, className, tokens): """ Generate a tuple of object ids for a classname based on command tokens. """ list = [] @@ -202,13 +253,14 @@ class ManagementData: else: for token in tokens: - if token.find ("-") != -1: - ids = token.split("-", 2) - for id in range (int (ids[0]), int (ids[1]) + 1): - if self.getClassForId (long (id) + self.baseId) == className: - list.append (id) - else: - list.append (token) + if self.isOid (token): + if token.find ("-") != -1: + ids = token.split("-", 2) + for id in range (int (ids[0]), int (ids[1]) + 1): + if self.getClassForId (long (id) + self.baseId) == className: + list.append (id) + else: + list.append (token) list.sort () result = () @@ -258,7 +310,7 @@ class ManagementData: if ts[2] > 0: destroyTime = self.disp.timestamp (ts[2]) objIndex = self.getObjIndex (className, config) - row = (objId - self.baseId, createTime, destroyTime, objIndex) + row = (self.refName (objId), createTime, destroyTime, objIndex) rows.append (row) self.disp.table ("Objects of type %s" % className, ("ID", "Created", "Destroyed", "Index"), @@ -270,12 +322,26 @@ class ManagementData: """ Generate a display of object data for a particular class """ self.lock.acquire () try: - className = tokens[0] - if className not in self.tables: - print "Class not known: %s" % className - raise ValueError () - - userIds = self.listOfIds (className, tokens[1:]) + self.lastUnit = None + if self.isOid (tokens[0]): + if tokens[0].find ("-") != -1: + rootId = int (tokens[0][0:tokens[0].find ("-")]) + else: + rootId = int (tokens[0]) + + className = self.getClassForId (rootId + self.baseId) + remaining = tokens + if className == None: + print "Id not known: %d" % int (tokens[0]) + raise ValueError () + else: + className = tokens[0] + remaining = tokens[1:] + if className not in self.tables: + print "Class not known: %s" % className + raise ValueError () + + userIds = self.listOfIds (className, remaining) if len (userIds) == 0: print "No object IDs supplied" raise ValueError () @@ -286,36 +352,37 @@ class ManagementData: ids.append (long (id) + self.baseId) rows = [] + timestamp = None config = self.tables[className][ids[0]][1] for eIdx in range (len (config)): key = config[eIdx][0] if key != "id": - isRef = key.find ("Ref") == len (key) - 3 row = ("config", key) for id in ids: - value = self.tables[className][id][1][eIdx][1] - if isRef: - value = value - self.baseId - row = row + (value,) + if timestamp == None or \ + timestamp < self.tables[className][id][0][0]: + timestamp = self.tables[className][id][0][0] + (key, value) = self.tables[className][id][1][eIdx] + row = row + (self.valueDisplay (className, key, value),) rows.append (row) inst = self.tables[className][ids[0]][2] for eIdx in range (len (inst)): key = inst[eIdx][0] if key != "id": - isRef = key.find ("Ref") == len (key) - 3 row = ("inst", key) for id in ids: - value = self.tables[className][id][2][eIdx][1] - if isRef: - value = value - self.baseId - row = row + (value,) + (key, value) = self.tables[className][id][2][eIdx] + row = row + (self.valueDisplay (className, key, value),) rows.append (row) titleRow = ("Type", "Element") for id in ids: - titleRow = titleRow + (str (id - self.baseId),) - self.disp.table ("Object of type %s:" % className, titleRow, rows) + titleRow = titleRow + (self.refName (id),) + caption = "Object of type %s:" % className + if timestamp != None: + caption = caption + " (last sample time: " + self.disp.timestamp (timestamp) + ")" + self.disp.table (caption, titleRow, rows) except: pass @@ -418,6 +485,10 @@ class ManagementData: if className == None: raise ValueError () + if methodName not in self.schema[className][2]: + print "Method '%s' not valid for class '%s'" % (methodName, className) + raise ValueError () + schemaMethod = self.schema[className][2][methodName] if len (args) != len (schemaMethod[1]): print "Wrong number of method args: Need %d, Got %d" % (len (schemaMethod[1]), len (args)) @@ -431,17 +502,19 @@ class ManagementData: self.methodsPending[self.methodSeq] = methodName except: methodOk = False - print "Error in call syntax" self.lock.release () if methodOk: - self.broker.method (self.methodSeq, userOid + self.baseId, className, - methodName, namedArgs) +# try: + self.broker.method (self.methodSeq, userOid + self.baseId, className, + methodName, namedArgs) +# except ValueError, e: +# print "Error invoking method:", e def do_list (self, data): tokens = data.split () if len (tokens) == 0: self.listClasses () - elif len (tokens) == 1: + elif len (tokens) == 1 and not self.isOid (tokens[0]): self.listObjects (data) else: self.showObjects (tokens) diff --git a/qpid/python/qpid/management.py b/qpid/python/qpid/management.py index 1c8b3cd840..40de2a5298 100644 --- a/qpid/python/qpid/management.py +++ b/qpid/python/qpid/management.py @@ -77,6 +77,14 @@ class ManagementMetadata: codec.encode_shortstr (value) elif typecode == 7: codec.encode_longstr (value) + elif typecode == 8: # ABSTIME + codec.encode_longlong (long (value)) + elif typecode == 9: # DELTATIME + codec.encode_longlong (long (value)) + elif typecode == 10: # REF + codec.encode_longlong (long (value)) + elif typecode == 11: # BOOL + codec.encode_octet (int (value)) else: raise ValueError ("Invalid type code: %d" % typecode) @@ -95,6 +103,14 @@ class ManagementMetadata: data = codec.decode_shortstr () elif typecode == 7: data = codec.decode_longstr () + elif typecode == 8: # ABSTIME + data = codec.decode_longlong () + elif typecode == 9: # DELTATIME + data = codec.decode_longlong () + elif typecode == 10: # REF + data = codec.decode_longlong () + elif typecode == 11: # BOOL + data = codec.decode_octet () else: raise ValueError ("Invalid type code: %d" % typecode) return data @@ -236,10 +252,7 @@ class ManagementMetadata: elif cls == 'I': self.broker.inst_cb[1] (self.broker.inst_cb[0], className, row, timestamps) - def parse (self, codec): - opcode = chr (codec.decode_octet ()) - cls = chr (codec.decode_octet ()) - + def parse (self, codec, opcode, cls): if opcode == 'S': self.parseSchema (cls, codec) @@ -261,34 +274,53 @@ class ManagedBroker: mExchange = "qpid.management" dExchange = "amq.direct" + def setHeader (self, codec, opcode, cls = 0): + codec.encode_octet (ord ('A')) + codec.encode_octet (ord ('M')) + codec.encode_octet (ord ('0')) + codec.encode_octet (ord ('1')) + codec.encode_octet (opcode) + codec.encode_octet (cls) + def checkHeader (self, codec): octet = chr (codec.decode_octet ()) if octet != 'A': - return 0 + return None octet = chr (codec.decode_octet ()) if octet != 'M': - return 0 + return None octet = chr (codec.decode_octet ()) if octet != '0': - return 0 + return None octet = chr (codec.decode_octet ()) if octet != '1': - return 0 - return 1 + return None + opcode = chr (codec.decode_octet ()) + cls = chr (codec.decode_octet ()) + return (opcode, cls) def publish_cb (self, msg): codec = Codec (StringIO (msg.content.body), self.spec) - if self.checkHeader (codec) == 0: + hdr = self.checkHeader (codec) + if hdr == None: raise ValueError ("outer header invalid"); - self.metadata.parse (codec) + self.metadata.parse (codec, hdr[0], hdr[1]) msg.complete () def reply_cb (self, msg): codec = Codec (StringIO (msg.content.body), self.spec) - sequence = codec.decode_long () - status = codec.decode_long () + hdr = self.checkHeader (codec) + if hdr == None: + msg.complete () + return + if hdr[0] != 'R': + msg.complete () + return + + sequence = codec.decode_long () + status = codec.decode_long () sText = codec.decode_shortstr () data = self.sequenceManager.release (sequence) @@ -369,9 +401,10 @@ class ManagedBroker: methodName, args=None, packageName="qpid"): codec = Codec (StringIO (), self.spec); sequence = self.sequenceManager.reserve ((userSequence, className, methodName)) + self.setHeader (codec, ord ('M')) codec.encode_long (sequence) # Method sequence id codec.encode_longlong (objId) # ID of object - codec.encode_shortstr (self.rqname) # name of reply queue + #codec.encode_shortstr (self.rqname) # name of reply queue # Encode args according to schema if (className,'M') not in self.metadata.schema: @@ -402,6 +435,8 @@ class ManagedBroker: msg["content_type"] = "application/octet-stream" msg["routing_key"] = "method." + packageName + "." + className + "." + methodName msg["reply_to"] = self.spec.struct ("reply_to") + msg["reply_to"]["exchange_name"] = "amq.direct" + msg["reply_to"]["routing_key"] = self.rqname self.channel.message_transfer (destination="qpid.management", content=msg) def isConnected (self): @@ -414,7 +449,7 @@ class ManagedBroker: self.client = Client (self.host, self.port, self.spec) self.client.start ({"LOGIN": self.username, "PASSWORD": self.password}) self.channel = self.client.channel (1) - response = self.channel.session_open (detached_lifetime=10) + response = self.channel.session_open (detached_lifetime=300) self.qname = "mgmt-" + base64.urlsafe_b64encode (response.session_id) self.rqname = "reply-" + base64.urlsafe_b64encode (response.session_id) diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml index 2147122f0a..db28f098bf 100644 --- a/qpid/specs/management-schema.xml +++ b/qpid/specs/management-schema.xml @@ -65,26 +65,23 @@ <configElement name="stagingThreshold" type="uint32" access="RO" desc="Broker stages messages over this size to disk"/> <configElement name="storeLib" type="sstr" access="RO" desc="Name of persistent storage library"/> <configElement name="asyncStore" type="bool" access="RO" desc="Use async persistent store"/> - <configElement name="mgmtPubInterval" type="uint16" min="1" access="RW" unit="second" desc="Interval for management broadcasts"/> + <configElement name="mgmtPubInterval" type="uint16" access="RW" unit="second" min="1" desc="Interval for management broadcasts"/> <configElement name="initialDiskPageSize" type="uint32" access="RO" desc="Number of disk pages allocated for storage"/> <configElement name="initialPagesPerQueue" type="uint32" access="RO" desc="Number of disk pages allocated per queue"/> <configElement name="clusterName" type="sstr" access="RO" desc="Name of cluster this server is a member of, zero-length for standalone server"/> <configElement name="version" type="sstr" access="RO" desc="Running software version"/> - - <method name="joinCluster"> <arg name="clusterName" dir="I" type="sstr"/> </method> <method name="leaveCluster"/> - <method name="echo"> + <method name="echo" desc="Request a response to test the path to the management agent"> <arg name="sequence" dir="IO" type="uint32" default="0"/> <arg name="body" dir="IO" type="lstr" default=""/> </method> -<!--<method name="crash" desc="Temporary test method to crash the broker"/>--> </class> <!-- @@ -135,12 +132,11 @@ <instElement name="consumers" type="hilo32" unit="consumer" desc="Current consumers on queue"/> <instElement name="bindings" type="hilo32" unit="binding" desc="Current bindings"/> <instElement name="unackedMessages" type="hilo32" unit="message" desc="Messages consumed but not yet acked"/> + <instElement name="messageLatencyMin" type="uint64" unit="nanosecond" desc="Minimum broker latency through this queue"/> + <instElement name="messageLatencyMax" type="uint64" unit="nanosecond" desc="Maximum broker latency through this queue"/> + <instElement name="messageLatencyAvg" type="uint64" unit="nanosecond" desc="Average broker latency through this queue"/> - <method name="purge" desc="Discard all messages on queue"/> - <method name="increaseJournalSize" desc="Increase number of disk pages allocated for this queue"> - <arg name="pages" type="uint32" dir="I" desc="New total page allocation"/> - </method> - + <method name="purge" desc="Discard all messages on queue"/> </class> <!-- @@ -184,17 +180,16 @@ --> <class name="client"> <configElement name="vhostRef" type="objId" access="RC" index="y" parentRef="y"/> - <configElement name="ipAddr" type="uint32" access="RC" index="y"/> - <configElement name="port" type="uint16" access="RC" index="y"/> + <configElement name="address" type="sstr" access="RC" index="y"/> - <instElement name="authIdentity" type="sstr"/> - <instElement name="msgsProduced" type="count64"/> - <instElement name="msgsConsumed" type="count64"/> - <instElement name="bytesProduced" type="count64"/> - <instElement name="bytesConsumed" type="count64"/> + <instElement name="closing" type="bool" desc="This client is closing by management request"/> + <instElement name="authIdentity" type="sstr"/> + <instElement name="framesFromClient" type="count64"/> + <instElement name="framesToClient" type="count64"/> + <instElement name="bytesFromClient" type="count64"/> + <instElement name="bytesToClient" type="count64"/> <method name="close"/> - <method name="detach"/> </class> <!-- @@ -205,11 +200,12 @@ <class name="session"> <configElement name="vhostRef" type="objId" access="RC" index="y" parentRef="y"/> <configElement name="name" type="sstr" access="RC" index="y"/> + <configElement name="channelId" type="uint16" access="RO"/> <configElement name="clientRef" type="objId" access="RO"/> - <configElement name="detachedLifespan" type="uint32" access="RO"/> + <configElement name="detachedLifespan" type="uint32" access="RO" unit="second"/> <instElement name="attached" type="bool"/> - <instElement name="remainingLifespan" type="count32"/> + <instElement name="expireTime" type="absTime"/> <instElement name="framesOutstanding" type="count32"/> <method name="solicitAck"/> diff --git a/qpid/specs/management-types.xml b/qpid/specs/management-types.xml index 9f251f032b..0929d965a6 100644 --- a/qpid/specs/management-types.xml +++ b/qpid/specs/management-types.xml @@ -19,14 +19,16 @@ under the License. --> -<type name="objId" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/> -<type name="uint8" base="U8" cpp="uint8_t" encode="@.putOctet (#)" decode="# = @.getOctet ()" accessor="direct" init="0"/> -<type name="uint16" base="U16" cpp="uint16_t" encode="@.putShort (#)" decode="# = @.getShort ()" accessor="direct" init="0"/> -<type name="uint32" base="U32" cpp="uint32_t" encode="@.putLong (#)" decode="# = @.getLong ()" accessor="direct" init="0"/> -<type name="uint64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/> -<type name="bool" base="U8" cpp="bool" encode="@.putOctet (#?1:0)" decode="# = @.getOctet ()==1" accessor="direct" init="0"/> -<type name="sstr" base="SSTR" cpp="std::string" encode="@.putShortString (#)" decode="@.getShortString (#)" accessor="direct" init='""'/> -<type name="lstr" base="LSTR" cpp="std::string" encode="@.putLongString (#)" decode="@.getLongString (#)" accessor="direct" init='""'/> +<type name="objId" base="REF" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/> +<type name="uint8" base="U8" cpp="uint8_t" encode="@.putOctet (#)" decode="# = @.getOctet ()" accessor="direct" init="0"/> +<type name="uint16" base="U16" cpp="uint16_t" encode="@.putShort (#)" decode="# = @.getShort ()" accessor="direct" init="0"/> +<type name="uint32" base="U32" cpp="uint32_t" encode="@.putLong (#)" decode="# = @.getLong ()" accessor="direct" init="0"/> +<type name="uint64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/> +<type name="bool" base="BOOL" cpp="uint8_t" encode="@.putOctet (#?1:0)" decode="# = @.getOctet ()==1" accessor="direct" init="0"/> +<type name="sstr" base="SSTR" cpp="std::string" encode="@.putShortString (#)" decode="@.getShortString (#)" accessor="direct" init='""'/> +<type name="lstr" base="LSTR" cpp="std::string" encode="@.putLongString (#)" decode="@.getLongString (#)" accessor="direct" init='""'/> +<type name="absTime" base="ABSTIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/> +<type name="deltaTime" base="DELTATIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/> <type name="hilo8" base="U8" cpp="uint8_t" encode="@.putOctet (#)" decode="# = @.getOctet ()" style="wm" accessor="counter" init="0"/> <type name="hilo16" base="U16" cpp="uint16_t" encode="@.putShort (#)" decode="# = @.getShort ()" style="wm" accessor="counter" init="0"/> |