diff options
Diffstat (limited to 'cpp/src/qmf/ConsoleEngine.cpp')
-rw-r--r-- | cpp/src/qmf/ConsoleEngine.cpp | 260 |
1 files changed, 234 insertions, 26 deletions
diff --git a/cpp/src/qmf/ConsoleEngine.cpp b/cpp/src/qmf/ConsoleEngine.cpp index 28b2852d67..7620e875eb 100644 --- a/cpp/src/qmf/ConsoleEngine.cpp +++ b/cpp/src/qmf/ConsoleEngine.cpp @@ -25,6 +25,8 @@ #include "qmf/ObjectIdImpl.h" #include "qmf/QueryImpl.h" #include "qmf/ValueImpl.h" +#include "qmf/Protocol.h" +#include "qmf/SequenceManager.h" #include <qpid/framing/Buffer.h> #include <qpid/framing/Uuid.h> #include <qpid/framing/FieldTable.h> @@ -36,6 +38,7 @@ #include <string> #include <deque> #include <map> +#include <vector> #include <iostream> #include <fstream> #include <boost/shared_ptr.hpp> @@ -92,18 +95,9 @@ namespace qmf { BrokerEvent copy(); }; - struct BrokerProxyImpl { + class BrokerProxyImpl : public SequenceContext { + public: typedef boost::shared_ptr<BrokerProxyImpl> Ptr; - mutable Mutex lock; - BrokerProxy* envelope; - ConsoleEngineImpl* console; - string queueName; - deque<MessageImpl::Ptr> xmtQueue; - deque<BrokerEventImpl::Ptr> eventQueue; - - static const char* QMF_EXCHANGE; - static const char* DIR_EXCHANGE; - static const char* BROKER_KEY; BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console); ~BrokerProxyImpl() {} @@ -112,6 +106,7 @@ namespace qmf { void sessionClosed(); void startProtocol(); + void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey); void handleRcvMessage(Message& message); bool getXmtMessage(Message& item) const; void popXmt(); @@ -119,9 +114,41 @@ namespace qmf { bool getEvent(BrokerEvent& event) const; void popEvent(); + // From SequenceContext + void complete(); + + void addBinding(const string& exchange, const string& key); + + private: + mutable Mutex lock; + BrokerProxy* envelope; + ConsoleEngineImpl* console; + string queueName; + Uuid brokerId; + SequenceManager seqMgr; + uint32_t requestsOutstanding; + bool topicBound; + deque<MessageImpl::Ptr> xmtQueue; + deque<BrokerEventImpl::Ptr> eventQueue; + +# define MA_BUFFER_SIZE 65536 + char outputBuffer[MA_BUFFER_SIZE]; + BrokerEventImpl::Ptr eventDeclareQueue(const string& queueName); BrokerEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key); BrokerEventImpl::Ptr eventSetupComplete(); + + void handleBrokerResponse(Buffer& inBuffer, uint32_t seq); + void handlePackageIndication(Buffer& inBuffer, uint32_t seq); + void handleCommandComplete(Buffer& inBuffer, uint32_t seq); + void handleClassIndication(Buffer& inBuffer, uint32_t seq); + void handleMethodResponse(Buffer& inBuffer, uint32_t seq); + void handleHeartbeatIndication(Buffer& inBuffer, uint32_t seq); + void handleEventIndication(Buffer& inBuffer, uint32_t seq); + void handleSchemaResponse(Buffer& inBuffer, uint32_t seq); + void handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat); + void incOutstanding(); + void decOutstanding(); }; struct AgentProxyImpl { @@ -139,10 +166,6 @@ namespace qmf { ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& settings = ConsoleSettings()); ~ConsoleEngineImpl(); - void handleRcvMessage(BrokerProxy& broker, Message& message); - bool getXmtMessage(Message& item) const; - void popXmt(); - bool getEvent(ConsoleEvent& event) const; void popEvent(); @@ -175,17 +198,21 @@ namespace qmf { */ private: + friend class BrokerProxyImpl; ConsoleEngine* envelope; const ConsoleSettings& settings; mutable Mutex lock; deque<ConsoleEventImpl::Ptr> eventQueue; + vector<BrokerProxyImpl::Ptr> brokerList; + vector<pair<string, string> > bindingList; // exchange/key (empty exchange => QMF_EXCHANGE) }; } -const char* BrokerProxyImpl::QMF_EXCHANGE = "qpid.management"; -const char* BrokerProxyImpl::DIR_EXCHANGE = "amq.direct"; -const char* BrokerProxyImpl::BROKER_KEY = "broker"; - +namespace { +const char* QMF_EXCHANGE = "qpid.management"; +const char* DIR_EXCHANGE = "amq.direct"; +const char* BROKER_KEY = "broker"; +} #define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());} @@ -250,12 +277,55 @@ void BrokerProxyImpl::sessionClosed() void BrokerProxyImpl::startProtocol() { - cout << "BrokerProxyImpl::startProtocol" << endl; + Mutex::ScopedLock _lock(lock); + char rawbuffer[512]; + Buffer buffer(rawbuffer, 512); + + requestsOutstanding = 1; + topicBound = false; + Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST); + sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); + QPID_LOG(trace, "SENT BrokerRequest"); +} + +void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey) +{ + uint32_t length = buf.getPosition(); + MessageImpl::Ptr message(new MessageImpl); + + buf.reset(); + buf.getRawData(message->body, length); + message->destination = destination; + message->routingKey = routingKey; + message->replyExchange = DIR_EXCHANGE; + message->replyKey = queueName; + + xmtQueue.push_back(message); } -void BrokerProxyImpl::handleRcvMessage(Message& /*message*/) +void BrokerProxyImpl::handleRcvMessage(Message& message) { - // TODO: Dispatch the messages types + Buffer inBuffer(message.body, message.length); + uint8_t opcode; + uint32_t sequence; + + while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) { + if (opcode == Protocol::OP_BROKER_RESPONSE) handleBrokerResponse(inBuffer, sequence); + else if (opcode == Protocol::OP_PACKAGE_INDICATION) handlePackageIndication(inBuffer, sequence); + else if (opcode == Protocol::OP_COMMAND_COMPLETE) handleCommandComplete(inBuffer, sequence); + else if (opcode == Protocol::OP_CLASS_INDICATION) handleClassIndication(inBuffer, sequence); + else if (opcode == Protocol::OP_METHOD_RESPONSE) handleMethodResponse(inBuffer, sequence); + else if (opcode == Protocol::OP_HEARTBEAT_INDICATION) handleHeartbeatIndication(inBuffer, sequence); + else if (opcode == Protocol::OP_EVENT_INDICATION) handleEventIndication(inBuffer, sequence); + else if (opcode == Protocol::OP_SCHEMA_RESPONSE) handleSchemaResponse(inBuffer, sequence); + else if (opcode == Protocol::OP_PROPERTY_INDICATION) handleObjectIndication(inBuffer, sequence, true, false); + else if (opcode == Protocol::OP_STATISTIC_INDICATION) handleObjectIndication(inBuffer, sequence, false, true); + else if (opcode == Protocol::OP_OBJECT_INDICATION) handleObjectIndication(inBuffer, sequence, true, true); + else { + QPID_LOG(trace, "BrokerProxyImpl::handleRcvMessage invalid opcode: " << opcode); + break; + } + } } bool BrokerProxyImpl::getXmtMessage(Message& item) const @@ -290,6 +360,16 @@ void BrokerProxyImpl::popEvent() eventQueue.pop_front(); } +void BrokerProxyImpl::complete() +{ + decOutstanding(); +} + +void BrokerProxyImpl::addBinding(const string& exchange, const string& key) +{ + eventQueue.push_back(eventBind(exchange, queueName, key)); +} + BrokerEventImpl::Ptr BrokerProxyImpl::eventDeclareQueue(const string& queueName) { BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::DECLARE_QUEUE)); @@ -313,6 +393,89 @@ BrokerEventImpl::Ptr BrokerProxyImpl::eventSetupComplete() return event; } +void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq) +{ + // Note that this function doesn't touch requestsOutstanding. This is because + // it accounts for one request completed (the BrokerRequest) and one request + // started (the PackageRequest) which cancel each other out. + + brokerId.decode(inBuffer); + QPID_LOG(trace, "RCVD BrokerResponse seq=" << seq << " brokerId=" << brokerId); + Mutex::ScopedLock _lock(lock); + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t sequence(seqMgr.reserve(this)); + Protocol::encodeHeader(outBuffer, Protocol::OP_PACKAGE_REQUEST, sequence); + sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY); + QPID_LOG(trace, "SENT PackageRequest seq=" << sequence); +} + +void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq) +{ + string package; + + inBuffer.getShortString(package); + QPID_LOG(trace, "RCVD PackageIndication seq=" << seq << " package=" << package); +} + +void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq) +{ + string text; + uint32_t code = inBuffer.getLong(); + inBuffer.getShortString(text); + QPID_LOG(trace, "RCVD CommandComplete seq=" << seq << " code=" << code << " text=" << text); + seqMgr.release(seq); +} + +void BrokerProxyImpl::handleClassIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/) +{ + // TODO +} + +void BrokerProxyImpl::handleMethodResponse(Buffer& /*inBuffer*/, uint32_t /*seq*/) +{ + // TODO +} + +void BrokerProxyImpl::handleHeartbeatIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/) +{ + // TODO +} + +void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/) +{ + // TODO +} + +void BrokerProxyImpl::handleSchemaResponse(Buffer& /*inBuffer*/, uint32_t /*seq*/) +{ + // TODO +} + +void BrokerProxyImpl::handleObjectIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/, bool /*prop*/, bool /*stat*/) +{ + // TODO +} + +void BrokerProxyImpl::incOutstanding() +{ + Mutex::ScopedLock _lock(lock); + requestsOutstanding++; +} + +void BrokerProxyImpl::decOutstanding() +{ + Mutex::ScopedLock _lock(lock); + requestsOutstanding--; + if (requestsOutstanding == 0 && !topicBound) { + for (vector<pair<string, string> >::const_iterator iter = console->bindingList.begin(); + iter != console->bindingList.end(); iter++) { + string exchange(iter->first.empty() ? QMF_EXCHANGE : iter->first); + string key(iter->second); + eventQueue.push_back(eventBind(exchange, queueName, key)); + } + } +} + MethodResponseImpl::MethodResponseImpl(Buffer& buf) : envelope(new MethodResponse(this)) { string text; @@ -329,6 +492,19 @@ MethodResponseImpl::MethodResponseImpl(Buffer& buf) : envelope(new MethodRespons ConsoleEngineImpl::ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& s) : envelope(e), settings(s) { + bindingList.push_back(pair<string, string>(string(), "schema.#")); + if (settings.rcvObjects && settings.rcvEvents && settings.rcvHeartbeats && !settings.userBindings) { + bindingList.push_back(pair<string, string>(string(), "console.#")); + } else { + if (settings.rcvObjects && !settings.userBindings) + bindingList.push_back(pair<string, string>(string(), "console.obj.#")); + else + bindingList.push_back(pair<string, string>(string(), "console.obj.*.*.org.apache.qpid.broker.agent")); + if (settings.rcvEvents) + bindingList.push_back(pair<string, string>(string(), "console.event.#")); + if (settings.rcvHeartbeats) + bindingList.push_back(pair<string, string>(string(), "console.heartbeat.#")); + } } ConsoleEngineImpl::~ConsoleEngineImpl() @@ -354,72 +530,105 @@ void ConsoleEngineImpl::popEvent() void ConsoleEngineImpl::addConnection(BrokerProxy& /*broker*/, void* /*context*/) { + // TODO } void ConsoleEngineImpl::delConnection(BrokerProxy& /*broker*/) { + // TODO } uint32_t ConsoleEngineImpl::packageCount() const { + // TODO return 0; } const string& ConsoleEngineImpl::getPackageName(uint32_t /*idx*/) const { + // TODO static string temp; return temp; } uint32_t ConsoleEngineImpl::classCount(const char* /*packageName*/) const { + // TODO return 0; } const SchemaClassKey* ConsoleEngineImpl::getClass(const char* /*packageName*/, uint32_t /*idx*/) const { + // TODO return 0; } ClassKind ConsoleEngineImpl::getClassKind(const SchemaClassKey& /*key*/) const { + // TODO return CLASS_OBJECT; } const SchemaObjectClass* ConsoleEngineImpl::getObjectClass(const SchemaClassKey& /*key*/) const { + // TODO return 0; } const SchemaEventClass* ConsoleEngineImpl::getEventClass(const SchemaClassKey& /*key*/) const { + // TODO return 0; } -void ConsoleEngineImpl::bindPackage(const char* /*packageName*/) +void ConsoleEngineImpl::bindPackage(const char* packageName) { + stringstream key; + key << "console.obj.*.*." << packageName << ".#"; + Mutex::ScopedLock _lock(lock); + bindingList.push_back(pair<string, string>(string(), key.str())); + for (vector<BrokerProxyImpl::Ptr>::iterator iter = brokerList.begin(); + iter != brokerList.end(); iter++) + (*iter)->addBinding(QMF_EXCHANGE, key.str()); } -void ConsoleEngineImpl::bindClass(const SchemaClassKey& /*key*/) +void ConsoleEngineImpl::bindClass(const SchemaClassKey& classKey) { + stringstream key; + key << "console.obj.*.*." << classKey.getPackageName() << "." << classKey.getClassName() << ".#"; + Mutex::ScopedLock _lock(lock); + bindingList.push_back(pair<string, string>(string(), key.str())); + for (vector<BrokerProxyImpl::Ptr>::iterator iter = brokerList.begin(); + iter != brokerList.end(); iter++) + (*iter)->addBinding(QMF_EXCHANGE, key.str()); } -void ConsoleEngineImpl::bindClass(const char* /*packageName*/, const char* /*className*/) +void ConsoleEngineImpl::bindClass(const char* packageName, const char* className) { + stringstream key; + key << "console.obj.*.*." << packageName << "." << className << ".#"; + Mutex::ScopedLock _lock(lock); + bindingList.push_back(pair<string, string>(string(), key.str())); + for (vector<BrokerProxyImpl::Ptr>::iterator iter = brokerList.begin(); + iter != brokerList.end(); iter++) + (*iter)->addBinding(QMF_EXCHANGE, key.str()); } uint32_t ConsoleEngineImpl::agentCount() const { + // TODO return 0; } const AgentProxy* ConsoleEngineImpl::getAgent(uint32_t /*idx*/) const { + // TODO return 0; } void ConsoleEngineImpl::sendQuery(const Query& /*query*/, void* /*context*/) { + // TODO } /* @@ -437,7 +646,6 @@ void ConsoleEngineImpl::endSync(SyncQuery& sync) */ - //================================================================== // Wrappers //================================================================== |