diff options
| author | Ted Ross <tross@apache.org> | 2010-09-21 21:48:41 +0000 | 
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2010-09-21 21:48:41 +0000 | 
| commit | 3cfbdf0e60c94733c0a79e94bdf8627afc6bb2a4 (patch) | |
| tree | e33d57bca9a2c2275e76f882484ac3ea913e83fd /cpp/src/qmf/ConsoleSession.cpp | |
| parent | 449ab0f1062c0eac0234f84556de60436ba2ee9d (diff) | |
| download | qpid-python-3cfbdf0e60c94733c0a79e94bdf8627afc6bb2a4.tar.gz | |
QMFv2 Additions:
  - QMFv2 schema encoding completed
  - Schema queries handled by the agent and initiated by the console by user request
  - Full query support with predicates evaluated on the agent (regex not yet implemented)
  - Agent filtering in the console
  - Agent aging in the console
  - Unit tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@999662 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qmf/ConsoleSession.cpp')
| -rw-r--r-- | cpp/src/qmf/ConsoleSession.cpp | 172 | 
1 files changed, 136 insertions, 36 deletions
| diff --git a/cpp/src/qmf/ConsoleSession.cpp b/cpp/src/qmf/ConsoleSession.cpp index 18986222c1..868df302ce 100644 --- a/cpp/src/qmf/ConsoleSession.cpp +++ b/cpp/src/qmf/ConsoleSession.cpp @@ -57,8 +57,9 @@ Agent ConsoleSession::getConnectedBrokerAgent() const { return impl->getConnecte  //========================================================================================  ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) : -    connection(c), domain("default"), opened(false), thread(0), threadCanceled(false), -    lastVisit(0), schemaCache(new SchemaCache()) +    connection(c), domain("default"), maxAgentAgeMinutes(5), opened(false), +    thread(0), threadCanceled(false), +    lastVisit(0), lastAgePass(0), schemaCache(new SchemaCache())  {      if (!options.empty()) {          qpid::messaging::AddressParser parser(options); @@ -70,6 +71,10 @@ ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :          iter = optMap.find("domain");          if (iter != optMap.end())              domain = iter->second.asString(); + +        iter = optMap.find("max-agent-age"); +        if (iter != optMap.end()) +            maxAgentAgeMinutes = iter->second.asUint32();      }  } @@ -81,13 +86,35 @@ ConsoleSessionImpl::~ConsoleSessionImpl()  } -void ConsoleSessionImpl::setAgentFilter(const string&) +void ConsoleSessionImpl::setAgentFilter(const string& predicate)  { +    agentQuery = Query(QUERY_OBJECT, predicate); +      // -    // TODO: Setup the new agent filter -    // TODO: Purge the agent list of any agents that don't match the filter -    // TODO: Send an agent locate with the new filter +    // Purge the agent list of any agents that don't match the filter.      // +    { +        qpid::sys::Mutex::ScopedLock l(lock); +        map<string, Agent> toDelete; +        for (map<string, Agent>::iterator iter = agents.begin(); iter != agents.end(); iter++) +            if ((iter->second.getName() != connectedBrokerAgent.getName()) && +                (!agentQuery.matchesPredicate(iter->second.getAttributes()))) { +                toDelete[iter->first] = iter->second; +            } + +        for (map<string, Agent>::iterator iter = toDelete.begin(); iter != toDelete.end(); iter++) { +            agents.erase(iter->first); +            auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_DEL, AGENT_DEL_FILTER)); +            eventImpl->setAgent(iter->second); +            enqueueEventLH(eventImpl.release()); +        } +    } + +    // +    // Broadcast an agent locate request with our new criteria. +    // +    if (opened) +        sendAgentLocate();  } @@ -111,15 +138,23 @@ void ConsoleSessionImpl::open()      Receiver legacyRx = session.createReceiver("amq.direct/" + myKey + ";{node:{type:topic}}");      directRx.setCapacity(64); -    topicRx.setCapacity(64); +    topicRx.setCapacity(128);      legacyRx.setCapacity(64); +    directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}"); +    topicSender = session.createSender(topicBase + ";{create:never,node:{type:topic}}"); + +    directSender.setCapacity(64); +    topicSender.setCapacity(128); +      // Start the receiver thread      threadCanceled = false;      thread = new qpid::sys::Thread(*this);      // Send an agent_locate to direct address 'broker' to identify the connected-broker-agent.      sendBrokerLocate(); +    if (agentQuery) +        sendAgentLocate();      opened = true;  } @@ -198,18 +233,17 @@ void ConsoleSessionImpl::dispatch(Message msg)  {      const Variant::Map& properties(msg.getProperties());      Variant::Map::const_iterator iter; +    Variant::Map::const_iterator oiter; +    oiter = properties.find("qmf.opcode");      iter = properties.find("x-amqp-0-10.app-id"); -    if (iter != properties.end() && iter->second.asString() == "qmf2") { +    if (iter == properties.end()) +        iter = properties.find("app_id"); +    if (iter != properties.end() && iter->second.asString() == "qmf2" && oiter != properties.end()) {          //          // Dispatch a QMFv2 formatted message          // -        iter = properties.find("qmf.opcode"); -        if (iter == properties.end()) { -            QPID_LOG(trace, "Message received with no 'qmf.opcode' header"); -            return; -        } -        const string& opcode = iter->second.asString(); +        const string& opcode = oiter->second.asString();          iter = properties.find("qmf.agent");          if (iter == properties.end()) { @@ -240,11 +274,8 @@ void ConsoleSessionImpl::dispatch(Message msg)              return;          } -        if (!agent.isValid()) { -            QPID_LOG(trace, "Received a QMFv2 message with opcode=" << opcode << -                     " from an unknown agent " << agentName); +        if (!agent.isValid())              return; -        }          AgentImpl& agentImpl(AgentImplAccess::get(agent)); @@ -305,14 +336,34 @@ void ConsoleSessionImpl::sendBrokerLocate()      msg.setReplyTo(replyAddress);      msg.setCorrelationId("broker-locate"); -    Sender sender(session.createSender(directBase + "/broker")); -    sender.send(msg); -    sender.close(); +    msg.setSubject("broker"); + +    directSender.send(msg);      QPID_LOG(trace, "SENT AgentLocate to broker");  } +void ConsoleSessionImpl::sendAgentLocate() +{ +    Message msg; +    Variant::Map& headers(msg.getProperties()); + +    headers["method"] = "request"; +    headers["qmf.opcode"] = "_agent_locate_request"; +    headers["x-amqp-0-10.app-id"] = "qmf2"; + +    msg.setReplyTo(replyAddress); +    msg.setCorrelationId("agent-locate"); +    msg.setSubject("console.request.agent_locate"); +    encode(agentQuery.getPredicate(), msg); + +    topicSender.send(msg); + +    QPID_LOG(trace, "SENT AgentLocate to topic"); +} + +  void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Variant::Map& content, const Message& msg)  {      Variant::Map::const_iterator iter; @@ -326,18 +377,25 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian      Variant::Map attrs(iter->second.asMap());      // -    // TODO: Check this agent against the agent filter.  Exit if it doesn't match. -    //       (only if this isn't the connected broker agent) +    // Check this agent against the agent filter.  Exit if it doesn't match. +    // (only if this isn't the connected broker agent)      // +    if ((cid != "broker-locate") && agentQuery && (!agentQuery.matchesPredicate(attrs))) +        return; + +    QPID_LOG(trace, "RCVD AgentHeartbeat from an agent matching our filter: " << agentName); -    iter = content.find("epoch"); -    if (iter != content.end()) +    iter = attrs.find("epoch"); +    if (iter != attrs.end())          epoch = iter->second.asUint32();      {          qpid::sys::Mutex::ScopedLock l(lock);          map<string, Agent>::iterator aIter = agents.find(agentName);          if (aIter == agents.end()) { +            // +            // This is a new agent.  We have no current record of its existence. +            //              auto_ptr<AgentImpl> impl(new AgentImpl(agentName, epoch, *this));              for (iter = attrs.begin(); iter != attrs.end(); iter++)                  if (iter->first != "epoch") @@ -345,24 +403,47 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian              agent = Agent(impl.release());              agents[agentName] = agent; +            // +            // Enqueue a notification of the new agent. +            //              auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_ADD));              eventImpl->setAgent(agent);              enqueueEventLH(ConsoleEvent(eventImpl.release())); -        } else +        } else { +            // +            // This is a refresh of an agent we are already tracking. +            //              agent = aIter->second; +            AgentImpl& impl(AgentImplAccess::get(agent)); +            impl.touch(); +            if (impl.getEpoch() != epoch) { +                // +                // The agent has restarted since the last time we heard from it. +                // Enqueue a notification. +                // +                auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_RESTART)); +                eventImpl->setAgent(agent); +                enqueueEventLH(ConsoleEvent(eventImpl.release())); +            } + +            iter = attrs.find("schemaUpdated"); +            if (iter != attrs.end()) { +                uint64_t ts(iter->second.asUint64()); +                if (ts > impl.getAttribute("schemaUpdated").asUint64()) { +                    // +                    // The agent has added new schema entries since we last heard from it. +                    // Enqueue a notification. +                    // +                    auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_SCHEMA_UPDATE)); +                    eventImpl->setAgent(agent); +                    enqueueEventLH(ConsoleEvent(eventImpl.release())); +                } +            } +        }          if (cid == "broker-locate")              connectedBrokerAgent = agent;      } - -    AgentImplAccess::get(agent).touch(); - -    // -    // Changes we are interested in: -    // -    //   agentEpoch    - indicates that the agent restarted since we last heard from it -    //   schemaUpdated - indicates that the agent has registered new schemata -    //  } @@ -385,8 +466,27 @@ void ConsoleSessionImpl::periodicProcessing(uint64_t seconds)      lastVisit = seconds;      // -    // TODO: Handle the aging of agent records +    // Handle the aging of agent records      // +    if (lastAgePass == 0) +        lastAgePass = seconds; +    if (seconds - lastAgePass >= 60) { +        lastAgePass = seconds; +        map<string, Agent> toDelete; +        qpid::sys::Mutex::ScopedLock l(lock); + +        for (map<string, Agent>::iterator iter = agents.begin(); iter != agents.end(); iter++) +            if ((iter->second.getName() != connectedBrokerAgent.getName()) && +                (AgentImplAccess::get(iter->second).age() > maxAgentAgeMinutes)) +                toDelete[iter->first] = iter->second; + +        for (map<string, Agent>::iterator iter = toDelete.begin(); iter != toDelete.end(); iter++) { +            agents.erase(iter->first); +            auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_DEL, AGENT_DEL_AGED)); +            eventImpl->setAgent(iter->second); +            enqueueEventLH(eventImpl.release()); +        } +    }  } | 
