/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qmf/AgentSessionImpl.h" #include #include namespace qmf { using std::string; using std::map; using qpid::messaging::Address; using qpid::messaging::Connection; using qpid::messaging::Duration; using qpid::messaging::Message; using qpid::messaging::Receiver; using qpid::messaging::Sender; using qpid::types::Variant; AgentSession::AgentSession(AgentSessionImpl* impl) { PI::ctor(*this, impl); } AgentSession::AgentSession(const AgentSession& s) : qmf::Handle() { PI::copy(*this, s); } AgentSession::~AgentSession() { PI::dtor(*this); } AgentSession& AgentSession::operator=(const AgentSession& s) { return PI::assign(*this, s); } AgentSession::AgentSession(Connection& c, const string& o) { PI::ctor(*this, new AgentSessionImpl(c, o)); } void AgentSession::setDomain(const string& d) { impl->setDomain(d); } void AgentSession::setVendor(const string& v) { impl->setVendor(v); } void AgentSession::setProduct(const string& p) { impl->setProduct(p); } void AgentSession::setInstance(const string& i) { impl->setInstance(i); } void AgentSession::setAttribute(const string& k, const qpid::types::Variant& v) { impl->setAttribute(k, v); } const string& AgentSession::getName() const { return impl->getName(); } void AgentSession::open() { impl->open(); } void AgentSession::close() { impl->close(); } bool AgentSession::nextEvent(AgentEvent& e, Duration t) { return impl->nextEvent(e, t); } int AgentSession::pendingEvents() const { return impl->pendingEvents(); } void AgentSession::registerSchema(Schema& s) { impl->registerSchema(s); } DataAddr AgentSession::addData(Data& d, const string& n, bool p) { return impl->addData(d, n, p); } void AgentSession::delData(const DataAddr& a) { impl->delData(a); } void AgentSession::authAccept(AgentEvent& e) { impl->authAccept(e); } void AgentSession::authReject(AgentEvent& e, const string& m) { impl->authReject(e, m); } void AgentSession::raiseException(AgentEvent& e, const string& s) { impl->raiseException(e, s); } void AgentSession::raiseException(AgentEvent& e, const Data& d) { impl->raiseException(e, d); } void AgentSession::response(AgentEvent& e, const Data& d) { impl->response(e, d); } void AgentSession::complete(AgentEvent& e) { impl->complete(e); } void AgentSession::methodSuccess(AgentEvent& e) { impl->methodSuccess(e); } void AgentSession::raiseEvent(const Data& d) { impl->raiseEvent(d); } void AgentSession::raiseEvent(const Data& d, int s) { impl->raiseEvent(d, s); } //======================================================================================== // Impl Method Bodies //======================================================================================== AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) : connection(c), domain("default"), opened(false), eventNotifier(0), thread(0), threadCanceled(false), bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false), externalStorage(false), autoAllowQueries(true), autoAllowMethods(true), maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true), listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5), schemaUpdateTime(uint64_t(qpid::sys::Duration::FromEpoch())) { // // Set Agent Capability Level // attributes["qmf.agent_capability"] = AGENT_CAPABILITY_0_8; if (!options.empty()) { qpid::messaging::AddressParser parser(options); Variant::Map optMap; Variant::Map::const_iterator iter; parser.parseMap(optMap); iter = optMap.find("domain"); if (iter != optMap.end()) domain = iter->second.asString(); iter = optMap.find("interval"); if (iter != optMap.end()) { interval = iter->second.asUint32(); if (interval < 1) interval = 1; } iter = optMap.find("external"); if (iter != optMap.end()) externalStorage = iter->second.asBool(); iter = optMap.find("allow-queries"); if (iter != optMap.end()) autoAllowQueries = iter->second.asBool(); iter = optMap.find("allow-methods"); if (iter != optMap.end()) autoAllowMethods = iter->second.asBool(); iter = optMap.find("max-subscriptions"); if (iter != optMap.end()) maxSubscriptions = iter->second.asUint32(); iter = optMap.find("min-sub-interval"); if (iter != optMap.end()) minSubInterval = iter->second.asUint32(); iter = optMap.find("sub-lifetime"); if (iter != optMap.end()) subLifetime = iter->second.asUint32(); iter = optMap.find("public-events"); if (iter != optMap.end()) publicEvents = iter->second.asBool(); iter = optMap.find("listen-on-direct"); if (iter != optMap.end()) listenOnDirect = iter->second.asBool(); iter = optMap.find("strict-security"); if (iter != optMap.end()) strictSecurity = iter->second.asBool(); iter = optMap.find("max-thread-wait-time"); if (iter != optMap.end()) maxThreadWaitTime = iter->second.asUint32(); } if (maxThreadWaitTime > interval) maxThreadWaitTime = interval; } AgentSessionImpl::~AgentSessionImpl() { if (opened) close(); if (thread) { thread->join(); delete thread; } } void AgentSessionImpl::open() { if (opened) throw QmfException("The session is already open"); // If the thread exists, join and delete it before creating a new one. if (thread) { thread->join(); delete thread; } const string addrArgs(";{create:never,node:{type:topic}}"); const string routableAddr("direct-agent.route." + qpid::types::Uuid(true).str()); attributes["_direct_subject"] = routableAddr; // Establish messaging addresses setAgentName(); directBase = "qmf." + domain + ".direct"; topicBase = "qmf." + domain + ".topic"; // Create AMQP session, receivers, and senders session = connection.createSession(); Receiver directRx; Receiver routableDirectRx = session.createReceiver(topicBase + "/" + routableAddr + addrArgs); Receiver topicRx = session.createReceiver(topicBase + "/console.#" + addrArgs); if (listenOnDirect && !strictSecurity) { directRx = session.createReceiver(directBase + "/" + agentName + addrArgs); directRx.setCapacity(64); } routableDirectRx.setCapacity(64); topicRx.setCapacity(64); if (!strictSecurity) directSender = session.createSender(directBase + addrArgs); topicSender = session.createSender(topicBase + addrArgs); // Start the receiver thread threadCanceled = false; opened = true; thread = new qpid::sys::Thread(*this); // Send an initial agent heartbeat message sendHeartbeat(); } void AgentSessionImpl::closeAsync() { if (!opened) return; // Stop the receiver thread. Don't join it until the destructor is called or open() is called. threadCanceled = true; opened = false; } void AgentSessionImpl::close() { closeAsync(); if (thread) { thread->join(); delete thread; thread = 0; } } bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout) { uint64_t milliseconds = timeout.getMilliseconds(); qpid::sys::Mutex::ScopedLock l(lock); if (eventQueue.empty() && milliseconds > 0) { int64_t nsecs(qpid::sys::TIME_INFINITE); if ((uint64_t)(nsecs / 1000000) > milliseconds) nsecs = (int64_t) milliseconds * 1000000; qpid::sys::Duration then(nsecs); cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then)); } if (!eventQueue.empty()) { event = eventQueue.front(); eventQueue.pop(); if (eventQueue.empty()) alertEventNotifierLH(false); return true; } return false; } int AgentSessionImpl::pendingEvents() const { qpid::sys::Mutex::ScopedLock l(lock); return eventQueue.size(); } void AgentSessionImpl::setEventNotifier(EventNotifierImpl* notifier) { qpid::sys::Mutex::ScopedLock l(lock); eventNotifier = notifier; } EventNotifierImpl* AgentSessionImpl::getEventNotifier() const { qpid::sys::Mutex::ScopedLock l(lock); return eventNotifier; } void AgentSessionImpl::registerSchema(Schema& schema) { if (!schema.isFinalized()) schema.finalize(); const SchemaId& schemaId(schema.getSchemaId()); qpid::sys::Mutex::ScopedLock l(lock); schemata[schemaId] = schema; schemaIndex[schemaId] = DataIndex(); // // Get the news out at the next periodic interval that there is new schema information. // schemaUpdateTime = uint64_t(qpid::sys::Duration::FromEpoch()); forceHeartbeat = true; } DataAddr AgentSessionImpl::addData(Data& data, const string& name, bool persistent) { if (externalStorage) throw QmfException("addData() must not be called when the 'external' option is enabled."); string dataName; if (name.empty()) dataName = qpid::types::Uuid(true).str(); else dataName = name; DataAddr addr(dataName, agentName, persistent ? 0 : bootSequence); data.setAddr(addr); { qpid::sys::Mutex::ScopedLock l(lock); DataIndex::const_iterator iter = globalIndex.find(addr); if (iter != globalIndex.end()) throw QmfException("Duplicate Data Address"); globalIndex[addr] = data; if (data.hasSchema()) schemaIndex[data.getSchemaId()][addr] = data; } // // TODO: Survey active subscriptions to see if they need to hear about this new data. // return addr; } void AgentSessionImpl::delData(const DataAddr& addr) { { qpid::sys::Mutex::ScopedLock l(lock); DataIndex::iterator iter = globalIndex.find(addr); if (iter == globalIndex.end()) return; if (iter->second.hasSchema()) { const SchemaId& schemaId(iter->second.getSchemaId()); schemaIndex[schemaId].erase(addr); } globalIndex.erase(iter); } // // TODO: Survey active subscriptions to see if they need to hear about this deleted data. // } void AgentSessionImpl::authAccept(AgentEvent& authEvent) { std::auto_ptr eventImpl(new AgentEventImpl(AGENT_QUERY)); eventImpl->setQuery(authEvent.getQuery()); eventImpl->setUserId(authEvent.getUserId()); eventImpl->setReplyTo(AgentEventImplAccess::get(authEvent).getReplyTo()); eventImpl->setCorrelationId(AgentEventImplAccess::get(authEvent).getCorrelationId()); AgentEvent event(eventImpl.release()); if (externalStorage) { enqueueEvent(event); return; } const Query& query(authEvent.getQuery()); if (query.getDataAddr().isValid()) { { qpid::sys::Mutex::ScopedLock l(lock); DataIndex::const_iterator iter = globalIndex.find(query.getDataAddr()); if (iter != globalIndex.end()) response(event, iter->second); } complete(event); return; } if (query.getSchemaId().isValid()) { { qpid::sys::Mutex::ScopedLock l(lock); map::const_iterator iter = schemaIndex.find(query.getSchemaId()); if (iter != schemaIndex.end()) for (DataIndex::const_iterator dIter = iter->second.begin(); dIter != iter->second.end(); dIter++) if (query.matchesPredicate(dIter->second.getProperties())) response(event, dIter->second); } complete(event); return; } raiseException(event, "Query is Invalid"); } void AgentSessionImpl::authReject(AgentEvent& event, const string& error) { raiseException(event, "Action Forbidden - " + error); } void AgentSessionImpl::raiseException(AgentEvent& event, const string& error) { Data exception(new DataImpl()); exception.setProperty("error_text", error); raiseException(event, exception); } void AgentSessionImpl::raiseException(AgentEvent& event, const Data& data) { Message msg; Variant::Map map; Variant::Map& headers(msg.getProperties()); headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE; headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_EXCEPTION; headers[protocol::HEADER_KEY_CONTENT] = protocol::HEADER_CONTENT_DATA; headers[protocol::HEADER_KEY_AGENT] = agentName; headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; AgentEventImpl& eventImpl(AgentEventImplAccess::get(event)); const DataImpl& dataImpl(DataImplAccess::get(data)); msg.setCorrelationId(eventImpl.getCorrelationId()); encode(dataImpl.asMap(), msg); send(msg, eventImpl.getReplyTo()); QPID_LOG(trace, "SENT Exception to=" << eventImpl.getReplyTo()); } void AgentSessionImpl::response(AgentEvent& event, const Data& data) { AgentEventImpl& impl(AgentEventImplAccess::get(event)); uint32_t count = impl.enqueueData(data); if (count >= 8) flushResponses(event, false); } void AgentSessionImpl::complete(AgentEvent& event) { flushResponses(event, true); } void AgentSessionImpl::methodSuccess(AgentEvent& event) { Message msg; Variant::Map map; Variant::Map& headers(msg.getProperties()); headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE; headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_METHOD_RESPONSE; headers[protocol::HEADER_KEY_AGENT] = agentName; headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; AgentEventImpl& eventImpl(AgentEventImplAccess::get(event)); const Variant::Map& outArgs(eventImpl.getReturnArguments()); const Variant::Map& outSubtypes(eventImpl.getReturnArgumentSubtypes()); map["_arguments"] = outArgs; if (!outSubtypes.empty()) map["_subtypes"] = outSubtypes; msg.setCorrelationId(eventImpl.getCorrelationId()); encode(map, msg); send(msg, eventImpl.getReplyTo()); QPID_LOG(trace, "SENT MethodResponse to=" << eventImpl.getReplyTo()); } void AgentSessionImpl::raiseEvent(const Data& data) { int severity(SEV_NOTICE); if (data.hasSchema()) { const Schema& schema(DataImplAccess::get(data).getSchema()); if (schema.isValid()) severity = schema.getDefaultSeverity(); } raiseEvent(data, severity); } void AgentSessionImpl::raiseEvent(const Data& data, int severity) { Message msg; Variant::Map map; Variant::Map& headers(msg.getProperties()); string subject("agent.ind.event"); if (data.hasSchema()) { const SchemaId& schemaId(data.getSchemaId()); if (schemaId.getType() != SCHEMA_TYPE_EVENT) throw QmfException("Cannot call raiseEvent on data that is not an Event"); subject = subject + "." + schemaId.getPackageName() + "." + schemaId.getName(); } if (severity < SEV_EMERG || severity > SEV_DEBUG) throw QmfException("Invalid severity value"); headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_INDICATION; headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_DATA_INDICATION; headers[protocol::HEADER_KEY_CONTENT] = protocol::HEADER_CONTENT_EVENT; headers[protocol::HEADER_KEY_AGENT] = agentName; headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; msg.setSubject(subject); Variant::List list; Variant::Map dataAsMap(DataImplAccess::get(data).asMap()); dataAsMap["_severity"] = severity; dataAsMap["_timestamp"] = uint64_t(qpid::sys::Duration::FromEpoch()); list.push_back(dataAsMap); encode(list, msg); topicSender.send(msg); QPID_LOG(trace, "SENT EventIndication to=" << topicSender.getName() << "/" << subject); } void AgentSessionImpl::checkOpen() { if (opened) throw QmfException("Operation must be performed before calling open()"); } void AgentSessionImpl::enqueueEvent(const AgentEvent& event) { qpid::sys::Mutex::ScopedLock l(lock); bool notify = eventQueue.empty(); eventQueue.push(event); if (notify) { cond.notify(); alertEventNotifierLH(true); } } void AgentSessionImpl::setAgentName() { Variant::Map::iterator iter; string vendor; string product; string instance; iter = attributes.find("_vendor"); if (iter == attributes.end()) attributes["_vendor"] = vendor; else vendor = iter->second.asString(); iter = attributes.find("_product"); if (iter == attributes.end()) attributes["_product"] = product; else product = iter->second.asString(); iter = attributes.find("_instance"); if (iter == attributes.end()) { instance = qpid::types::Uuid(true).str(); attributes["_instance"] = instance; } else instance = iter->second.asString(); agentName = vendor + ":" + product + ":" + instance; attributes["_name"] = agentName; } void AgentSessionImpl::handleLocateRequest(const Variant::List& predicate, const Message& msg) { QPID_LOG(trace, "RCVD AgentLocateRequest from=" << msg.getReplyTo()); if (!predicate.empty()) { Query agentQuery(QUERY_OBJECT); agentQuery.setPredicate(predicate); if (!agentQuery.matchesPredicate(attributes)) { QPID_LOG(trace, "AgentLocate predicate does not match this agent, ignoring"); return; } } Message reply; Variant::Map map; Variant::Map& headers(reply.getProperties()); headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_INDICATION; headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_RESPONSE; headers[protocol::HEADER_KEY_AGENT] = agentName; headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; map["_values"] = attributes; map["_values"].asMap()[protocol::AGENT_ATTR_TIMESTAMP] = uint64_t(qpid::sys::Duration::FromEpoch()); map["_values"].asMap()[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = interval; map["_values"].asMap()[protocol::AGENT_ATTR_EPOCH] = bootSequence; map["_values"].asMap()[protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP] = schemaUpdateTime; encode(map, reply); send(reply, msg.getReplyTo()); QPID_LOG(trace, "SENT AgentLocateResponse to=" << msg.getReplyTo()); } void AgentSessionImpl::handleMethodRequest(const Variant::Map& content, const Message& msg) { QPID_LOG(trace, "RCVD MethodRequest map=" << content << " from=" << msg.getReplyTo() << " cid=" << msg.getCorrelationId()); // // Construct an AgentEvent to be sent to the application. // std::auto_ptr eventImpl(new AgentEventImpl(AGENT_METHOD)); eventImpl->setUserId(msg.getUserId()); eventImpl->setReplyTo(msg.getReplyTo()); eventImpl->setCorrelationId(msg.getCorrelationId()); Variant::Map::const_iterator iter; iter = content.find("_method_name"); if (iter == content.end()) { AgentEvent event(eventImpl.release()); raiseException(event, "Malformed MethodRequest: missing _method_name field"); return; } eventImpl->setMethodName(iter->second.asString()); iter = content.find("_arguments"); if (iter != content.end()) eventImpl->setArguments(iter->second.asMap()); iter = content.find("_subtypes"); if (iter != content.end()) eventImpl->setArgumentSubtypes(iter->second.asMap()); iter = content.find("_object_id"); if (iter != content.end()) { DataAddr addr(new DataAddrImpl(iter->second.asMap())); eventImpl->setDataAddr(addr); if (!externalStorage) { DataIndex::const_iterator iter(globalIndex.find(addr)); if (iter == globalIndex.end()) { AgentEvent event(eventImpl.release()); raiseException(event, "No data object found with the specified address"); return; } const Schema& schema(DataImplAccess::get(iter->second).getSchema()); if (schema.isValid()) { eventImpl->setSchema(schema); for (Variant::Map::const_iterator aIter = eventImpl->getArguments().begin(); aIter != eventImpl->getArguments().end(); aIter++) { const Schema& schema(DataImplAccess::get(iter->second).getSchema()); if (!SchemaImplAccess::get(schema).isValidMethodInArg(eventImpl->getMethodName(), aIter->first, aIter->second)) { AgentEvent event(eventImpl.release()); raiseException(event, "Invalid argument: " + aIter->first); return; } } } } } enqueueEvent(AgentEvent(eventImpl.release())); } void AgentSessionImpl::handleQueryRequest(const Variant::Map& content, const Message& msg) { QPID_LOG(trace, "RCVD QueryRequest query=" << content << " from=" << msg.getReplyTo() << " cid=" << msg.getCorrelationId()); // // Construct an AgentEvent to be sent to the application or directly handled by the agent. // std::auto_ptr queryImpl(new QueryImpl(content)); std::auto_ptr eventImpl(new AgentEventImpl(AGENT_AUTH_QUERY)); eventImpl->setUserId(msg.getUserId()); eventImpl->setReplyTo(msg.getReplyTo()); eventImpl->setCorrelationId(msg.getCorrelationId()); eventImpl->setQuery(queryImpl.release()); AgentEvent ae(eventImpl.release()); if (ae.getQuery().getTarget() == QUERY_SCHEMA_ID || ae.getQuery().getTarget() == QUERY_SCHEMA) { handleSchemaRequest(ae); return; } if (autoAllowQueries) authAccept(ae); else enqueueEvent(ae); } void AgentSessionImpl::handleSchemaRequest(AgentEvent& event) { SchemaMap::const_iterator iter; string error; const Query& query(event.getQuery()); Message msg; Variant::List content; Variant::Map map; Variant::Map& headers(msg.getProperties()); headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE; headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_QUERY_RESPONSE; headers[protocol::HEADER_KEY_AGENT] = agentName; headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; { qpid::sys::Mutex::ScopedLock l(lock); if (query.getTarget() == QUERY_SCHEMA_ID) { headers[protocol::HEADER_KEY_CONTENT] = "_schema_id"; for (iter = schemata.begin(); iter != schemata.end(); iter++) content.push_back(SchemaIdImplAccess::get(iter->first).asMap()); } else if (query.getSchemaId().isValid()) { headers[protocol::HEADER_KEY_CONTENT] = "_schema"; iter = schemata.find(query.getSchemaId()); if (iter != schemata.end()) content.push_back(SchemaImplAccess::get(iter->second).asMap()); } else { error = "Invalid Schema Query: Requests for SCHEMA must supply a valid schema ID."; } } if (!error.empty()) { raiseException(event, error); return; } AgentEventImpl& eventImpl(AgentEventImplAccess::get(event)); msg.setCorrelationId(eventImpl.getCorrelationId()); encode(content, msg); send(msg, eventImpl.getReplyTo()); QPID_LOG(trace, "SENT QueryResponse(Schema) to=" << eventImpl.getReplyTo()); } void AgentSessionImpl::handleV1SchemaRequest(qpid::management::Buffer& buffer, uint32_t seq, const Message& msg) { string packageName; string className; uint8_t hashBits[16]; buffer.getShortString(packageName); buffer.getShortString(className); buffer.getBin128(hashBits); QPID_LOG(trace, "RCVD QMFv1 SchemaRequest for " << packageName << ":" << className); qpid::types::Uuid hash(hashBits); map::const_iterator iter; string replyContent; SchemaId dataId(SCHEMA_TYPE_DATA, packageName, className); dataId.setHash(hash); { qpid::sys::Mutex::ScopedLock l(lock); iter = schemata.find(dataId); if (iter != schemata.end()) replyContent = SchemaImplAccess::get(iter->second).asV1Content(seq); else { SchemaId eventId(SCHEMA_TYPE_EVENT, packageName, className); eventId.setHash(hash); iter = schemata.find(dataId); if (iter != schemata.end()) replyContent = SchemaImplAccess::get(iter->second).asV1Content(seq); else return; } } Message reply; Variant::Map& headers(reply.getProperties()); headers[protocol::HEADER_KEY_AGENT] = agentName; reply.setContent(replyContent); send(reply, msg.getReplyTo()); QPID_LOG(trace, "SENT QMFv1 SchemaResponse to=" << msg.getReplyTo()); } void AgentSessionImpl::dispatch(Message msg) { const Variant::Map& properties(msg.getProperties()); Variant::Map::const_iterator iter; // // If strict-security is enabled, make sure that reply-to address complies with the // strict-security addressing pattern (i.e. start with 'qmf..topic/direct-console.'). // if (strictSecurity && msg.getReplyTo()) { if (msg.getReplyTo().getName() != topicBase || msg.getReplyTo().getSubject().find("direct-console.") != 0) { QPID_LOG(warning, "Reply-to violates strict-security policy: " << msg.getReplyTo().str()); return; } } iter = properties.find(protocol::HEADER_KEY_APP_ID); if (iter != properties.end() && iter->second.asString() == protocol::HEADER_APP_ID_QMF) { // // Dispatch a QMFv2 formatted message // iter = properties.find(protocol::HEADER_KEY_OPCODE); if (iter == properties.end()) { QPID_LOG(trace, "Message received with no 'qmf.opcode' header"); return; } const string& opcode = iter->second.asString(); if (msg.getContentType() == "amqp/list") { Variant::List content; decode(msg, content); if (opcode == protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST) handleLocateRequest(content, msg); else { QPID_LOG(trace, "Unexpected QMFv2 opcode with 'amqp/list' content: " << opcode); } } else if (msg.getContentType() == "amqp/map") { Variant::Map content; decode(msg, content); if (opcode == protocol::HEADER_OPCODE_METHOD_REQUEST) handleMethodRequest(content, msg); else if (opcode == protocol::HEADER_OPCODE_QUERY_REQUEST) handleQueryRequest(content, msg); else { QPID_LOG(trace, "Unexpected QMFv2 opcode with 'amqp/map' content: " << opcode); } } else { QPID_LOG(trace, "Unexpected QMFv2 content type. Expected amqp/list or amqp/map"); } } else { // // Dispatch a QMFv1 formatted message // const string& body(msg.getContent()); if (body.size() < 8) return; qpid::management::Buffer buffer(const_cast(body.c_str()), body.size()); if (buffer.getOctet() != 'A') return; if (buffer.getOctet() != 'M') return; if (buffer.getOctet() != '2') return; char v1Opcode(buffer.getOctet()); uint32_t seq(buffer.getLong()); if (v1Opcode == 'S') handleV1SchemaRequest(buffer, seq, msg); else { QPID_LOG(trace, "Unknown or Unsupported QMFv1 opcode: " << v1Opcode); } } } void AgentSessionImpl::sendHeartbeat() { Message msg; Variant::Map map; Variant::Map& headers(msg.getProperties()); std::stringstream address; address << "agent.ind.heartbeat"; // append .. to address key if present. Variant::Map::const_iterator v; if ((v = attributes.find("_vendor")) != attributes.end() && !v->second.getString().empty()) { address << "." << v->second.getString(); if ((v = attributes.find("_product")) != attributes.end() && !v->second.getString().empty()) { address << "." << v->second.getString(); } } headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_INDICATION; headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_HEARTBEAT_INDICATION; headers[protocol::HEADER_KEY_AGENT] = agentName; headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; msg.setSubject(address.str()); map["_values"] = attributes; map["_values"].asMap()[protocol::AGENT_ATTR_TIMESTAMP] = uint64_t(qpid::sys::Duration::FromEpoch()); map["_values"].asMap()[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = interval; map["_values"].asMap()[protocol::AGENT_ATTR_EPOCH] = bootSequence; map["_values"].asMap()[protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP] = schemaUpdateTime; encode(map, msg); topicSender.send(msg); QPID_LOG(trace, "SENT AgentHeartbeat name=" << agentName); } void AgentSessionImpl::send(Message msg, const Address& to) { Sender sender; if (strictSecurity && to.getName() != topicBase) { QPID_LOG(warning, "Address violates strict-security policy: " << to); return; } if (to.getName() == directBase) { msg.setSubject(to.getSubject()); sender = directSender; } else if (to.getName() == topicBase) { msg.setSubject(to.getSubject()); sender = topicSender; } else sender = session.createSender(to); sender.send(msg); } void AgentSessionImpl::flushResponses(AgentEvent& event, bool final) { Message msg; Variant::Map map; Variant::Map& headers(msg.getProperties()); headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE; headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_QUERY_RESPONSE; headers[protocol::HEADER_KEY_CONTENT] = protocol::HEADER_CONTENT_DATA; headers[protocol::HEADER_KEY_AGENT] = agentName; headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; if (!final) headers[protocol::HEADER_KEY_PARTIAL] = Variant(); Variant::List body; AgentEventImpl& eventImpl(AgentEventImplAccess::get(event)); Data data(eventImpl.dequeueData()); while (data.isValid()) { DataImpl& dataImpl(DataImplAccess::get(data)); body.push_back(dataImpl.asMap()); data = eventImpl.dequeueData(); } msg.setCorrelationId(eventImpl.getCorrelationId()); encode(body, msg); send(msg, eventImpl.getReplyTo()); QPID_LOG(trace, "SENT QueryResponse to=" << eventImpl.getReplyTo()); } void AgentSessionImpl::periodicProcessing(uint64_t seconds) { // // The granularity of this timer is seconds. Don't waste time looking for work if // it's been less than a second since we last visited. // if (seconds == lastVisit) return; //uint64_t thisInterval(seconds - lastVisit); lastVisit = seconds; // // First time through, set lastHeartbeat to the current time. // if (lastHeartbeat == 0) lastHeartbeat = seconds; // // If the hearbeat interval has elapsed, send a heartbeat. // if (forceHeartbeat || (seconds - lastHeartbeat >= interval)) { lastHeartbeat = seconds; forceHeartbeat = false; sendHeartbeat(); } // // TODO: process any active subscriptions on their intervals. // } void AgentSessionImpl::alertEventNotifierLH(bool readable) { if (eventNotifier) eventNotifier->setReadable(readable); } void AgentSessionImpl::run() { QPID_LOG(debug, "AgentSession thread started for agent " << agentName); try { while (!threadCanceled) { periodicProcessing((uint64_t) qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_SEC); Receiver rx; bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime); if (threadCanceled) break; if (valid) { try { dispatch(rx.fetch()); } catch (qpid::types::Exception& e) { QPID_LOG(error, "Exception caught in message dispatch: " << e.what()); } session.acknowledge(); } } } catch (qpid::types::Exception& e) { QPID_LOG(error, "Exception caught in message thread - exiting: " << e.what()); enqueueEvent(AgentEvent(new AgentEventImpl(AGENT_THREAD_FAILED))); } session.close(); QPID_LOG(debug, "AgentSession thread exiting for agent " << agentName); } AgentSessionImpl& AgentSessionImplAccess::get(AgentSession& session) { return *session.impl; } const AgentSessionImpl& AgentSessionImplAccess::get(const AgentSession& session) { return *session.impl; } }