summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-04-16 19:03:36 +0000
committerTed Ross <tross@apache.org>2010-04-16 19:03:36 +0000
commit982a0d216f6f7da64c2e614672ecc09a2dcd4d4f (patch)
tree8c964cb97ce3a1ed3772cfd77d74808d888a7d4d /cpp/src
parentb6d073ef81e8fef3dd1319136ec3bb12c4da73fc (diff)
downloadqpid-python-982a0d216f6f7da64c2e614672ecc09a2dcd4d4f.tar.gz
Fixed problems with the broker's QMFv2 mode:
- app_id is a message property, not an application header - even in v2 mode, the agent must respond to v1 schema requests - missing object_id was added to the periodic data updates - epoch/boot-sequence added to agent information (locate-response, heartbeat) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@935044 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.cpp2
-rw-r--r--cpp/src/qpid/broker/Message.cpp5
-rw-r--r--cpp/src/qpid/broker/Message.h1
-rw-r--r--cpp/src/qpid/broker/MessageAdapter.cpp6
-rw-r--r--cpp/src/qpid/broker/MessageAdapter.h2
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp23
6 files changed, 27 insertions, 12 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 076a34d1bf..23cd284a2f 100644
--- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -374,6 +374,7 @@ void ManagementAgentImpl::sendHeartbeat()
map["_values"] = attrMap;
map["_values"].asMap()["timestamp"] = uint64_t(Duration(now()));
map["_values"].asMap()["heartbeat_interval"] = interval;
+ map["_values"].asMap()["epoch"] = bootSequence;
MapCodec::encode(map, content);
connThreadBody.sendBuffer(content, "", headers, addr_exchange, addr_key);
@@ -673,6 +674,7 @@ void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid,
map["_values"] = attrMap;
map["_values"].asMap()["timestamp"] = uint64_t(Duration(now()));
map["_values"].asMap()["heartbeat_interval"] = interval;
+ map["_values"].asMap()["epoch"] = bootSequence;
MapCodec::encode(map, content);
connThreadBody.sendBuffer(content, cid, headers, addr_exchange, replyTo);
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 65106cb99b..b086d59ca5 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -99,6 +99,11 @@ const FieldTable* Message::getApplicationHeaders() const
return getAdapter().getApplicationHeaders(frames);
}
+std::string Message::getAppId() const
+{
+ return getAdapter().getAppId(frames);
+}
+
bool Message::isPersistent() const
{
return (getAdapter().isPersistent(frames) || forcePersistentPolicy);
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index 353044c577..4330a03469 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -73,6 +73,7 @@ public:
QPID_BROKER_EXTERN std::string getExchangeName() const;
bool isImmediate() const;
QPID_BROKER_EXTERN const framing::FieldTable* getApplicationHeaders() const;
+ QPID_BROKER_EXTERN std::string getAppId() const;
framing::FieldTable& getOrInsertHeaders();
QPID_BROKER_EXTERN bool isPersistent() const;
bool requiresAccept();
diff --git a/cpp/src/qpid/broker/MessageAdapter.cpp b/cpp/src/qpid/broker/MessageAdapter.cpp
index acec2b2af6..0eb4a6fa22 100644
--- a/cpp/src/qpid/broker/MessageAdapter.cpp
+++ b/cpp/src/qpid/broker/MessageAdapter.cpp
@@ -72,4 +72,10 @@ namespace broker{
const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>();
return p ? p->getPriority() : 0;
}
+
+ std::string TransferAdapter::getAppId(const framing::FrameSet& f)
+ {
+ const framing::MessageProperties* p = f.getHeaders()->get<framing::MessageProperties>();
+ return p ? p->getAppId() : empty;
+ }
}}
diff --git a/cpp/src/qpid/broker/MessageAdapter.h b/cpp/src/qpid/broker/MessageAdapter.h
index cbc75ab6e2..df50db4063 100644
--- a/cpp/src/qpid/broker/MessageAdapter.h
+++ b/cpp/src/qpid/broker/MessageAdapter.h
@@ -41,6 +41,7 @@ struct MessageAdapter
virtual bool isPersistent(const framing::FrameSet& f) = 0;
virtual bool requiresAccept(const framing::FrameSet& f) = 0;
virtual uint8_t getPriority(const framing::FrameSet& f) = 0;
+ virtual std::string getAppId(const framing::FrameSet& f) = 0;
};
struct TransferAdapter : MessageAdapter
@@ -52,6 +53,7 @@ struct TransferAdapter : MessageAdapter
bool isImmediate(const framing::FrameSet&);
bool requiresAccept(const framing::FrameSet& f);
uint8_t getPriority(const framing::FrameSet& f);
+ virtual std::string getAppId(const framing::FrameSet& f);
};
}}
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index efc3f182e3..7131207d7d 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -667,7 +667,10 @@ void ManagementAgent::periodicProcessing (void)
if ((send_stats || send_props) && qmf2Support) {
Variant::Map map_;
Variant::Map values;
+ Variant::Map oid;
+ object->getObjectId().mapEncode(oid);
+ map_["_object_id"] = oid;
map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
object->getClassName(),
"_data",
@@ -675,7 +678,6 @@ void ManagementAgent::periodicProcessing (void)
object->mapEncodeValues(values, send_props, send_stats);
map_["_values"] = values;
list_.push_back(map_);
-
}
if (send_props) pcount++;
@@ -698,7 +700,7 @@ void ManagementAgent::periodicProcessing (void)
stringstream key;
key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName();
sendBufferLH(msgBuffer, contentSize, mExchange, key.str());
- QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
+ QPID_LOG(trace, "SEND V1 Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
}
}
@@ -815,6 +817,7 @@ void ManagementAgent::periodicProcessing (void)
map["_values"] = attrMap;
map["_values"].asMap()["timestamp"] = uint64_t(sys::Duration(sys::now()));
map["_values"].asMap()["heartbeat_interval"] = interval;
+ map["_values"].asMap()["epoch"] = bootSequence;
string content;
MapCodec::encode(map, content);
@@ -906,8 +909,7 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable,
{
sys::Mutex::ScopedLock lock (userLock);
Message& msg = ((DeliverableMessage&) deliverable).getMessage ();
-
- if (qmf1Support && topic) {
+ if (topic) {
// qmf1 is bound only to the topic management exchange.
// Parse the routing key. This management broker should act as though it
@@ -943,11 +945,9 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable,
if (qmf2Support) {
if (topic) {
-
// Intercept messages bound to:
// "console.ind.locate.# - process these messages, and also allow them to be forwarded.
-
- if (routingKey.compare(0, 18, "console.ind.locate") == 0) {
+ if (routingKey == "console.request.agent_locate") {
dispatchAgentCommandLH(msg);
return true;
}
@@ -1704,10 +1704,12 @@ void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo
map["_values"] = attrMap;
map["_values"].asMap()["timestamp"] = uint64_t(sys::Duration(sys::now()));
map["_values"].asMap()["heartbeat_interval"] = interval;
+ map["_values"].asMap()["epoch"] = bootSequence;
string content;
MapCodec::encode(map, content);
sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
+ clientWasAdded = true;
QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo);
}
@@ -1736,7 +1738,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
const framing::FieldTable *headers = msg.getApplicationHeaders();
- if (headers && headers->getAsString("app_id") == "qmf2")
+ if (headers && msg.getAppId() == "qmf2")
{
mapMsg = true;
@@ -1874,7 +1876,6 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
void ManagementAgent::dispatchAgentCommandLH(Message& msg)
{
string replyToKey;
-
const framing::MessageProperties* p =
msg.getFrames().getHeaders()->get<framing::MessageProperties>();
if (p && p->hasReplyTo()) {
@@ -1898,14 +1899,12 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg)
inBuffer.reset();
const framing::FieldTable *headers = msg.getApplicationHeaders();
-
- if (headers && headers->getAsString("app_id") == "qmf2")
+ if (headers && msg.getAppId() == "qmf2")
{
std::string opcode = headers->getAsString("qmf.opcode");
std::string contentType = headers->getAsString("qmf.content");
std::string body;
std::string cid;
-
inBuffer.getRawData(body, bufferLen);
if (p && p->hasCorrelationId()) {