/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qmf/PrivateImplRef.h" #include "qmf/ConsoleSessionImpl.h" #include "qmf/AgentImpl.h" #include "qmf/SchemaId.h" #include "qmf/SchemaImpl.h" #include "qmf/ConsoleEventImpl.h" #include "qmf/constants.h" #include "qpid/log/Statement.h" #include "qpid/messaging/AddressParser.h" #include "qpid/messaging/Sender.h" #include "qpid/messaging/Receiver.h" using namespace std; using namespace qmf; using qpid::messaging::Address; using qpid::messaging::Connection; using qpid::messaging::Receiver; using qpid::messaging::Sender; using qpid::messaging::Duration; using qpid::messaging::Message; using qpid::types::Variant; typedef qmf::PrivateImplRef PI; ConsoleSession::ConsoleSession(ConsoleSessionImpl* impl) { PI::ctor(*this, impl); } ConsoleSession::ConsoleSession(const ConsoleSession& s) : qmf::Handle() { PI::copy(*this, s); } ConsoleSession::~ConsoleSession() { PI::dtor(*this); } ConsoleSession& ConsoleSession::operator=(const ConsoleSession& s) { return PI::assign(*this, s); } ConsoleSession::ConsoleSession(Connection& c, const string& o) { PI::ctor(*this, new ConsoleSessionImpl(c, o)); } void ConsoleSession::setDomain(const string& d) { impl->setDomain(d); } void ConsoleSession::setAgentFilter(const string& f) { impl->setAgentFilter(f); } void ConsoleSession::open() { impl->open(); } void ConsoleSession::close() { impl->close(); } bool ConsoleSession::nextEvent(ConsoleEvent& e, Duration t) { return impl->nextEvent(e, t); } int ConsoleSession::pendingEvents() const { return impl->pendingEvents(); } uint32_t ConsoleSession::getAgentCount() const { return impl->getAgentCount(); } Agent ConsoleSession::getAgent(uint32_t i) const { return impl->getAgent(i); } Agent ConsoleSession::getConnectedBrokerAgent() const { return impl->getConnectedBrokerAgent(); } Subscription ConsoleSession::subscribe(const Query& q, const string& f, const string& o) { return impl->subscribe(q, f, o); } Subscription ConsoleSession::subscribe(const string& q, const string& f, const string& o) { return impl->subscribe(q, f, o); } //======================================================================================== // Impl Method Bodies //======================================================================================== ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) : connection(c), domain("default"), maxAgentAgeMinutes(5), opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0), connectedBrokerInAgentList(false), schemaCache(new SchemaCache()) { if (!options.empty()) { qpid::messaging::AddressParser parser(options); Variant::Map optMap; Variant::Map::const_iterator iter; parser.parseMap(optMap); iter = optMap.find("domain"); if (iter != optMap.end()) domain = iter->second.asString(); iter = optMap.find("max-agent-age"); if (iter != optMap.end()) maxAgentAgeMinutes = iter->second.asUint32(); iter = optMap.find("listen-on-direct"); if (iter != optMap.end()) listenOnDirect = iter->second.asBool(); iter = optMap.find("strict-security"); if (iter != optMap.end()) strictSecurity = iter->second.asBool(); } } ConsoleSessionImpl::~ConsoleSessionImpl() { if (opened) close(); } void ConsoleSessionImpl::setAgentFilter(const string& predicate) { agentQuery = Query(QUERY_OBJECT, predicate); // // Purge the agent list of any agents that don't match the filter. // { qpid::sys::Mutex::ScopedLock l(lock); map toDelete; for (map::iterator iter = agents.begin(); iter != agents.end(); iter++) if (!agentQuery.matchesPredicate(iter->second.getAttributes())) { toDelete[iter->first] = iter->second; if (iter->second.getName() == connectedBrokerAgent.getName()) connectedBrokerInAgentList = false; } for (map::iterator iter = toDelete.begin(); iter != toDelete.end(); iter++) { agents.erase(iter->first); auto_ptr eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_DEL, AGENT_DEL_FILTER)); eventImpl->setAgent(iter->second); enqueueEventLH(eventImpl.release()); } if (!connectedBrokerInAgentList && connectedBrokerAgent.isValid() && agentQuery.matchesPredicate(connectedBrokerAgent.getAttributes())) { agents[connectedBrokerAgent.getName()] = connectedBrokerAgent; connectedBrokerInAgentList = true; // // Enqueue a notification of the new agent. // auto_ptr eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_ADD)); eventImpl->setAgent(connectedBrokerAgent); enqueueEventLH(ConsoleEvent(eventImpl.release())); } } // // Broadcast an agent locate request with our new criteria. // if (opened) sendAgentLocate(); } void ConsoleSessionImpl::open() { if (opened) throw QmfException("The session is already open"); // Establish messaging addresses directBase = "qmf." + domain + ".direct"; topicBase = "qmf." + domain + ".topic"; string myKey("direct-console." + qpid::types::Uuid(true).str()); replyAddress = Address(topicBase + "/" + myKey + ";{node:{type:topic}}"); // Create AMQP session, receivers, and senders session = connection.createSession(); Receiver directRx = session.createReceiver(replyAddress); Receiver topicRx = session.createReceiver(topicBase + "/agent.#"); // TODO: be more discriminating if (!strictSecurity) { Receiver legacyRx = session.createReceiver("amq.direct/" + myKey + ";{node:{type:topic}}"); legacyRx.setCapacity(64); directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}"); directSender.setCapacity(128); } directRx.setCapacity(64); topicRx.setCapacity(128); topicSender = session.createSender(topicBase + ";{create:never,node:{type:topic}}"); topicSender.setCapacity(128); // Start the receiver thread threadCanceled = false; thread = new qpid::sys::Thread(*this); // Send an agent_locate to direct address 'broker' to identify the connected-broker-agent. sendBrokerLocate(); if (agentQuery) sendAgentLocate(); opened = true; } void ConsoleSessionImpl::close() { if (!opened) throw QmfException("The session is already closed"); // Stop and join the receiver thread threadCanceled = true; thread->join(); delete thread; // Close the AMQP session session.close(); opened = false; } bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout) { uint64_t milliseconds = timeout.getMilliseconds(); qpid::sys::Mutex::ScopedLock l(lock); if (eventQueue.empty() && milliseconds > 0) cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC))); if (!eventQueue.empty()) { event = eventQueue.front(); eventQueue.pop(); return true; } return false; } int ConsoleSessionImpl::pendingEvents() const { qpid::sys::Mutex::ScopedLock l(lock); return eventQueue.size(); } uint32_t ConsoleSessionImpl::getAgentCount() const { qpid::sys::Mutex::ScopedLock l(lock); return agents.size(); } Agent ConsoleSessionImpl::getAgent(uint32_t i) const { qpid::sys::Mutex::ScopedLock l(lock); uint32_t count = 0; for (map::const_iterator iter = agents.begin(); iter != agents.end(); iter++) if (count++ == i) return iter->second; throw IndexOutOfRange(); } Subscription ConsoleSessionImpl::subscribe(const Query&, const string&, const string&) { return Subscription(); } Subscription ConsoleSessionImpl::subscribe(const string&, const string&, const string&) { return Subscription(); } void ConsoleSessionImpl::enqueueEvent(const ConsoleEvent& event) { qpid::sys::Mutex::ScopedLock l(lock); enqueueEventLH(event); } void ConsoleSessionImpl::enqueueEventLH(const ConsoleEvent& event) { bool notify = eventQueue.empty(); eventQueue.push(event); if (notify) cond.notify(); } void ConsoleSessionImpl::dispatch(Message msg) { const Variant::Map& properties(msg.getProperties()); Variant::Map::const_iterator iter; Variant::Map::const_iterator oiter; oiter = properties.find(protocol::HEADER_KEY_OPCODE); iter = properties.find(protocol::HEADER_KEY_APP_ID); if (iter == properties.end()) iter = properties.find("app_id"); if (iter != properties.end() && iter->second.asString() == protocol::HEADER_APP_ID_QMF && oiter != properties.end()) { // // Dispatch a QMFv2 formatted message // const string& opcode = oiter->second.asString(); iter = properties.find(protocol::HEADER_KEY_AGENT); if (iter == properties.end()) { QPID_LOG(trace, "Message received with no 'qmf.agent' header"); return; } const string& agentName = iter->second.asString(); Agent agent; { qpid::sys::Mutex::ScopedLock l(lock); map::iterator aIter = agents.find(agentName); if (aIter != agents.end()) { agent = aIter->second; AgentImplAccess::get(agent).touch(); } } if (msg.getContentType() == "amqp/map" && (opcode == protocol::HEADER_OPCODE_AGENT_HEARTBEAT_INDICATION || opcode == protocol::HEADER_OPCODE_AGENT_LOCATE_RESPONSE)) { // // This is the one case where it's ok (necessary actually) to receive a QMFv2 // message from an unknown agent (how else are they going to get known?) // Variant::Map content; decode(msg, content); handleAgentUpdate(agentName, content, msg); return; } if (!agent.isValid()) return; AgentImpl& agentImpl(AgentImplAccess::get(agent)); if (msg.getContentType() == "amqp/map") { Variant::Map content; decode(msg, content); if (opcode == protocol::HEADER_OPCODE_EXCEPTION) agentImpl.handleException(content, msg); else if (opcode == protocol::HEADER_OPCODE_METHOD_RESPONSE) agentImpl.handleMethodResponse(content, msg); else QPID_LOG(error, "Received a map-formatted QMFv2 message with opcode=" << opcode); return; } if (msg.getContentType() == "amqp/list") { Variant::List content; decode(msg, content); if (opcode == protocol::HEADER_OPCODE_QUERY_RESPONSE) agentImpl.handleQueryResponse(content, msg); else if (opcode == protocol::HEADER_OPCODE_DATA_INDICATION) agentImpl.handleDataIndication(content, msg); else QPID_LOG(error, "Received a list-formatted QMFv2 message with opcode=" << opcode); return; } } else { // // Dispatch a QMFv1 formatted message // const string& body(msg.getContent()); if (body.size() < 8) return; qpid::management::Buffer buffer(const_cast(body.c_str()), body.size()); if (buffer.getOctet() != 'A') return; if (buffer.getOctet() != 'M') return; if (buffer.getOctet() != '2') return; char v1Opcode(buffer.getOctet()); uint32_t seq(buffer.getLong()); if (v1Opcode == 's') handleV1SchemaResponse(buffer, seq, msg); else { QPID_LOG(trace, "Unknown or Unsupported QMFv1 opcode: " << v1Opcode); } } } void ConsoleSessionImpl::sendBrokerLocate() { Message msg; Variant::Map& headers(msg.getProperties()); headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST; headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST; headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; msg.setReplyTo(replyAddress); msg.setCorrelationId("broker-locate"); msg.setSubject("broker"); Sender sender = session.createSender(directBase + ";{create:never,node:{type:topic}}"); sender.send(msg); sender.close(); QPID_LOG(trace, "SENT AgentLocate to broker"); } void ConsoleSessionImpl::sendAgentLocate() { Message msg; Variant::Map& headers(msg.getProperties()); static const string subject("console.request.agent_locate"); headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST; headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST; headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; msg.setReplyTo(replyAddress); msg.setCorrelationId("agent-locate"); msg.setSubject(subject); encode(agentQuery.getPredicate(), msg); topicSender.send(msg); QPID_LOG(trace, "SENT AgentLocate to=" << topicSender.getName() << "/" << subject); } void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Variant::Map& content, const Message& msg) { Variant::Map::const_iterator iter; Agent agent; uint32_t epoch(0); string cid(msg.getCorrelationId()); iter = content.find("_values"); if (iter == content.end()) return; const Variant::Map& in_attrs(iter->second.asMap()); Variant::Map attrs; // // Copy the map from the message to "attrs". Translate any old-style // keys to their new key values in the process. // for (iter = in_attrs.begin(); iter != in_attrs.end(); iter++) { if (iter->first == "epoch") attrs[protocol::AGENT_ATTR_EPOCH] = iter->second; else if (iter->first == "timestamp") attrs[protocol::AGENT_ATTR_TIMESTAMP] = iter->second; else if (iter->first == "heartbeat_interval") attrs[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = iter->second; else attrs[iter->first] = iter->second; } iter = attrs.find(protocol::AGENT_ATTR_EPOCH); if (iter != attrs.end()) epoch = iter->second.asUint32(); if (cid == "broker-locate") { qpid::sys::Mutex::ScopedLock l(lock); auto_ptr impl(new AgentImpl(agentName, epoch, *this)); for (iter = attrs.begin(); iter != attrs.end(); iter++) if (iter->first != protocol::AGENT_ATTR_EPOCH) impl->setAttribute(iter->first, iter->second); agent = Agent(impl.release()); connectedBrokerAgent = agent; if (!agentQuery || agentQuery.matchesPredicate(attrs)) { connectedBrokerInAgentList = true; agents[agentName] = agent; // // Enqueue a notification of the new agent. // auto_ptr eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_ADD)); eventImpl->setAgent(agent); enqueueEventLH(ConsoleEvent(eventImpl.release())); } return; } // // Check this agent against the agent filter. Exit if it doesn't match. // (only if this isn't the connected broker agent) // if (agentQuery && (!agentQuery.matchesPredicate(attrs))) return; QPID_LOG(trace, "RCVD AgentHeartbeat from an agent matching our filter: " << agentName); { qpid::sys::Mutex::ScopedLock l(lock); map::iterator aIter = agents.find(agentName); if (aIter == agents.end()) { // // This is a new agent. We have no current record of its existence. // auto_ptr impl(new AgentImpl(agentName, epoch, *this)); for (iter = attrs.begin(); iter != attrs.end(); iter++) if (iter->first != protocol::AGENT_ATTR_EPOCH) impl->setAttribute(iter->first, iter->second); agent = Agent(impl.release()); agents[agentName] = agent; // // Enqueue a notification of the new agent. // auto_ptr eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_ADD)); eventImpl->setAgent(agent); enqueueEventLH(ConsoleEvent(eventImpl.release())); } else { // // This is a refresh of an agent we are already tracking. // bool detectedRestart(false); agent = aIter->second; AgentImpl& impl(AgentImplAccess::get(agent)); impl.touch(); if (impl.getEpoch() != epoch) { // // The agent has restarted since the last time we heard from it. // Enqueue a notification. // impl.setEpoch(epoch); auto_ptr eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_RESTART)); eventImpl->setAgent(agent); enqueueEventLH(ConsoleEvent(eventImpl.release())); detectedRestart = true; } iter = attrs.find(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP); if (iter != attrs.end()) { uint64_t ts(iter->second.asUint64()); if (ts > impl.getAttribute(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP).asUint64()) { // // The agent has added new schema entries since we last heard from it. // Update the attribute and, if this doesn't accompany a restart, enqueue a notification. // if (!detectedRestart) { auto_ptr eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_SCHEMA_UPDATE)); eventImpl->setAgent(agent); enqueueEventLH(ConsoleEvent(eventImpl.release())); } impl.setAttribute(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP, iter->second); } } } } } void ConsoleSessionImpl::handleV1SchemaResponse(qpid::management::Buffer& buffer, uint32_t, const Message&) { QPID_LOG(trace, "RCVD V1SchemaResponse"); Schema schema(new SchemaImpl(buffer)); schemaCache->declareSchema(schema); } void ConsoleSessionImpl::periodicProcessing(uint64_t seconds) { // // The granularity of this timer is seconds. Don't waste time looking for work if // it's been less than a second since we last visited. // if (seconds == lastVisit) return; lastVisit = seconds; // // Handle the aging of agent records // if (lastAgePass == 0) lastAgePass = seconds; if (seconds - lastAgePass >= 60) { lastAgePass = seconds; map toDelete; qpid::sys::Mutex::ScopedLock l(lock); for (map::iterator iter = agents.begin(); iter != agents.end(); iter++) if ((iter->second.getName() != connectedBrokerAgent.getName()) && (AgentImplAccess::get(iter->second).age() > maxAgentAgeMinutes)) toDelete[iter->first] = iter->second; for (map::iterator iter = toDelete.begin(); iter != toDelete.end(); iter++) { agents.erase(iter->first); auto_ptr eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_DEL, AGENT_DEL_AGED)); eventImpl->setAgent(iter->second); enqueueEventLH(eventImpl.release()); } } } void ConsoleSessionImpl::run() { QPID_LOG(debug, "ConsoleSession thread started"); try { while (!threadCanceled) { periodicProcessing((uint64_t) qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()) / qpid::sys::TIME_SEC); Receiver rx; bool valid = session.nextReceiver(rx, Duration::SECOND); if (threadCanceled) break; if (valid) { try { dispatch(rx.fetch()); } catch (qpid::types::Exception& e) { QPID_LOG(error, "Exception caught in message dispatch: " << e.what()); } session.acknowledge(); } } } catch (qpid::types::Exception& e) { QPID_LOG(error, "Exception caught in message thread - exiting: " << e.what()); enqueueEvent(ConsoleEvent(new ConsoleEventImpl(CONSOLE_THREAD_FAILED))); } QPID_LOG(debug, "ConsoleSession thread exiting"); }