summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-03-18 15:58:57 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-03-18 15:58:57 +0000
commitba714fe9b24642659a9ea1d788cdc9c00bbc2501 (patch)
treeb72703a352e27d08b7e00944469321239ade184c
parentbeb42b51850a1c20fc7ac941dade108470db23e8 (diff)
downloadqpid-python-ba714fe9b24642659a9ea1d788cdc9c00bbc2501.tar.gz
checkpoint prior to schema revert
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@924873 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp93
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.h4
2 files changed, 68 insertions, 29 deletions
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index b154ef73b0..c4cf841148 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -244,7 +244,7 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se
content.encode();
connThreadBody.sendBuffer(msg.getContent(), 0,
headers,
- "qpid.management", key.str());
+ "qmf.default.topic", key.str());
}
uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
@@ -264,8 +264,7 @@ uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
methodQueue.pop_front();
{
Mutex::ScopedUnlock unlock(agentLock);
- Buffer inBuffer(const_cast<char*>(item->body.c_str()), item->body.size());
- invokeMethodRequest(inBuffer, item->sequence, item->replyTo);
+ invokeMethodRequest(item->body, item->sequence, item->replyTo);
delete item;
}
}
@@ -446,17 +445,20 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc
ClassMap& cMap = pIter->second;
ClassMap::iterator cIter = cMap.find(key);
if (cIter != cMap.end()) {
- //SchemaClass& schema = cIter->second;
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ SchemaClass& schema = cIter->second;
+ ::qpid::messaging::Message m;
+ ::qpid::messaging::MapContent content(m);
- encodeHeader(outBuffer, 's', sequence);
- //schema.writeSchemaCall(outBuffer);
- assert(false); // TODO FIX ABOVE
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
+ schema.writeSchemaCall(content.asMap());
+
+ ::qpid::messaging::VariantMap headers;
+ headers["method"] = "response";
+ headers["qmf.opcode"] = "_query_response";
+ headers["qmf.content"] = "_schema_class";
+ headers["qmf.agent"] = name_address;
+ content.encode();
+ connThreadBody.sendBuffer(m.getContent(), sequence, headers, "qpid.management", "broker");
QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name);
}
}
@@ -470,18 +472,29 @@ void ManagementAgentImpl::handleConsoleAddedIndication()
QPID_LOG(trace, "RCVD ConsoleAddedInd");
}
-void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo)
+void ManagementAgentImpl::invokeMethodRequest(const std::string& body, uint32_t sequence, string replyTo)
{
+#if 1
+ (void)body;
+ (void)sequence;
+ (void)replyTo;
+#else
string methodName;
string packageName;
string className;
uint8_t hash[16];
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
+ qpid::messaging::Message inMsg(body);
+ qpid::messaging::MapView inMap(inMsg);
+ qpid::messaging::MapView::const_iterator i;
- assert(false); // TODO FIX OBJ ID!!
+ if ((i = inMap.find("_object_id")) == _map.end()) {
+ // KAG TODO: TBD!!
+ }
//ObjectId objId(inBuffer);
ObjectId objId(std::string("foobag?"));
+
inBuffer.getShortString(packageName);
inBuffer.getShortString(className);
inBuffer.getBin128(hash);
@@ -514,6 +527,7 @@ void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequenc
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+#endif
}
void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, string replyTo)
@@ -557,7 +571,7 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st
headers["qmf.agent"] = name_address;
content.encode();
- connThreadBody.sendBuffer(m.getContent(), sequence, headers, "amq.direct", replyTo);
+ connThreadBody.sendBuffer(m.getContent(), sequence, headers, "qmf.default.direct", replyTo);
QPID_LOG(trace, "SENT ObjectInd");
}
@@ -592,7 +606,7 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st
headers["qmf.agent"] = name_address;
content.encode();
- connThreadBody.sendBuffer(m.getContent(), sequence, headers, "amq.direct", replyTo);
+ connThreadBody.sendBuffer(m.getContent(), sequence, headers, "qmf.default.direct", replyTo);
QPID_LOG(trace, "SENT ObjectInd");
}
@@ -601,13 +615,11 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st
sendCommandComplete(replyTo, sequence);
}
-void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo)
+void ManagementAgentImpl::handleMethodRequest(const std::string& body, uint32_t sequence, string replyTo)
{
if (extThread) {
Mutex::ScopedLock lock(agentLock);
- string body;
- inBuffer.getRawData(body, inBuffer.available());
methodQueue.push_back(new QueuedMethod(sequence, replyTo, body));
if (pipeHandle != 0) {
pipeHandle->write("X", 1);
@@ -627,7 +639,7 @@ void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequenc
inCallback = false;
}
} else {
- invokeMethodRequest(inBuffer, sequence, replyTo);
+ invokeMethodRequest(body, sequence, replyTo);
}
QPID_LOG(trace, "RCVD MethodRequest");
@@ -635,25 +647,52 @@ void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequenc
void ManagementAgentImpl::received(Message& msg)
{
- string data = msg.getData();
- Buffer inBuffer(const_cast<char*>(data.c_str()), data.size());
- uint8_t opcode;
- uint32_t sequence;
string replyToKey;
-
framing::MessageProperties p = msg.getMessageProperties();
if (p.hasReplyTo()) {
const framing::ReplyTo& rt = p.getReplyTo();
replyToKey = rt.getRoutingKey();
}
+ if (msg.getHeaders().getAsString("app_id") == "qmf2")
+ {
+ uint32_t sequence = 0;
+ std::string opcode = msg.getHeaders().getAsString("qmf.opcode");
+ std::string cid = msg.getMessageProperties().getCorrelationId();
+ if (!cid.empty()) {
+ try {
+ sequence = boost::lexical_cast<uint32_t>(cid);
+ } catch(const boost::bad_lexical_cast&) {
+ QPID_LOG(warning, "Bad correlation Id for received QMF request.");
+ return;
+ }
+ }
+
+ if (opcode == "_method_request") {
+ handleMethodRequest(msg.getData(), sequence, replyToKey);
+ return;
+ }
+
+ QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!");
+ return;
+ }
+
+ // old preV2 binary messages
+
+ uint32_t sequence;
+ string data = msg.getData();
+ Buffer inBuffer(const_cast<char*>(data.c_str()), data.size());
+ uint8_t opcode;
+
+
if (checkHeader(inBuffer, &opcode, &sequence))
{
if (opcode == 'a') handleAttachResponse(inBuffer);
else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence);
else if (opcode == 'x') handleConsoleAddedIndication();
else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey);
- else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey);
+ else if (opcode == 'M')
+ QPID_LOG(warning, "Ignoring old-format QMF Method Request!!!");
}
}
@@ -862,7 +901,7 @@ void ManagementAgentImpl::periodicProcessing()
headers["qmf.content"] = "_data";
headers["qmf.agent"] = name_address;
- connThreadBody.sendBuffer(str, 0, headers, "qpid.management", key.str(), "amqp/list");
+ connThreadBody.sendBuffer(str, 0, headers, "qmf.default.topic", key.str(), "amqp/list");
QPID_LOG(trace, "SENT DataIndication key=" << key.str());
}
}
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
index 35ace3910f..f753395415 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
@@ -263,9 +263,9 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
void handlePackageRequest (qpid::framing::Buffer& inBuffer);
void handleClassQuery (qpid::framing::Buffer& inBuffer);
void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence);
- void invokeMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
+ void invokeMethodRequest (const std::string& body, uint32_t sequence, std::string replyTo);
void handleGetQuery (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
- void handleMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
+ void handleMethodRequest (const std::string& body, uint32_t sequence, std::string replyTo);
void handleConsoleAddedIndication();
};