diff options
Diffstat (limited to 'cpp/src/qmf')
-rw-r--r-- | cpp/src/qmf/engine/BrokerProxyImpl.cpp | 19 | ||||
-rw-r--r-- | cpp/src/qmf/engine/BrokerProxyImpl.h | 2 | ||||
-rw-r--r-- | cpp/src/qmf/engine/QueryImpl.h | 2 |
3 files changed, 19 insertions, 4 deletions
diff --git a/cpp/src/qmf/engine/BrokerProxyImpl.cpp b/cpp/src/qmf/engine/BrokerProxyImpl.cpp index 1a2b3e6555..0a1769f891 100644 --- a/cpp/src/qmf/engine/BrokerProxyImpl.cpp +++ b/cpp/src/qmf/engine/BrokerProxyImpl.cpp @@ -206,19 +206,31 @@ void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentPr { SequenceContext::Ptr queryContext(new QueryContext(*this, context)); Mutex::ScopedLock _lock(lock); + bool sent = false; if (agent != 0) { - sendGetRequestLH(queryContext, query, agent); + if (sendGetRequestLH(queryContext, query, agent)) + sent = true; } else { // TODO (optimization) only send queries to agents that have the requested class+package for (map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.begin(); iter != agentList.end(); iter++) { - sendGetRequestLH(queryContext, query, iter->second.get()); + if (sendGetRequestLH(queryContext, query, iter->second.get())) + sent = true; } } + + if (!sent) { + queryContext->reserve(); + queryContext->release(); + } } -void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent) +bool BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent) { + if (query.impl->singleAgent()) { + if (query.impl->agentBank() != agent->getAgentBank()) + return false; + } stringstream key; Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t sequence(seqMgr.reserve(queryContext)); @@ -229,6 +241,7 @@ void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const key << "agent.1." << agent->impl->agentBank; sendBufferLH(outBuffer, QMF_EXCHANGE, key.str()); QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << key.str()); + return true; } string BrokerProxyImpl::encodeMethodArguments(const SchemaMethod* schema, const Value* argmap, Buffer& buffer) diff --git a/cpp/src/qmf/engine/BrokerProxyImpl.h b/cpp/src/qmf/engine/BrokerProxyImpl.h index 798a5fdc76..b651b52345 100644 --- a/cpp/src/qmf/engine/BrokerProxyImpl.h +++ b/cpp/src/qmf/engine/BrokerProxyImpl.h @@ -139,7 +139,7 @@ namespace engine { uint32_t agentCount() const; const AgentProxy* getAgent(uint32_t idx) const; void sendQuery(const Query& query, void* context, const AgentProxy* agent); - void sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent); + bool sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent); std::string encodeMethodArguments(const SchemaMethod* schema, const Value* args, qpid::framing::Buffer& buffer); void sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls, const std::string& method, const Value* args, void* context); diff --git a/cpp/src/qmf/engine/QueryImpl.h b/cpp/src/qmf/engine/QueryImpl.h index 2c64c6739c..8ebe0d932f 100644 --- a/cpp/src/qmf/engine/QueryImpl.h +++ b/cpp/src/qmf/engine/QueryImpl.h @@ -85,6 +85,8 @@ namespace engine { bool getDecreasing() const { return orderDecreasing; } void encode(qpid::framing::Buffer& buffer) const; + bool singleAgent() const { return oid.get() != 0; } + uint32_t agentBank() const { return singleAgent() ? oid->getAgentBank() : 0; } std::string packageName; std::string className; |