summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qmf/BrokerProxyImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qmf/BrokerProxyImpl.cpp')
-rw-r--r--qpid/cpp/src/qmf/BrokerProxyImpl.cpp44
1 files changed, 37 insertions, 7 deletions
diff --git a/qpid/cpp/src/qmf/BrokerProxyImpl.cpp b/qpid/cpp/src/qmf/BrokerProxyImpl.cpp
index 29e51566b3..a66fa24834 100644
--- a/qpid/cpp/src/qmf/BrokerProxyImpl.cpp
+++ b/qpid/cpp/src/qmf/BrokerProxyImpl.cpp
@@ -113,7 +113,7 @@ void BrokerProxyImpl::startProtocol()
char rawbuffer[512];
Buffer buffer(rawbuffer, 512);
- agentList.push_back(AgentProxyPtr(AgentProxyImpl::factory(console, publicObject, 0, "Agent embedded in broker")));
+ agentList[0] = AgentProxyPtr(AgentProxyImpl::factory(console, publicObject, 0, "Agent embedded in broker"));
requestsOutstanding = 1;
topicBound = false;
@@ -189,10 +189,10 @@ uint32_t BrokerProxyImpl::agentCount() const
const AgentProxy* BrokerProxyImpl::getAgent(uint32_t idx) const
{
Mutex::ScopedLock _lock(lock);
- for (vector<AgentProxyPtr>::const_iterator iter = agentList.begin();
+ for (map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.begin();
iter != agentList.end(); iter++)
if (idx-- == 0)
- return iter->get();
+ return iter->second.get();
return 0;
}
@@ -204,9 +204,9 @@ void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentPr
sendGetRequestLH(queryContext, query, agent);
} else {
// TODO (optimization) only send queries to agents that have the requested class+package
- for (vector<AgentProxyPtr>::const_iterator iter = agentList.begin();
+ for (map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.begin();
iter != agentList.end(); iter++) {
- sendGetRequestLH(queryContext, query, (*iter).get());
+ sendGetRequestLH(queryContext, query, iter->second.get());
}
}
}
@@ -317,6 +317,7 @@ BrokerEventImpl::Ptr BrokerProxyImpl::eventSetupComplete()
BrokerEventImpl::Ptr BrokerProxyImpl::eventStable()
{
+ QPID_LOG(trace, "Console Link to Broker Stable");
BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::STABLE));
return event;
}
@@ -432,7 +433,7 @@ void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq)
// request for the current list of agents so we can have it on-hand before we declare
// this session "stable".
//
- if (key->getClassName() == AGENT_CLASS && key->getPackageName() == BROKER_PACKAGE) {
+ if (key->impl->getClassName() == AGENT_CLASS && key->impl->getPackageName() == BROKER_PACKAGE) {
Mutex::ScopedLock _lock(lock);
incOutstandingLH();
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
@@ -467,7 +468,36 @@ ObjectPtr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq
return ObjectPtr();
}
- return ObjectPtr(ObjectImpl::factory(schema, this, inBuffer, prop, stat, true));
+ ObjectPtr optr(ObjectImpl::factory(schema, this, inBuffer, prop, stat, true));
+ if (prop && classKey->impl->getPackageName() == BROKER_PACKAGE && classKey->impl->getClassName() == AGENT_CLASS) {
+ //
+ // We've intercepted information about a remote agent... update the agent list accordingly
+ //
+ updateAgentList(optr);
+ }
+ return optr;
+}
+
+void BrokerProxyImpl::updateAgentList(ObjectPtr obj)
+{
+ Value* value = obj->getValue("agentBank");
+ if (value != 0 && value->isUint()) {
+ uint32_t agentBank = value->asUint();
+ if (obj->isDeleted()) {
+ agentList.erase(agentBank);
+ QPID_LOG(trace, "Agent at bank " << agentBank << " removed from agent list");
+ } else {
+ Value* str = obj->getValue("label");
+ string label;
+ if (str != 0 && str->isString())
+ label = str->asString();
+ map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.find(agentBank);
+ if (iter == agentList.end()) {
+ agentList[agentBank] = AgentProxyPtr(AgentProxyImpl::factory(console, publicObject, agentBank, label));
+ QPID_LOG(trace, "Agent '" << label << "' found at bank " << agentBank);
+ }
+ }
+ }
}
void BrokerProxyImpl::incOutstandingLH()