diff options
author | Ted Ross <tross@apache.org> | 2008-07-31 13:15:16 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-07-31 13:15:16 +0000 |
commit | 12b33f499c8a33d5010fedecdb267c721483f0a5 (patch) | |
tree | 0c60fb8918c0ca3ac2ca8cf020c9fa8f4c796022 /qpid/cpp | |
parent | 5ec77e5be4feca03b2c13866286a6bec911ab4fc (diff) | |
download | qpid-python-12b33f499c8a33d5010fedecdb267c721483f0a5.tar.gz |
QPID-1174 - Management updates for remote agents
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@681362 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r-- | qpid/cpp/examples/qmf-agent/Makefile | 16 | ||||
-rw-r--r-- | qpid/cpp/examples/qmf-agent/example.cpp | 96 | ||||
-rwxr-xr-x | qpid/cpp/managementgen/schema.py | 6 | ||||
-rw-r--r-- | qpid/cpp/managementgen/templates/Class.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp | 97 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementBroker.cpp | 176 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementBroker.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementExchange.cpp | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementObject.h | 4 |
11 files changed, 252 insertions, 179 deletions
diff --git a/qpid/cpp/examples/qmf-agent/Makefile b/qpid/cpp/examples/qmf-agent/Makefile index cc33dd1dc6..0b37f1a295 100644 --- a/qpid/cpp/examples/qmf-agent/Makefile +++ b/qpid/cpp/examples/qmf-agent/Makefile @@ -26,13 +26,13 @@ OUT_FILE = $(SRC_DIR)/qmf-agent CC = gcc LIB_DIR = $(QPID_DIR)/cpp/src/.libs CC_INCLUDES = -I$(SRC_DIR) -I$(QPID_DIR)/cpp/src -I$(QPID_DIR)/cpp/src/gen -I$(GEN_DIR) -CC_FLAGS = -g -O2 +CC_FLAGS = -g -O3 LD_FLAGS = -lqpidclient -lqpidcommon -L$(LIB_DIR) SPEC_DIR = $(QPID_DIR)/specs +TYPE_FILE = $(SPEC_DIR)/management-types.xml MGEN_DIR = $(QPID_DIR)/cpp/managementgen TEMPLATE_DIR = $(MGEN_DIR)/templates MGEN = $(MGEN_DIR)/main.py -OBJ_DIR = $(SRC_DIR)/.libs vpath %.cpp $(SRC_DIR):$(GEN_DIR) vpath %.d $(OBJ_DIR) @@ -43,15 +43,17 @@ cpps += $(wildcard $(GEN_DIR)/*.cpp) deps = $(addsuffix .d, $(basename $(cpps))) objects = $(addsuffix .o, $(basename $(cpps))) -.PHONY: all clean +.PHONY: all clean gen #========================================================== # Pass 0: generate source files from schema ifeq ($(MAKELEVEL), 0) -all: - $(MGEN) $(SCHEMA_FILE) $(SPEC_DIR)/management-types.xml $(TEMPLATE_DIR) $(GEN_DIR) - $(MAKE) +all: gen + @$(MAKE) + +gen: + $(MGEN) $(SCHEMA_FILE) $(TYPE_FILE) $(TEMPLATE_DIR) $(GEN_DIR) clean: rm -rf $(GEN_DIR) $(OUT_FILE) *.d *.o @@ -62,7 +64,7 @@ clean: else ifeq ($(MAKELEVEL), 1) all: $(deps) - $(MAKE) + @$(MAKE) %.d : %.cpp $(CC) -M $(CC_FLAGS) $(CC_INCLUDES) $< > $@ diff --git a/qpid/cpp/examples/qmf-agent/example.cpp b/qpid/cpp/examples/qmf-agent/example.cpp index 62d2909410..35ea97d4c0 100644 --- a/qpid/cpp/examples/qmf-agent/example.cpp +++ b/qpid/cpp/examples/qmf-agent/example.cpp @@ -22,7 +22,10 @@ #include <qpid/management/Manageable.h> #include <qpid/management/ManagementObject.h> #include <qpid/agent/ManagementAgent.h> +#include <qpid/sys/Mutex.h> #include "Parent.h" +#include "Child.h" +#include "ArgsParentCreate_child.h" #include "PackageQmf_example.h" #include <unistd.h> @@ -32,7 +35,14 @@ #include <sstream> using namespace qpid::management; +using namespace qpid::sys; using namespace std; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; +using qpid::sys::Mutex; + +class ChildClass; //============================================================== // CoreClass is the operational class that corresponds to the @@ -40,21 +50,44 @@ using namespace std; //============================================================== class CoreClass : public Manageable { - string name; + string name; + ManagementAgent* agent; Parent* mgmtObject; + std::vector<ChildClass*> children; + Mutex vectorLock; public: CoreClass(ManagementAgent* agent, string _name); - ~CoreClass() {} + ~CoreClass() { mgmtObject->resourceDestroy(); } + + ManagementObject* GetManagementObject(void) const + { return mgmtObject; } + + void doLoop(); + status_t ManagementMethod (uint32_t methodId, Args& args); +}; + +class ChildClass : public Manageable +{ + string name; + Child* mgmtObject; + +public: - void bumpCounter() { mgmtObject->inc_count(); } + ChildClass(ManagementAgent* agent, CoreClass* parent, string name); + ~ChildClass() { mgmtObject->resourceDestroy(); } ManagementObject* GetManagementObject(void) const { return mgmtObject; } + + void doWork() + { + mgmtObject->inc_count(2); + } }; -CoreClass::CoreClass(ManagementAgent* agent, string _name) : name(_name) +CoreClass::CoreClass(ManagementAgent* _agent, string _name) : name(_name), agent(_agent) { mgmtObject = new Parent(agent, this, name); @@ -62,6 +95,52 @@ CoreClass::CoreClass(ManagementAgent* agent, string _name) : name(_name) mgmtObject->set_state("IDLE"); } +void CoreClass::doLoop() +{ + // Periodically bump a counter to provide a changing statistical value + while (1) { + sleep(1); + mgmtObject->inc_count(); + mgmtObject->set_state("IN LOOP"); + + { + Mutex::ScopedLock _lock(vectorLock); + + for (std::vector<ChildClass*>::iterator iter = children.begin(); + iter != children.end(); + iter++) { + (*iter)->doWork(); + } + } + } +} + +Manageable::status_t CoreClass::ManagementMethod(uint32_t methodId, Args& args) +{ + Mutex::ScopedLock _lock(vectorLock); + + switch (methodId) { + case Parent::METHOD_CREATE_CHILD: + ArgsParentCreate_child& ioArgs = (ArgsParentCreate_child&) args; + + ChildClass *child = new ChildClass(agent, this, ioArgs.i_name); + ioArgs.o_childRef = child->GetManagementObject()->getObjectId(); + + children.push_back(child); + + return STATUS_OK; + } + + return STATUS_NOT_IMPLEMENTED; +} + +ChildClass::ChildClass(ManagementAgent* agent, CoreClass* parent, string name) +{ + mgmtObject = new Child(agent, this, parent, name); + + agent->addObject(mgmtObject); +} + //============================================================== // Main program @@ -79,19 +158,14 @@ int main(int argc, char** argv) { // Start the agent. It will attempt to make a connection to the // management broker - agent->init (string(host), port); + agent->init(string(host), port); // Allocate some core objects CoreClass core1(agent, "Example Core Object #1"); CoreClass core2(agent, "Example Core Object #2"); CoreClass core3(agent, "Example Core Object #3"); - // Periodically bump a counter in core1 to provide a changing statistical value - while (1) - { - sleep(1); - core1.bumpCounter(); - } + core1.doLoop(); } diff --git a/qpid/cpp/managementgen/schema.py b/qpid/cpp/managementgen/schema.py index eb0b066b62..25284a14f7 100755 --- a/qpid/cpp/managementgen/schema.py +++ b/qpid/cpp/managementgen/schema.py @@ -577,7 +577,7 @@ class SchemaMethod: return self.name def getFullName (self): - return self.parent.getName().capitalize() + self.name[0:1].upper() +\ + return capitalize(self.parent.getName()) + self.name[0:1].upper() +\ self.name[1:] def getArgCount (self): @@ -644,7 +644,7 @@ class SchemaEvent: return self.name def getFullName (self): - return self.parent.getName ().capitalize() + self.name.capitalize () + return capitalize(self.parent.getName()) + capitalize(self.name) def getArgCount (self): return len (self.args) @@ -938,7 +938,7 @@ class SchemaClass: method.genSchema (stream, variables) def genNameCap (self, stream, variables): - stream.write (self.name.capitalize ()) + stream.write (capitalize(self.name)) def genNameLower (self, stream, variables): stream.write (self.name.lower ()) diff --git a/qpid/cpp/managementgen/templates/Class.h b/qpid/cpp/managementgen/templates/Class.h index 8a4dc1006a..fac63d5d55 100644 --- a/qpid/cpp/managementgen/templates/Class.h +++ b/qpid/cpp/managementgen/templates/Class.h @@ -86,9 +86,9 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject /*MGEN:Class.SetGeneralReferenceDeclaration*/ - std::string getPackageName (void) { return packageName; } - std::string getClassName (void) { return className; } - uint8_t* getMd5Sum (void) { return md5Sum; } + std::string& getPackageName (void) { return packageName; } + std::string& getClassName (void) { return className; } + uint8_t* getMd5Sum (void) { return md5Sum; } // Method IDs /*MGEN:Class.MethodIdDeclarations*/ diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 5cff0fcd3c..85f13ba15d 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -67,16 +67,21 @@ ManagementAgent* ManagementAgent::Singleton::getInstance() } ManagementAgentImpl::ManagementAgentImpl() : - clientWasAdded(true), objIdPrefix(0), bgThread(*this), thread(bgThread) + clientWasAdded(true), objIdPrefix(0), bgThread(*this), thread(bgThread), startupWait(false) { // TODO: Establish system ID } -void ManagementAgentImpl::init (std::string brokerHost, - uint16_t brokerPort, - uint16_t intervalSeconds, - bool useExternalThread) +void ManagementAgentImpl::init(std::string brokerHost, + uint16_t brokerPort, + uint16_t intervalSeconds, + bool useExternalThread) { + { + Mutex::ScopedLock lock(agentLock); + startupWait = true; + } + interval = intervalSeconds; extThread = useExternalThread; nextObjectId = 1; @@ -92,17 +97,17 @@ void ManagementAgentImpl::init (std::string brokerHost, session.queueDeclare (arg::queue=queueName.str()); session.exchangeBind (arg::exchange="amq.direct", arg::queue=queueName.str(), - arg::bindingKey=queueName.str ()); + arg::bindingKey=queueName.str()); session.messageSubscribe (arg::queue=queueName.str(), arg::destination=dest); session.messageFlow (arg::destination=dest, arg::unit=0, arg::value=0xFFFFFFFF); session.messageFlow (arg::destination=dest, arg::unit=1, arg::value=0xFFFFFFFF); Message attachRequest; - char rawbuffer[512]; // TODO: Modify Buffer so it can use stringstream + char rawbuffer[512]; Buffer buffer (rawbuffer, 512); - attachRequest.getDeliveryProperties().setRoutingKey("agent"); + attachRequest.getDeliveryProperties().setRoutingKey("broker"); attachRequest.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); EncodeHeader (buffer, 'A'); @@ -115,15 +120,22 @@ void ManagementAgentImpl::init (std::string brokerHost, string stringBuffer (rawbuffer, length); attachRequest.setData (stringBuffer); - session.messageTransfer (arg::content=attachRequest, arg::destination="qpid.management"); + session.messageTransfer(arg::content=attachRequest, arg::destination="qpid.management"); - dispatcher->listen (dest, this); - dispatcher->start (); + dispatcher->listen(dest, this); + dispatcher->start(); + + { + Mutex::ScopedLock lock(agentLock); + if (startupWait) + startupCond.wait(agentLock); + } } -ManagementAgentImpl::~ManagementAgentImpl () +ManagementAgentImpl::~ManagementAgentImpl() { - dispatcher->stop (); + dispatcher->stop(); + session.close(); delete dispatcher; } @@ -151,24 +163,33 @@ uint64_t ManagementAgentImpl::addObject (ManagementObject* object, return objectId; } -uint32_t ManagementAgentImpl::pollCallbacks (uint32_t /*callLimit*/) +uint32_t ManagementAgentImpl::pollCallbacks(uint32_t /*callLimit*/) { return 0; } -int ManagementAgentImpl::getSignalFd (void) +int ManagementAgentImpl::getSignalFd(void) { return -1; } -void ManagementAgentImpl::handleAttachResponse (Buffer& inBuffer) +void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer) { Mutex::ScopedLock lock(agentLock); uint32_t assigned; + stringstream key; assigned = inBuffer.getLong(); objIdPrefix = ((uint64_t) assigned) << 24; + startupWait = false; + startupCond.notify(); + + // Bind to qpid.management to receive commands + key << "agent." << assigned; + session.exchangeBind (arg::exchange="qpid.management", arg::queue=queueName.str(), + arg::bindingKey=key.str()); + // Send package indications for all local packages for (PackageMap::iterator pIter = packages.begin(); pIter != packages.end(); @@ -180,7 +201,7 @@ void ManagementAgentImpl::handleAttachResponse (Buffer& inBuffer) EncodePackageIndication(outBuffer, pIter); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset(); - SendBuffer(outBuffer, outLen, "qpid.management", "agent"); + SendBuffer(outBuffer, outLen, "qpid.management", "broker"); // Send class indications for all local classes ClassMap cMap = pIter->second; @@ -190,7 +211,7 @@ void ManagementAgentImpl::handleAttachResponse (Buffer& inBuffer) EncodeClassIndication(outBuffer, pIter, cIter); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset(); - SendBuffer(outBuffer, outLen, "qpid.management", "agent"); + SendBuffer(outBuffer, outLen, "qpid.management", "broker"); } } } @@ -218,7 +239,7 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc schema.writeSchemaCall(outBuffer); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset(); - SendBuffer(outBuffer, outLen, "qpid.management", "agent"); + SendBuffer(outBuffer, outLen, "qpid.management", "broker"); } } } @@ -229,18 +250,50 @@ void ManagementAgentImpl::handleConsoleAddedIndication() clientWasAdded = true; } -void ManagementAgentImpl::received (Message& msg) +void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo) +{ + string methodName; + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + uint64_t objId = inBuffer.getLongLong(); + inBuffer.getShortString(methodName); + + EncodeHeader(outBuffer, 'm', sequence); + + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter == managementObjects.end() || iter->second->isDeleted()) { + outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); + outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); + } else { + iter->second->doMethod(methodName, inBuffer, outBuffer); + } + + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, "amq.direct", replyTo); +} + +void ManagementAgentImpl::received(Message& msg) { - string data = msg.getData (); - Buffer inBuffer (const_cast<char*>(data.c_str()), data.size()); + string data = msg.getData(); + Buffer inBuffer(const_cast<char*>(data.c_str()), data.size()); uint8_t opcode; uint32_t sequence; + string replyToKey; + + framing::MessageProperties p = msg.getMessageProperties(); + if (p.hasReplyTo()) { + const framing::ReplyTo& rt = p.getReplyTo(); + replyToKey = rt.getRoutingKey(); + } if (CheckHeader (inBuffer, &opcode, &sequence)) { if (opcode == 'a') handleAttachResponse(inBuffer); else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence); else if (opcode == 'x') handleConsoleAddedIndication(); + else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey); } } diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h index 2ecf63cd5d..f7f19e145d 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -30,6 +30,7 @@ #include "qpid/sys/Thread.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Mutex.h" +#include "qpid/sys/Condition.h" #include "qpid/framing/Uuid.h" #include <iostream> #include <sstream> @@ -127,6 +128,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen BackgroundThread bgThread; sys::Thread thread; + sys::Condition startupCond; + bool startupWait; PackageMap::iterator FindOrAddPackage (std::string name); void moveNewObjectsLH(); @@ -149,6 +152,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen void handlePackageRequest (qpid::framing::Buffer& inBuffer); void handleClassQuery (qpid::framing::Buffer& inBuffer); void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence); + void handleMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); void handleConsoleAddedIndication(); }; diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 9cbf86ebd3..05b759f695 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -335,8 +335,7 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args return Manageable::STATUS_OK; case management::Link::METHOD_BRIDGE : - management::ArgsLinkBridge iargs = - dynamic_cast<const management::ArgsLinkBridge&>(args); + management::ArgsLinkBridge& iargs = (management::ArgsLinkBridge&) args; // Durable bridges are only valid on durable links if (iargs.i_durable && !durable) diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.cpp b/qpid/cpp/src/qpid/management/ManagementBroker.cpp index f66b34c43c..223811ebc2 100644 --- a/qpid/cpp/src/qpid/management/ManagementBroker.cpp +++ b/qpid/cpp/src/qpid/management/ManagementBroker.cpp @@ -97,6 +97,7 @@ ManagementBroker::~ManagementBroker () // objects that will be invalid. dExchange.reset(); mExchange.reset(); + timer.stop(); moveNewObjectsLH(); for (ManagementObjectMap::iterator iter = managementObjects.begin (); @@ -117,33 +118,33 @@ void ManagementBroker::configure(string _dataDir, uint16_t _interval, Manageable timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval))); // Get from file or generate and save to file. - if (dataDir.empty ()) + if (dataDir.empty()) { - uuid.generate (); + uuid.generate(); QPID_LOG (info, "ManagementBroker has no data directory, generated new broker ID: " << uuid); } else { - string filename (dataDir + "/.mbrokerdata"); - ifstream inFile (filename.c_str ()); + string filename(dataDir + "/.mbrokerdata"); + ifstream inFile(filename.c_str ()); - if (inFile.good ()) + if (inFile.good()) { inFile >> uuid; inFile >> bootSequence; inFile >> nextRemoteBank; - inFile.close (); + inFile.close(); QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid); bootSequence++; - writeData (); + writeData(); } else { - uuid.generate (); + uuid.generate(); QPID_LOG (info, "ManagementBroker generated broker ID: " << uuid); - writeData (); + writeData(); } QPID_LOG (debug, "ManagementBroker boot sequence: " << bootSequence); @@ -155,10 +156,10 @@ void ManagementBroker::writeData () string filename (dataDir + "/.mbrokerdata"); ofstream outFile (filename.c_str ()); - if (outFile.good ()) + if (outFile.good()) { outFile << uuid << " " << bootSequence << " " << nextRemoteBank << endl; - outFile.close (); + outFile.close(); } } @@ -174,7 +175,7 @@ void ManagementBroker::RegisterClass (string packageName, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall) { - Mutex::ScopedLock lock (userLock); + Mutex::ScopedLock lock(userLock); PackageMap::iterator pIter = FindOrAddPackageLH(packageName); AddClass(pIter, className, md5Sum, schemaCall); } @@ -391,124 +392,64 @@ void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence SendBuffer (outBuffer, outLen, dExchange, replyToKey); } -void ManagementBroker::dispatchCommand (Deliverable& deliverable, +bool ManagementBroker::dispatchCommand (Deliverable& deliverable, const string& routingKey, const FieldTable* /*args*/) { Mutex::ScopedLock lock (userLock); Message& msg = ((DeliverableMessage&) deliverable).getMessage (); - if (routingKey.compare (0, 13, "agent.method.") == 0) - dispatchMethodLH (msg, routingKey, 13); + // Parse the routing key. This management broker should act as though it + // is bound to the exchange to match the following keys: + // + // agent.<X>.# + // broker.# + // + // where <X> is any non-negative decimal integer less than the lowest remote + // object-id bank. - else if (routingKey.length () == 5 && - routingKey.compare (0, 5, "agent") == 0) + if (routingKey == "broker") { dispatchAgentCommandLH (msg); - - else - { - QPID_LOG (debug, "Illegal routing key for dispatch: " << routingKey); - return; - } -} - -void ManagementBroker::dispatchMethodLH (Message& msg, - const string& routingKey, - size_t first) -{ - size_t pos, start = first; - uint32_t contentSize; - - if (routingKey.length () == start) - { - QPID_LOG (debug, "Missing package-name in routing key: " << routingKey); - return; - } - - pos = routingKey.find ('.', start); - if (pos == string::npos || routingKey.length () == pos + 1) - { - QPID_LOG (debug, "Missing class-name in routing key: " << routingKey); - return; + return false; } - string packageName = routingKey.substr (start, pos - start); - - start = pos + 1; - pos = routingKey.find ('.', start); - if (pos == string::npos || routingKey.length () == pos + 1) - { - QPID_LOG (debug, "Missing method-name in routing key: " << routingKey); - return; + else if (routingKey.compare(0, 6, "agent.") == 0) { + uint32_t delim = routingKey.find('.', 6); + if (delim == string::npos) + delim = routingKey.length(); + string bank = routingKey.substr(6, delim - 6); + if ((uint32_t) atoi(bank.c_str()) <= localBank) { + dispatchAgentCommandLH (msg); + return false; + } } - string className = routingKey.substr (start, pos - start); - - start = pos + 1; - string methodName = routingKey.substr (start, routingKey.length () - start); - - contentSize = msg.encodedContentSize (); - if (contentSize < 8 || contentSize > MA_BUFFER_SIZE) - return; + return true; +} - Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); +void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +{ + string methodName; Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen, sequence; - uint8_t opcode; - - if (msg.encodedSize() > MA_BUFFER_SIZE) { - QPID_LOG(debug, "ManagementBroker::dispatchMethodLH: Message too large: " << - msg.encodedSize()); - return; - } - - msg.encodeContent (inBuffer); - inBuffer.reset (); - - if (!CheckHeader (inBuffer, &opcode, &sequence)) - { - QPID_LOG (debug, " Invalid content header"); - return; - } - - if (opcode != 'M') - { - QPID_LOG (debug, " Unexpected opcode " << opcode); - return; - } - - uint64_t objId = inBuffer.getLongLong (); - string replyToKey; + uint32_t outLen; - const framing::MessageProperties* p = - msg.getFrames().getHeaders()->get<framing::MessageProperties>(); - if (p && p->hasReplyTo()) - { - const framing::ReplyTo& rt = p->getReplyTo (); - replyToKey = rt.getRoutingKey (); - } - else - { - QPID_LOG (debug, " Reply-to missing"); - return; - } + uint64_t objId = inBuffer.getLongLong(); + inBuffer.getShortString(methodName); - EncodeHeader (outBuffer, 'm', sequence); + std::cout << "ManagementBroker::handleMethodRequest (" << objId << ", " << methodName << ")" << std::endl; + EncodeHeader(outBuffer, 'm', sequence); - ManagementObjectMap::iterator iter = managementObjects.find (objId); - if (iter == managementObjects.end () || iter->second->isDeleted ()) - { + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter == managementObjects.end() || iter->second->isDeleted()) { outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); - } - else - { - iter->second->doMethod (methodName, inBuffer, outBuffer); + } else { + iter->second->doMethod(methodName, inBuffer, outBuffer); } - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, dExchange, replyToKey); } void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence) @@ -737,6 +678,8 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe requestedBank = inBuffer.getLong (); assignedBank = assignBankLH (requestedBank); + // TODO: Make a pass over the agents and delete any that no longer have a session. + RemoteAgentMap::iterator aIter = remoteAgents.find (sessionName); if (aIter != remoteAgents.end()) { @@ -755,6 +698,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe agent->mgmtObject->set_label (label); agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); agent->mgmtObject->set_systemId (systemId); + agent->mgmtObject->set_objectIdBank (assignedBank); addObject (agent->mgmtObject); remoteAgents[sessionName] = agent; @@ -818,10 +762,9 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg) const framing::MessageProperties* p = msg.getFrames().getHeaders()->get<framing::MessageProperties>(); - if (p && p->hasReplyTo()) - { - const framing::ReplyTo& rt = p->getReplyTo (); - replyToKey = rt.getRoutingKey (); + if (p && p->hasReplyTo()) { + const framing::ReplyTo& rt = p->getReplyTo(); + replyToKey = rt.getRoutingKey(); } else return; @@ -832,10 +775,10 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg) return; } - msg.encodeContent (inBuffer); - inBuffer.reset (); + msg.encodeContent(inBuffer); + inBuffer.reset(); - if (!CheckHeader (inBuffer, &opcode, &sequence)) + if (!CheckHeader(inBuffer, &opcode, &sequence)) return; if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence); @@ -847,6 +790,7 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg) else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence); else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence); else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence); + else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence); } ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name) diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.h b/qpid/cpp/src/qpid/management/ManagementBroker.h index 447720fb5e..89ea80b3b2 100644 --- a/qpid/cpp/src/qpid/management/ManagementBroker.h +++ b/qpid/cpp/src/qpid/management/ManagementBroker.h @@ -59,7 +59,7 @@ class ManagementBroker : public ManagementAgent uint32_t persistId = 0, uint32_t persistBank = 4); void clientAdded (void); - void dispatchCommand (broker::Deliverable& msg, + bool dispatchCommand (broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); @@ -177,9 +177,6 @@ class ManagementBroker : public ManagementAgent std::string routingKey); void moveNewObjectsLH(); - void dispatchMethodLH (broker::Message& msg, - const std::string& routingKey, - size_t first); void dispatchAgentCommandLH (broker::Message& msg); PackageMap::iterator FindOrAddPackageLH(std::string name); @@ -206,6 +203,7 @@ class ManagementBroker : public ManagementAgent void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); size_t ValidateSchema(framing::Buffer&); }; diff --git a/qpid/cpp/src/qpid/management/ManagementExchange.cpp b/qpid/cpp/src/qpid/management/ManagementExchange.cpp index b4824549ed..4ccf8e68c9 100644 --- a/qpid/cpp/src/qpid/management/ManagementExchange.cpp +++ b/qpid/cpp/src/qpid/management/ManagementExchange.cpp @@ -40,17 +40,16 @@ void ManagementExchange::route (Deliverable& msg, const string& routingKey, const FieldTable* args) { + bool routeIt = true; + // Intercept management agent commands - if ((routingKey.length () > 6 && - routingKey.substr (0, 6).compare ("agent.") == 0) || - (routingKey.length () == 5 && - routingKey.substr (0, 5).compare ("agent") == 0)) - { - managementAgent->dispatchCommand (msg, routingKey, args); - return; - } + if ((routingKey.length() > 6 && + routingKey.substr(0, 6).compare("agent.") == 0) || + (routingKey == "broker")) + routeIt = managementAgent->dispatchCommand(msg, routingKey, args); - TopicExchange::route (msg, routingKey, args); + if (routeIt) + TopicExchange::route(msg, routingKey, args); } bool ManagementExchange::bind (Queue::shared_ptr queue, diff --git a/qpid/cpp/src/qpid/management/ManagementObject.h b/qpid/cpp/src/qpid/management/ManagementObject.h index 66adabf035..ce3051367d 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.h +++ b/qpid/cpp/src/qpid/management/ManagementObject.h @@ -98,8 +98,8 @@ class ManagementObject qpid::framing::Buffer& outBuf) = 0; virtual void setReference (uint64_t objectId); - virtual std::string getClassName (void) = 0; - virtual std::string getPackageName (void) = 0; + virtual std::string& getClassName (void) = 0; + virtual std::string& getPackageName (void) = 0; virtual uint8_t* getMd5Sum (void) = 0; void setObjectId (uint64_t oid) { objectId = oid; } |