summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-03-22 20:40:49 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-03-22 20:40:49 +0000
commitd3c6291c047d0c954bce5d652ce3e6abec9466e5 (patch)
tree655f733e832250b53ff7ee0cf6308663f76fcfe4
parent77431e5a5d165975bf3e39c6b9ae9e0ed3228464 (diff)
downloadqpid-python-d3c6291c047d0c954bce5d652ce3e6abec9466e5.tar.gz
get query req handling
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@926323 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp155
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.h3
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp179
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h7
4 files changed, 231 insertions, 113 deletions
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 3d05e02bcc..f160bf004b 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -544,89 +544,102 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string&
connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo);
}
-void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, const string& cid, const string& replyTo)
+void ManagementAgentImpl::handleGetQuery(const string& body, const string& contentType,
+ const string& cid, const string& replyTo)
{
- FieldTable ft;
- FieldTable::ValuePtr value;
-
moveNewObjectsLH();
- ft.decode(inBuffer);
+ if (contentType != "_query_v1") {
+ QPID_LOG(warning, "Support for QMF V2 Query format TBD!!!");
+ return;
+ }
- QPID_LOG(trace, "RCVD GetQuery: map=" << ft);
+ qpid::messaging::Message inMsg(body);
+ qpid::messaging::MapView inMap(inMsg);
+ qpid::messaging::MapView::const_iterator i;
+ ::qpid::messaging::Variant::Map headers;
- value = ft.get("_class");
- if (value.get() == 0 || !value->convertsTo<string>()) {
- value = ft.get("_objectid");
- if (value.get() == 0 || !value->convertsTo<string>())
- return;
+ QPID_LOG(trace, "RCVD GetQuery: map=" << inMap << " cid=" << cid);
- ObjectId selector(value->get<string>());
- ManagementObjectMap::iterator iter = managementObjects.find(selector);
- if (iter != managementObjects.end()) {
- ManagementObject* object = iter->second;
- ::qpid::messaging::Message m;
- ::qpid::messaging::ListContent content(m);
- ::qpid::messaging::Variant::List &list_ = content.asList();
- ::qpid::messaging::Variant::Map map_;
- ::qpid::messaging::Variant::Map values;
- ::qpid::messaging::Variant::Map headers;
-
- if (object->getConfigChanged() || object->getInstChanged())
- object->setUpdateTime();
-
- object->mapEncodeValues(values, true, true); // write both stats and properties
- map_["_values"] = values;
- list_.push_back(map_);
-
- headers["method"] = "response";
- headers["qmf.opcode"] = "_query_response";
- headers["qmf.content"] = "_data";
- headers["qmf.agent"] = name_address;
+ headers["method"] = "response";
+ headers["qmf.opcode"] = "_query_response";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
+ headers["partial"];
- content.encode();
- connThreadBody.sendBuffer(m.getContent(), cid, headers, "qmf.default.direct", replyTo);
+ ::qpid::messaging::Message outMsg;
+ ::qpid::messaging::ListContent content(outMsg);
+ ::qpid::messaging::Variant::List &list_ = content.asList();
+ ::qpid::messaging::Variant::Map map_;
+ ::qpid::messaging::Variant::Map values;
+ string className;
- QPID_LOG(trace, "SENT ObjectInd");
+ i = inMap.find("_class");
+ if (i != inMap.end())
+ try {
+ className = i->second.asString();
+ } catch(exception& e) {
+ className.clear();
+ QPID_LOG(trace, "RCVD GetQuery: invalid format - class target ignored.");
}
- //sendCommandComplete(replyTo, sequence);
- return;
- }
- string className(value->get<string>());
+ if (className.empty()) {
+ ObjectId objId;
+ i = inMap.find("_object_id");
+ if (i != inMap.end()) {
+
+ try {
+ objId = ObjectId(i->second.asMap());
+ } catch (exception &e) {
+ objId = ObjectId(); // empty object id - won't find a match (I hope).
+ QPID_LOG(trace, "RCVD GetQuery (invalid Object Id format) to=" << replyTo << " seq=" << cid);
+ }
- for (ManagementObjectMap::iterator iter = managementObjects.begin();
- iter != managementObjects.end();
- iter++) {
- ManagementObject* object = iter->second;
- if (object->getClassName() == className) {
- ::qpid::messaging::Message m;
- ::qpid::messaging::ListContent content(m);
- ::qpid::messaging::Variant::List &list_ = content.asList();
- ::qpid::messaging::Variant::Map map_;
- ::qpid::messaging::Variant::Map values;
- ::qpid::messaging::Variant::Map headers;
-
- if (object->getConfigChanged() || object->getInstChanged())
- object->setUpdateTime();
-
- object->mapEncodeValues(values, true, true); // write both stats and properties
- map_["_values"] = values;
- list_.push_back(map_);
-
- headers["method"] = "response";
- headers["qmf.opcode"] = "_query_response";
- headers["qmf.content"] = "_data";
- headers["qmf.agent"] = name_address;
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+ if (iter != managementObjects.end()) {
+ ManagementObject* object = iter->second;
+
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
- content.encode();
- connThreadBody.sendBuffer(m.getContent(), cid, headers, "qmf.default.direct", replyTo);
+ object->mapEncodeValues(values, true, true); // write both stats and properties
+ map_["_values"] = values;
+ list_.push_back(map_);
- QPID_LOG(trace, "SENT ObjectInd");
+ content.encode();
+ connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo);
+ }
+ }
+ } else {
+ for (ManagementObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
+ iter++) {
+ ManagementObject* object = iter->second;
+ if (object->getClassName() == className) {
+
+ // @todo support multiple object reply per message
+ values.clear();
+ list_.clear();
+
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
+
+ object->mapEncodeValues(values, true, true); // write both stats and properties
+ map_["_values"] = values;
+ list_.push_back(map_);
+
+ content.encode();
+ connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo);
+ }
}
}
- //sendCommandComplete(replyTo, sequence);
+ // end empty "non-partial" message to indicate CommandComplete
+ list_.clear();
+ headers.erase("partial");
+ content.encode();
+ connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo);
+ QPID_LOG(trace, "SENT ObjectInd");
}
void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& replyTo)
@@ -698,8 +711,11 @@ void ManagementAgentImpl::received(Message& msg)
if (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToKey);
else if (opcode == "_method_request") handleMethodRequest(msg.getData(), cid, replyToKey);
+ else if (opcode == "_query_request") handleGetQuery(msg.getData(),
+ mp.getApplicationHeaders().getAsString("qmf.content"),
+ cid, replyToKey);
else {
- QPID_LOG(trace, "Support for QMF Opcode [" << opcode << "] TBD!!!");
+ QPID_LOG(warning, "Support for QMF V2 Opcode [" << opcode << "] TBD!!!");
}
return;
}
@@ -717,8 +733,7 @@ void ManagementAgentImpl::received(Message& msg)
if (opcode == 'a') handleAttachResponse(inBuffer);
else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence);
else if (opcode == 'x') handleConsoleAddedIndication();
- else if (opcode == 'M')
- QPID_LOG(warning, "Ignoring old-format QMF Method Request!!!");
+ QPID_LOG(warning, "Ignoring old-format QMF Request! opcode=" << char(opcode));
}
}
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
index 4f838d2317..399c9d7944 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
@@ -267,7 +267,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence);
void invokeMethodRequest (const std::string& body, const std::string& cid, const std::string& replyTo);
- void handleGetQuery (qpid::framing::Buffer& inBuffer, const std::string& cid, const std::string& replyTo);
+ void handleGetQuery (const std::string& body, const std::string& content_type,
+ const std::string& cid, const std::string& replyTo);
void handleLocateRequest (const std::string& body, const std::string& sequence, const std::string& replyTo);
void handleMethodRequest (const std::string& body, const std::string& sequence, const std::string& replyTo);
void handleConsoleAddedIndication();
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 0adafc7dd5..bbb14f98e9 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -422,8 +422,8 @@ void ManagementAgent::sendBuffer(Buffer& buf,
void ManagementAgent::sendBuffer(const std::string& data,
- const uint32_t sequence,
- const qpid::messaging::VariantMap headers,
+ const std::string& cid,
+ const qpid::messaging::VariantMap& headers,
qpid::broker::Exchange::shared_ptr exchange,
string routingKey)
{
@@ -451,8 +451,8 @@ void ManagementAgent::sendBuffer(const std::string& data,
MessageProperties* props =
msg->getFrames().getHeaders()->get<MessageProperties>(true);
props->setContentLength(data.length());
- if (sequence) {
- props->setCorrelationId(boost::lexical_cast<std::string>(sequence));
+ if (!cid.empty()) {
+ props->setCorrelationId(cid);
}
for (i = headers.begin(); i != headers.end(); ++i) {
@@ -854,7 +854,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey
void ManagementAgent::handleMethodRequestLH (const std::string& body, string replyTo,
- uint32_t sequence, const ConnectionToken* connToken)
+ const std::string& cid, const ConnectionToken* connToken)
{
string methodName;
qpid::messaging::Message inMsg(body);
@@ -876,8 +876,8 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep
((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_PARAMETER_INVALID;
((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID);
outMap.encode();
- sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo);
- QPID_LOG(trace, "SEND MethodResponse (invalid param) to=" << replyTo << " seq=" << sequence);
+ sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse (invalid param) to=" << replyTo << " seq=" << cid);
return;
}
@@ -897,8 +897,8 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep
((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION;
((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = e.what();
outMap.encode();
- sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo);
- QPID_LOG(trace, "SEND MethodResponse (invalid format) to=" << replyTo << " seq=" << sequence);
+ sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse (invalid format) to=" << replyTo << " seq=" << cid);
return;
}
@@ -908,8 +908,8 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep
((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_UNKNOWN_OBJECT;
((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT);
outMap.encode();
- sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo);
- QPID_LOG(trace, "SEND MethodResponse (unknown object) to=" << replyTo << " seq=" << sequence);
+ sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse (unknown object) to=" << replyTo << " seq=" << cid);
return;
}
@@ -922,8 +922,8 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep
((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN;
((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = i->second;
outMap.encode();
- sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo);
- QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence);
+ sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << cid);
return;
}
@@ -937,8 +937,8 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep
((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN;
((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN);
outMap.encode();
- sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo);
- QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
+ sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << cid);
return;
}
}
@@ -951,14 +951,14 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep
((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION;
((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = e.what();
outMap.encode();
- sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo);
- QPID_LOG(trace, "SEND MethodResponse (exception) to=" << replyTo << " seq=" << sequence);
+ sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse (exception) to=" << replyTo << " seq=" << cid);
return;
}
outMap.encode();
- sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo);
- QPID_LOG(trace, "SEND MethodResponse to=" << replyTo << " seq=" << sequence);
+ sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse to=" << replyTo << " seq=" << cid);
}
@@ -1333,7 +1333,8 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin
content.encode();
- sendBuffer(m.getContent(), sequence, headers, dExchange, replyToKey);
+ sendBuffer(m.getContent(), boost::lexical_cast<std::string>(sequence),
+ headers, dExchange, replyToKey);
QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence);
}
}
@@ -1370,7 +1371,8 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin
content.encode();
- sendBuffer(m.getContent(), sequence, headers, dExchange, replyToKey);
+ sendBuffer(m.getContent(), boost::lexical_cast<std::string>(sequence),
+ headers, dExchange, replyToKey);
QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence);
}
}
@@ -1379,6 +1381,110 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin
sendCommandComplete(replyToKey, sequence);
}
+
+void ManagementAgent::handleGetQueryLH(const std::string& body, std::string& replyTo, const std::string& cid, const std::string& contentType)
+{
+ FieldTable ft;
+ FieldTable::ValuePtr value;
+
+ moveNewObjectsLH();
+
+ if (contentType != "_query_v1") {
+ QPID_LOG(warning, "Support for QMF V2 Query format TBD!!!");
+ return;
+ }
+
+ qpid::messaging::Message inMsg(body);
+ qpid::messaging::MapView inMap(inMsg);
+ qpid::messaging::MapView::const_iterator i;
+ ::qpid::messaging::Variant::Map headers;
+
+ QPID_LOG(trace, "RECV GetQuery: map=" << inMap << " seq=" << cid);
+
+ headers["method"] = "response";
+ headers["qmf.opcode"] = "_query_response";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = std::string(agentName);
+ headers["partial"];
+
+ ::qpid::messaging::Message outMsg;
+ ::qpid::messaging::ListContent content(outMsg);
+ ::qpid::messaging::Variant::List &list_ = content.asList();
+ ::qpid::messaging::Variant::Map map_;
+ ::qpid::messaging::Variant::Map values;
+ string className;
+
+ i = inMap.find("_class");
+ if (i != inMap.end())
+ try {
+ className = i->second.asString();
+ } catch(exception& e) {
+ className.clear();
+ QPID_LOG(trace, "RCVD GetQuery: invalid format - class target ignored.");
+ }
+
+ if (className.empty()) {
+ ObjectId objId;
+ i = inMap.find("_object_id");
+ if (i != inMap.end()) {
+
+ try {
+ objId = ObjectId(i->second.asMap());
+ } catch (exception &e) {
+ objId = ObjectId(); // empty object id - won't find a match (I hope).
+ QPID_LOG(trace, "RCVD GetQuery (invalid Object Id format) to=" << replyTo << " seq=" << cid);
+ }
+
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+ if (iter != managementObjects.end()) {
+ ManagementObject* object = iter->second;
+
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
+
+ if (!object->isDeleted()) {
+ object->mapEncodeValues(values, true, true); // write both stats and properties
+ map_["_values"] = values;
+ list_.push_back(map_);
+
+ content.encode();
+ sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo);
+ }
+ }
+ }
+ } else {
+ for (ManagementObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
+ iter++) {
+ ManagementObject* object = iter->second;
+ if (object->getClassName () == className) {
+
+ // @todo: support multiple objects per message reply
+ values.clear();
+ list_.clear();
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
+
+ if (!object->isDeleted()) {
+ object->mapEncodeValues(values, true, true); // write both stats and properties
+ map_["_values"] = values;
+ list_.push_back(map_);
+
+ content.encode();
+ sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo);
+ }
+ }
+ }
+ }
+
+ // end empty "non-partial" message to indicate CommandComplete
+ list_.clear();
+ headers.erase("partial");
+ content.encode();
+ sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo);
+ QPID_LOG(trace, "SEND GetResponse to=" << replyTo << " seq=" << cid);
+}
+
bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
{
Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
@@ -1521,7 +1627,8 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN;
((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN);
outMap.encode();
- sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyToKey);
+ sendBuffer(outMsg.getContent(), boost::lexical_cast<std::string>(sequence),
+ headers, dExchange, replyToKey);
} else {
@@ -1547,7 +1654,6 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
void ManagementAgent::dispatchAgentCommandLH(Message& msg)
{
- uint32_t sequence;
string replyToKey;
const framing::MessageProperties* p =
@@ -1577,26 +1683,20 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg)
if (headers && headers->getAsString("app_id") == "qmf2")
{
std::string opcode = headers->getAsString("qmf.opcode");
+ std::string contentType = headers->getAsString("qmf.content");
+ std::string body;
+ std::string cid;
+
+ inBuffer.getRawData(body, bufferLen);
- sequence = 0;
if (p && p->hasCorrelationId()) {
- std::string cid = p->getCorrelationId();
- if (!cid.empty()) {
- try {
- sequence = boost::lexical_cast<uint32_t>(cid);
- } catch(const boost::bad_lexical_cast&) {
- QPID_LOG(warning, "Bad correlation Id for received QMF request.");
- return;
- }
- }
+ cid = p->getCorrelationId();
}
- if (opcode == "_method_request") {
- std::string body;
- inBuffer.getRawData(body, bufferLen);
- handleMethodRequestLH(body, replyToKey, sequence, msg.getPublisher());
- return;
- }
+ if (opcode == "_method_request")
+ return handleMethodRequestLH(body, replyToKey, cid, msg.getPublisher());
+ else if (opcode == "_query_request")
+ return handleGetQueryLH(body, replyToKey, cid, contentType);
QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!");
return;
@@ -1606,6 +1706,7 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg)
while (inBuffer.getPosition() < bufferLen) {
+ uint32_t sequence;
if (!checkHeader(inBuffer, &opcode, &sequence))
return;
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index 47f78e48bd..9ec28500c4 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -278,8 +278,8 @@ private:
qpid::broker::Exchange::shared_ptr exchange,
std::string routingKey);
void sendBuffer(const std::string& data,
- const uint32_t sequence,
- const qpid::messaging::VariantMap headers,
+ const std::string& cid,
+ const qpid::messaging::VariantMap& headers,
qpid::broker::Exchange::shared_ptr exchange,
std::string routingKey);
void moveNewObjectsLH();
@@ -314,7 +314,8 @@ private:
void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
- void handleMethodRequestLH (const std::string& body, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
+ void handleGetQueryLH (const std::string& body, std::string& replyToKey, const std::string& cid, const std::string& contentType);
+ void handleMethodRequestLH (const std::string& body, std::string replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken);
size_t validateSchema(framing::Buffer&, uint8_t kind);
size_t validateTableSchema(framing::Buffer&);