diff options
Diffstat (limited to 'qpid/cpp/src/qmf/ConsoleSession.cpp')
-rw-r--r-- | qpid/cpp/src/qmf/ConsoleSession.cpp | 47 |
1 files changed, 32 insertions, 15 deletions
diff --git a/qpid/cpp/src/qmf/ConsoleSession.cpp b/qpid/cpp/src/qmf/ConsoleSession.cpp index f327170c5e..bb4458a0b9 100644 --- a/qpid/cpp/src/qmf/ConsoleSession.cpp +++ b/qpid/cpp/src/qmf/ConsoleSession.cpp @@ -36,6 +36,7 @@ using namespace qmf; using qpid::messaging::Address; using qpid::messaging::Connection; using qpid::messaging::Receiver; +using qpid::messaging::Sender; using qpid::messaging::Duration; using qpid::messaging::Message; using qpid::types::Variant; @@ -64,9 +65,9 @@ Subscription ConsoleSession::subscribe(const string& q, const string& f, const s //======================================================================================== ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) : - connection(c), domain("default"), maxAgentAgeMinutes(5), opened(false), - thread(0), threadCanceled(false), - lastVisit(0), lastAgePass(0), connectedBrokerInAgentList(false), schemaCache(new SchemaCache()) + connection(c), domain("default"), authUser(c.getAuthenticatedUsername()), maxAgentAgeMinutes(5), + opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0), + connectedBrokerInAgentList(false), schemaCache(new SchemaCache()) { if (!options.empty()) { qpid::messaging::AddressParser parser(options); @@ -82,6 +83,14 @@ ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) : iter = optMap.find("max-agent-age"); if (iter != optMap.end()) maxAgentAgeMinutes = iter->second.asUint32(); + + iter = optMap.find("listen-on-direct"); + if (iter != optMap.end()) + listenOnDirect = iter->second.asBool(); + + iter = optMap.find("strict-security"); + if (iter != optMap.end()) + strictSecurity = iter->second.asBool(); } } @@ -148,24 +157,26 @@ void ConsoleSessionImpl::open() directBase = "qmf." + domain + ".direct"; topicBase = "qmf." + domain + ".topic"; - string myKey("qmf-console-" + qpid::types::Uuid(true).str()); + string myKey("direct-console." + qpid::types::Uuid(true).str()); - replyAddress = Address(directBase + "/" + myKey + ";{node:{type:topic}}"); + replyAddress = Address(topicBase + "/" + myKey + ";{node:{type:topic}}"); // Create AMQP session, receivers, and senders session = connection.createSession(); Receiver directRx = session.createReceiver(replyAddress); Receiver topicRx = session.createReceiver(topicBase + "/agent.#"); // TODO: be more discriminating - Receiver legacyRx = session.createReceiver("amq.direct/" + myKey + ";{node:{type:topic}}"); + if (!strictSecurity) { + Receiver legacyRx = session.createReceiver("amq.direct/" + myKey + ";{node:{type:topic}}"); + legacyRx.setCapacity(64); + directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}"); + directSender.setCapacity(128); + } directRx.setCapacity(64); topicRx.setCapacity(128); - legacyRx.setCapacity(64); - directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}"); topicSender = session.createSender(topicBase + ";{create:never,node:{type:topic}}"); - directSender.setCapacity(64); topicSender.setCapacity(128); // Start the receiver thread @@ -371,7 +382,9 @@ void ConsoleSessionImpl::sendBrokerLocate() msg.setCorrelationId("broker-locate"); msg.setSubject("broker"); - directSender.send(msg); + Sender sender = session.createSender(directBase + ";{create:never,node:{type:topic}}"); + sender.send(msg); + sender.close(); QPID_LOG(trace, "SENT AgentLocate to broker"); } @@ -468,6 +481,7 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian // // This is a refresh of an agent we are already tracking. // + bool detectedRestart(false); agent = aIter->second; AgentImpl& impl(AgentImplAccess::get(agent)); impl.touch(); @@ -480,6 +494,7 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_RESTART)); eventImpl->setAgent(agent); enqueueEventLH(ConsoleEvent(eventImpl.release())); + detectedRestart = true; } iter = attrs.find(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP); @@ -488,12 +503,14 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian if (ts > impl.getAttribute(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP).asUint64()) { // // The agent has added new schema entries since we last heard from it. - // Enqueue a notification. + // Update the attribute and, if this doesn't accompany a restart, enqueue a notification. // - auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_SCHEMA_UPDATE)); - eventImpl->setAgent(agent); - impl.setAttribute(iter->first, iter->second); - enqueueEventLH(ConsoleEvent(eventImpl.release())); + if (!detectedRestart) { + auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_SCHEMA_UPDATE)); + eventImpl->setAgent(agent); + enqueueEventLH(ConsoleEvent(eventImpl.release())); + } + impl.setAttribute(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP, iter->second); } } } |