summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2011-07-12 11:49:32 +0000
committerTed Ross <tross@apache.org>2011-07-12 11:49:32 +0000
commit6bc2dfcb32f4b6af4c24a51a664c4ab838d8a3fa (patch)
tree4a6dd7d44d0a4755e84abaceea5c0016b56d9b7c /qpid/cpp/src
parent95868c8ad1a191fa806f9fc4ddadbfabf90e15ab (diff)
downloadqpid-python-6bc2dfcb32f4b6af4c24a51a664c4ab838d8a3fa.tar.gz
QPID-3275 - QMF Console asynchronous correlation-id should be scoped to the session, not the specific agent
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1145557 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qmf/Agent.cpp29
-rw-r--r--qpid/cpp/src/qmf/AgentImpl.h1
-rw-r--r--qpid/cpp/src/qmf/ConsoleSession.cpp2
-rw-r--r--qpid/cpp/src/qmf/ConsoleSessionImpl.h3
4 files changed, 10 insertions, 25 deletions
diff --git a/qpid/cpp/src/qmf/Agent.cpp b/qpid/cpp/src/qmf/Agent.cpp
index 915f2a1c88..684f8e4fba 100644
--- a/qpid/cpp/src/qmf/Agent.cpp
+++ b/qpid/cpp/src/qmf/Agent.cpp
@@ -72,7 +72,7 @@ Schema Agent::getSchema(const SchemaId& s, Duration t) { return impl->getSchema(
AgentImpl::AgentImpl(const std::string& n, uint32_t e, ConsoleSessionImpl& s) :
name(n), directSubject(n), epoch(e), session(s), touched(true), untouchedCount(0), capability(0),
- sender(session.directSender), nextCorrelator(1), schemaCache(s.schemaCache)
+ sender(session.directSender), schemaCache(s.schemaCache)
{
}
@@ -102,12 +102,11 @@ const Variant& AgentImpl::getAttribute(const string& k) const
ConsoleEvent AgentImpl::query(const Query& query, Duration timeout)
{
boost::shared_ptr<SyncContext> context(new SyncContext());
- uint32_t correlator;
+ uint32_t correlator(session.correlator());
ConsoleEvent result;
{
qpid::sys::Mutex::ScopedLock l(lock);
- correlator = nextCorrelator++;
contextMap[correlator] = context;
}
try {
@@ -151,12 +150,7 @@ ConsoleEvent AgentImpl::query(const string& text, Duration timeout)
uint32_t AgentImpl::queryAsync(const Query& query)
{
- uint32_t correlator;
-
- {
- qpid::sys::Mutex::ScopedLock l(lock);
- correlator = nextCorrelator++;
- }
+ uint32_t correlator(session.correlator());
sendQuery(query, correlator);
return correlator;
@@ -172,12 +166,11 @@ uint32_t AgentImpl::queryAsync(const string& text)
ConsoleEvent AgentImpl::callMethod(const string& method, const Variant::Map& args, const DataAddr& addr, Duration timeout)
{
boost::shared_ptr<SyncContext> context(new SyncContext());
- uint32_t correlator;
+ uint32_t correlator(session.correlator());
ConsoleEvent result;
{
qpid::sys::Mutex::ScopedLock l(lock);
- correlator = nextCorrelator++;
contextMap[correlator] = context;
}
try {
@@ -213,12 +206,7 @@ ConsoleEvent AgentImpl::callMethod(const string& method, const Variant::Map& arg
uint32_t AgentImpl::callMethodAsync(const string& method, const Variant::Map& args, const DataAddr& addr)
{
- uint32_t correlator;
-
- {
- qpid::sys::Mutex::ScopedLock l(lock);
- correlator = nextCorrelator++;
- }
+ uint32_t correlator(session.correlator());
sendMethod(method, args, addr, correlator);
return correlator;
@@ -596,12 +584,7 @@ void AgentImpl::sendMethod(const string& method, const Variant::Map& args, const
void AgentImpl::sendSchemaRequest(const SchemaId& id)
{
- uint32_t correlator;
-
- {
- qpid::sys::Mutex::ScopedLock l(lock);
- correlator = nextCorrelator++;
- }
+ uint32_t correlator(session.correlator());
if (capability >= AGENT_CAPABILITY_V2_SCHEMA) {
Query query(QUERY_SCHEMA, id);
diff --git a/qpid/cpp/src/qmf/AgentImpl.h b/qpid/cpp/src/qmf/AgentImpl.h
index 7fa4f4373a..09754a3a7e 100644
--- a/qpid/cpp/src/qmf/AgentImpl.h
+++ b/qpid/cpp/src/qmf/AgentImpl.h
@@ -99,7 +99,6 @@ namespace qmf {
uint32_t capability;
qpid::messaging::Sender sender;
qpid::types::Variant::Map attributes;
- uint32_t nextCorrelator;
std::map<uint32_t, boost::shared_ptr<SyncContext> > contextMap;
boost::shared_ptr<SchemaCache> schemaCache;
mutable std::set<std::string> packageSet;
diff --git a/qpid/cpp/src/qmf/ConsoleSession.cpp b/qpid/cpp/src/qmf/ConsoleSession.cpp
index 5df0d83f12..7b51d80032 100644
--- a/qpid/cpp/src/qmf/ConsoleSession.cpp
+++ b/qpid/cpp/src/qmf/ConsoleSession.cpp
@@ -68,7 +68,7 @@ Subscription ConsoleSession::subscribe(const string& q, const string& f, const s
ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false),
opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
- connectedBrokerInAgentList(false), schemaCache(new SchemaCache())
+ connectedBrokerInAgentList(false), schemaCache(new SchemaCache()), nextCorrelator(1)
{
if (!options.empty()) {
qpid::messaging::AddressParser parser(options);
diff --git a/qpid/cpp/src/qmf/ConsoleSessionImpl.h b/qpid/cpp/src/qmf/ConsoleSessionImpl.h
index 411b3f016a..429dfc4881 100644
--- a/qpid/cpp/src/qmf/ConsoleSessionImpl.h
+++ b/qpid/cpp/src/qmf/ConsoleSessionImpl.h
@@ -90,6 +90,8 @@ namespace qmf {
std::string directBase;
std::string topicBase;
boost::shared_ptr<SchemaCache> schemaCache;
+ qpid::sys::Mutex corrlock;
+ uint32_t nextCorrelator;
void enqueueEvent(const ConsoleEvent&);
void enqueueEventLH(const ConsoleEvent&);
@@ -100,6 +102,7 @@ namespace qmf {
void handleV1SchemaResponse(qpid::management::Buffer&, uint32_t, const qpid::messaging::Message&);
void periodicProcessing(uint64_t);
void run();
+ uint32_t correlator() { qpid::sys::Mutex::ScopedLock l(corrlock); return nextCorrelator++; }
friend class AgentImpl;
};