diff options
author | Ted Ross <tross@apache.org> | 2008-08-01 18:36:25 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-08-01 18:36:25 +0000 |
commit | b272cc92fa7a4bcd49fb8da50b93bfb4d015fda7 (patch) | |
tree | 1fbdc79f022c79ed5972fdf4b8994ce092f3b181 /cpp/src | |
parent | 978db5706dc7930325362fc662c2ae6941b1faee (diff) | |
download | qpid-python-b272cc92fa7a4bcd49fb8da50b93bfb4d015fda7.tar.gz |
QPID-1174 - Clean up agent objects when the remote agent disconnects
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@681773 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementBroker.cpp | 70 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementBroker.h | 9 |
3 files changed, 57 insertions, 23 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 85f13ba15d..ebdc71e3b1 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -112,7 +112,6 @@ void ManagementAgentImpl::init(std::string brokerHost, EncodeHeader (buffer, 'A'); buffer.putShortString ("RemoteAgent [C++]"); - buffer.putShortString (queueName.str()); systemId.encode (buffer); buffer.putLong (11); diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp index 15263b5f2a..ec8e6fe436 100644 --- a/cpp/src/qpid/management/ManagementBroker.cpp +++ b/cpp/src/qpid/management/ManagementBroker.cpp @@ -26,6 +26,7 @@ #include <qpid/broker/MessageDelivery.h> #include "qpid/framing/MessageTransferBody.h" #include "qpid/sys/Time.h" +#include "qpid/broker/ConnectionState.h" #include <list> #include <iostream> #include <fstream> @@ -375,7 +376,10 @@ void ManagementBroker::PeriodicProcessing (void) iter++) managementObjects.erase (*iter); - deleteList.clear (); + if (!deleteList.empty()) { + deleteList.clear(); + deleteOrphanedAgentsLH(); + } } void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence, @@ -664,44 +668,72 @@ uint32_t ManagementBroker::assignBankLH (uint32_t requestedBank) return requestedBank; } -void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +void ManagementBroker::deleteOrphanedAgentsLH() +{ + vector<uint64_t> deleteList; + + for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); aIter++) { + uint64_t connectionRef = aIter->first; + bool found = false; + + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) { + if (iter->first == connectionRef && !iter->second->isDeleted()) { + found = true; + break; + } + } + + if (!found) { + deleteList.push_back(connectionRef); + delete aIter->second; + } + } + + for (vector<uint64_t>::iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++) { + + remoteAgents.erase(*dIter); + } + + deleteList.clear(); +} + +void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken) { string label; uint32_t requestedBank; uint32_t assignedBank; - string sessionName; + uint64_t connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId(); Uuid systemId; - inBuffer.getShortString (label); - inBuffer.getShortString (sessionName); - systemId.decode (inBuffer); - requestedBank = inBuffer.getLong (); - assignedBank = assignBankLH (requestedBank); - - // TODO: Make a pass over the agents and delete any that no longer have a session. - - RemoteAgentMap::iterator aIter = remoteAgents.find (sessionName); - if (aIter != remoteAgents.end()) - { + moveNewObjectsLH(); + deleteOrphanedAgentsLH(); + RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef); + if (aIter != remoteAgents.end()) { // There already exists an agent on this session. Reject the request. - sendCommandComplete (replyToKey, sequence, 1, "Session already has remote agent"); + sendCommandComplete (replyToKey, sequence, 1, "Connection already has remote agent"); return; } - // TODO: Reject requests for which the session name does not match an existing session. + inBuffer.getShortString (label); + systemId.decode (inBuffer); + requestedBank = inBuffer.getLong (); + assignedBank = assignBankLH (requestedBank); RemoteAgent* agent = new RemoteAgent; agent->objIdBank = assignedBank; agent->routingKey = replyToKey; + agent->connectionRef = connectionRef; agent->mgmtObject = new management::Agent (this, agent); - agent->mgmtObject->set_sessionName (sessionName); + agent->mgmtObject->set_connectionRef(agent->connectionRef); agent->mgmtObject->set_label (label); agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); agent->mgmtObject->set_systemId (systemId); agent->mgmtObject->set_objectIdBank (assignedBank); addObject (agent->mgmtObject); - remoteAgents[sessionName] = agent; + remoteAgents[connectionRef] = agent; // Send an Attach Response Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); @@ -788,7 +820,7 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg) else if (opcode == 'q') handleClassIndLH (inBuffer, replyToKey, sequence); else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence); else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence); - else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence); + else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher()); else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence); else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence); } diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementBroker.h index 89ea80b3b2..151926f526 100644 --- a/cpp/src/qpid/management/ManagementBroker.h +++ b/cpp/src/qpid/management/ManagementBroker.h @@ -26,6 +26,7 @@ #include "qpid/broker/Timer.h" #include "qpid/framing/Uuid.h" #include "qpid/sys/Mutex.h" +#include "qpid/broker/ConnectionToken.h" #include "qpid/agent/ManagementAgent.h" #include "ManagementObject.h" #include "Manageable.h" @@ -87,6 +88,7 @@ class ManagementBroker : public ManagementAgent { uint32_t objIdBank; std::string routingKey; + uint64_t connectionRef; Agent* mgmtObject; ManagementObject* GetManagementObject (void) const { return mgmtObject; } virtual ~RemoteAgent (); @@ -95,8 +97,8 @@ class ManagementBroker : public ManagementAgent // TODO: Eventually replace string with entire reply-to structure. reply-to // currently assumes that the exchange is "amq.direct" even though it could // in theory be specified differently. - typedef std::map<std::string, RemoteAgent*> RemoteAgentMap; - typedef std::vector<std::string> ReplyToVector; + typedef std::map<uint64_t, RemoteAgent*> RemoteAgentMap; + typedef std::vector<std::string> ReplyToVector; // Storage for known schema classes: // @@ -192,6 +194,7 @@ class ManagementBroker : public ManagementAgent bool bankInUse (uint32_t bank); uint32_t allocateNewBank (); uint32_t assignBankLH (uint32_t requestedPrefix); + void deleteOrphanedAgentsLH(); void sendCommandComplete (std::string replyToKey, uint32_t sequence, uint32_t code = 0, std::string text = std::string("OK")); void handleBrokerRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); @@ -201,7 +204,7 @@ class ManagementBroker : public ManagementAgent void handleClassIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); void handleSchemaRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const broker::ConnectionToken* connToken); void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); |