diff options
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp | 97 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementBroker.cpp | 176 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementBroker.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementExchange.cpp | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementObject.h | 4 |
7 files changed, 152 insertions, 155 deletions
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 5cff0fcd3c..85f13ba15d 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -67,16 +67,21 @@ ManagementAgent* ManagementAgent::Singleton::getInstance() } ManagementAgentImpl::ManagementAgentImpl() : - clientWasAdded(true), objIdPrefix(0), bgThread(*this), thread(bgThread) + clientWasAdded(true), objIdPrefix(0), bgThread(*this), thread(bgThread), startupWait(false) { // TODO: Establish system ID } -void ManagementAgentImpl::init (std::string brokerHost, - uint16_t brokerPort, - uint16_t intervalSeconds, - bool useExternalThread) +void ManagementAgentImpl::init(std::string brokerHost, + uint16_t brokerPort, + uint16_t intervalSeconds, + bool useExternalThread) { + { + Mutex::ScopedLock lock(agentLock); + startupWait = true; + } + interval = intervalSeconds; extThread = useExternalThread; nextObjectId = 1; @@ -92,17 +97,17 @@ void ManagementAgentImpl::init (std::string brokerHost, session.queueDeclare (arg::queue=queueName.str()); session.exchangeBind (arg::exchange="amq.direct", arg::queue=queueName.str(), - arg::bindingKey=queueName.str ()); + arg::bindingKey=queueName.str()); session.messageSubscribe (arg::queue=queueName.str(), arg::destination=dest); session.messageFlow (arg::destination=dest, arg::unit=0, arg::value=0xFFFFFFFF); session.messageFlow (arg::destination=dest, arg::unit=1, arg::value=0xFFFFFFFF); Message attachRequest; - char rawbuffer[512]; // TODO: Modify Buffer so it can use stringstream + char rawbuffer[512]; Buffer buffer (rawbuffer, 512); - attachRequest.getDeliveryProperties().setRoutingKey("agent"); + attachRequest.getDeliveryProperties().setRoutingKey("broker"); attachRequest.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); EncodeHeader (buffer, 'A'); @@ -115,15 +120,22 @@ void ManagementAgentImpl::init (std::string brokerHost, string stringBuffer (rawbuffer, length); attachRequest.setData (stringBuffer); - session.messageTransfer (arg::content=attachRequest, arg::destination="qpid.management"); + session.messageTransfer(arg::content=attachRequest, arg::destination="qpid.management"); - dispatcher->listen (dest, this); - dispatcher->start (); + dispatcher->listen(dest, this); + dispatcher->start(); + + { + Mutex::ScopedLock lock(agentLock); + if (startupWait) + startupCond.wait(agentLock); + } } -ManagementAgentImpl::~ManagementAgentImpl () +ManagementAgentImpl::~ManagementAgentImpl() { - dispatcher->stop (); + dispatcher->stop(); + session.close(); delete dispatcher; } @@ -151,24 +163,33 @@ uint64_t ManagementAgentImpl::addObject (ManagementObject* object, return objectId; } -uint32_t ManagementAgentImpl::pollCallbacks (uint32_t /*callLimit*/) +uint32_t ManagementAgentImpl::pollCallbacks(uint32_t /*callLimit*/) { return 0; } -int ManagementAgentImpl::getSignalFd (void) +int ManagementAgentImpl::getSignalFd(void) { return -1; } -void ManagementAgentImpl::handleAttachResponse (Buffer& inBuffer) +void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer) { Mutex::ScopedLock lock(agentLock); uint32_t assigned; + stringstream key; assigned = inBuffer.getLong(); objIdPrefix = ((uint64_t) assigned) << 24; + startupWait = false; + startupCond.notify(); + + // Bind to qpid.management to receive commands + key << "agent." << assigned; + session.exchangeBind (arg::exchange="qpid.management", arg::queue=queueName.str(), + arg::bindingKey=key.str()); + // Send package indications for all local packages for (PackageMap::iterator pIter = packages.begin(); pIter != packages.end(); @@ -180,7 +201,7 @@ void ManagementAgentImpl::handleAttachResponse (Buffer& inBuffer) EncodePackageIndication(outBuffer, pIter); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset(); - SendBuffer(outBuffer, outLen, "qpid.management", "agent"); + SendBuffer(outBuffer, outLen, "qpid.management", "broker"); // Send class indications for all local classes ClassMap cMap = pIter->second; @@ -190,7 +211,7 @@ void ManagementAgentImpl::handleAttachResponse (Buffer& inBuffer) EncodeClassIndication(outBuffer, pIter, cIter); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset(); - SendBuffer(outBuffer, outLen, "qpid.management", "agent"); + SendBuffer(outBuffer, outLen, "qpid.management", "broker"); } } } @@ -218,7 +239,7 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc schema.writeSchemaCall(outBuffer); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset(); - SendBuffer(outBuffer, outLen, "qpid.management", "agent"); + SendBuffer(outBuffer, outLen, "qpid.management", "broker"); } } } @@ -229,18 +250,50 @@ void ManagementAgentImpl::handleConsoleAddedIndication() clientWasAdded = true; } -void ManagementAgentImpl::received (Message& msg) +void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo) +{ + string methodName; + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + uint64_t objId = inBuffer.getLongLong(); + inBuffer.getShortString(methodName); + + EncodeHeader(outBuffer, 'm', sequence); + + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter == managementObjects.end() || iter->second->isDeleted()) { + outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); + outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); + } else { + iter->second->doMethod(methodName, inBuffer, outBuffer); + } + + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, "amq.direct", replyTo); +} + +void ManagementAgentImpl::received(Message& msg) { - string data = msg.getData (); - Buffer inBuffer (const_cast<char*>(data.c_str()), data.size()); + string data = msg.getData(); + Buffer inBuffer(const_cast<char*>(data.c_str()), data.size()); uint8_t opcode; uint32_t sequence; + string replyToKey; + + framing::MessageProperties p = msg.getMessageProperties(); + if (p.hasReplyTo()) { + const framing::ReplyTo& rt = p.getReplyTo(); + replyToKey = rt.getRoutingKey(); + } if (CheckHeader (inBuffer, &opcode, &sequence)) { if (opcode == 'a') handleAttachResponse(inBuffer); else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence); else if (opcode == 'x') handleConsoleAddedIndication(); + else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey); } } diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h index 2ecf63cd5d..f7f19e145d 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -30,6 +30,7 @@ #include "qpid/sys/Thread.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Mutex.h" +#include "qpid/sys/Condition.h" #include "qpid/framing/Uuid.h" #include <iostream> #include <sstream> @@ -127,6 +128,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen BackgroundThread bgThread; sys::Thread thread; + sys::Condition startupCond; + bool startupWait; PackageMap::iterator FindOrAddPackage (std::string name); void moveNewObjectsLH(); @@ -149,6 +152,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen void handlePackageRequest (qpid::framing::Buffer& inBuffer); void handleClassQuery (qpid::framing::Buffer& inBuffer); void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence); + void handleMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); void handleConsoleAddedIndication(); }; diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 9cbf86ebd3..05b759f695 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -335,8 +335,7 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args return Manageable::STATUS_OK; case management::Link::METHOD_BRIDGE : - management::ArgsLinkBridge iargs = - dynamic_cast<const management::ArgsLinkBridge&>(args); + management::ArgsLinkBridge& iargs = (management::ArgsLinkBridge&) args; // Durable bridges are only valid on durable links if (iargs.i_durable && !durable) diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.cpp b/qpid/cpp/src/qpid/management/ManagementBroker.cpp index f66b34c43c..223811ebc2 100644 --- a/qpid/cpp/src/qpid/management/ManagementBroker.cpp +++ b/qpid/cpp/src/qpid/management/ManagementBroker.cpp @@ -97,6 +97,7 @@ ManagementBroker::~ManagementBroker () // objects that will be invalid. dExchange.reset(); mExchange.reset(); + timer.stop(); moveNewObjectsLH(); for (ManagementObjectMap::iterator iter = managementObjects.begin (); @@ -117,33 +118,33 @@ void ManagementBroker::configure(string _dataDir, uint16_t _interval, Manageable timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval))); // Get from file or generate and save to file. - if (dataDir.empty ()) + if (dataDir.empty()) { - uuid.generate (); + uuid.generate(); QPID_LOG (info, "ManagementBroker has no data directory, generated new broker ID: " << uuid); } else { - string filename (dataDir + "/.mbrokerdata"); - ifstream inFile (filename.c_str ()); + string filename(dataDir + "/.mbrokerdata"); + ifstream inFile(filename.c_str ()); - if (inFile.good ()) + if (inFile.good()) { inFile >> uuid; inFile >> bootSequence; inFile >> nextRemoteBank; - inFile.close (); + inFile.close(); QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid); bootSequence++; - writeData (); + writeData(); } else { - uuid.generate (); + uuid.generate(); QPID_LOG (info, "ManagementBroker generated broker ID: " << uuid); - writeData (); + writeData(); } QPID_LOG (debug, "ManagementBroker boot sequence: " << bootSequence); @@ -155,10 +156,10 @@ void ManagementBroker::writeData () string filename (dataDir + "/.mbrokerdata"); ofstream outFile (filename.c_str ()); - if (outFile.good ()) + if (outFile.good()) { outFile << uuid << " " << bootSequence << " " << nextRemoteBank << endl; - outFile.close (); + outFile.close(); } } @@ -174,7 +175,7 @@ void ManagementBroker::RegisterClass (string packageName, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall) { - Mutex::ScopedLock lock (userLock); + Mutex::ScopedLock lock(userLock); PackageMap::iterator pIter = FindOrAddPackageLH(packageName); AddClass(pIter, className, md5Sum, schemaCall); } @@ -391,124 +392,64 @@ void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence SendBuffer (outBuffer, outLen, dExchange, replyToKey); } -void ManagementBroker::dispatchCommand (Deliverable& deliverable, +bool ManagementBroker::dispatchCommand (Deliverable& deliverable, const string& routingKey, const FieldTable* /*args*/) { Mutex::ScopedLock lock (userLock); Message& msg = ((DeliverableMessage&) deliverable).getMessage (); - if (routingKey.compare (0, 13, "agent.method.") == 0) - dispatchMethodLH (msg, routingKey, 13); + // Parse the routing key. This management broker should act as though it + // is bound to the exchange to match the following keys: + // + // agent.<X>.# + // broker.# + // + // where <X> is any non-negative decimal integer less than the lowest remote + // object-id bank. - else if (routingKey.length () == 5 && - routingKey.compare (0, 5, "agent") == 0) + if (routingKey == "broker") { dispatchAgentCommandLH (msg); - - else - { - QPID_LOG (debug, "Illegal routing key for dispatch: " << routingKey); - return; - } -} - -void ManagementBroker::dispatchMethodLH (Message& msg, - const string& routingKey, - size_t first) -{ - size_t pos, start = first; - uint32_t contentSize; - - if (routingKey.length () == start) - { - QPID_LOG (debug, "Missing package-name in routing key: " << routingKey); - return; - } - - pos = routingKey.find ('.', start); - if (pos == string::npos || routingKey.length () == pos + 1) - { - QPID_LOG (debug, "Missing class-name in routing key: " << routingKey); - return; + return false; } - string packageName = routingKey.substr (start, pos - start); - - start = pos + 1; - pos = routingKey.find ('.', start); - if (pos == string::npos || routingKey.length () == pos + 1) - { - QPID_LOG (debug, "Missing method-name in routing key: " << routingKey); - return; + else if (routingKey.compare(0, 6, "agent.") == 0) { + uint32_t delim = routingKey.find('.', 6); + if (delim == string::npos) + delim = routingKey.length(); + string bank = routingKey.substr(6, delim - 6); + if ((uint32_t) atoi(bank.c_str()) <= localBank) { + dispatchAgentCommandLH (msg); + return false; + } } - string className = routingKey.substr (start, pos - start); - - start = pos + 1; - string methodName = routingKey.substr (start, routingKey.length () - start); - - contentSize = msg.encodedContentSize (); - if (contentSize < 8 || contentSize > MA_BUFFER_SIZE) - return; + return true; +} - Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); +void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +{ + string methodName; Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen, sequence; - uint8_t opcode; - - if (msg.encodedSize() > MA_BUFFER_SIZE) { - QPID_LOG(debug, "ManagementBroker::dispatchMethodLH: Message too large: " << - msg.encodedSize()); - return; - } - - msg.encodeContent (inBuffer); - inBuffer.reset (); - - if (!CheckHeader (inBuffer, &opcode, &sequence)) - { - QPID_LOG (debug, " Invalid content header"); - return; - } - - if (opcode != 'M') - { - QPID_LOG (debug, " Unexpected opcode " << opcode); - return; - } - - uint64_t objId = inBuffer.getLongLong (); - string replyToKey; + uint32_t outLen; - const framing::MessageProperties* p = - msg.getFrames().getHeaders()->get<framing::MessageProperties>(); - if (p && p->hasReplyTo()) - { - const framing::ReplyTo& rt = p->getReplyTo (); - replyToKey = rt.getRoutingKey (); - } - else - { - QPID_LOG (debug, " Reply-to missing"); - return; - } + uint64_t objId = inBuffer.getLongLong(); + inBuffer.getShortString(methodName); - EncodeHeader (outBuffer, 'm', sequence); + std::cout << "ManagementBroker::handleMethodRequest (" << objId << ", " << methodName << ")" << std::endl; + EncodeHeader(outBuffer, 'm', sequence); - ManagementObjectMap::iterator iter = managementObjects.find (objId); - if (iter == managementObjects.end () || iter->second->isDeleted ()) - { + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter == managementObjects.end() || iter->second->isDeleted()) { outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); - } - else - { - iter->second->doMethod (methodName, inBuffer, outBuffer); + } else { + iter->second->doMethod(methodName, inBuffer, outBuffer); } - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, dExchange, replyToKey); } void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence) @@ -737,6 +678,8 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe requestedBank = inBuffer.getLong (); assignedBank = assignBankLH (requestedBank); + // TODO: Make a pass over the agents and delete any that no longer have a session. + RemoteAgentMap::iterator aIter = remoteAgents.find (sessionName); if (aIter != remoteAgents.end()) { @@ -755,6 +698,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe agent->mgmtObject->set_label (label); agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); agent->mgmtObject->set_systemId (systemId); + agent->mgmtObject->set_objectIdBank (assignedBank); addObject (agent->mgmtObject); remoteAgents[sessionName] = agent; @@ -818,10 +762,9 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg) const framing::MessageProperties* p = msg.getFrames().getHeaders()->get<framing::MessageProperties>(); - if (p && p->hasReplyTo()) - { - const framing::ReplyTo& rt = p->getReplyTo (); - replyToKey = rt.getRoutingKey (); + if (p && p->hasReplyTo()) { + const framing::ReplyTo& rt = p->getReplyTo(); + replyToKey = rt.getRoutingKey(); } else return; @@ -832,10 +775,10 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg) return; } - msg.encodeContent (inBuffer); - inBuffer.reset (); + msg.encodeContent(inBuffer); + inBuffer.reset(); - if (!CheckHeader (inBuffer, &opcode, &sequence)) + if (!CheckHeader(inBuffer, &opcode, &sequence)) return; if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence); @@ -847,6 +790,7 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg) else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence); else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence); else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence); + else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence); } ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name) diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.h b/qpid/cpp/src/qpid/management/ManagementBroker.h index 447720fb5e..89ea80b3b2 100644 --- a/qpid/cpp/src/qpid/management/ManagementBroker.h +++ b/qpid/cpp/src/qpid/management/ManagementBroker.h @@ -59,7 +59,7 @@ class ManagementBroker : public ManagementAgent uint32_t persistId = 0, uint32_t persistBank = 4); void clientAdded (void); - void dispatchCommand (broker::Deliverable& msg, + bool dispatchCommand (broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); @@ -177,9 +177,6 @@ class ManagementBroker : public ManagementAgent std::string routingKey); void moveNewObjectsLH(); - void dispatchMethodLH (broker::Message& msg, - const std::string& routingKey, - size_t first); void dispatchAgentCommandLH (broker::Message& msg); PackageMap::iterator FindOrAddPackageLH(std::string name); @@ -206,6 +203,7 @@ class ManagementBroker : public ManagementAgent void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); size_t ValidateSchema(framing::Buffer&); }; diff --git a/qpid/cpp/src/qpid/management/ManagementExchange.cpp b/qpid/cpp/src/qpid/management/ManagementExchange.cpp index b4824549ed..4ccf8e68c9 100644 --- a/qpid/cpp/src/qpid/management/ManagementExchange.cpp +++ b/qpid/cpp/src/qpid/management/ManagementExchange.cpp @@ -40,17 +40,16 @@ void ManagementExchange::route (Deliverable& msg, const string& routingKey, const FieldTable* args) { + bool routeIt = true; + // Intercept management agent commands - if ((routingKey.length () > 6 && - routingKey.substr (0, 6).compare ("agent.") == 0) || - (routingKey.length () == 5 && - routingKey.substr (0, 5).compare ("agent") == 0)) - { - managementAgent->dispatchCommand (msg, routingKey, args); - return; - } + if ((routingKey.length() > 6 && + routingKey.substr(0, 6).compare("agent.") == 0) || + (routingKey == "broker")) + routeIt = managementAgent->dispatchCommand(msg, routingKey, args); - TopicExchange::route (msg, routingKey, args); + if (routeIt) + TopicExchange::route(msg, routingKey, args); } bool ManagementExchange::bind (Queue::shared_ptr queue, diff --git a/qpid/cpp/src/qpid/management/ManagementObject.h b/qpid/cpp/src/qpid/management/ManagementObject.h index 66adabf035..ce3051367d 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.h +++ b/qpid/cpp/src/qpid/management/ManagementObject.h @@ -98,8 +98,8 @@ class ManagementObject qpid::framing::Buffer& outBuf) = 0; virtual void setReference (uint64_t objectId); - virtual std::string getClassName (void) = 0; - virtual std::string getPackageName (void) = 0; + virtual std::string& getClassName (void) = 0; + virtual std::string& getPackageName (void) = 0; virtual uint8_t* getMd5Sum (void) = 0; void setObjectId (uint64_t oid) { objectId = oid; } |