diff options
author | Ted Ross <tross@apache.org> | 2011-01-10 14:06:16 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2011-01-10 14:06:16 +0000 |
commit | 120ea440ef9d048d3bb31e6118027f5c9e890fca (patch) | |
tree | 34e14880765c6b79c77c4fb24e834b8d67260149 /cpp/src/qmf/ConsoleSession.cpp | |
parent | 598e5eacac716a9a3a812e9cf72b14bde57ed45a (diff) | |
download | qpid-python-120ea440ef9d048d3bb31e6118027f5c9e890fca.tar.gz |
Updates to the C++ implementation of QMFv2:
1) Consolidated string constants for the protocol into a definition file.
2) Added hooks for subscription handling.
3) Added checks to validate properties and arguments against the schema (if there is a schema).
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1057199 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qmf/ConsoleSession.cpp')
-rw-r--r-- | cpp/src/qmf/ConsoleSession.cpp | 66 |
1 files changed, 45 insertions, 21 deletions
diff --git a/cpp/src/qmf/ConsoleSession.cpp b/cpp/src/qmf/ConsoleSession.cpp index dc2bbe34ee..f327170c5e 100644 --- a/cpp/src/qmf/ConsoleSession.cpp +++ b/cpp/src/qmf/ConsoleSession.cpp @@ -25,14 +25,19 @@ #include "qmf/SchemaId.h" #include "qmf/SchemaImpl.h" #include "qmf/ConsoleEventImpl.h" +#include "qmf/constants.h" #include "qpid/log/Statement.h" #include "qpid/messaging/AddressParser.h" #include "qpid/messaging/Sender.h" #include "qpid/messaging/Receiver.h" using namespace std; -using namespace qpid::messaging; using namespace qmf; +using qpid::messaging::Address; +using qpid::messaging::Connection; +using qpid::messaging::Receiver; +using qpid::messaging::Duration; +using qpid::messaging::Message; using qpid::types::Variant; typedef qmf::PrivateImplRef<ConsoleSession> PI; @@ -51,6 +56,8 @@ bool ConsoleSession::nextEvent(ConsoleEvent& e, Duration t) { return impl->nextE uint32_t ConsoleSession::getAgentCount() const { return impl->getAgentCount(); } Agent ConsoleSession::getAgent(uint32_t i) const { return impl->getAgent(i); } Agent ConsoleSession::getConnectedBrokerAgent() const { return impl->getConnectedBrokerAgent(); } +Subscription ConsoleSession::subscribe(const Query& q, const string& f, const string& o) { return impl->subscribe(q, f, o); } +Subscription ConsoleSession::subscribe(const string& q, const string& f, const string& o) { return impl->subscribe(q, f, o); } //======================================================================================== // Impl Method Bodies @@ -227,6 +234,18 @@ Agent ConsoleSessionImpl::getAgent(uint32_t i) const } +Subscription ConsoleSessionImpl::subscribe(const Query&, const string&, const string&) +{ + return Subscription(); +} + + +Subscription ConsoleSessionImpl::subscribe(const string&, const string&, const string&) +{ + return Subscription(); +} + + void ConsoleSessionImpl::enqueueEvent(const ConsoleEvent& event) { qpid::sys::Mutex::ScopedLock l(lock); @@ -249,17 +268,17 @@ void ConsoleSessionImpl::dispatch(Message msg) Variant::Map::const_iterator iter; Variant::Map::const_iterator oiter; - oiter = properties.find("qmf.opcode"); - iter = properties.find("x-amqp-0-10.app-id"); + oiter = properties.find(protocol::HEADER_KEY_OPCODE); + iter = properties.find(protocol::HEADER_KEY_APP_ID); if (iter == properties.end()) iter = properties.find("app_id"); - if (iter != properties.end() && iter->second.asString() == "qmf2" && oiter != properties.end()) { + if (iter != properties.end() && iter->second.asString() == protocol::HEADER_APP_ID_QMF && oiter != properties.end()) { // // Dispatch a QMFv2 formatted message // const string& opcode = oiter->second.asString(); - iter = properties.find("qmf.agent"); + iter = properties.find(protocol::HEADER_KEY_AGENT); if (iter == properties.end()) { QPID_LOG(trace, "Message received with no 'qmf.agent' header"); return; @@ -277,7 +296,7 @@ void ConsoleSessionImpl::dispatch(Message msg) } if (msg.getContentType() == "amqp/map" && - (opcode == "_agent_heartbeat_indication" || opcode == "_agent_locate_response")) { + (opcode == protocol::HEADER_OPCODE_AGENT_HEARTBEAT_INDICATION || opcode == protocol::HEADER_OPCODE_AGENT_LOCATE_RESPONSE)) { // // This is the one case where it's ok (necessary actually) to receive a QMFv2 // message from an unknown agent (how else are they going to get known?) @@ -297,8 +316,8 @@ void ConsoleSessionImpl::dispatch(Message msg) Variant::Map content; decode(msg, content); - if (opcode == "_exception") agentImpl.handleException(content, msg); - else if (opcode == "_method_response") agentImpl.handleMethodResponse(content, msg); + if (opcode == protocol::HEADER_OPCODE_EXCEPTION) agentImpl.handleException(content, msg); + else if (opcode == protocol::HEADER_OPCODE_METHOD_RESPONSE) agentImpl.handleMethodResponse(content, msg); else QPID_LOG(error, "Received a map-formatted QMFv2 message with opcode=" << opcode); @@ -309,8 +328,8 @@ void ConsoleSessionImpl::dispatch(Message msg) Variant::List content; decode(msg, content); - if (opcode == "_query_response") agentImpl.handleQueryResponse(content, msg); - else if (opcode == "_data_indication") agentImpl.handleDataIndication(content, msg); + if (opcode == protocol::HEADER_OPCODE_QUERY_RESPONSE) agentImpl.handleQueryResponse(content, msg); + else if (opcode == protocol::HEADER_OPCODE_DATA_INDICATION) agentImpl.handleDataIndication(content, msg); else QPID_LOG(error, "Received a list-formatted QMFv2 message with opcode=" << opcode); @@ -344,9 +363,9 @@ void ConsoleSessionImpl::sendBrokerLocate() Message msg; Variant::Map& headers(msg.getProperties()); - headers["method"] = "request"; - headers["qmf.opcode"] = "_agent_locate_request"; - headers["x-amqp-0-10.app-id"] = "qmf2"; + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; msg.setReplyTo(replyAddress); msg.setCorrelationId("broker-locate"); @@ -363,9 +382,9 @@ void ConsoleSessionImpl::sendAgentLocate() Message msg; Variant::Map& headers(msg.getProperties()); - headers["method"] = "request"; - headers["qmf.opcode"] = "_agent_locate_request"; - headers["x-amqp-0-10.app-id"] = "qmf2"; + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; msg.setReplyTo(replyAddress); msg.setCorrelationId("agent-locate"); @@ -390,13 +409,17 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian return; Variant::Map attrs(iter->second.asMap()); - iter = attrs.find("epoch"); + iter = attrs.find(protocol::AGENT_ATTR_EPOCH); if (iter != attrs.end()) epoch = iter->second.asUint32(); if (cid == "broker-locate") { qpid::sys::Mutex::ScopedLock l(lock); - agent = Agent(new AgentImpl(agentName, epoch, *this)); + auto_ptr<AgentImpl> impl(new AgentImpl(agentName, epoch, *this)); + for (iter = attrs.begin(); iter != attrs.end(); iter++) + if (iter->first != protocol::AGENT_ATTR_EPOCH) + impl->setAttribute(iter->first, iter->second); + agent = Agent(impl.release()); connectedBrokerAgent = agent; if (!agentQuery || agentQuery.matchesPredicate(attrs)) { connectedBrokerInAgentList = true; @@ -430,7 +453,7 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian // auto_ptr<AgentImpl> impl(new AgentImpl(agentName, epoch, *this)); for (iter = attrs.begin(); iter != attrs.end(); iter++) - if (iter->first != "epoch") + if (iter->first != protocol::AGENT_ATTR_EPOCH) impl->setAttribute(iter->first, iter->second); agent = Agent(impl.release()); agents[agentName] = agent; @@ -459,16 +482,17 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian enqueueEventLH(ConsoleEvent(eventImpl.release())); } - iter = attrs.find("schemaUpdated"); + iter = attrs.find(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP); if (iter != attrs.end()) { uint64_t ts(iter->second.asUint64()); - if (ts > impl.getAttribute("schemaUpdated").asUint64()) { + if (ts > impl.getAttribute(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP).asUint64()) { // // The agent has added new schema entries since we last heard from it. // Enqueue a notification. // auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_SCHEMA_UPDATE)); eventImpl->setAgent(agent); + impl.setAttribute(iter->first, iter->second); enqueueEventLH(ConsoleEvent(eventImpl.release())); } } |