summaryrefslogtreecommitdiff
path: root/cpp/src/qmf/ConsoleSession.cpp
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-09-21 21:48:41 +0000
committerTed Ross <tross@apache.org>2010-09-21 21:48:41 +0000
commit3cfbdf0e60c94733c0a79e94bdf8627afc6bb2a4 (patch)
treee33d57bca9a2c2275e76f882484ac3ea913e83fd /cpp/src/qmf/ConsoleSession.cpp
parent449ab0f1062c0eac0234f84556de60436ba2ee9d (diff)
downloadqpid-python-3cfbdf0e60c94733c0a79e94bdf8627afc6bb2a4.tar.gz
QMFv2 Additions:
- QMFv2 schema encoding completed - Schema queries handled by the agent and initiated by the console by user request - Full query support with predicates evaluated on the agent (regex not yet implemented) - Agent filtering in the console - Agent aging in the console - Unit tests git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@999662 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qmf/ConsoleSession.cpp')
-rw-r--r--cpp/src/qmf/ConsoleSession.cpp172
1 files changed, 136 insertions, 36 deletions
diff --git a/cpp/src/qmf/ConsoleSession.cpp b/cpp/src/qmf/ConsoleSession.cpp
index 18986222c1..868df302ce 100644
--- a/cpp/src/qmf/ConsoleSession.cpp
+++ b/cpp/src/qmf/ConsoleSession.cpp
@@ -57,8 +57,9 @@ Agent ConsoleSession::getConnectedBrokerAgent() const { return impl->getConnecte
//========================================================================================
ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
- connection(c), domain("default"), opened(false), thread(0), threadCanceled(false),
- lastVisit(0), schemaCache(new SchemaCache())
+ connection(c), domain("default"), maxAgentAgeMinutes(5), opened(false),
+ thread(0), threadCanceled(false),
+ lastVisit(0), lastAgePass(0), schemaCache(new SchemaCache())
{
if (!options.empty()) {
qpid::messaging::AddressParser parser(options);
@@ -70,6 +71,10 @@ ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
iter = optMap.find("domain");
if (iter != optMap.end())
domain = iter->second.asString();
+
+ iter = optMap.find("max-agent-age");
+ if (iter != optMap.end())
+ maxAgentAgeMinutes = iter->second.asUint32();
}
}
@@ -81,13 +86,35 @@ ConsoleSessionImpl::~ConsoleSessionImpl()
}
-void ConsoleSessionImpl::setAgentFilter(const string&)
+void ConsoleSessionImpl::setAgentFilter(const string& predicate)
{
+ agentQuery = Query(QUERY_OBJECT, predicate);
+
//
- // TODO: Setup the new agent filter
- // TODO: Purge the agent list of any agents that don't match the filter
- // TODO: Send an agent locate with the new filter
+ // Purge the agent list of any agents that don't match the filter.
//
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ map<string, Agent> toDelete;
+ for (map<string, Agent>::iterator iter = agents.begin(); iter != agents.end(); iter++)
+ if ((iter->second.getName() != connectedBrokerAgent.getName()) &&
+ (!agentQuery.matchesPredicate(iter->second.getAttributes()))) {
+ toDelete[iter->first] = iter->second;
+ }
+
+ for (map<string, Agent>::iterator iter = toDelete.begin(); iter != toDelete.end(); iter++) {
+ agents.erase(iter->first);
+ auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_DEL, AGENT_DEL_FILTER));
+ eventImpl->setAgent(iter->second);
+ enqueueEventLH(eventImpl.release());
+ }
+ }
+
+ //
+ // Broadcast an agent locate request with our new criteria.
+ //
+ if (opened)
+ sendAgentLocate();
}
@@ -111,15 +138,23 @@ void ConsoleSessionImpl::open()
Receiver legacyRx = session.createReceiver("amq.direct/" + myKey + ";{node:{type:topic}}");
directRx.setCapacity(64);
- topicRx.setCapacity(64);
+ topicRx.setCapacity(128);
legacyRx.setCapacity(64);
+ directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}");
+ topicSender = session.createSender(topicBase + ";{create:never,node:{type:topic}}");
+
+ directSender.setCapacity(64);
+ topicSender.setCapacity(128);
+
// Start the receiver thread
threadCanceled = false;
thread = new qpid::sys::Thread(*this);
// Send an agent_locate to direct address 'broker' to identify the connected-broker-agent.
sendBrokerLocate();
+ if (agentQuery)
+ sendAgentLocate();
opened = true;
}
@@ -198,18 +233,17 @@ void ConsoleSessionImpl::dispatch(Message msg)
{
const Variant::Map& properties(msg.getProperties());
Variant::Map::const_iterator iter;
+ Variant::Map::const_iterator oiter;
+ oiter = properties.find("qmf.opcode");
iter = properties.find("x-amqp-0-10.app-id");
- if (iter != properties.end() && iter->second.asString() == "qmf2") {
+ if (iter == properties.end())
+ iter = properties.find("app_id");
+ if (iter != properties.end() && iter->second.asString() == "qmf2" && oiter != properties.end()) {
//
// Dispatch a QMFv2 formatted message
//
- iter = properties.find("qmf.opcode");
- if (iter == properties.end()) {
- QPID_LOG(trace, "Message received with no 'qmf.opcode' header");
- return;
- }
- const string& opcode = iter->second.asString();
+ const string& opcode = oiter->second.asString();
iter = properties.find("qmf.agent");
if (iter == properties.end()) {
@@ -240,11 +274,8 @@ void ConsoleSessionImpl::dispatch(Message msg)
return;
}
- if (!agent.isValid()) {
- QPID_LOG(trace, "Received a QMFv2 message with opcode=" << opcode <<
- " from an unknown agent " << agentName);
+ if (!agent.isValid())
return;
- }
AgentImpl& agentImpl(AgentImplAccess::get(agent));
@@ -305,14 +336,34 @@ void ConsoleSessionImpl::sendBrokerLocate()
msg.setReplyTo(replyAddress);
msg.setCorrelationId("broker-locate");
- Sender sender(session.createSender(directBase + "/broker"));
- sender.send(msg);
- sender.close();
+ msg.setSubject("broker");
+
+ directSender.send(msg);
QPID_LOG(trace, "SENT AgentLocate to broker");
}
+void ConsoleSessionImpl::sendAgentLocate()
+{
+ Message msg;
+ Variant::Map& headers(msg.getProperties());
+
+ headers["method"] = "request";
+ headers["qmf.opcode"] = "_agent_locate_request";
+ headers["x-amqp-0-10.app-id"] = "qmf2";
+
+ msg.setReplyTo(replyAddress);
+ msg.setCorrelationId("agent-locate");
+ msg.setSubject("console.request.agent_locate");
+ encode(agentQuery.getPredicate(), msg);
+
+ topicSender.send(msg);
+
+ QPID_LOG(trace, "SENT AgentLocate to topic");
+}
+
+
void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Variant::Map& content, const Message& msg)
{
Variant::Map::const_iterator iter;
@@ -326,18 +377,25 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian
Variant::Map attrs(iter->second.asMap());
//
- // TODO: Check this agent against the agent filter. Exit if it doesn't match.
- // (only if this isn't the connected broker agent)
+ // Check this agent against the agent filter. Exit if it doesn't match.
+ // (only if this isn't the connected broker agent)
//
+ if ((cid != "broker-locate") && agentQuery && (!agentQuery.matchesPredicate(attrs)))
+ return;
+
+ QPID_LOG(trace, "RCVD AgentHeartbeat from an agent matching our filter: " << agentName);
- iter = content.find("epoch");
- if (iter != content.end())
+ iter = attrs.find("epoch");
+ if (iter != attrs.end())
epoch = iter->second.asUint32();
{
qpid::sys::Mutex::ScopedLock l(lock);
map<string, Agent>::iterator aIter = agents.find(agentName);
if (aIter == agents.end()) {
+ //
+ // This is a new agent. We have no current record of its existence.
+ //
auto_ptr<AgentImpl> impl(new AgentImpl(agentName, epoch, *this));
for (iter = attrs.begin(); iter != attrs.end(); iter++)
if (iter->first != "epoch")
@@ -345,24 +403,47 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian
agent = Agent(impl.release());
agents[agentName] = agent;
+ //
+ // Enqueue a notification of the new agent.
+ //
auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_ADD));
eventImpl->setAgent(agent);
enqueueEventLH(ConsoleEvent(eventImpl.release()));
- } else
+ } else {
+ //
+ // This is a refresh of an agent we are already tracking.
+ //
agent = aIter->second;
+ AgentImpl& impl(AgentImplAccess::get(agent));
+ impl.touch();
+ if (impl.getEpoch() != epoch) {
+ //
+ // The agent has restarted since the last time we heard from it.
+ // Enqueue a notification.
+ //
+ auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_RESTART));
+ eventImpl->setAgent(agent);
+ enqueueEventLH(ConsoleEvent(eventImpl.release()));
+ }
+
+ iter = attrs.find("schemaUpdated");
+ if (iter != attrs.end()) {
+ uint64_t ts(iter->second.asUint64());
+ if (ts > impl.getAttribute("schemaUpdated").asUint64()) {
+ //
+ // The agent has added new schema entries since we last heard from it.
+ // Enqueue a notification.
+ //
+ auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_SCHEMA_UPDATE));
+ eventImpl->setAgent(agent);
+ enqueueEventLH(ConsoleEvent(eventImpl.release()));
+ }
+ }
+ }
if (cid == "broker-locate")
connectedBrokerAgent = agent;
}
-
- AgentImplAccess::get(agent).touch();
-
- //
- // Changes we are interested in:
- //
- // agentEpoch - indicates that the agent restarted since we last heard from it
- // schemaUpdated - indicates that the agent has registered new schemata
- //
}
@@ -385,8 +466,27 @@ void ConsoleSessionImpl::periodicProcessing(uint64_t seconds)
lastVisit = seconds;
//
- // TODO: Handle the aging of agent records
+ // Handle the aging of agent records
//
+ if (lastAgePass == 0)
+ lastAgePass = seconds;
+ if (seconds - lastAgePass >= 60) {
+ lastAgePass = seconds;
+ map<string, Agent> toDelete;
+ qpid::sys::Mutex::ScopedLock l(lock);
+
+ for (map<string, Agent>::iterator iter = agents.begin(); iter != agents.end(); iter++)
+ if ((iter->second.getName() != connectedBrokerAgent.getName()) &&
+ (AgentImplAccess::get(iter->second).age() > maxAgentAgeMinutes))
+ toDelete[iter->first] = iter->second;
+
+ for (map<string, Agent>::iterator iter = toDelete.begin(); iter != toDelete.end(); iter++) {
+ agents.erase(iter->first);
+ auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_DEL, AGENT_DEL_AGED));
+ eventImpl->setAgent(iter->second);
+ enqueueEventLH(eventImpl.release());
+ }
+ }
}