summaryrefslogtreecommitdiff
path: root/cpp/src/qmf/ConsoleSession.cpp
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2011-01-10 14:06:16 +0000
committerTed Ross <tross@apache.org>2011-01-10 14:06:16 +0000
commit120ea440ef9d048d3bb31e6118027f5c9e890fca (patch)
tree34e14880765c6b79c77c4fb24e834b8d67260149 /cpp/src/qmf/ConsoleSession.cpp
parent598e5eacac716a9a3a812e9cf72b14bde57ed45a (diff)
downloadqpid-python-120ea440ef9d048d3bb31e6118027f5c9e890fca.tar.gz
Updates to the C++ implementation of QMFv2:
1) Consolidated string constants for the protocol into a definition file. 2) Added hooks for subscription handling. 3) Added checks to validate properties and arguments against the schema (if there is a schema). git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1057199 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qmf/ConsoleSession.cpp')
-rw-r--r--cpp/src/qmf/ConsoleSession.cpp66
1 files changed, 45 insertions, 21 deletions
diff --git a/cpp/src/qmf/ConsoleSession.cpp b/cpp/src/qmf/ConsoleSession.cpp
index dc2bbe34ee..f327170c5e 100644
--- a/cpp/src/qmf/ConsoleSession.cpp
+++ b/cpp/src/qmf/ConsoleSession.cpp
@@ -25,14 +25,19 @@
#include "qmf/SchemaId.h"
#include "qmf/SchemaImpl.h"
#include "qmf/ConsoleEventImpl.h"
+#include "qmf/constants.h"
#include "qpid/log/Statement.h"
#include "qpid/messaging/AddressParser.h"
#include "qpid/messaging/Sender.h"
#include "qpid/messaging/Receiver.h"
using namespace std;
-using namespace qpid::messaging;
using namespace qmf;
+using qpid::messaging::Address;
+using qpid::messaging::Connection;
+using qpid::messaging::Receiver;
+using qpid::messaging::Duration;
+using qpid::messaging::Message;
using qpid::types::Variant;
typedef qmf::PrivateImplRef<ConsoleSession> PI;
@@ -51,6 +56,8 @@ bool ConsoleSession::nextEvent(ConsoleEvent& e, Duration t) { return impl->nextE
uint32_t ConsoleSession::getAgentCount() const { return impl->getAgentCount(); }
Agent ConsoleSession::getAgent(uint32_t i) const { return impl->getAgent(i); }
Agent ConsoleSession::getConnectedBrokerAgent() const { return impl->getConnectedBrokerAgent(); }
+Subscription ConsoleSession::subscribe(const Query& q, const string& f, const string& o) { return impl->subscribe(q, f, o); }
+Subscription ConsoleSession::subscribe(const string& q, const string& f, const string& o) { return impl->subscribe(q, f, o); }
//========================================================================================
// Impl Method Bodies
@@ -227,6 +234,18 @@ Agent ConsoleSessionImpl::getAgent(uint32_t i) const
}
+Subscription ConsoleSessionImpl::subscribe(const Query&, const string&, const string&)
+{
+ return Subscription();
+}
+
+
+Subscription ConsoleSessionImpl::subscribe(const string&, const string&, const string&)
+{
+ return Subscription();
+}
+
+
void ConsoleSessionImpl::enqueueEvent(const ConsoleEvent& event)
{
qpid::sys::Mutex::ScopedLock l(lock);
@@ -249,17 +268,17 @@ void ConsoleSessionImpl::dispatch(Message msg)
Variant::Map::const_iterator iter;
Variant::Map::const_iterator oiter;
- oiter = properties.find("qmf.opcode");
- iter = properties.find("x-amqp-0-10.app-id");
+ oiter = properties.find(protocol::HEADER_KEY_OPCODE);
+ iter = properties.find(protocol::HEADER_KEY_APP_ID);
if (iter == properties.end())
iter = properties.find("app_id");
- if (iter != properties.end() && iter->second.asString() == "qmf2" && oiter != properties.end()) {
+ if (iter != properties.end() && iter->second.asString() == protocol::HEADER_APP_ID_QMF && oiter != properties.end()) {
//
// Dispatch a QMFv2 formatted message
//
const string& opcode = oiter->second.asString();
- iter = properties.find("qmf.agent");
+ iter = properties.find(protocol::HEADER_KEY_AGENT);
if (iter == properties.end()) {
QPID_LOG(trace, "Message received with no 'qmf.agent' header");
return;
@@ -277,7 +296,7 @@ void ConsoleSessionImpl::dispatch(Message msg)
}
if (msg.getContentType() == "amqp/map" &&
- (opcode == "_agent_heartbeat_indication" || opcode == "_agent_locate_response")) {
+ (opcode == protocol::HEADER_OPCODE_AGENT_HEARTBEAT_INDICATION || opcode == protocol::HEADER_OPCODE_AGENT_LOCATE_RESPONSE)) {
//
// This is the one case where it's ok (necessary actually) to receive a QMFv2
// message from an unknown agent (how else are they going to get known?)
@@ -297,8 +316,8 @@ void ConsoleSessionImpl::dispatch(Message msg)
Variant::Map content;
decode(msg, content);
- if (opcode == "_exception") agentImpl.handleException(content, msg);
- else if (opcode == "_method_response") agentImpl.handleMethodResponse(content, msg);
+ if (opcode == protocol::HEADER_OPCODE_EXCEPTION) agentImpl.handleException(content, msg);
+ else if (opcode == protocol::HEADER_OPCODE_METHOD_RESPONSE) agentImpl.handleMethodResponse(content, msg);
else
QPID_LOG(error, "Received a map-formatted QMFv2 message with opcode=" << opcode);
@@ -309,8 +328,8 @@ void ConsoleSessionImpl::dispatch(Message msg)
Variant::List content;
decode(msg, content);
- if (opcode == "_query_response") agentImpl.handleQueryResponse(content, msg);
- else if (opcode == "_data_indication") agentImpl.handleDataIndication(content, msg);
+ if (opcode == protocol::HEADER_OPCODE_QUERY_RESPONSE) agentImpl.handleQueryResponse(content, msg);
+ else if (opcode == protocol::HEADER_OPCODE_DATA_INDICATION) agentImpl.handleDataIndication(content, msg);
else
QPID_LOG(error, "Received a list-formatted QMFv2 message with opcode=" << opcode);
@@ -344,9 +363,9 @@ void ConsoleSessionImpl::sendBrokerLocate()
Message msg;
Variant::Map& headers(msg.getProperties());
- headers["method"] = "request";
- headers["qmf.opcode"] = "_agent_locate_request";
- headers["x-amqp-0-10.app-id"] = "qmf2";
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
msg.setReplyTo(replyAddress);
msg.setCorrelationId("broker-locate");
@@ -363,9 +382,9 @@ void ConsoleSessionImpl::sendAgentLocate()
Message msg;
Variant::Map& headers(msg.getProperties());
- headers["method"] = "request";
- headers["qmf.opcode"] = "_agent_locate_request";
- headers["x-amqp-0-10.app-id"] = "qmf2";
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
msg.setReplyTo(replyAddress);
msg.setCorrelationId("agent-locate");
@@ -390,13 +409,17 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian
return;
Variant::Map attrs(iter->second.asMap());
- iter = attrs.find("epoch");
+ iter = attrs.find(protocol::AGENT_ATTR_EPOCH);
if (iter != attrs.end())
epoch = iter->second.asUint32();
if (cid == "broker-locate") {
qpid::sys::Mutex::ScopedLock l(lock);
- agent = Agent(new AgentImpl(agentName, epoch, *this));
+ auto_ptr<AgentImpl> impl(new AgentImpl(agentName, epoch, *this));
+ for (iter = attrs.begin(); iter != attrs.end(); iter++)
+ if (iter->first != protocol::AGENT_ATTR_EPOCH)
+ impl->setAttribute(iter->first, iter->second);
+ agent = Agent(impl.release());
connectedBrokerAgent = agent;
if (!agentQuery || agentQuery.matchesPredicate(attrs)) {
connectedBrokerInAgentList = true;
@@ -430,7 +453,7 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian
//
auto_ptr<AgentImpl> impl(new AgentImpl(agentName, epoch, *this));
for (iter = attrs.begin(); iter != attrs.end(); iter++)
- if (iter->first != "epoch")
+ if (iter->first != protocol::AGENT_ATTR_EPOCH)
impl->setAttribute(iter->first, iter->second);
agent = Agent(impl.release());
agents[agentName] = agent;
@@ -459,16 +482,17 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian
enqueueEventLH(ConsoleEvent(eventImpl.release()));
}
- iter = attrs.find("schemaUpdated");
+ iter = attrs.find(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP);
if (iter != attrs.end()) {
uint64_t ts(iter->second.asUint64());
- if (ts > impl.getAttribute("schemaUpdated").asUint64()) {
+ if (ts > impl.getAttribute(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP).asUint64()) {
//
// The agent has added new schema entries since we last heard from it.
// Enqueue a notification.
//
auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_SCHEMA_UPDATE));
eventImpl->setAgent(agent);
+ impl.setAttribute(iter->first, iter->second);
enqueueEventLH(ConsoleEvent(eventImpl.release()));
}
}