diff options
author | Ted Ross <tross@apache.org> | 2011-07-12 11:49:32 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2011-07-12 11:49:32 +0000 |
commit | 6bc2dfcb32f4b6af4c24a51a664c4ab838d8a3fa (patch) | |
tree | 4a6dd7d44d0a4755e84abaceea5c0016b56d9b7c /qpid/cpp | |
parent | 95868c8ad1a191fa806f9fc4ddadbfabf90e15ab (diff) | |
download | qpid-python-6bc2dfcb32f4b6af4c24a51a664c4ab838d8a3fa.tar.gz |
QPID-3275 - QMF Console asynchronous correlation-id should be scoped to the session, not the specific agent
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1145557 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r-- | qpid/cpp/src/qmf/Agent.cpp | 29 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/AgentImpl.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/ConsoleSession.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/ConsoleSessionImpl.h | 3 |
4 files changed, 10 insertions, 25 deletions
diff --git a/qpid/cpp/src/qmf/Agent.cpp b/qpid/cpp/src/qmf/Agent.cpp index 915f2a1c88..684f8e4fba 100644 --- a/qpid/cpp/src/qmf/Agent.cpp +++ b/qpid/cpp/src/qmf/Agent.cpp @@ -72,7 +72,7 @@ Schema Agent::getSchema(const SchemaId& s, Duration t) { return impl->getSchema( AgentImpl::AgentImpl(const std::string& n, uint32_t e, ConsoleSessionImpl& s) : name(n), directSubject(n), epoch(e), session(s), touched(true), untouchedCount(0), capability(0), - sender(session.directSender), nextCorrelator(1), schemaCache(s.schemaCache) + sender(session.directSender), schemaCache(s.schemaCache) { } @@ -102,12 +102,11 @@ const Variant& AgentImpl::getAttribute(const string& k) const ConsoleEvent AgentImpl::query(const Query& query, Duration timeout) { boost::shared_ptr<SyncContext> context(new SyncContext()); - uint32_t correlator; + uint32_t correlator(session.correlator()); ConsoleEvent result; { qpid::sys::Mutex::ScopedLock l(lock); - correlator = nextCorrelator++; contextMap[correlator] = context; } try { @@ -151,12 +150,7 @@ ConsoleEvent AgentImpl::query(const string& text, Duration timeout) uint32_t AgentImpl::queryAsync(const Query& query) { - uint32_t correlator; - - { - qpid::sys::Mutex::ScopedLock l(lock); - correlator = nextCorrelator++; - } + uint32_t correlator(session.correlator()); sendQuery(query, correlator); return correlator; @@ -172,12 +166,11 @@ uint32_t AgentImpl::queryAsync(const string& text) ConsoleEvent AgentImpl::callMethod(const string& method, const Variant::Map& args, const DataAddr& addr, Duration timeout) { boost::shared_ptr<SyncContext> context(new SyncContext()); - uint32_t correlator; + uint32_t correlator(session.correlator()); ConsoleEvent result; { qpid::sys::Mutex::ScopedLock l(lock); - correlator = nextCorrelator++; contextMap[correlator] = context; } try { @@ -213,12 +206,7 @@ ConsoleEvent AgentImpl::callMethod(const string& method, const Variant::Map& arg uint32_t AgentImpl::callMethodAsync(const string& method, const Variant::Map& args, const DataAddr& addr) { - uint32_t correlator; - - { - qpid::sys::Mutex::ScopedLock l(lock); - correlator = nextCorrelator++; - } + uint32_t correlator(session.correlator()); sendMethod(method, args, addr, correlator); return correlator; @@ -596,12 +584,7 @@ void AgentImpl::sendMethod(const string& method, const Variant::Map& args, const void AgentImpl::sendSchemaRequest(const SchemaId& id) { - uint32_t correlator; - - { - qpid::sys::Mutex::ScopedLock l(lock); - correlator = nextCorrelator++; - } + uint32_t correlator(session.correlator()); if (capability >= AGENT_CAPABILITY_V2_SCHEMA) { Query query(QUERY_SCHEMA, id); diff --git a/qpid/cpp/src/qmf/AgentImpl.h b/qpid/cpp/src/qmf/AgentImpl.h index 7fa4f4373a..09754a3a7e 100644 --- a/qpid/cpp/src/qmf/AgentImpl.h +++ b/qpid/cpp/src/qmf/AgentImpl.h @@ -99,7 +99,6 @@ namespace qmf { uint32_t capability; qpid::messaging::Sender sender; qpid::types::Variant::Map attributes; - uint32_t nextCorrelator; std::map<uint32_t, boost::shared_ptr<SyncContext> > contextMap; boost::shared_ptr<SchemaCache> schemaCache; mutable std::set<std::string> packageSet; diff --git a/qpid/cpp/src/qmf/ConsoleSession.cpp b/qpid/cpp/src/qmf/ConsoleSession.cpp index 5df0d83f12..7b51d80032 100644 --- a/qpid/cpp/src/qmf/ConsoleSession.cpp +++ b/qpid/cpp/src/qmf/ConsoleSession.cpp @@ -68,7 +68,7 @@ Subscription ConsoleSession::subscribe(const string& q, const string& f, const s ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) : connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false), opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0), - connectedBrokerInAgentList(false), schemaCache(new SchemaCache()) + connectedBrokerInAgentList(false), schemaCache(new SchemaCache()), nextCorrelator(1) { if (!options.empty()) { qpid::messaging::AddressParser parser(options); diff --git a/qpid/cpp/src/qmf/ConsoleSessionImpl.h b/qpid/cpp/src/qmf/ConsoleSessionImpl.h index 411b3f016a..429dfc4881 100644 --- a/qpid/cpp/src/qmf/ConsoleSessionImpl.h +++ b/qpid/cpp/src/qmf/ConsoleSessionImpl.h @@ -90,6 +90,8 @@ namespace qmf { std::string directBase; std::string topicBase; boost::shared_ptr<SchemaCache> schemaCache; + qpid::sys::Mutex corrlock; + uint32_t nextCorrelator; void enqueueEvent(const ConsoleEvent&); void enqueueEventLH(const ConsoleEvent&); @@ -100,6 +102,7 @@ namespace qmf { void handleV1SchemaResponse(qpid::management::Buffer&, uint32_t, const qpid::messaging::Message&); void periodicProcessing(uint64_t); void run(); + uint32_t correlator() { qpid::sys::Mutex::ScopedLock l(corrlock); return nextCorrelator++; } friend class AgentImpl; }; |