diff options
author | Ted Ross <tross@apache.org> | 2010-04-16 19:03:36 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2010-04-16 19:03:36 +0000 |
commit | 982a0d216f6f7da64c2e614672ecc09a2dcd4d4f (patch) | |
tree | 8c964cb97ce3a1ed3772cfd77d74808d888a7d4d /cpp/src | |
parent | b6d073ef81e8fef3dd1319136ec3bb12c4da73fc (diff) | |
download | qpid-python-982a0d216f6f7da64c2e614672ecc09a2dcd4d4f.tar.gz |
Fixed problems with the broker's QMFv2 mode:
- app_id is a message property, not an application header
- even in v2 mode, the agent must respond to v1 schema requests
- missing object_id was added to the periodic data updates
- epoch/boot-sequence added to agent information (locate-response, heartbeat)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@935044 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageAdapter.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageAdapter.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 23 |
6 files changed, 27 insertions, 12 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 076a34d1bf..23cd284a2f 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -374,6 +374,7 @@ void ManagementAgentImpl::sendHeartbeat() map["_values"] = attrMap; map["_values"].asMap()["timestamp"] = uint64_t(Duration(now())); map["_values"].asMap()["heartbeat_interval"] = interval; + map["_values"].asMap()["epoch"] = bootSequence; MapCodec::encode(map, content); connThreadBody.sendBuffer(content, "", headers, addr_exchange, addr_key); @@ -673,6 +674,7 @@ void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, map["_values"] = attrMap; map["_values"].asMap()["timestamp"] = uint64_t(Duration(now())); map["_values"].asMap()["heartbeat_interval"] = interval; + map["_values"].asMap()["epoch"] = bootSequence; MapCodec::encode(map, content); connThreadBody.sendBuffer(content, cid, headers, addr_exchange, replyTo); diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 65106cb99b..b086d59ca5 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -99,6 +99,11 @@ const FieldTable* Message::getApplicationHeaders() const return getAdapter().getApplicationHeaders(frames); } +std::string Message::getAppId() const +{ + return getAdapter().getAppId(frames); +} + bool Message::isPersistent() const { return (getAdapter().isPersistent(frames) || forcePersistentPolicy); diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index 353044c577..4330a03469 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -73,6 +73,7 @@ public: QPID_BROKER_EXTERN std::string getExchangeName() const; bool isImmediate() const; QPID_BROKER_EXTERN const framing::FieldTable* getApplicationHeaders() const; + QPID_BROKER_EXTERN std::string getAppId() const; framing::FieldTable& getOrInsertHeaders(); QPID_BROKER_EXTERN bool isPersistent() const; bool requiresAccept(); diff --git a/cpp/src/qpid/broker/MessageAdapter.cpp b/cpp/src/qpid/broker/MessageAdapter.cpp index acec2b2af6..0eb4a6fa22 100644 --- a/cpp/src/qpid/broker/MessageAdapter.cpp +++ b/cpp/src/qpid/broker/MessageAdapter.cpp @@ -72,4 +72,10 @@ namespace broker{ const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>(); return p ? p->getPriority() : 0; } + + std::string TransferAdapter::getAppId(const framing::FrameSet& f) + { + const framing::MessageProperties* p = f.getHeaders()->get<framing::MessageProperties>(); + return p ? p->getAppId() : empty; + } }} diff --git a/cpp/src/qpid/broker/MessageAdapter.h b/cpp/src/qpid/broker/MessageAdapter.h index cbc75ab6e2..df50db4063 100644 --- a/cpp/src/qpid/broker/MessageAdapter.h +++ b/cpp/src/qpid/broker/MessageAdapter.h @@ -41,6 +41,7 @@ struct MessageAdapter virtual bool isPersistent(const framing::FrameSet& f) = 0; virtual bool requiresAccept(const framing::FrameSet& f) = 0; virtual uint8_t getPriority(const framing::FrameSet& f) = 0; + virtual std::string getAppId(const framing::FrameSet& f) = 0; }; struct TransferAdapter : MessageAdapter @@ -52,6 +53,7 @@ struct TransferAdapter : MessageAdapter bool isImmediate(const framing::FrameSet&); bool requiresAccept(const framing::FrameSet& f); uint8_t getPriority(const framing::FrameSet& f); + virtual std::string getAppId(const framing::FrameSet& f); }; }} diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index efc3f182e3..7131207d7d 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -667,7 +667,10 @@ void ManagementAgent::periodicProcessing (void) if ((send_stats || send_props) && qmf2Support) { Variant::Map map_; Variant::Map values; + Variant::Map oid; + object->getObjectId().mapEncode(oid); + map_["_object_id"] = oid; map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), object->getClassName(), "_data", @@ -675,7 +678,6 @@ void ManagementAgent::periodicProcessing (void) object->mapEncodeValues(values, send_props, send_stats); map_["_values"] = values; list_.push_back(map_); - } if (send_props) pcount++; @@ -698,7 +700,7 @@ void ManagementAgent::periodicProcessing (void) stringstream key; key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName(); sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); - QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount); + QPID_LOG(trace, "SEND V1 Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount); } } @@ -815,6 +817,7 @@ void ManagementAgent::periodicProcessing (void) map["_values"] = attrMap; map["_values"].asMap()["timestamp"] = uint64_t(sys::Duration(sys::now())); map["_values"].asMap()["heartbeat_interval"] = interval; + map["_values"].asMap()["epoch"] = bootSequence; string content; MapCodec::encode(map, content); @@ -906,8 +909,7 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable, { sys::Mutex::ScopedLock lock (userLock); Message& msg = ((DeliverableMessage&) deliverable).getMessage (); - - if (qmf1Support && topic) { + if (topic) { // qmf1 is bound only to the topic management exchange. // Parse the routing key. This management broker should act as though it @@ -943,11 +945,9 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable, if (qmf2Support) { if (topic) { - // Intercept messages bound to: // "console.ind.locate.# - process these messages, and also allow them to be forwarded. - - if (routingKey.compare(0, 18, "console.ind.locate") == 0) { + if (routingKey == "console.request.agent_locate") { dispatchAgentCommandLH(msg); return true; } @@ -1704,10 +1704,12 @@ void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo map["_values"] = attrMap; map["_values"].asMap()["timestamp"] = uint64_t(sys::Duration(sys::now())); map["_values"].asMap()["heartbeat_interval"] = interval; + map["_values"].asMap()["epoch"] = bootSequence; string content; MapCodec::encode(map, content); sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo); + clientWasAdded = true; QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo); } @@ -1736,7 +1738,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) const framing::FieldTable *headers = msg.getApplicationHeaders(); - if (headers && headers->getAsString("app_id") == "qmf2") + if (headers && msg.getAppId() == "qmf2") { mapMsg = true; @@ -1874,7 +1876,6 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) void ManagementAgent::dispatchAgentCommandLH(Message& msg) { string replyToKey; - const framing::MessageProperties* p = msg.getFrames().getHeaders()->get<framing::MessageProperties>(); if (p && p->hasReplyTo()) { @@ -1898,14 +1899,12 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg) inBuffer.reset(); const framing::FieldTable *headers = msg.getApplicationHeaders(); - - if (headers && headers->getAsString("app_id") == "qmf2") + if (headers && msg.getAppId() == "qmf2") { std::string opcode = headers->getAsString("qmf.opcode"); std::string contentType = headers->getAsString("qmf.content"); std::string body; std::string cid; - inBuffer.getRawData(body, bufferLen); if (p && p->hasCorrelationId()) { |