From 6bc2dfcb32f4b6af4c24a51a664c4ab838d8a3fa Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Tue, 12 Jul 2011 11:49:32 +0000 Subject: QPID-3275 - QMF Console asynchronous correlation-id should be scoped to the session, not the specific agent git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1145557 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qmf/Agent.cpp | 29 ++++++----------------------- qpid/cpp/src/qmf/AgentImpl.h | 1 - qpid/cpp/src/qmf/ConsoleSession.cpp | 2 +- qpid/cpp/src/qmf/ConsoleSessionImpl.h | 3 +++ 4 files changed, 10 insertions(+), 25 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qmf/Agent.cpp b/qpid/cpp/src/qmf/Agent.cpp index 915f2a1c88..684f8e4fba 100644 --- a/qpid/cpp/src/qmf/Agent.cpp +++ b/qpid/cpp/src/qmf/Agent.cpp @@ -72,7 +72,7 @@ Schema Agent::getSchema(const SchemaId& s, Duration t) { return impl->getSchema( AgentImpl::AgentImpl(const std::string& n, uint32_t e, ConsoleSessionImpl& s) : name(n), directSubject(n), epoch(e), session(s), touched(true), untouchedCount(0), capability(0), - sender(session.directSender), nextCorrelator(1), schemaCache(s.schemaCache) + sender(session.directSender), schemaCache(s.schemaCache) { } @@ -102,12 +102,11 @@ const Variant& AgentImpl::getAttribute(const string& k) const ConsoleEvent AgentImpl::query(const Query& query, Duration timeout) { boost::shared_ptr context(new SyncContext()); - uint32_t correlator; + uint32_t correlator(session.correlator()); ConsoleEvent result; { qpid::sys::Mutex::ScopedLock l(lock); - correlator = nextCorrelator++; contextMap[correlator] = context; } try { @@ -151,12 +150,7 @@ ConsoleEvent AgentImpl::query(const string& text, Duration timeout) uint32_t AgentImpl::queryAsync(const Query& query) { - uint32_t correlator; - - { - qpid::sys::Mutex::ScopedLock l(lock); - correlator = nextCorrelator++; - } + uint32_t correlator(session.correlator()); sendQuery(query, correlator); return correlator; @@ -172,12 +166,11 @@ uint32_t AgentImpl::queryAsync(const string& text) ConsoleEvent AgentImpl::callMethod(const string& method, const Variant::Map& args, const DataAddr& addr, Duration timeout) { boost::shared_ptr context(new SyncContext()); - uint32_t correlator; + uint32_t correlator(session.correlator()); ConsoleEvent result; { qpid::sys::Mutex::ScopedLock l(lock); - correlator = nextCorrelator++; contextMap[correlator] = context; } try { @@ -213,12 +206,7 @@ ConsoleEvent AgentImpl::callMethod(const string& method, const Variant::Map& arg uint32_t AgentImpl::callMethodAsync(const string& method, const Variant::Map& args, const DataAddr& addr) { - uint32_t correlator; - - { - qpid::sys::Mutex::ScopedLock l(lock); - correlator = nextCorrelator++; - } + uint32_t correlator(session.correlator()); sendMethod(method, args, addr, correlator); return correlator; @@ -596,12 +584,7 @@ void AgentImpl::sendMethod(const string& method, const Variant::Map& args, const void AgentImpl::sendSchemaRequest(const SchemaId& id) { - uint32_t correlator; - - { - qpid::sys::Mutex::ScopedLock l(lock); - correlator = nextCorrelator++; - } + uint32_t correlator(session.correlator()); if (capability >= AGENT_CAPABILITY_V2_SCHEMA) { Query query(QUERY_SCHEMA, id); diff --git a/qpid/cpp/src/qmf/AgentImpl.h b/qpid/cpp/src/qmf/AgentImpl.h index 7fa4f4373a..09754a3a7e 100644 --- a/qpid/cpp/src/qmf/AgentImpl.h +++ b/qpid/cpp/src/qmf/AgentImpl.h @@ -99,7 +99,6 @@ namespace qmf { uint32_t capability; qpid::messaging::Sender sender; qpid::types::Variant::Map attributes; - uint32_t nextCorrelator; std::map > contextMap; boost::shared_ptr schemaCache; mutable std::set packageSet; diff --git a/qpid/cpp/src/qmf/ConsoleSession.cpp b/qpid/cpp/src/qmf/ConsoleSession.cpp index 5df0d83f12..7b51d80032 100644 --- a/qpid/cpp/src/qmf/ConsoleSession.cpp +++ b/qpid/cpp/src/qmf/ConsoleSession.cpp @@ -68,7 +68,7 @@ Subscription ConsoleSession::subscribe(const string& q, const string& f, const s ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) : connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false), opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0), - connectedBrokerInAgentList(false), schemaCache(new SchemaCache()) + connectedBrokerInAgentList(false), schemaCache(new SchemaCache()), nextCorrelator(1) { if (!options.empty()) { qpid::messaging::AddressParser parser(options); diff --git a/qpid/cpp/src/qmf/ConsoleSessionImpl.h b/qpid/cpp/src/qmf/ConsoleSessionImpl.h index 411b3f016a..429dfc4881 100644 --- a/qpid/cpp/src/qmf/ConsoleSessionImpl.h +++ b/qpid/cpp/src/qmf/ConsoleSessionImpl.h @@ -90,6 +90,8 @@ namespace qmf { std::string directBase; std::string topicBase; boost::shared_ptr schemaCache; + qpid::sys::Mutex corrlock; + uint32_t nextCorrelator; void enqueueEvent(const ConsoleEvent&); void enqueueEventLH(const ConsoleEvent&); @@ -100,6 +102,7 @@ namespace qmf { void handleV1SchemaResponse(qpid::management::Buffer&, uint32_t, const qpid::messaging::Message&); void periodicProcessing(uint64_t); void run(); + uint32_t correlator() { qpid::sys::Mutex::ScopedLock l(corrlock); return nextCorrelator++; } friend class AgentImpl; }; -- cgit v1.2.1