summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qmf/ConsoleSession.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qmf/ConsoleSession.cpp')
-rw-r--r--qpid/cpp/src/qmf/ConsoleSession.cpp683
1 files changed, 683 insertions, 0 deletions
diff --git a/qpid/cpp/src/qmf/ConsoleSession.cpp b/qpid/cpp/src/qmf/ConsoleSession.cpp
new file mode 100644
index 0000000000..c74d4de8db
--- /dev/null
+++ b/qpid/cpp/src/qmf/ConsoleSession.cpp
@@ -0,0 +1,683 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qmf/PrivateImplRef.h"
+#include "qmf/ConsoleSessionImpl.h"
+#include "qmf/AgentImpl.h"
+#include "qmf/SchemaId.h"
+#include "qmf/SchemaImpl.h"
+#include "qmf/ConsoleEventImpl.h"
+#include "qmf/constants.h"
+#include "qpid/log/Statement.h"
+#include "qpid/messaging/AddressParser.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Receiver.h"
+
+using namespace std;
+using namespace qmf;
+using qpid::messaging::Address;
+using qpid::messaging::Connection;
+using qpid::messaging::Receiver;
+using qpid::messaging::Sender;
+using qpid::messaging::Duration;
+using qpid::messaging::Message;
+using qpid::types::Variant;
+
+typedef qmf::PrivateImplRef<ConsoleSession> PI;
+
+ConsoleSession::ConsoleSession(ConsoleSessionImpl* impl) { PI::ctor(*this, impl); }
+ConsoleSession::ConsoleSession(const ConsoleSession& s) : qmf::Handle<ConsoleSessionImpl>() { PI::copy(*this, s); }
+ConsoleSession::~ConsoleSession() { PI::dtor(*this); }
+ConsoleSession& ConsoleSession::operator=(const ConsoleSession& s) { return PI::assign(*this, s); }
+
+ConsoleSession::ConsoleSession(Connection& c, const string& o) { PI::ctor(*this, new ConsoleSessionImpl(c, o)); }
+void ConsoleSession::setDomain(const string& d) { impl->setDomain(d); }
+void ConsoleSession::setAgentFilter(const string& f) { impl->setAgentFilter(f); }
+void ConsoleSession::open() { impl->open(); }
+void ConsoleSession::close() { impl->close(); }
+bool ConsoleSession::nextEvent(ConsoleEvent& e, Duration t) { return impl->nextEvent(e, t); }
+int ConsoleSession::pendingEvents() const { return impl->pendingEvents(); }
+uint32_t ConsoleSession::getAgentCount() const { return impl->getAgentCount(); }
+Agent ConsoleSession::getAgent(uint32_t i) const { return impl->getAgent(i); }
+Agent ConsoleSession::getConnectedBrokerAgent() const { return impl->getConnectedBrokerAgent(); }
+Subscription ConsoleSession::subscribe(const Query& q, const string& f, const string& o) { return impl->subscribe(q, f, o); }
+Subscription ConsoleSession::subscribe(const string& q, const string& f, const string& o) { return impl->subscribe(q, f, o); }
+
+//========================================================================================
+// Impl Method Bodies
+//========================================================================================
+
+ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
+ connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5),
+ opened(false), eventNotifier(0), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
+ connectedBrokerInAgentList(false), schemaCache(new SchemaCache()), nextCorrelator(1)
+{
+ if (!options.empty()) {
+ qpid::messaging::AddressParser parser(options);
+ Variant::Map optMap;
+ Variant::Map::const_iterator iter;
+
+ parser.parseMap(optMap);
+
+ iter = optMap.find("domain");
+ if (iter != optMap.end())
+ domain = iter->second.asString();
+
+ iter = optMap.find("max-agent-age");
+ if (iter != optMap.end())
+ maxAgentAgeMinutes = iter->second.asUint32();
+
+ iter = optMap.find("listen-on-direct");
+ if (iter != optMap.end())
+ listenOnDirect = iter->second.asBool();
+
+ iter = optMap.find("strict-security");
+ if (iter != optMap.end())
+ strictSecurity = iter->second.asBool();
+
+ iter = optMap.find("max-thread-wait-time");
+ if (iter != optMap.end())
+ maxThreadWaitTime = iter->second.asUint32();
+ }
+
+ if (maxThreadWaitTime > 60)
+ maxThreadWaitTime = 60;
+}
+
+
+ConsoleSessionImpl::~ConsoleSessionImpl()
+{
+ if (opened)
+ close();
+
+ if (thread) {
+ thread->join();
+ delete thread;
+ }
+}
+
+
+void ConsoleSessionImpl::setAgentFilter(const string& predicate)
+{
+ agentQuery = Query(QUERY_OBJECT, predicate);
+
+ //
+ // Purge the agent list of any agents that don't match the filter.
+ //
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ map<string, Agent> toDelete;
+ for (map<string, Agent>::iterator iter = agents.begin(); iter != agents.end(); iter++)
+ if (!agentQuery.matchesPredicate(iter->second.getAttributes())) {
+ toDelete[iter->first] = iter->second;
+ if (iter->second.getName() == connectedBrokerAgent.getName())
+ connectedBrokerInAgentList = false;
+ }
+
+ for (map<string, Agent>::iterator iter = toDelete.begin(); iter != toDelete.end(); iter++) {
+ agents.erase(iter->first);
+ auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_DEL, AGENT_DEL_FILTER));
+ eventImpl->setAgent(iter->second);
+ enqueueEventLH(eventImpl.release());
+ }
+
+ if (!connectedBrokerInAgentList && connectedBrokerAgent.isValid() &&
+ agentQuery.matchesPredicate(connectedBrokerAgent.getAttributes())) {
+ agents[connectedBrokerAgent.getName()] = connectedBrokerAgent;
+ connectedBrokerInAgentList = true;
+
+ //
+ // Enqueue a notification of the new agent.
+ //
+ auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_ADD));
+ eventImpl->setAgent(connectedBrokerAgent);
+ enqueueEventLH(ConsoleEvent(eventImpl.release()));
+ }
+ }
+
+ //
+ // Broadcast an agent locate request with our new criteria.
+ //
+ if (opened)
+ sendAgentLocate();
+}
+
+
+void ConsoleSessionImpl::open()
+{
+ if (opened)
+ throw QmfException("The session is already open");
+
+ // If the thread exists, join and delete it before creating a new one.
+ if (thread) {
+ thread->join();
+ delete thread;
+ }
+
+ // Establish messaging addresses
+ directBase = "qmf." + domain + ".direct";
+ topicBase = "qmf." + domain + ".topic";
+
+ string myKey("direct-console." + qpid::types::Uuid(true).str());
+
+ replyAddress = Address(topicBase + "/" + myKey + ";{node:{type:topic}}");
+
+ // Create AMQP session, receivers, and senders
+ session = connection.createSession();
+ Receiver directRx = session.createReceiver(replyAddress);
+ Receiver topicRx = session.createReceiver(topicBase + "/agent.#"); // TODO: be more discriminating
+ if (!strictSecurity) {
+ Receiver legacyRx = session.createReceiver("amq.direct/" + myKey + ";{node:{type:topic}}");
+ legacyRx.setCapacity(64);
+ directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}");
+ directSender.setCapacity(128);
+ }
+
+ directRx.setCapacity(64);
+ topicRx.setCapacity(128);
+
+ topicSender = session.createSender(topicBase + ";{create:never,node:{type:topic}}");
+
+ topicSender.setCapacity(128);
+
+ // Start the receiver thread
+ threadCanceled = false;
+ opened = true;
+ thread = new qpid::sys::Thread(*this);
+
+ // Send an agent_locate to direct address 'broker' to identify the connected-broker-agent.
+ sendBrokerLocate();
+ if (agentQuery)
+ sendAgentLocate();
+}
+
+
+void ConsoleSessionImpl::closeAsync()
+{
+ if (!opened)
+ throw QmfException("The session is already closed");
+
+ // Stop the receiver thread. Don't join it until the destructor is called or open() is called.
+ threadCanceled = true;
+ opened = false;
+}
+
+
+void ConsoleSessionImpl::close()
+{
+ closeAsync();
+
+ if (thread) {
+ thread->join();
+ delete thread;
+ thread = 0;
+ }
+}
+
+
+bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout)
+{
+ uint64_t milliseconds = timeout.getMilliseconds();
+ qpid::sys::Mutex::ScopedLock l(lock);
+
+ if (eventQueue.empty() && milliseconds > 0) {
+ int64_t nsecs(qpid::sys::TIME_INFINITE);
+ if ((uint64_t)(nsecs / 1000000) > milliseconds)
+ nsecs = (int64_t) milliseconds * 1000000;
+ qpid::sys::Duration then(nsecs);
+ cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then));
+ }
+
+ if (!eventQueue.empty()) {
+ event = eventQueue.front();
+ eventQueue.pop();
+ if (eventQueue.empty())
+ alertEventNotifierLH(false);
+ return true;
+ }
+
+ return false;
+}
+
+
+int ConsoleSessionImpl::pendingEvents() const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return eventQueue.size();
+}
+
+
+void ConsoleSessionImpl::setEventNotifier(EventNotifierImpl* notifier)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ eventNotifier = notifier;
+}
+
+
+EventNotifierImpl* ConsoleSessionImpl::getEventNotifier() const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return eventNotifier;
+}
+
+
+uint32_t ConsoleSessionImpl::getAgentCount() const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return agents.size();
+}
+
+
+Agent ConsoleSessionImpl::getAgent(uint32_t i) const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ uint32_t count = 0;
+ for (map<string, Agent>::const_iterator iter = agents.begin(); iter != agents.end(); iter++)
+ if (count++ == i)
+ return iter->second;
+ throw IndexOutOfRange();
+}
+
+
+Subscription ConsoleSessionImpl::subscribe(const Query&, const string&, const string&)
+{
+ return Subscription();
+}
+
+
+Subscription ConsoleSessionImpl::subscribe(const string&, const string&, const string&)
+{
+ return Subscription();
+}
+
+
+void ConsoleSessionImpl::enqueueEvent(const ConsoleEvent& event)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ enqueueEventLH(event);
+}
+
+
+void ConsoleSessionImpl::enqueueEventLH(const ConsoleEvent& event)
+{
+ bool notify = eventQueue.empty();
+ eventQueue.push(event);
+ if (notify) {
+ cond.notify();
+ alertEventNotifierLH(true);
+ }
+}
+
+
+void ConsoleSessionImpl::dispatch(Message msg)
+{
+ const Variant::Map& properties(msg.getProperties());
+ Variant::Map::const_iterator iter;
+ Variant::Map::const_iterator oiter;
+
+ oiter = properties.find(protocol::HEADER_KEY_OPCODE);
+ iter = properties.find(protocol::HEADER_KEY_APP_ID);
+ if (iter == properties.end())
+ iter = properties.find("app_id");
+ if (iter != properties.end() && iter->second.asString() == protocol::HEADER_APP_ID_QMF && oiter != properties.end()) {
+ //
+ // Dispatch a QMFv2 formatted message
+ //
+ const string& opcode = oiter->second.asString();
+
+ iter = properties.find(protocol::HEADER_KEY_AGENT);
+ if (iter == properties.end()) {
+ QPID_LOG(trace, "Message received with no 'qmf.agent' header");
+ return;
+ }
+ const string& agentName = iter->second.asString();
+
+ Agent agent;
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ map<string, Agent>::iterator aIter = agents.find(agentName);
+ if (aIter != agents.end()) {
+ agent = aIter->second;
+ AgentImplAccess::get(agent).touch();
+ }
+ }
+
+ if (msg.getContentType() == "amqp/map" &&
+ (opcode == protocol::HEADER_OPCODE_AGENT_HEARTBEAT_INDICATION || opcode == protocol::HEADER_OPCODE_AGENT_LOCATE_RESPONSE)) {
+ //
+ // This is the one case where it's ok (necessary actually) to receive a QMFv2
+ // message from an unknown agent (how else are they going to get known?)
+ //
+ Variant::Map content;
+ decode(msg, content);
+ handleAgentUpdate(agentName, content, msg);
+ return;
+ }
+
+ if (!agent.isValid())
+ return;
+
+ AgentImpl& agentImpl(AgentImplAccess::get(agent));
+
+ if (msg.getContentType() == "amqp/map") {
+ Variant::Map content;
+ decode(msg, content);
+
+ if (opcode == protocol::HEADER_OPCODE_EXCEPTION) agentImpl.handleException(content, msg);
+ else if (opcode == protocol::HEADER_OPCODE_METHOD_RESPONSE) agentImpl.handleMethodResponse(content, msg);
+ else
+ QPID_LOG(error, "Received a map-formatted QMFv2 message with opcode=" << opcode);
+
+ return;
+ }
+
+ if (msg.getContentType() == "amqp/list") {
+ Variant::List content;
+ decode(msg, content);
+
+ if (opcode == protocol::HEADER_OPCODE_QUERY_RESPONSE) agentImpl.handleQueryResponse(content, msg);
+ else if (opcode == protocol::HEADER_OPCODE_DATA_INDICATION) agentImpl.handleDataIndication(content, msg);
+ else
+ QPID_LOG(error, "Received a list-formatted QMFv2 message with opcode=" << opcode);
+
+ return;
+ }
+ } else {
+ //
+ // Dispatch a QMFv1 formatted message
+ //
+ const string& body(msg.getContent());
+ if (body.size() < 8)
+ return;
+ qpid::management::Buffer buffer(const_cast<char*>(body.c_str()), body.size());
+
+ if (buffer.getOctet() != 'A') return;
+ if (buffer.getOctet() != 'M') return;
+ if (buffer.getOctet() != '2') return;
+ char v1Opcode(buffer.getOctet());
+ uint32_t seq(buffer.getLong());
+
+ if (v1Opcode == 's') handleV1SchemaResponse(buffer, seq, msg);
+ else {
+ QPID_LOG(trace, "Unknown or Unsupported QMFv1 opcode: " << v1Opcode);
+ }
+ }
+}
+
+
+void ConsoleSessionImpl::sendBrokerLocate()
+{
+ Message msg;
+ Variant::Map& headers(msg.getProperties());
+
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
+
+ msg.setReplyTo(replyAddress);
+ msg.setCorrelationId("broker-locate");
+ msg.setSubject("broker");
+
+ Sender sender = session.createSender(directBase + ";{create:never,node:{type:topic}}");
+ sender.send(msg);
+ sender.close();
+
+ QPID_LOG(trace, "SENT AgentLocate to broker");
+}
+
+
+void ConsoleSessionImpl::sendAgentLocate()
+{
+ Message msg;
+ Variant::Map& headers(msg.getProperties());
+ static const string subject("console.request.agent_locate");
+
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
+
+ msg.setReplyTo(replyAddress);
+ msg.setCorrelationId("agent-locate");
+ msg.setSubject(subject);
+ encode(agentQuery.getPredicate(), msg);
+
+ topicSender.send(msg);
+
+ QPID_LOG(trace, "SENT AgentLocate to=" << topicSender.getName() << "/" << subject);
+}
+
+
+void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Variant::Map& content, const Message& msg)
+{
+ Variant::Map::const_iterator iter;
+ Agent agent;
+ uint32_t epoch(0);
+ string cid(msg.getCorrelationId());
+
+ iter = content.find("_values");
+ if (iter == content.end())
+ return;
+ const Variant::Map& in_attrs(iter->second.asMap());
+ Variant::Map attrs;
+
+ //
+ // Copy the map from the message to "attrs". Translate any old-style
+ // keys to their new key values in the process.
+ //
+ for (iter = in_attrs.begin(); iter != in_attrs.end(); iter++) {
+ if (iter->first == "epoch")
+ attrs[protocol::AGENT_ATTR_EPOCH] = iter->second;
+ else if (iter->first == "timestamp")
+ attrs[protocol::AGENT_ATTR_TIMESTAMP] = iter->second;
+ else if (iter->first == "heartbeat_interval")
+ attrs[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = iter->second;
+ else
+ attrs[iter->first] = iter->second;
+ }
+
+ iter = attrs.find(protocol::AGENT_ATTR_EPOCH);
+ if (iter != attrs.end())
+ epoch = iter->second.asUint32();
+
+ if (cid == "broker-locate") {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ auto_ptr<AgentImpl> impl(new AgentImpl(agentName, epoch, *this));
+ for (iter = attrs.begin(); iter != attrs.end(); iter++)
+ if (iter->first != protocol::AGENT_ATTR_EPOCH)
+ impl->setAttribute(iter->first, iter->second);
+ agent = Agent(impl.release());
+ connectedBrokerAgent = agent;
+ if (!agentQuery || agentQuery.matchesPredicate(attrs)) {
+ connectedBrokerInAgentList = true;
+ agents[agentName] = agent;
+
+ //
+ // Enqueue a notification of the new agent.
+ //
+ auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_ADD));
+ eventImpl->setAgent(agent);
+ enqueueEventLH(ConsoleEvent(eventImpl.release()));
+ }
+ return;
+ }
+
+ //
+ // Check this agent against the agent filter. Exit if it doesn't match.
+ // (only if this isn't the connected broker agent)
+ //
+ if (agentQuery && (!agentQuery.matchesPredicate(attrs)))
+ return;
+
+ QPID_LOG(trace, "RCVD AgentHeartbeat from an agent matching our filter: " << agentName);
+
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ map<string, Agent>::iterator aIter = agents.find(agentName);
+ if (aIter == agents.end()) {
+ //
+ // This is a new agent. We have no current record of its existence.
+ //
+ auto_ptr<AgentImpl> impl(new AgentImpl(agentName, epoch, *this));
+ for (iter = attrs.begin(); iter != attrs.end(); iter++)
+ if (iter->first != protocol::AGENT_ATTR_EPOCH)
+ impl->setAttribute(iter->first, iter->second);
+ agent = Agent(impl.release());
+ agents[agentName] = agent;
+
+ //
+ // Enqueue a notification of the new agent.
+ //
+ auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_ADD));
+ eventImpl->setAgent(agent);
+ enqueueEventLH(ConsoleEvent(eventImpl.release()));
+ } else {
+ //
+ // This is a refresh of an agent we are already tracking.
+ //
+ bool detectedRestart(false);
+ agent = aIter->second;
+ AgentImpl& impl(AgentImplAccess::get(agent));
+ impl.touch();
+ if (impl.getEpoch() != epoch) {
+ //
+ // The agent has restarted since the last time we heard from it.
+ // Enqueue a notification.
+ //
+ impl.setEpoch(epoch);
+ auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_RESTART));
+ eventImpl->setAgent(agent);
+ enqueueEventLH(ConsoleEvent(eventImpl.release()));
+ detectedRestart = true;
+ }
+
+ iter = attrs.find(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP);
+ if (iter != attrs.end()) {
+ uint64_t ts(iter->second.asUint64());
+ if (ts > impl.getAttribute(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP).asUint64()) {
+ //
+ // The agent has added new schema entries since we last heard from it.
+ // Update the attribute and, if this doesn't accompany a restart, enqueue a notification.
+ //
+ if (!detectedRestart) {
+ auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_SCHEMA_UPDATE));
+ eventImpl->setAgent(agent);
+ enqueueEventLH(ConsoleEvent(eventImpl.release()));
+ }
+ impl.setAttribute(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP, iter->second);
+ }
+ }
+ }
+ }
+}
+
+
+void ConsoleSessionImpl::handleV1SchemaResponse(qpid::management::Buffer& buffer, uint32_t, const Message&)
+{
+ QPID_LOG(trace, "RCVD V1SchemaResponse");
+ Schema schema(new SchemaImpl(buffer));
+ schemaCache->declareSchema(schema);
+}
+
+
+void ConsoleSessionImpl::periodicProcessing(uint64_t seconds)
+{
+ //
+ // The granularity of this timer is seconds. Don't waste time looking for work if
+ // it's been less than a second since we last visited.
+ //
+ if (seconds == lastVisit)
+ return;
+ lastVisit = seconds;
+
+ //
+ // Handle the aging of agent records
+ //
+ if (lastAgePass == 0)
+ lastAgePass = seconds;
+ if (seconds - lastAgePass >= 60) {
+ lastAgePass = seconds;
+ map<string, Agent> toDelete;
+ qpid::sys::Mutex::ScopedLock l(lock);
+
+ for (map<string, Agent>::iterator iter = agents.begin(); iter != agents.end(); iter++)
+ if ((iter->second.getName() != connectedBrokerAgent.getName()) &&
+ (AgentImplAccess::get(iter->second).age() > maxAgentAgeMinutes))
+ toDelete[iter->first] = iter->second;
+
+ for (map<string, Agent>::iterator iter = toDelete.begin(); iter != toDelete.end(); iter++) {
+ agents.erase(iter->first);
+ auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_DEL, AGENT_DEL_AGED));
+ eventImpl->setAgent(iter->second);
+ enqueueEventLH(eventImpl.release());
+ }
+ }
+}
+
+
+void ConsoleSessionImpl::alertEventNotifierLH(bool readable)
+{
+ if (eventNotifier)
+ eventNotifier->setReadable(readable);
+}
+
+
+void ConsoleSessionImpl::run()
+{
+ QPID_LOG(debug, "ConsoleSession thread started");
+
+ try {
+ while (!threadCanceled) {
+ periodicProcessing((uint64_t) qpid::sys::Duration::FromEpoch() /
+ qpid::sys::TIME_SEC);
+
+ Receiver rx;
+ bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime);
+ if (threadCanceled)
+ break;
+ if (valid) {
+ try {
+ dispatch(rx.fetch());
+ } catch (qpid::types::Exception& e) {
+ QPID_LOG(error, "Exception caught in message dispatch: " << e.what());
+ }
+ session.acknowledge();
+ }
+ }
+ } catch (qpid::types::Exception& e) {
+ QPID_LOG(error, "Exception caught in message thread - exiting: " << e.what());
+ enqueueEvent(ConsoleEvent(new ConsoleEventImpl(CONSOLE_THREAD_FAILED)));
+ }
+
+ session.close();
+ QPID_LOG(debug, "ConsoleSession thread exiting");
+}
+
+
+ConsoleSessionImpl& ConsoleSessionImplAccess::get(ConsoleSession& session)
+{
+ return *session.impl;
+}
+
+
+const ConsoleSessionImpl& ConsoleSessionImplAccess::get(const ConsoleSession& session)
+{
+ return *session.impl;
+}