From ba714fe9b24642659a9ea1d788cdc9c00bbc2501 Mon Sep 17 00:00:00 2001 From: Kenneth Anthony Giusti Date: Thu, 18 Mar 2010 15:58:57 +0000 Subject: checkpoint prior to schema revert git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@924873 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp | 93 ++++++++++++++++++------- qpid/cpp/src/qpid/agent/ManagementAgentImpl.h | 4 +- 2 files changed, 68 insertions(+), 29 deletions(-) diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp index b154ef73b0..c4cf841148 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -244,7 +244,7 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se content.encode(); connThreadBody.sendBuffer(msg.getContent(), 0, headers, - "qpid.management", key.str()); + "qmf.default.topic", key.str()); } uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) @@ -264,8 +264,7 @@ uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) methodQueue.pop_front(); { Mutex::ScopedUnlock unlock(agentLock); - Buffer inBuffer(const_cast(item->body.c_str()), item->body.size()); - invokeMethodRequest(inBuffer, item->sequence, item->replyTo); + invokeMethodRequest(item->body, item->sequence, item->replyTo); delete item; } } @@ -446,17 +445,20 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc ClassMap& cMap = pIter->second; ClassMap::iterator cIter = cMap.find(key); if (cIter != cMap.end()) { - //SchemaClass& schema = cIter->second; - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + SchemaClass& schema = cIter->second; + ::qpid::messaging::Message m; + ::qpid::messaging::MapContent content(m); - encodeHeader(outBuffer, 's', sequence); - //schema.writeSchemaCall(outBuffer); - assert(false); // TODO FIX ABOVE - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); + schema.writeSchemaCall(content.asMap()); + + ::qpid::messaging::VariantMap headers; + headers["method"] = "response"; + headers["qmf.opcode"] = "_query_response"; + headers["qmf.content"] = "_schema_class"; + headers["qmf.agent"] = name_address; + content.encode(); + connThreadBody.sendBuffer(m.getContent(), sequence, headers, "qpid.management", "broker"); QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name); } } @@ -470,18 +472,29 @@ void ManagementAgentImpl::handleConsoleAddedIndication() QPID_LOG(trace, "RCVD ConsoleAddedInd"); } -void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo) +void ManagementAgentImpl::invokeMethodRequest(const std::string& body, uint32_t sequence, string replyTo) { +#if 1 + (void)body; + (void)sequence; + (void)replyTo; +#else string methodName; string packageName; string className; uint8_t hash[16]; Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; + qpid::messaging::Message inMsg(body); + qpid::messaging::MapView inMap(inMsg); + qpid::messaging::MapView::const_iterator i; - assert(false); // TODO FIX OBJ ID!! + if ((i = inMap.find("_object_id")) == _map.end()) { + // KAG TODO: TBD!! + } //ObjectId objId(inBuffer); ObjectId objId(std::string("foobag?")); + inBuffer.getShortString(packageName); inBuffer.getShortString(className); inBuffer.getBin128(hash); @@ -514,6 +527,7 @@ void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequenc outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo); +#endif } void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, string replyTo) @@ -557,7 +571,7 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st headers["qmf.agent"] = name_address; content.encode(); - connThreadBody.sendBuffer(m.getContent(), sequence, headers, "amq.direct", replyTo); + connThreadBody.sendBuffer(m.getContent(), sequence, headers, "qmf.default.direct", replyTo); QPID_LOG(trace, "SENT ObjectInd"); } @@ -592,7 +606,7 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st headers["qmf.agent"] = name_address; content.encode(); - connThreadBody.sendBuffer(m.getContent(), sequence, headers, "amq.direct", replyTo); + connThreadBody.sendBuffer(m.getContent(), sequence, headers, "qmf.default.direct", replyTo); QPID_LOG(trace, "SENT ObjectInd"); } @@ -601,13 +615,11 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st sendCommandComplete(replyTo, sequence); } -void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo) +void ManagementAgentImpl::handleMethodRequest(const std::string& body, uint32_t sequence, string replyTo) { if (extThread) { Mutex::ScopedLock lock(agentLock); - string body; - inBuffer.getRawData(body, inBuffer.available()); methodQueue.push_back(new QueuedMethod(sequence, replyTo, body)); if (pipeHandle != 0) { pipeHandle->write("X", 1); @@ -627,7 +639,7 @@ void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequenc inCallback = false; } } else { - invokeMethodRequest(inBuffer, sequence, replyTo); + invokeMethodRequest(body, sequence, replyTo); } QPID_LOG(trace, "RCVD MethodRequest"); @@ -635,25 +647,52 @@ void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequenc void ManagementAgentImpl::received(Message& msg) { - string data = msg.getData(); - Buffer inBuffer(const_cast(data.c_str()), data.size()); - uint8_t opcode; - uint32_t sequence; string replyToKey; - framing::MessageProperties p = msg.getMessageProperties(); if (p.hasReplyTo()) { const framing::ReplyTo& rt = p.getReplyTo(); replyToKey = rt.getRoutingKey(); } + if (msg.getHeaders().getAsString("app_id") == "qmf2") + { + uint32_t sequence = 0; + std::string opcode = msg.getHeaders().getAsString("qmf.opcode"); + std::string cid = msg.getMessageProperties().getCorrelationId(); + if (!cid.empty()) { + try { + sequence = boost::lexical_cast(cid); + } catch(const boost::bad_lexical_cast&) { + QPID_LOG(warning, "Bad correlation Id for received QMF request."); + return; + } + } + + if (opcode == "_method_request") { + handleMethodRequest(msg.getData(), sequence, replyToKey); + return; + } + + QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!"); + return; + } + + // old preV2 binary messages + + uint32_t sequence; + string data = msg.getData(); + Buffer inBuffer(const_cast(data.c_str()), data.size()); + uint8_t opcode; + + if (checkHeader(inBuffer, &opcode, &sequence)) { if (opcode == 'a') handleAttachResponse(inBuffer); else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence); else if (opcode == 'x') handleConsoleAddedIndication(); else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey); - else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey); + else if (opcode == 'M') + QPID_LOG(warning, "Ignoring old-format QMF Method Request!!!"); } } @@ -862,7 +901,7 @@ void ManagementAgentImpl::periodicProcessing() headers["qmf.content"] = "_data"; headers["qmf.agent"] = name_address; - connThreadBody.sendBuffer(str, 0, headers, "qpid.management", key.str(), "amqp/list"); + connThreadBody.sendBuffer(str, 0, headers, "qmf.default.topic", key.str(), "amqp/list"); QPID_LOG(trace, "SENT DataIndication key=" << key.str()); } } diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h index 35ace3910f..f753395415 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -263,9 +263,9 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen void handlePackageRequest (qpid::framing::Buffer& inBuffer); void handleClassQuery (qpid::framing::Buffer& inBuffer); void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence); - void invokeMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); + void invokeMethodRequest (const std::string& body, uint32_t sequence, std::string replyTo); void handleGetQuery (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); - void handleMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); + void handleMethodRequest (const std::string& body, uint32_t sequence, std::string replyTo); void handleConsoleAddedIndication(); }; -- cgit v1.2.1