summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-08-01 18:36:25 +0000
committerTed Ross <tross@apache.org>2008-08-01 18:36:25 +0000
commitb272cc92fa7a4bcd49fb8da50b93bfb4d015fda7 (patch)
tree1fbdc79f022c79ed5972fdf4b8994ce092f3b181 /cpp/src
parent978db5706dc7930325362fc662c2ae6941b1faee (diff)
downloadqpid-python-b272cc92fa7a4bcd49fb8da50b93bfb4d015fda7.tar.gz
QPID-1174 - Clean up agent objects when the remote agent disconnects
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@681773 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.cpp1
-rw-r--r--cpp/src/qpid/management/ManagementBroker.cpp70
-rw-r--r--cpp/src/qpid/management/ManagementBroker.h9
3 files changed, 57 insertions, 23 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 85f13ba15d..ebdc71e3b1 100644
--- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -112,7 +112,6 @@ void ManagementAgentImpl::init(std::string brokerHost,
EncodeHeader (buffer, 'A');
buffer.putShortString ("RemoteAgent [C++]");
- buffer.putShortString (queueName.str());
systemId.encode (buffer);
buffer.putLong (11);
diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp
index 15263b5f2a..ec8e6fe436 100644
--- a/cpp/src/qpid/management/ManagementBroker.cpp
+++ b/cpp/src/qpid/management/ManagementBroker.cpp
@@ -26,6 +26,7 @@
#include <qpid/broker/MessageDelivery.h>
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/sys/Time.h"
+#include "qpid/broker/ConnectionState.h"
#include <list>
#include <iostream>
#include <fstream>
@@ -375,7 +376,10 @@ void ManagementBroker::PeriodicProcessing (void)
iter++)
managementObjects.erase (*iter);
- deleteList.clear ();
+ if (!deleteList.empty()) {
+ deleteList.clear();
+ deleteOrphanedAgentsLH();
+ }
}
void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence,
@@ -664,44 +668,72 @@ uint32_t ManagementBroker::assignBankLH (uint32_t requestedBank)
return requestedBank;
}
-void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+void ManagementBroker::deleteOrphanedAgentsLH()
+{
+ vector<uint64_t> deleteList;
+
+ for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); aIter++) {
+ uint64_t connectionRef = aIter->first;
+ bool found = false;
+
+ for (ManagementObjectMap::iterator iter = managementObjects.begin ();
+ iter != managementObjects.end ();
+ iter++) {
+ if (iter->first == connectionRef && !iter->second->isDeleted()) {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found) {
+ deleteList.push_back(connectionRef);
+ delete aIter->second;
+ }
+ }
+
+ for (vector<uint64_t>::iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++) {
+
+ remoteAgents.erase(*dIter);
+ }
+
+ deleteList.clear();
+}
+
+void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken)
{
string label;
uint32_t requestedBank;
uint32_t assignedBank;
- string sessionName;
+ uint64_t connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId();
Uuid systemId;
- inBuffer.getShortString (label);
- inBuffer.getShortString (sessionName);
- systemId.decode (inBuffer);
- requestedBank = inBuffer.getLong ();
- assignedBank = assignBankLH (requestedBank);
-
- // TODO: Make a pass over the agents and delete any that no longer have a session.
-
- RemoteAgentMap::iterator aIter = remoteAgents.find (sessionName);
- if (aIter != remoteAgents.end())
- {
+ moveNewObjectsLH();
+ deleteOrphanedAgentsLH();
+ RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef);
+ if (aIter != remoteAgents.end()) {
// There already exists an agent on this session. Reject the request.
- sendCommandComplete (replyToKey, sequence, 1, "Session already has remote agent");
+ sendCommandComplete (replyToKey, sequence, 1, "Connection already has remote agent");
return;
}
- // TODO: Reject requests for which the session name does not match an existing session.
+ inBuffer.getShortString (label);
+ systemId.decode (inBuffer);
+ requestedBank = inBuffer.getLong ();
+ assignedBank = assignBankLH (requestedBank);
RemoteAgent* agent = new RemoteAgent;
agent->objIdBank = assignedBank;
agent->routingKey = replyToKey;
+ agent->connectionRef = connectionRef;
agent->mgmtObject = new management::Agent (this, agent);
- agent->mgmtObject->set_sessionName (sessionName);
+ agent->mgmtObject->set_connectionRef(agent->connectionRef);
agent->mgmtObject->set_label (label);
agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
agent->mgmtObject->set_systemId (systemId);
agent->mgmtObject->set_objectIdBank (assignedBank);
addObject (agent->mgmtObject);
- remoteAgents[sessionName] = agent;
+ remoteAgents[connectionRef] = agent;
// Send an Attach Response
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
@@ -788,7 +820,7 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg)
else if (opcode == 'q') handleClassIndLH (inBuffer, replyToKey, sequence);
else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence);
else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher());
else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence);
else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence);
}
diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementBroker.h
index 89ea80b3b2..151926f526 100644
--- a/cpp/src/qpid/management/ManagementBroker.h
+++ b/cpp/src/qpid/management/ManagementBroker.h
@@ -26,6 +26,7 @@
#include "qpid/broker/Timer.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/broker/ConnectionToken.h"
#include "qpid/agent/ManagementAgent.h"
#include "ManagementObject.h"
#include "Manageable.h"
@@ -87,6 +88,7 @@ class ManagementBroker : public ManagementAgent
{
uint32_t objIdBank;
std::string routingKey;
+ uint64_t connectionRef;
Agent* mgmtObject;
ManagementObject* GetManagementObject (void) const { return mgmtObject; }
virtual ~RemoteAgent ();
@@ -95,8 +97,8 @@ class ManagementBroker : public ManagementAgent
// TODO: Eventually replace string with entire reply-to structure. reply-to
// currently assumes that the exchange is "amq.direct" even though it could
// in theory be specified differently.
- typedef std::map<std::string, RemoteAgent*> RemoteAgentMap;
- typedef std::vector<std::string> ReplyToVector;
+ typedef std::map<uint64_t, RemoteAgent*> RemoteAgentMap;
+ typedef std::vector<std::string> ReplyToVector;
// Storage for known schema classes:
//
@@ -192,6 +194,7 @@ class ManagementBroker : public ManagementAgent
bool bankInUse (uint32_t bank);
uint32_t allocateNewBank ();
uint32_t assignBankLH (uint32_t requestedPrefix);
+ void deleteOrphanedAgentsLH();
void sendCommandComplete (std::string replyToKey, uint32_t sequence,
uint32_t code = 0, std::string text = std::string("OK"));
void handleBrokerRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
@@ -201,7 +204,7 @@ class ManagementBroker : public ManagementAgent
void handleClassIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
void handleSchemaRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const broker::ConnectionToken* connToken);
void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);