diff options
-rw-r--r-- | qpid/cpp/examples/qmf-agent/Makefile | 16 | ||||
-rw-r--r-- | qpid/cpp/examples/qmf-agent/example.cpp | 96 | ||||
-rwxr-xr-x | qpid/cpp/managementgen/schema.py | 6 | ||||
-rw-r--r-- | qpid/cpp/managementgen/templates/Class.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp | 97 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementBroker.cpp | 176 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementBroker.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementExchange.cpp | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementObject.h | 4 | ||||
-rwxr-xr-x | qpid/python/commands/qpid-tool | 4 | ||||
-rw-r--r-- | qpid/python/qpid/management.py | 15 | ||||
-rw-r--r-- | qpid/python/qpid/managementdata.py | 92 | ||||
-rw-r--r-- | qpid/specs/management-schema.xml | 9 |
15 files changed, 344 insertions, 207 deletions
diff --git a/qpid/cpp/examples/qmf-agent/Makefile b/qpid/cpp/examples/qmf-agent/Makefile index cc33dd1dc6..0b37f1a295 100644 --- a/qpid/cpp/examples/qmf-agent/Makefile +++ b/qpid/cpp/examples/qmf-agent/Makefile @@ -26,13 +26,13 @@ OUT_FILE = $(SRC_DIR)/qmf-agent CC = gcc LIB_DIR = $(QPID_DIR)/cpp/src/.libs CC_INCLUDES = -I$(SRC_DIR) -I$(QPID_DIR)/cpp/src -I$(QPID_DIR)/cpp/src/gen -I$(GEN_DIR) -CC_FLAGS = -g -O2 +CC_FLAGS = -g -O3 LD_FLAGS = -lqpidclient -lqpidcommon -L$(LIB_DIR) SPEC_DIR = $(QPID_DIR)/specs +TYPE_FILE = $(SPEC_DIR)/management-types.xml MGEN_DIR = $(QPID_DIR)/cpp/managementgen TEMPLATE_DIR = $(MGEN_DIR)/templates MGEN = $(MGEN_DIR)/main.py -OBJ_DIR = $(SRC_DIR)/.libs vpath %.cpp $(SRC_DIR):$(GEN_DIR) vpath %.d $(OBJ_DIR) @@ -43,15 +43,17 @@ cpps += $(wildcard $(GEN_DIR)/*.cpp) deps = $(addsuffix .d, $(basename $(cpps))) objects = $(addsuffix .o, $(basename $(cpps))) -.PHONY: all clean +.PHONY: all clean gen #========================================================== # Pass 0: generate source files from schema ifeq ($(MAKELEVEL), 0) -all: - $(MGEN) $(SCHEMA_FILE) $(SPEC_DIR)/management-types.xml $(TEMPLATE_DIR) $(GEN_DIR) - $(MAKE) +all: gen + @$(MAKE) + +gen: + $(MGEN) $(SCHEMA_FILE) $(TYPE_FILE) $(TEMPLATE_DIR) $(GEN_DIR) clean: rm -rf $(GEN_DIR) $(OUT_FILE) *.d *.o @@ -62,7 +64,7 @@ clean: else ifeq ($(MAKELEVEL), 1) all: $(deps) - $(MAKE) + @$(MAKE) %.d : %.cpp $(CC) -M $(CC_FLAGS) $(CC_INCLUDES) $< > $@ diff --git a/qpid/cpp/examples/qmf-agent/example.cpp b/qpid/cpp/examples/qmf-agent/example.cpp index 62d2909410..35ea97d4c0 100644 --- a/qpid/cpp/examples/qmf-agent/example.cpp +++ b/qpid/cpp/examples/qmf-agent/example.cpp @@ -22,7 +22,10 @@ #include <qpid/management/Manageable.h> #include <qpid/management/ManagementObject.h> #include <qpid/agent/ManagementAgent.h> +#include <qpid/sys/Mutex.h> #include "Parent.h" +#include "Child.h" +#include "ArgsParentCreate_child.h" #include "PackageQmf_example.h" #include <unistd.h> @@ -32,7 +35,14 @@ #include <sstream> using namespace qpid::management; +using namespace qpid::sys; using namespace std; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; +using qpid::sys::Mutex; + +class ChildClass; //============================================================== // CoreClass is the operational class that corresponds to the @@ -40,21 +50,44 @@ using namespace std; //============================================================== class CoreClass : public Manageable { - string name; + string name; + ManagementAgent* agent; Parent* mgmtObject; + std::vector<ChildClass*> children; + Mutex vectorLock; public: CoreClass(ManagementAgent* agent, string _name); - ~CoreClass() {} + ~CoreClass() { mgmtObject->resourceDestroy(); } + + ManagementObject* GetManagementObject(void) const + { return mgmtObject; } + + void doLoop(); + status_t ManagementMethod (uint32_t methodId, Args& args); +}; + +class ChildClass : public Manageable +{ + string name; + Child* mgmtObject; + +public: - void bumpCounter() { mgmtObject->inc_count(); } + ChildClass(ManagementAgent* agent, CoreClass* parent, string name); + ~ChildClass() { mgmtObject->resourceDestroy(); } ManagementObject* GetManagementObject(void) const { return mgmtObject; } + + void doWork() + { + mgmtObject->inc_count(2); + } }; -CoreClass::CoreClass(ManagementAgent* agent, string _name) : name(_name) +CoreClass::CoreClass(ManagementAgent* _agent, string _name) : name(_name), agent(_agent) { mgmtObject = new Parent(agent, this, name); @@ -62,6 +95,52 @@ CoreClass::CoreClass(ManagementAgent* agent, string _name) : name(_name) mgmtObject->set_state("IDLE"); } +void CoreClass::doLoop() +{ + // Periodically bump a counter to provide a changing statistical value + while (1) { + sleep(1); + mgmtObject->inc_count(); + mgmtObject->set_state("IN LOOP"); + + { + Mutex::ScopedLock _lock(vectorLock); + + for (std::vector<ChildClass*>::iterator iter = children.begin(); + iter != children.end(); + iter++) { + (*iter)->doWork(); + } + } + } +} + +Manageable::status_t CoreClass::ManagementMethod(uint32_t methodId, Args& args) +{ + Mutex::ScopedLock _lock(vectorLock); + + switch (methodId) { + case Parent::METHOD_CREATE_CHILD: + ArgsParentCreate_child& ioArgs = (ArgsParentCreate_child&) args; + + ChildClass *child = new ChildClass(agent, this, ioArgs.i_name); + ioArgs.o_childRef = child->GetManagementObject()->getObjectId(); + + children.push_back(child); + + return STATUS_OK; + } + + return STATUS_NOT_IMPLEMENTED; +} + +ChildClass::ChildClass(ManagementAgent* agent, CoreClass* parent, string name) +{ + mgmtObject = new Child(agent, this, parent, name); + + agent->addObject(mgmtObject); +} + //============================================================== // Main program @@ -79,19 +158,14 @@ int main(int argc, char** argv) { // Start the agent. It will attempt to make a connection to the // management broker - agent->init (string(host), port); + agent->init(string(host), port); // Allocate some core objects CoreClass core1(agent, "Example Core Object #1"); CoreClass core2(agent, "Example Core Object #2"); CoreClass core3(agent, "Example Core Object #3"); - // Periodically bump a counter in core1 to provide a changing statistical value - while (1) - { - sleep(1); - core1.bumpCounter(); - } + core1.doLoop(); } diff --git a/qpid/cpp/managementgen/schema.py b/qpid/cpp/managementgen/schema.py index eb0b066b62..25284a14f7 100755 --- a/qpid/cpp/managementgen/schema.py +++ b/qpid/cpp/managementgen/schema.py @@ -577,7 +577,7 @@ class SchemaMethod: return self.name def getFullName (self): - return self.parent.getName().capitalize() + self.name[0:1].upper() +\ + return capitalize(self.parent.getName()) + self.name[0:1].upper() +\ self.name[1:] def getArgCount (self): @@ -644,7 +644,7 @@ class SchemaEvent: return self.name def getFullName (self): - return self.parent.getName ().capitalize() + self.name.capitalize () + return capitalize(self.parent.getName()) + capitalize(self.name) def getArgCount (self): return len (self.args) @@ -938,7 +938,7 @@ class SchemaClass: method.genSchema (stream, variables) def genNameCap (self, stream, variables): - stream.write (self.name.capitalize ()) + stream.write (capitalize(self.name)) def genNameLower (self, stream, variables): stream.write (self.name.lower ()) diff --git a/qpid/cpp/managementgen/templates/Class.h b/qpid/cpp/managementgen/templates/Class.h index 8a4dc1006a..fac63d5d55 100644 --- a/qpid/cpp/managementgen/templates/Class.h +++ b/qpid/cpp/managementgen/templates/Class.h @@ -86,9 +86,9 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject /*MGEN:Class.SetGeneralReferenceDeclaration*/ - std::string getPackageName (void) { return packageName; } - std::string getClassName (void) { return className; } - uint8_t* getMd5Sum (void) { return md5Sum; } + std::string& getPackageName (void) { return packageName; } + std::string& getClassName (void) { return className; } + uint8_t* getMd5Sum (void) { return md5Sum; } // Method IDs /*MGEN:Class.MethodIdDeclarations*/ diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 5cff0fcd3c..85f13ba15d 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -67,16 +67,21 @@ ManagementAgent* ManagementAgent::Singleton::getInstance() } ManagementAgentImpl::ManagementAgentImpl() : - clientWasAdded(true), objIdPrefix(0), bgThread(*this), thread(bgThread) + clientWasAdded(true), objIdPrefix(0), bgThread(*this), thread(bgThread), startupWait(false) { // TODO: Establish system ID } -void ManagementAgentImpl::init (std::string brokerHost, - uint16_t brokerPort, - uint16_t intervalSeconds, - bool useExternalThread) +void ManagementAgentImpl::init(std::string brokerHost, + uint16_t brokerPort, + uint16_t intervalSeconds, + bool useExternalThread) { + { + Mutex::ScopedLock lock(agentLock); + startupWait = true; + } + interval = intervalSeconds; extThread = useExternalThread; nextObjectId = 1; @@ -92,17 +97,17 @@ void ManagementAgentImpl::init (std::string brokerHost, session.queueDeclare (arg::queue=queueName.str()); session.exchangeBind (arg::exchange="amq.direct", arg::queue=queueName.str(), - arg::bindingKey=queueName.str ()); + arg::bindingKey=queueName.str()); session.messageSubscribe (arg::queue=queueName.str(), arg::destination=dest); session.messageFlow (arg::destination=dest, arg::unit=0, arg::value=0xFFFFFFFF); session.messageFlow (arg::destination=dest, arg::unit=1, arg::value=0xFFFFFFFF); Message attachRequest; - char rawbuffer[512]; // TODO: Modify Buffer so it can use stringstream + char rawbuffer[512]; Buffer buffer (rawbuffer, 512); - attachRequest.getDeliveryProperties().setRoutingKey("agent"); + attachRequest.getDeliveryProperties().setRoutingKey("broker"); attachRequest.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); EncodeHeader (buffer, 'A'); @@ -115,15 +120,22 @@ void ManagementAgentImpl::init (std::string brokerHost, string stringBuffer (rawbuffer, length); attachRequest.setData (stringBuffer); - session.messageTransfer (arg::content=attachRequest, arg::destination="qpid.management"); + session.messageTransfer(arg::content=attachRequest, arg::destination="qpid.management"); - dispatcher->listen (dest, this); - dispatcher->start (); + dispatcher->listen(dest, this); + dispatcher->start(); + + { + Mutex::ScopedLock lock(agentLock); + if (startupWait) + startupCond.wait(agentLock); + } } -ManagementAgentImpl::~ManagementAgentImpl () +ManagementAgentImpl::~ManagementAgentImpl() { - dispatcher->stop (); + dispatcher->stop(); + session.close(); delete dispatcher; } @@ -151,24 +163,33 @@ uint64_t ManagementAgentImpl::addObject (ManagementObject* object, return objectId; } -uint32_t ManagementAgentImpl::pollCallbacks (uint32_t /*callLimit*/) +uint32_t ManagementAgentImpl::pollCallbacks(uint32_t /*callLimit*/) { return 0; } -int ManagementAgentImpl::getSignalFd (void) +int ManagementAgentImpl::getSignalFd(void) { return -1; } -void ManagementAgentImpl::handleAttachResponse (Buffer& inBuffer) +void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer) { Mutex::ScopedLock lock(agentLock); uint32_t assigned; + stringstream key; assigned = inBuffer.getLong(); objIdPrefix = ((uint64_t) assigned) << 24; + startupWait = false; + startupCond.notify(); + + // Bind to qpid.management to receive commands + key << "agent." << assigned; + session.exchangeBind (arg::exchange="qpid.management", arg::queue=queueName.str(), + arg::bindingKey=key.str()); + // Send package indications for all local packages for (PackageMap::iterator pIter = packages.begin(); pIter != packages.end(); @@ -180,7 +201,7 @@ void ManagementAgentImpl::handleAttachResponse (Buffer& inBuffer) EncodePackageIndication(outBuffer, pIter); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset(); - SendBuffer(outBuffer, outLen, "qpid.management", "agent"); + SendBuffer(outBuffer, outLen, "qpid.management", "broker"); // Send class indications for all local classes ClassMap cMap = pIter->second; @@ -190,7 +211,7 @@ void ManagementAgentImpl::handleAttachResponse (Buffer& inBuffer) EncodeClassIndication(outBuffer, pIter, cIter); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset(); - SendBuffer(outBuffer, outLen, "qpid.management", "agent"); + SendBuffer(outBuffer, outLen, "qpid.management", "broker"); } } } @@ -218,7 +239,7 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc schema.writeSchemaCall(outBuffer); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset(); - SendBuffer(outBuffer, outLen, "qpid.management", "agent"); + SendBuffer(outBuffer, outLen, "qpid.management", "broker"); } } } @@ -229,18 +250,50 @@ void ManagementAgentImpl::handleConsoleAddedIndication() clientWasAdded = true; } -void ManagementAgentImpl::received (Message& msg) +void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo) +{ + string methodName; + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + uint64_t objId = inBuffer.getLongLong(); + inBuffer.getShortString(methodName); + + EncodeHeader(outBuffer, 'm', sequence); + + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter == managementObjects.end() || iter->second->isDeleted()) { + outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); + outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); + } else { + iter->second->doMethod(methodName, inBuffer, outBuffer); + } + + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, "amq.direct", replyTo); +} + +void ManagementAgentImpl::received(Message& msg) { - string data = msg.getData (); - Buffer inBuffer (const_cast<char*>(data.c_str()), data.size()); + string data = msg.getData(); + Buffer inBuffer(const_cast<char*>(data.c_str()), data.size()); uint8_t opcode; uint32_t sequence; + string replyToKey; + + framing::MessageProperties p = msg.getMessageProperties(); + if (p.hasReplyTo()) { + const framing::ReplyTo& rt = p.getReplyTo(); + replyToKey = rt.getRoutingKey(); + } if (CheckHeader (inBuffer, &opcode, &sequence)) { if (opcode == 'a') handleAttachResponse(inBuffer); else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence); else if (opcode == 'x') handleConsoleAddedIndication(); + else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey); } } diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h index 2ecf63cd5d..f7f19e145d 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -30,6 +30,7 @@ #include "qpid/sys/Thread.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Mutex.h" +#include "qpid/sys/Condition.h" #include "qpid/framing/Uuid.h" #include <iostream> #include <sstream> @@ -127,6 +128,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen BackgroundThread bgThread; sys::Thread thread; + sys::Condition startupCond; + bool startupWait; PackageMap::iterator FindOrAddPackage (std::string name); void moveNewObjectsLH(); @@ -149,6 +152,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen void handlePackageRequest (qpid::framing::Buffer& inBuffer); void handleClassQuery (qpid::framing::Buffer& inBuffer); void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence); + void handleMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); void handleConsoleAddedIndication(); }; diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 9cbf86ebd3..05b759f695 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -335,8 +335,7 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args return Manageable::STATUS_OK; case management::Link::METHOD_BRIDGE : - management::ArgsLinkBridge iargs = - dynamic_cast<const management::ArgsLinkBridge&>(args); + management::ArgsLinkBridge& iargs = (management::ArgsLinkBridge&) args; // Durable bridges are only valid on durable links if (iargs.i_durable && !durable) diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.cpp b/qpid/cpp/src/qpid/management/ManagementBroker.cpp index f66b34c43c..223811ebc2 100644 --- a/qpid/cpp/src/qpid/management/ManagementBroker.cpp +++ b/qpid/cpp/src/qpid/management/ManagementBroker.cpp @@ -97,6 +97,7 @@ ManagementBroker::~ManagementBroker () // objects that will be invalid. dExchange.reset(); mExchange.reset(); + timer.stop(); moveNewObjectsLH(); for (ManagementObjectMap::iterator iter = managementObjects.begin (); @@ -117,33 +118,33 @@ void ManagementBroker::configure(string _dataDir, uint16_t _interval, Manageable timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval))); // Get from file or generate and save to file. - if (dataDir.empty ()) + if (dataDir.empty()) { - uuid.generate (); + uuid.generate(); QPID_LOG (info, "ManagementBroker has no data directory, generated new broker ID: " << uuid); } else { - string filename (dataDir + "/.mbrokerdata"); - ifstream inFile (filename.c_str ()); + string filename(dataDir + "/.mbrokerdata"); + ifstream inFile(filename.c_str ()); - if (inFile.good ()) + if (inFile.good()) { inFile >> uuid; inFile >> bootSequence; inFile >> nextRemoteBank; - inFile.close (); + inFile.close(); QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid); bootSequence++; - writeData (); + writeData(); } else { - uuid.generate (); + uuid.generate(); QPID_LOG (info, "ManagementBroker generated broker ID: " << uuid); - writeData (); + writeData(); } QPID_LOG (debug, "ManagementBroker boot sequence: " << bootSequence); @@ -155,10 +156,10 @@ void ManagementBroker::writeData () string filename (dataDir + "/.mbrokerdata"); ofstream outFile (filename.c_str ()); - if (outFile.good ()) + if (outFile.good()) { outFile << uuid << " " << bootSequence << " " << nextRemoteBank << endl; - outFile.close (); + outFile.close(); } } @@ -174,7 +175,7 @@ void ManagementBroker::RegisterClass (string packageName, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall) { - Mutex::ScopedLock lock (userLock); + Mutex::ScopedLock lock(userLock); PackageMap::iterator pIter = FindOrAddPackageLH(packageName); AddClass(pIter, className, md5Sum, schemaCall); } @@ -391,124 +392,64 @@ void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence SendBuffer (outBuffer, outLen, dExchange, replyToKey); } -void ManagementBroker::dispatchCommand (Deliverable& deliverable, +bool ManagementBroker::dispatchCommand (Deliverable& deliverable, const string& routingKey, const FieldTable* /*args*/) { Mutex::ScopedLock lock (userLock); Message& msg = ((DeliverableMessage&) deliverable).getMessage (); - if (routingKey.compare (0, 13, "agent.method.") == 0) - dispatchMethodLH (msg, routingKey, 13); + // Parse the routing key. This management broker should act as though it + // is bound to the exchange to match the following keys: + // + // agent.<X>.# + // broker.# + // + // where <X> is any non-negative decimal integer less than the lowest remote + // object-id bank. - else if (routingKey.length () == 5 && - routingKey.compare (0, 5, "agent") == 0) + if (routingKey == "broker") { dispatchAgentCommandLH (msg); - - else - { - QPID_LOG (debug, "Illegal routing key for dispatch: " << routingKey); - return; - } -} - -void ManagementBroker::dispatchMethodLH (Message& msg, - const string& routingKey, - size_t first) -{ - size_t pos, start = first; - uint32_t contentSize; - - if (routingKey.length () == start) - { - QPID_LOG (debug, "Missing package-name in routing key: " << routingKey); - return; - } - - pos = routingKey.find ('.', start); - if (pos == string::npos || routingKey.length () == pos + 1) - { - QPID_LOG (debug, "Missing class-name in routing key: " << routingKey); - return; + return false; } - string packageName = routingKey.substr (start, pos - start); - - start = pos + 1; - pos = routingKey.find ('.', start); - if (pos == string::npos || routingKey.length () == pos + 1) - { - QPID_LOG (debug, "Missing method-name in routing key: " << routingKey); - return; + else if (routingKey.compare(0, 6, "agent.") == 0) { + uint32_t delim = routingKey.find('.', 6); + if (delim == string::npos) + delim = routingKey.length(); + string bank = routingKey.substr(6, delim - 6); + if ((uint32_t) atoi(bank.c_str()) <= localBank) { + dispatchAgentCommandLH (msg); + return false; + } } - string className = routingKey.substr (start, pos - start); - - start = pos + 1; - string methodName = routingKey.substr (start, routingKey.length () - start); - - contentSize = msg.encodedContentSize (); - if (contentSize < 8 || contentSize > MA_BUFFER_SIZE) - return; + return true; +} - Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); +void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +{ + string methodName; Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen, sequence; - uint8_t opcode; - - if (msg.encodedSize() > MA_BUFFER_SIZE) { - QPID_LOG(debug, "ManagementBroker::dispatchMethodLH: Message too large: " << - msg.encodedSize()); - return; - } - - msg.encodeContent (inBuffer); - inBuffer.reset (); - - if (!CheckHeader (inBuffer, &opcode, &sequence)) - { - QPID_LOG (debug, " Invalid content header"); - return; - } - - if (opcode != 'M') - { - QPID_LOG (debug, " Unexpected opcode " << opcode); - return; - } - - uint64_t objId = inBuffer.getLongLong (); - string replyToKey; + uint32_t outLen; - const framing::MessageProperties* p = - msg.getFrames().getHeaders()->get<framing::MessageProperties>(); - if (p && p->hasReplyTo()) - { - const framing::ReplyTo& rt = p->getReplyTo (); - replyToKey = rt.getRoutingKey (); - } - else - { - QPID_LOG (debug, " Reply-to missing"); - return; - } + uint64_t objId = inBuffer.getLongLong(); + inBuffer.getShortString(methodName); - EncodeHeader (outBuffer, 'm', sequence); + std::cout << "ManagementBroker::handleMethodRequest (" << objId << ", " << methodName << ")" << std::endl; + EncodeHeader(outBuffer, 'm', sequence); - ManagementObjectMap::iterator iter = managementObjects.find (objId); - if (iter == managementObjects.end () || iter->second->isDeleted ()) - { + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter == managementObjects.end() || iter->second->isDeleted()) { outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); - } - else - { - iter->second->doMethod (methodName, inBuffer, outBuffer); + } else { + iter->second->doMethod(methodName, inBuffer, outBuffer); } - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, dExchange, replyToKey); } void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence) @@ -737,6 +678,8 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe requestedBank = inBuffer.getLong (); assignedBank = assignBankLH (requestedBank); + // TODO: Make a pass over the agents and delete any that no longer have a session. + RemoteAgentMap::iterator aIter = remoteAgents.find (sessionName); if (aIter != remoteAgents.end()) { @@ -755,6 +698,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe agent->mgmtObject->set_label (label); agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); agent->mgmtObject->set_systemId (systemId); + agent->mgmtObject->set_objectIdBank (assignedBank); addObject (agent->mgmtObject); remoteAgents[sessionName] = agent; @@ -818,10 +762,9 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg) const framing::MessageProperties* p = msg.getFrames().getHeaders()->get<framing::MessageProperties>(); - if (p && p->hasReplyTo()) - { - const framing::ReplyTo& rt = p->getReplyTo (); - replyToKey = rt.getRoutingKey (); + if (p && p->hasReplyTo()) { + const framing::ReplyTo& rt = p->getReplyTo(); + replyToKey = rt.getRoutingKey(); } else return; @@ -832,10 +775,10 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg) return; } - msg.encodeContent (inBuffer); - inBuffer.reset (); + msg.encodeContent(inBuffer); + inBuffer.reset(); - if (!CheckHeader (inBuffer, &opcode, &sequence)) + if (!CheckHeader(inBuffer, &opcode, &sequence)) return; if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence); @@ -847,6 +790,7 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg) else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence); else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence); else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence); + else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence); } ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name) diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.h b/qpid/cpp/src/qpid/management/ManagementBroker.h index 447720fb5e..89ea80b3b2 100644 --- a/qpid/cpp/src/qpid/management/ManagementBroker.h +++ b/qpid/cpp/src/qpid/management/ManagementBroker.h @@ -59,7 +59,7 @@ class ManagementBroker : public ManagementAgent uint32_t persistId = 0, uint32_t persistBank = 4); void clientAdded (void); - void dispatchCommand (broker::Deliverable& msg, + bool dispatchCommand (broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); @@ -177,9 +177,6 @@ class ManagementBroker : public ManagementAgent std::string routingKey); void moveNewObjectsLH(); - void dispatchMethodLH (broker::Message& msg, - const std::string& routingKey, - size_t first); void dispatchAgentCommandLH (broker::Message& msg); PackageMap::iterator FindOrAddPackageLH(std::string name); @@ -206,6 +203,7 @@ class ManagementBroker : public ManagementAgent void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); size_t ValidateSchema(framing::Buffer&); }; diff --git a/qpid/cpp/src/qpid/management/ManagementExchange.cpp b/qpid/cpp/src/qpid/management/ManagementExchange.cpp index b4824549ed..4ccf8e68c9 100644 --- a/qpid/cpp/src/qpid/management/ManagementExchange.cpp +++ b/qpid/cpp/src/qpid/management/ManagementExchange.cpp @@ -40,17 +40,16 @@ void ManagementExchange::route (Deliverable& msg, const string& routingKey, const FieldTable* args) { + bool routeIt = true; + // Intercept management agent commands - if ((routingKey.length () > 6 && - routingKey.substr (0, 6).compare ("agent.") == 0) || - (routingKey.length () == 5 && - routingKey.substr (0, 5).compare ("agent") == 0)) - { - managementAgent->dispatchCommand (msg, routingKey, args); - return; - } + if ((routingKey.length() > 6 && + routingKey.substr(0, 6).compare("agent.") == 0) || + (routingKey == "broker")) + routeIt = managementAgent->dispatchCommand(msg, routingKey, args); - TopicExchange::route (msg, routingKey, args); + if (routeIt) + TopicExchange::route(msg, routingKey, args); } bool ManagementExchange::bind (Queue::shared_ptr queue, diff --git a/qpid/cpp/src/qpid/management/ManagementObject.h b/qpid/cpp/src/qpid/management/ManagementObject.h index 66adabf035..ce3051367d 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.h +++ b/qpid/cpp/src/qpid/management/ManagementObject.h @@ -98,8 +98,8 @@ class ManagementObject qpid::framing::Buffer& outBuf) = 0; virtual void setReference (uint64_t objectId); - virtual std::string getClassName (void) = 0; - virtual std::string getPackageName (void) = 0; + virtual std::string& getClassName (void) = 0; + virtual std::string& getPackageName (void) = 0; virtual uint8_t* getMd5Sum (void) = 0; void setObjectId (uint64_t oid) { objectId = oid; } diff --git a/qpid/python/commands/qpid-tool b/qpid/python/commands/qpid-tool index 0ab47a01e7..7301dad6e4 100755 --- a/qpid/python/commands/qpid-tool +++ b/qpid/python/commands/qpid-tool @@ -65,6 +65,7 @@ class Mcli (Cmd): print " schema <className> - Print details of an object class" print " set time-format short - Select short timestamp format (default)" print " set time-format long - Select long timestamp format" + print " id [<ID>] - Display translations of display object ids" print " quit or ^D - Exit the program" print @@ -93,6 +94,9 @@ class Mcli (Cmd): except: pass + def do_id (self, data): + self.dataObject.do_id(data) + def complete_schema (self, text, line, begidx, endidx): tokens = split (line) if len (tokens) > 2: diff --git a/qpid/python/qpid/management.py b/qpid/python/qpid/management.py index 55479de0e6..1059c70ada 100644 --- a/qpid/python/qpid/management.py +++ b/qpid/python/qpid/management.py @@ -154,7 +154,7 @@ class managementChannel: def accept (self, msg): self.qpidChannel.message_accept(RangedSet(msg.id)) - def message (self, body, routing_key="agent"): + def message (self, body, routing_key="broker"): dp = self.qpidChannel.delivery_properties() dp.routing_key = routing_key mp = self.qpidChannel.message_properties() @@ -227,14 +227,14 @@ class managementClient: """ Invoke a method on a managed object. """ self.method (channel, userSequence, objId, className, methodName, args) - def getObjects (self, channel, userSequence, className): + def getObjects (self, channel, userSequence, className, bank=0): """ Request immediate content from broker """ codec = Codec (self.spec) self.setHeader (codec, ord ('G'), userSequence) ft = {} ft["_class"] = className codec.write_map (ft) - msg = channel.message(codec.encoded) + msg = channel.message(codec.encoded, routing_key="agent.%d" % bank) channel.send ("qpid.management", msg) def syncWaitForStable (self, channel): @@ -273,14 +273,14 @@ class managementClient: self.cv.release () return result - def syncGetObjects (self, channel, className): + def syncGetObjects (self, channel, className, bank=0): """ Synchronous (blocking) get call """ self.cv.acquire () self.syncInFlight = True self.syncResult = [] self.syncSequence = self.seqMgr.reserve ("sync") self.cv.release () - self.getObjects (channel, self.syncSequence, className) + self.getObjects (channel, self.syncSequence, className, bank) self.cv.acquire () starttime = time () while self.syncInFlight: @@ -748,6 +748,8 @@ class managementClient: sequence = self.seqMgr.reserve ((userSequence, classId, methodName)) self.setHeader (codec, ord ('M'), sequence) codec.write_uint64 (objId) # ID of object + codec.write_str8 (methodName) + bank = (objId & 0x0000FFFFFF000000) >> 24 # Encode args according to schema if classId not in self.schema: @@ -777,6 +779,5 @@ class managementClient: packageName = classId[0] className = classId[1] - msg = channel.message(codec.encoded, "agent.method." + packageName + "." + \ - className + "." + methodName) + msg = channel.message(codec.encoded, "agent." + str(bank)) channel.send ("qpid.management", msg) diff --git a/qpid/python/qpid/managementdata.py b/qpid/python/qpid/managementdata.py index 4c34b514d4..f6ebf4a381 100644 --- a/qpid/python/qpid/managementdata.py +++ b/qpid/python/qpid/managementdata.py @@ -167,10 +167,14 @@ class ManagementData: if self.cli != None: self.cli.setPromptMessage ("Broker Disconnected") - def schemaHandler (self, context, className, configs, insts, methods, events): + def schemaHandler (self, context, classKey, configs, insts, methods, events): """ Callback for schema updates """ - if className not in self.schema: - self.schema[className] = (configs, insts, methods, events) + if classKey not in self.schema: + schemaRev = 0 + for key in self.schema: + if classKey[0] == key[0] and classKey[1] == key[1]: + schemaRev += 1 + self.schema[classKey] = (configs, insts, methods, events, schemaRev) def setCli (self, cliobj): self.cli = cliobj @@ -248,17 +252,17 @@ class ManagementData: return str (value) return "*type-error*" - def getObjIndex (self, className, config): + def getObjIndex (self, classKey, config): """ Concatenate the values from index columns to form a unique object name """ result = "" - schemaConfig = self.schema[className][0] + schemaConfig = self.schema[classKey][0] for item in schemaConfig: if item[5] == 1 and item[0] != "id": if result != "": result = result + "." for key,val in config: if key == item[0]: - result = result + self.valueDisplay (className, key, val) + result = result + self.valueDisplay (classKey, key, val) return result def getClassKey (self, className): @@ -268,11 +272,17 @@ class ManagementData: if key[1] == className: return key else: - package = className[0:dotPos] - name = className[dotPos + 1:] + package = className[0:dotPos] + name = className[dotPos + 1:] + schemaRev = 0 + delim = name.find(".") + if delim != -1: + schemaRev = int(name[delim + 1:]) + name = name[0:delim] for key in self.schema: if key[0] == package and key[1] == name: - return key + if self.schema[key][4] == schemaRev: + return key return None def classCompletions (self, prefix): @@ -508,7 +518,11 @@ class ManagementData: sorted.sort () for classKey in sorted: tuple = self.schema[classKey] - className = classKey[0] + "." + classKey[1] + if tuple[4] == 0: + suffix = "" + else: + suffix = ".%d" % tuple[4] + className = classKey[0] + "." + classKey[1] + suffix row = (className, len (tuple[0]), len (tuple[1]), len (tuple[2]), len (tuple[3])) rows.append (row) self.disp.table ("Classes in Schema:", @@ -527,6 +541,7 @@ class ManagementData: raise ValueError () rows = [] + schemaRev = self.schema[classKey][4] for config in self.schema[classKey][0]: name = config[0] if name != "id": @@ -554,7 +569,7 @@ class ManagementData: rows.append ((name, typename, unit, "", "", desc)) titles = ("Element", "Type", "Unit", "Access", "Notes", "Description") - self.disp.table ("Schema for class '%s.%s':" % (classKey[0], classKey[1]), titles, rows) + self.disp.table ("Schema for class '%s.%s.%d':" % (classKey[0], classKey[1], schemaRev), titles, rows) for mname in self.schema[classKey][2]: (mdesc, args) = self.schema[classKey][2][mname] @@ -603,13 +618,20 @@ class ManagementData: raise ValueError () schemaMethod = self.schema[classKey][2][methodName] - if len (args) != len (schemaMethod[1]): - print "Wrong number of method args: Need %d, Got %d" % (len (schemaMethod[1]), len (args)) + count = 0 + for arg in range(len(schemaMethod[1])): + if schemaMethod[1][arg][2].find("I") != -1: + count += 1 + if len (args) != count: + print "Wrong number of method args: Need %d, Got %d" % (count, len (args)) raise ValueError () namedArgs = {} - for idx in range (len (args)): - namedArgs[schemaMethod[1][idx][0]] = args[idx] + idx = 0 + for arg in range(len(schemaMethod[1])): + if schemaMethod[1][arg][2].find("I") != -1: + namedArgs[schemaMethod[1][arg][0]] = args[idx] + idx += 1 self.methodSeq = self.methodSeq + 1 self.methodsPending[self.methodSeq] = methodName @@ -623,6 +645,35 @@ class ManagementData: # except ValueError, e: # print "Error invoking method:", e + def makeIdRow (self, displayId): + if displayId in self.idMap: + rawId = self.idMap[displayId] + else: + return None + return (displayId, + rawId, + (rawId & 0x7FFF000000000000) >> 48, + (rawId & 0x0000FFFFFF000000) >> 24, + (rawId & 0x0000000000FFFFFF)) + + def listIds (self, select): + rows = [] + if select == 0: + sorted = self.idMap.keys() + sorted.sort() + for displayId in sorted: + row = self.makeIdRow (displayId) + rows.append(row) + else: + row = self.makeIdRow (select) + if row == None: + print "Display Id %d not known" % select + return + rows.append(row) + self.disp.table("Translation of Display IDs:", + ("DisplayID", "RawID", "BootSequence", "Bank", "Object"), + rows) + def do_list (self, data): tokens = data.split () if len (tokens) == 0: @@ -644,10 +695,17 @@ class ManagementData: print "Not enough arguments supplied" return - userOid = long (tokens[0]) + displayId = long (tokens[0]) methodName = tokens[1] args = tokens[2:] - self.callMethod (userOid, methodName, args) + self.callMethod (displayId, methodName, args) + + def do_id (self, data): + if data == "": + select = 0 + else: + select = int(data) + self.listIds(select) def do_exit (self): self.mclient.removeChannel (self.mch) diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml index 2f231e9c8d..75c4dffd8e 100644 --- a/qpid/specs/management-schema.xml +++ b/qpid/specs/management-schema.xml @@ -102,10 +102,11 @@ =============================================================== --> <class name="Agent"> - <property name="sessionName" type="sstr" access="RO" index="y" desc="Session ID for Agent"/> - <property name="label" type="sstr" access="RO" desc="Label for agent"/> - <property name="registeredTo" type="objId" references="Broker" access="RO" desc="Broker agent is registered to"/> - <property name="systemId" type="uuid" access="RO" desc="Identifier of system where agent resides"/> + <property name="sessionName" type="sstr" access="RO" index="y" desc="Session ID for Agent"/> + <property name="label" type="sstr" access="RO" desc="Label for agent"/> + <property name="registeredTo" type="objId" references="Broker" access="RO" desc="Broker agent is registered to"/> + <property name="systemId" type="uuid" access="RO" desc="Identifier of system where agent resides"/> + <property name="objectIdBank" type="uint32" access="RO" desc="Assigned object-id bank"/> </class> <!-- |