summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qmf/ConsoleSession.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qmf/ConsoleSession.cpp')
-rw-r--r--qpid/cpp/src/qmf/ConsoleSession.cpp47
1 files changed, 32 insertions, 15 deletions
diff --git a/qpid/cpp/src/qmf/ConsoleSession.cpp b/qpid/cpp/src/qmf/ConsoleSession.cpp
index f327170c5e..bb4458a0b9 100644
--- a/qpid/cpp/src/qmf/ConsoleSession.cpp
+++ b/qpid/cpp/src/qmf/ConsoleSession.cpp
@@ -36,6 +36,7 @@ using namespace qmf;
using qpid::messaging::Address;
using qpid::messaging::Connection;
using qpid::messaging::Receiver;
+using qpid::messaging::Sender;
using qpid::messaging::Duration;
using qpid::messaging::Message;
using qpid::types::Variant;
@@ -64,9 +65,9 @@ Subscription ConsoleSession::subscribe(const string& q, const string& f, const s
//========================================================================================
ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
- connection(c), domain("default"), maxAgentAgeMinutes(5), opened(false),
- thread(0), threadCanceled(false),
- lastVisit(0), lastAgePass(0), connectedBrokerInAgentList(false), schemaCache(new SchemaCache())
+ connection(c), domain("default"), authUser(c.getAuthenticatedUsername()), maxAgentAgeMinutes(5),
+ opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
+ connectedBrokerInAgentList(false), schemaCache(new SchemaCache())
{
if (!options.empty()) {
qpid::messaging::AddressParser parser(options);
@@ -82,6 +83,14 @@ ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
iter = optMap.find("max-agent-age");
if (iter != optMap.end())
maxAgentAgeMinutes = iter->second.asUint32();
+
+ iter = optMap.find("listen-on-direct");
+ if (iter != optMap.end())
+ listenOnDirect = iter->second.asBool();
+
+ iter = optMap.find("strict-security");
+ if (iter != optMap.end())
+ strictSecurity = iter->second.asBool();
}
}
@@ -148,24 +157,26 @@ void ConsoleSessionImpl::open()
directBase = "qmf." + domain + ".direct";
topicBase = "qmf." + domain + ".topic";
- string myKey("qmf-console-" + qpid::types::Uuid(true).str());
+ string myKey("direct-console." + qpid::types::Uuid(true).str());
- replyAddress = Address(directBase + "/" + myKey + ";{node:{type:topic}}");
+ replyAddress = Address(topicBase + "/" + myKey + ";{node:{type:topic}}");
// Create AMQP session, receivers, and senders
session = connection.createSession();
Receiver directRx = session.createReceiver(replyAddress);
Receiver topicRx = session.createReceiver(topicBase + "/agent.#"); // TODO: be more discriminating
- Receiver legacyRx = session.createReceiver("amq.direct/" + myKey + ";{node:{type:topic}}");
+ if (!strictSecurity) {
+ Receiver legacyRx = session.createReceiver("amq.direct/" + myKey + ";{node:{type:topic}}");
+ legacyRx.setCapacity(64);
+ directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}");
+ directSender.setCapacity(128);
+ }
directRx.setCapacity(64);
topicRx.setCapacity(128);
- legacyRx.setCapacity(64);
- directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}");
topicSender = session.createSender(topicBase + ";{create:never,node:{type:topic}}");
- directSender.setCapacity(64);
topicSender.setCapacity(128);
// Start the receiver thread
@@ -371,7 +382,9 @@ void ConsoleSessionImpl::sendBrokerLocate()
msg.setCorrelationId("broker-locate");
msg.setSubject("broker");
- directSender.send(msg);
+ Sender sender = session.createSender(directBase + ";{create:never,node:{type:topic}}");
+ sender.send(msg);
+ sender.close();
QPID_LOG(trace, "SENT AgentLocate to broker");
}
@@ -468,6 +481,7 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian
//
// This is a refresh of an agent we are already tracking.
//
+ bool detectedRestart(false);
agent = aIter->second;
AgentImpl& impl(AgentImplAccess::get(agent));
impl.touch();
@@ -480,6 +494,7 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian
auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_RESTART));
eventImpl->setAgent(agent);
enqueueEventLH(ConsoleEvent(eventImpl.release()));
+ detectedRestart = true;
}
iter = attrs.find(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP);
@@ -488,12 +503,14 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian
if (ts > impl.getAttribute(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP).asUint64()) {
//
// The agent has added new schema entries since we last heard from it.
- // Enqueue a notification.
+ // Update the attribute and, if this doesn't accompany a restart, enqueue a notification.
//
- auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_SCHEMA_UPDATE));
- eventImpl->setAgent(agent);
- impl.setAttribute(iter->first, iter->second);
- enqueueEventLH(ConsoleEvent(eventImpl.release()));
+ if (!detectedRestart) {
+ auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_SCHEMA_UPDATE));
+ eventImpl->setAgent(agent);
+ enqueueEventLH(ConsoleEvent(eventImpl.release()));
+ }
+ impl.setAttribute(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP, iter->second);
}
}
}