diff options
| author | Ted Ross <tross@apache.org> | 2010-09-21 21:48:41 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2010-09-21 21:48:41 +0000 |
| commit | 3cfbdf0e60c94733c0a79e94bdf8627afc6bb2a4 (patch) | |
| tree | e33d57bca9a2c2275e76f882484ac3ea913e83fd /cpp/src/qmf/ConsoleSession.cpp | |
| parent | 449ab0f1062c0eac0234f84556de60436ba2ee9d (diff) | |
| download | qpid-python-3cfbdf0e60c94733c0a79e94bdf8627afc6bb2a4.tar.gz | |
QMFv2 Additions:
- QMFv2 schema encoding completed
- Schema queries handled by the agent and initiated by the console by user request
- Full query support with predicates evaluated on the agent (regex not yet implemented)
- Agent filtering in the console
- Agent aging in the console
- Unit tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@999662 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qmf/ConsoleSession.cpp')
| -rw-r--r-- | cpp/src/qmf/ConsoleSession.cpp | 172 |
1 files changed, 136 insertions, 36 deletions
diff --git a/cpp/src/qmf/ConsoleSession.cpp b/cpp/src/qmf/ConsoleSession.cpp index 18986222c1..868df302ce 100644 --- a/cpp/src/qmf/ConsoleSession.cpp +++ b/cpp/src/qmf/ConsoleSession.cpp @@ -57,8 +57,9 @@ Agent ConsoleSession::getConnectedBrokerAgent() const { return impl->getConnecte //======================================================================================== ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) : - connection(c), domain("default"), opened(false), thread(0), threadCanceled(false), - lastVisit(0), schemaCache(new SchemaCache()) + connection(c), domain("default"), maxAgentAgeMinutes(5), opened(false), + thread(0), threadCanceled(false), + lastVisit(0), lastAgePass(0), schemaCache(new SchemaCache()) { if (!options.empty()) { qpid::messaging::AddressParser parser(options); @@ -70,6 +71,10 @@ ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) : iter = optMap.find("domain"); if (iter != optMap.end()) domain = iter->second.asString(); + + iter = optMap.find("max-agent-age"); + if (iter != optMap.end()) + maxAgentAgeMinutes = iter->second.asUint32(); } } @@ -81,13 +86,35 @@ ConsoleSessionImpl::~ConsoleSessionImpl() } -void ConsoleSessionImpl::setAgentFilter(const string&) +void ConsoleSessionImpl::setAgentFilter(const string& predicate) { + agentQuery = Query(QUERY_OBJECT, predicate); + // - // TODO: Setup the new agent filter - // TODO: Purge the agent list of any agents that don't match the filter - // TODO: Send an agent locate with the new filter + // Purge the agent list of any agents that don't match the filter. // + { + qpid::sys::Mutex::ScopedLock l(lock); + map<string, Agent> toDelete; + for (map<string, Agent>::iterator iter = agents.begin(); iter != agents.end(); iter++) + if ((iter->second.getName() != connectedBrokerAgent.getName()) && + (!agentQuery.matchesPredicate(iter->second.getAttributes()))) { + toDelete[iter->first] = iter->second; + } + + for (map<string, Agent>::iterator iter = toDelete.begin(); iter != toDelete.end(); iter++) { + agents.erase(iter->first); + auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_DEL, AGENT_DEL_FILTER)); + eventImpl->setAgent(iter->second); + enqueueEventLH(eventImpl.release()); + } + } + + // + // Broadcast an agent locate request with our new criteria. + // + if (opened) + sendAgentLocate(); } @@ -111,15 +138,23 @@ void ConsoleSessionImpl::open() Receiver legacyRx = session.createReceiver("amq.direct/" + myKey + ";{node:{type:topic}}"); directRx.setCapacity(64); - topicRx.setCapacity(64); + topicRx.setCapacity(128); legacyRx.setCapacity(64); + directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}"); + topicSender = session.createSender(topicBase + ";{create:never,node:{type:topic}}"); + + directSender.setCapacity(64); + topicSender.setCapacity(128); + // Start the receiver thread threadCanceled = false; thread = new qpid::sys::Thread(*this); // Send an agent_locate to direct address 'broker' to identify the connected-broker-agent. sendBrokerLocate(); + if (agentQuery) + sendAgentLocate(); opened = true; } @@ -198,18 +233,17 @@ void ConsoleSessionImpl::dispatch(Message msg) { const Variant::Map& properties(msg.getProperties()); Variant::Map::const_iterator iter; + Variant::Map::const_iterator oiter; + oiter = properties.find("qmf.opcode"); iter = properties.find("x-amqp-0-10.app-id"); - if (iter != properties.end() && iter->second.asString() == "qmf2") { + if (iter == properties.end()) + iter = properties.find("app_id"); + if (iter != properties.end() && iter->second.asString() == "qmf2" && oiter != properties.end()) { // // Dispatch a QMFv2 formatted message // - iter = properties.find("qmf.opcode"); - if (iter == properties.end()) { - QPID_LOG(trace, "Message received with no 'qmf.opcode' header"); - return; - } - const string& opcode = iter->second.asString(); + const string& opcode = oiter->second.asString(); iter = properties.find("qmf.agent"); if (iter == properties.end()) { @@ -240,11 +274,8 @@ void ConsoleSessionImpl::dispatch(Message msg) return; } - if (!agent.isValid()) { - QPID_LOG(trace, "Received a QMFv2 message with opcode=" << opcode << - " from an unknown agent " << agentName); + if (!agent.isValid()) return; - } AgentImpl& agentImpl(AgentImplAccess::get(agent)); @@ -305,14 +336,34 @@ void ConsoleSessionImpl::sendBrokerLocate() msg.setReplyTo(replyAddress); msg.setCorrelationId("broker-locate"); - Sender sender(session.createSender(directBase + "/broker")); - sender.send(msg); - sender.close(); + msg.setSubject("broker"); + + directSender.send(msg); QPID_LOG(trace, "SENT AgentLocate to broker"); } +void ConsoleSessionImpl::sendAgentLocate() +{ + Message msg; + Variant::Map& headers(msg.getProperties()); + + headers["method"] = "request"; + headers["qmf.opcode"] = "_agent_locate_request"; + headers["x-amqp-0-10.app-id"] = "qmf2"; + + msg.setReplyTo(replyAddress); + msg.setCorrelationId("agent-locate"); + msg.setSubject("console.request.agent_locate"); + encode(agentQuery.getPredicate(), msg); + + topicSender.send(msg); + + QPID_LOG(trace, "SENT AgentLocate to topic"); +} + + void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Variant::Map& content, const Message& msg) { Variant::Map::const_iterator iter; @@ -326,18 +377,25 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian Variant::Map attrs(iter->second.asMap()); // - // TODO: Check this agent against the agent filter. Exit if it doesn't match. - // (only if this isn't the connected broker agent) + // Check this agent against the agent filter. Exit if it doesn't match. + // (only if this isn't the connected broker agent) // + if ((cid != "broker-locate") && agentQuery && (!agentQuery.matchesPredicate(attrs))) + return; + + QPID_LOG(trace, "RCVD AgentHeartbeat from an agent matching our filter: " << agentName); - iter = content.find("epoch"); - if (iter != content.end()) + iter = attrs.find("epoch"); + if (iter != attrs.end()) epoch = iter->second.asUint32(); { qpid::sys::Mutex::ScopedLock l(lock); map<string, Agent>::iterator aIter = agents.find(agentName); if (aIter == agents.end()) { + // + // This is a new agent. We have no current record of its existence. + // auto_ptr<AgentImpl> impl(new AgentImpl(agentName, epoch, *this)); for (iter = attrs.begin(); iter != attrs.end(); iter++) if (iter->first != "epoch") @@ -345,24 +403,47 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian agent = Agent(impl.release()); agents[agentName] = agent; + // + // Enqueue a notification of the new agent. + // auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_ADD)); eventImpl->setAgent(agent); enqueueEventLH(ConsoleEvent(eventImpl.release())); - } else + } else { + // + // This is a refresh of an agent we are already tracking. + // agent = aIter->second; + AgentImpl& impl(AgentImplAccess::get(agent)); + impl.touch(); + if (impl.getEpoch() != epoch) { + // + // The agent has restarted since the last time we heard from it. + // Enqueue a notification. + // + auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_RESTART)); + eventImpl->setAgent(agent); + enqueueEventLH(ConsoleEvent(eventImpl.release())); + } + + iter = attrs.find("schemaUpdated"); + if (iter != attrs.end()) { + uint64_t ts(iter->second.asUint64()); + if (ts > impl.getAttribute("schemaUpdated").asUint64()) { + // + // The agent has added new schema entries since we last heard from it. + // Enqueue a notification. + // + auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_SCHEMA_UPDATE)); + eventImpl->setAgent(agent); + enqueueEventLH(ConsoleEvent(eventImpl.release())); + } + } + } if (cid == "broker-locate") connectedBrokerAgent = agent; } - - AgentImplAccess::get(agent).touch(); - - // - // Changes we are interested in: - // - // agentEpoch - indicates that the agent restarted since we last heard from it - // schemaUpdated - indicates that the agent has registered new schemata - // } @@ -385,8 +466,27 @@ void ConsoleSessionImpl::periodicProcessing(uint64_t seconds) lastVisit = seconds; // - // TODO: Handle the aging of agent records + // Handle the aging of agent records // + if (lastAgePass == 0) + lastAgePass = seconds; + if (seconds - lastAgePass >= 60) { + lastAgePass = seconds; + map<string, Agent> toDelete; + qpid::sys::Mutex::ScopedLock l(lock); + + for (map<string, Agent>::iterator iter = agents.begin(); iter != agents.end(); iter++) + if ((iter->second.getName() != connectedBrokerAgent.getName()) && + (AgentImplAccess::get(iter->second).age() > maxAgentAgeMinutes)) + toDelete[iter->first] = iter->second; + + for (map<string, Agent>::iterator iter = toDelete.begin(); iter != toDelete.end(); iter++) { + agents.erase(iter->first); + auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_DEL, AGENT_DEL_AGED)); + eventImpl->setAgent(iter->second); + enqueueEventLH(eventImpl.release()); + } + } } |
