summaryrefslogtreecommitdiff
path: root/cpp/src/qmf
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qmf')
-rw-r--r--cpp/src/qmf/engine/BrokerProxyImpl.cpp19
-rw-r--r--cpp/src/qmf/engine/BrokerProxyImpl.h2
-rw-r--r--cpp/src/qmf/engine/QueryImpl.h2
3 files changed, 19 insertions, 4 deletions
diff --git a/cpp/src/qmf/engine/BrokerProxyImpl.cpp b/cpp/src/qmf/engine/BrokerProxyImpl.cpp
index 1a2b3e6555..0a1769f891 100644
--- a/cpp/src/qmf/engine/BrokerProxyImpl.cpp
+++ b/cpp/src/qmf/engine/BrokerProxyImpl.cpp
@@ -206,19 +206,31 @@ void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentPr
{
SequenceContext::Ptr queryContext(new QueryContext(*this, context));
Mutex::ScopedLock _lock(lock);
+ bool sent = false;
if (agent != 0) {
- sendGetRequestLH(queryContext, query, agent);
+ if (sendGetRequestLH(queryContext, query, agent))
+ sent = true;
} else {
// TODO (optimization) only send queries to agents that have the requested class+package
for (map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.begin();
iter != agentList.end(); iter++) {
- sendGetRequestLH(queryContext, query, iter->second.get());
+ if (sendGetRequestLH(queryContext, query, iter->second.get()))
+ sent = true;
}
}
+
+ if (!sent) {
+ queryContext->reserve();
+ queryContext->release();
+ }
}
-void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent)
+bool BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent)
{
+ if (query.impl->singleAgent()) {
+ if (query.impl->agentBank() != agent->getAgentBank())
+ return false;
+ }
stringstream key;
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t sequence(seqMgr.reserve(queryContext));
@@ -229,6 +241,7 @@ void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const
key << "agent.1." << agent->impl->agentBank;
sendBufferLH(outBuffer, QMF_EXCHANGE, key.str());
QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << key.str());
+ return true;
}
string BrokerProxyImpl::encodeMethodArguments(const SchemaMethod* schema, const Value* argmap, Buffer& buffer)
diff --git a/cpp/src/qmf/engine/BrokerProxyImpl.h b/cpp/src/qmf/engine/BrokerProxyImpl.h
index 798a5fdc76..b651b52345 100644
--- a/cpp/src/qmf/engine/BrokerProxyImpl.h
+++ b/cpp/src/qmf/engine/BrokerProxyImpl.h
@@ -139,7 +139,7 @@ namespace engine {
uint32_t agentCount() const;
const AgentProxy* getAgent(uint32_t idx) const;
void sendQuery(const Query& query, void* context, const AgentProxy* agent);
- void sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent);
+ bool sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent);
std::string encodeMethodArguments(const SchemaMethod* schema, const Value* args, qpid::framing::Buffer& buffer);
void sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls, const std::string& method, const Value* args, void* context);
diff --git a/cpp/src/qmf/engine/QueryImpl.h b/cpp/src/qmf/engine/QueryImpl.h
index 2c64c6739c..8ebe0d932f 100644
--- a/cpp/src/qmf/engine/QueryImpl.h
+++ b/cpp/src/qmf/engine/QueryImpl.h
@@ -85,6 +85,8 @@ namespace engine {
bool getDecreasing() const { return orderDecreasing; }
void encode(qpid::framing::Buffer& buffer) const;
+ bool singleAgent() const { return oid.get() != 0; }
+ uint32_t agentBank() const { return singleAgent() ? oid->getAgentBank() : 0; }
std::string packageName;
std::string className;