diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qmf.mk | 4 | ||||
-rw-r--r-- | cpp/src/qmf/AgentEngine.cpp | 190 | ||||
-rw-r--r-- | cpp/src/qmf/ConsoleEngine.cpp | 260 | ||||
-rw-r--r-- | cpp/src/qmf/Protocol.cpp | 52 | ||||
-rw-r--r-- | cpp/src/qmf/Protocol.h | 67 | ||||
-rw-r--r-- | cpp/src/qmf/ResilientConnection.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qmf/SequenceManager.cpp | 50 | ||||
-rw-r--r-- | cpp/src/qmf/SequenceManager.h | 52 |
8 files changed, 509 insertions, 174 deletions
diff --git a/cpp/src/qmf.mk b/cpp/src/qmf.mk index a9c2f24922..54110ebaf7 100644 --- a/cpp/src/qmf.mk +++ b/cpp/src/qmf.mk @@ -49,11 +49,15 @@ libqmfcommon_la_SOURCES = \ qmf/ObjectIdImpl.h \ qmf/ObjectImpl.cpp \ qmf/ObjectImpl.h \ + qmf/Protocol.cpp \ + qmf/Protocol.h \ qmf/Query.h \ qmf/QueryImpl.cpp \ qmf/QueryImpl.h \ qmf/ResilientConnection.cpp \ qmf/ResilientConnection.h \ + qmf/SequenceManager.cpp \ + qmf/SequenceManager.h \ qmf/Schema.h \ qmf/SchemaImpl.cpp \ qmf/SchemaImpl.h \ diff --git a/cpp/src/qmf/AgentEngine.cpp b/cpp/src/qmf/AgentEngine.cpp index ec5b117337..d3204042d5 100644 --- a/cpp/src/qmf/AgentEngine.cpp +++ b/cpp/src/qmf/AgentEngine.cpp @@ -25,6 +25,7 @@ #include "qmf/ObjectIdImpl.h" #include "qmf/QueryImpl.h" #include "qmf/ValueImpl.h" +#include "qmf/Protocol.h" #include <qpid/framing/Buffer.h> #include <qpid/framing/Uuid.h> #include <qpid/framing/FieldTable.h> @@ -172,8 +173,6 @@ namespace qmf { map<string, ClassMaps> packages; - bool checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq); - void encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq = 0); AgentEventImpl::Ptr eventDeclareQueue(const string& queueName); AgentEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key); AgentEventImpl::Ptr eventSetupComplete(); @@ -268,12 +267,16 @@ void AgentEngineImpl::handleRcvMessage(Message& message) string replyToKey(message.replyKey ? message.replyKey : ""); string userId(message.userId ? message.userId : ""); - if (checkHeader(inBuffer, &opcode, &sequence)) { - if (opcode == 'a') handleAttachResponse(inBuffer); - else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey); - else if (opcode == 'x') handleConsoleAddedIndication(); - else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey, userId); - else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey, userId); + while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) { + if (opcode == Protocol::OP_ATTACH_RESPONSE) handleAttachResponse(inBuffer); + else if (opcode == Protocol::OP_SCHEMA_REQUEST) handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey); + else if (opcode == Protocol::OP_CONSOLE_ADDED_INDICATION) handleConsoleAddedIndication(); + else if (opcode == Protocol::OP_GET_QUERY) handleGetQuery(inBuffer, sequence, replyToKey, userId); + else if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(inBuffer, sequence, replyToKey, userId); + else { + QPID_LOG(error, "AgentEngineImpl::handleRcvMessage invalid opcode=" << opcode); + break; + } } } @@ -325,7 +328,7 @@ void AgentEngineImpl::startProtocol() char rawbuffer[512]; Buffer buffer(rawbuffer, 512); - encodeHeader(buffer, 'A'); + Protocol::encodeHeader(buffer, Protocol::OP_ATTACH_REQUEST); buffer.putShortString("qmfa"); systemId.encode(buffer); buffer.putLong(requestedBrokerBank); @@ -340,7 +343,7 @@ void AgentEngineImpl::heartbeat() Mutex::ScopedLock _lock(lock); Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 'h'); + Protocol::encodeHeader(buffer, Protocol::OP_HEARTBEAT_INDICATION); buffer.putLongLong(uint64_t(Duration(now()))); stringstream key; key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank; @@ -349,7 +352,7 @@ void AgentEngineImpl::heartbeat() } void AgentEngineImpl::methodResponse(uint32_t sequence, uint32_t status, char* text, - const Value& argMap) + const Value& argMap) { Mutex::ScopedLock _lock(lock); map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence); @@ -359,7 +362,7 @@ void AgentEngineImpl::methodResponse(uint32_t sequence, uint32_t status, char* t contextMap.erase(iter); Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 'm', context->sequence); + Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, context->sequence); buffer.putLong(status); buffer.putMediumString(text); if (status == 0) { @@ -390,7 +393,7 @@ void AgentEngineImpl::queryResponse(uint32_t sequence, Object& object, bool prop AgentQueryContext::Ptr context = iter->second; Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 'g', context->sequence); + Protocol::encodeHeader(buffer, Protocol::OP_OBJECT_INDICATION, context->sequence); object.impl->encodeSchemaKey(buffer); object.impl->encodeManagedObjectData(buffer); @@ -477,30 +480,6 @@ void AgentEngineImpl::raiseEvent(Event&) Mutex::ScopedLock _lock(lock); } -void AgentEngineImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq) -{ - buf.putOctet('A'); - buf.putOctet('M'); - buf.putOctet('3'); - buf.putOctet(opcode); - buf.putLong (seq); -} - -bool AgentEngineImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq) -{ - if (buf.getSize() < 8) - return false; - - uint8_t h1 = buf.getOctet(); - uint8_t h2 = buf.getOctet(); - uint8_t h3 = buf.getOctet(); - - *opcode = buf.getOctet(); - *seq = buf.getLong(); - - return h1 == 'A' && h2 == 'M' && h3 == '3'; -} - AgentEventImpl::Ptr AgentEngineImpl::eventDeclareQueue(const string& name) { AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::DECLARE_QUEUE)); @@ -570,7 +549,7 @@ void AgentEngineImpl::sendBufferLH(Buffer& buf, const string& destination, const void AgentEngineImpl::sendPackageIndicationLH(const string& packageName) { Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 'p'); + Protocol::encodeHeader(buffer, Protocol::OP_PACKAGE_INDICATION); buffer.putShortString(packageName); sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); QPID_LOG(trace, "SENT PackageIndication: package_name=" << packageName); @@ -579,7 +558,7 @@ void AgentEngineImpl::sendPackageIndicationLH(const string& packageName) void AgentEngineImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key) { Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 'q'); + Protocol::encodeHeader(buffer, Protocol::OP_CLASS_INDICATION); buffer.putOctet((int) kind); buffer.putShortString(packageName); buffer.putShortString(key.name); @@ -592,7 +571,7 @@ void AgentEngineImpl::sendCommandCompleteLH(const string& exchange, const string uint32_t sequence, uint32_t code, const string& text) { Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 'z', sequence); + Protocol::encodeHeader(buffer, Protocol::OP_COMMAND_COMPLETE, sequence); buffer.putLong(code); buffer.putShortString(text); sendBufferLH(buffer, exchange, replyToKey); @@ -602,7 +581,7 @@ void AgentEngineImpl::sendCommandCompleteLH(const string& exchange, const string void AgentEngineImpl::sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text) { Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 'm', sequence); + Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, sequence); buffer.putLong(code); string fulltext; @@ -710,7 +689,7 @@ void AgentEngineImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, if (ocIter != cMap.objectClasses.end()) { SchemaObjectClassImpl* oImpl = ocIter->second; Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 's', sequence); + Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence); oImpl->encode(buffer); sendBufferLH(buffer, rExchange, rKey); QPID_LOG(trace, "SENT SchemaResponse: (object) package=" << packageName << " class=" << key.name); @@ -721,7 +700,7 @@ void AgentEngineImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, if (ecIter != cMap.eventClasses.end()) { SchemaEventClassImpl* eImpl = ecIter->second; Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 's', sequence); + Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence); eImpl->encode(buffer); sendBufferLH(buffer, rExchange, rKey); QPID_LOG(trace, "SENT SchemaResponse: (event) package=" << packageName << " class=" << key.name); @@ -851,108 +830,25 @@ void AgentEngineImpl::handleConsoleAddedIndication() // Wrappers //================================================================== -AgentEngine::AgentEngine(char* label, bool internalStore) -{ - impl = new AgentEngineImpl(label, internalStore); -} - -AgentEngine::~AgentEngine() -{ - delete impl; -} - -void AgentEngine::setStoreDir(const char* path) -{ - impl->setStoreDir(path); -} - -void AgentEngine::setTransferDir(const char* path) -{ - impl->setTransferDir(path); -} - -void AgentEngine::handleRcvMessage(Message& message) -{ - impl->handleRcvMessage(message); -} - -bool AgentEngine::getXmtMessage(Message& item) const -{ - return impl->getXmtMessage(item); -} - -void AgentEngine::popXmt() -{ - impl->popXmt(); -} - -bool AgentEngine::getEvent(AgentEvent& event) const -{ - return impl->getEvent(event); -} - -void AgentEngine::popEvent() -{ - impl->popEvent(); -} - -void AgentEngine::newSession() -{ - impl->newSession(); -} - -void AgentEngine::startProtocol() -{ - impl->startProtocol(); -} - -void AgentEngine::heartbeat() -{ - impl->heartbeat(); -} - -void AgentEngine::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments) -{ - impl->methodResponse(sequence, status, text, arguments); -} - -void AgentEngine::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) -{ - impl->queryResponse(sequence, object, prop, stat); -} - -void AgentEngine::queryComplete(uint32_t sequence) -{ - impl->queryComplete(sequence); -} - -void AgentEngine::registerClass(SchemaObjectClass* cls) -{ - impl->registerClass(cls); -} - -void AgentEngine::registerClass(SchemaEventClass* cls) -{ - impl->registerClass(cls); -} - -const ObjectId* AgentEngine::addObject(Object& obj, uint64_t persistId) -{ - return impl->addObject(obj, persistId); -} - -const ObjectId* AgentEngine::allocObjectId(uint64_t persistId) -{ - return impl->allocObjectId(persistId); -} - -const ObjectId* AgentEngine::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) -{ - return impl->allocObjectId(persistIdLo, persistIdHi); -} - -void AgentEngine::raiseEvent(Event& event) -{ - impl->raiseEvent(event); -} +AgentEngine::AgentEngine(char* label, bool internalStore) { impl = new AgentEngineImpl(label, internalStore); } +AgentEngine::~AgentEngine() { delete impl; } +void AgentEngine::setStoreDir(const char* path) { impl->setStoreDir(path); } +void AgentEngine::setTransferDir(const char* path) { impl->setTransferDir(path); } +void AgentEngine::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); } +bool AgentEngine::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); } +void AgentEngine::popXmt() { impl->popXmt(); } +bool AgentEngine::getEvent(AgentEvent& event) const { return impl->getEvent(event); } +void AgentEngine::popEvent() { impl->popEvent(); } +void AgentEngine::newSession() { impl->newSession(); } +void AgentEngine::startProtocol() { impl->startProtocol(); } +void AgentEngine::heartbeat() { impl->heartbeat(); } +void AgentEngine::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments) { impl->methodResponse(sequence, status, text, arguments); } +void AgentEngine::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) { impl->queryResponse(sequence, object, prop, stat); } +void AgentEngine::queryComplete(uint32_t sequence) { impl->queryComplete(sequence); } +void AgentEngine::registerClass(SchemaObjectClass* cls) { impl->registerClass(cls); } +void AgentEngine::registerClass(SchemaEventClass* cls) { impl->registerClass(cls); } +const ObjectId* AgentEngine::addObject(Object& obj, uint64_t persistId) { return impl->addObject(obj, persistId); } +const ObjectId* AgentEngine::allocObjectId(uint64_t persistId) { return impl->allocObjectId(persistId); } +const ObjectId* AgentEngine::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) { return impl->allocObjectId(persistIdLo, persistIdHi); } +void AgentEngine::raiseEvent(Event& event) { impl->raiseEvent(event); } diff --git a/cpp/src/qmf/ConsoleEngine.cpp b/cpp/src/qmf/ConsoleEngine.cpp index 28b2852d67..7620e875eb 100644 --- a/cpp/src/qmf/ConsoleEngine.cpp +++ b/cpp/src/qmf/ConsoleEngine.cpp @@ -25,6 +25,8 @@ #include "qmf/ObjectIdImpl.h" #include "qmf/QueryImpl.h" #include "qmf/ValueImpl.h" +#include "qmf/Protocol.h" +#include "qmf/SequenceManager.h" #include <qpid/framing/Buffer.h> #include <qpid/framing/Uuid.h> #include <qpid/framing/FieldTable.h> @@ -36,6 +38,7 @@ #include <string> #include <deque> #include <map> +#include <vector> #include <iostream> #include <fstream> #include <boost/shared_ptr.hpp> @@ -92,18 +95,9 @@ namespace qmf { BrokerEvent copy(); }; - struct BrokerProxyImpl { + class BrokerProxyImpl : public SequenceContext { + public: typedef boost::shared_ptr<BrokerProxyImpl> Ptr; - mutable Mutex lock; - BrokerProxy* envelope; - ConsoleEngineImpl* console; - string queueName; - deque<MessageImpl::Ptr> xmtQueue; - deque<BrokerEventImpl::Ptr> eventQueue; - - static const char* QMF_EXCHANGE; - static const char* DIR_EXCHANGE; - static const char* BROKER_KEY; BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console); ~BrokerProxyImpl() {} @@ -112,6 +106,7 @@ namespace qmf { void sessionClosed(); void startProtocol(); + void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey); void handleRcvMessage(Message& message); bool getXmtMessage(Message& item) const; void popXmt(); @@ -119,9 +114,41 @@ namespace qmf { bool getEvent(BrokerEvent& event) const; void popEvent(); + // From SequenceContext + void complete(); + + void addBinding(const string& exchange, const string& key); + + private: + mutable Mutex lock; + BrokerProxy* envelope; + ConsoleEngineImpl* console; + string queueName; + Uuid brokerId; + SequenceManager seqMgr; + uint32_t requestsOutstanding; + bool topicBound; + deque<MessageImpl::Ptr> xmtQueue; + deque<BrokerEventImpl::Ptr> eventQueue; + +# define MA_BUFFER_SIZE 65536 + char outputBuffer[MA_BUFFER_SIZE]; + BrokerEventImpl::Ptr eventDeclareQueue(const string& queueName); BrokerEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key); BrokerEventImpl::Ptr eventSetupComplete(); + + void handleBrokerResponse(Buffer& inBuffer, uint32_t seq); + void handlePackageIndication(Buffer& inBuffer, uint32_t seq); + void handleCommandComplete(Buffer& inBuffer, uint32_t seq); + void handleClassIndication(Buffer& inBuffer, uint32_t seq); + void handleMethodResponse(Buffer& inBuffer, uint32_t seq); + void handleHeartbeatIndication(Buffer& inBuffer, uint32_t seq); + void handleEventIndication(Buffer& inBuffer, uint32_t seq); + void handleSchemaResponse(Buffer& inBuffer, uint32_t seq); + void handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat); + void incOutstanding(); + void decOutstanding(); }; struct AgentProxyImpl { @@ -139,10 +166,6 @@ namespace qmf { ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& settings = ConsoleSettings()); ~ConsoleEngineImpl(); - void handleRcvMessage(BrokerProxy& broker, Message& message); - bool getXmtMessage(Message& item) const; - void popXmt(); - bool getEvent(ConsoleEvent& event) const; void popEvent(); @@ -175,17 +198,21 @@ namespace qmf { */ private: + friend class BrokerProxyImpl; ConsoleEngine* envelope; const ConsoleSettings& settings; mutable Mutex lock; deque<ConsoleEventImpl::Ptr> eventQueue; + vector<BrokerProxyImpl::Ptr> brokerList; + vector<pair<string, string> > bindingList; // exchange/key (empty exchange => QMF_EXCHANGE) }; } -const char* BrokerProxyImpl::QMF_EXCHANGE = "qpid.management"; -const char* BrokerProxyImpl::DIR_EXCHANGE = "amq.direct"; -const char* BrokerProxyImpl::BROKER_KEY = "broker"; - +namespace { +const char* QMF_EXCHANGE = "qpid.management"; +const char* DIR_EXCHANGE = "amq.direct"; +const char* BROKER_KEY = "broker"; +} #define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());} @@ -250,12 +277,55 @@ void BrokerProxyImpl::sessionClosed() void BrokerProxyImpl::startProtocol() { - cout << "BrokerProxyImpl::startProtocol" << endl; + Mutex::ScopedLock _lock(lock); + char rawbuffer[512]; + Buffer buffer(rawbuffer, 512); + + requestsOutstanding = 1; + topicBound = false; + Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST); + sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); + QPID_LOG(trace, "SENT BrokerRequest"); +} + +void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey) +{ + uint32_t length = buf.getPosition(); + MessageImpl::Ptr message(new MessageImpl); + + buf.reset(); + buf.getRawData(message->body, length); + message->destination = destination; + message->routingKey = routingKey; + message->replyExchange = DIR_EXCHANGE; + message->replyKey = queueName; + + xmtQueue.push_back(message); } -void BrokerProxyImpl::handleRcvMessage(Message& /*message*/) +void BrokerProxyImpl::handleRcvMessage(Message& message) { - // TODO: Dispatch the messages types + Buffer inBuffer(message.body, message.length); + uint8_t opcode; + uint32_t sequence; + + while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) { + if (opcode == Protocol::OP_BROKER_RESPONSE) handleBrokerResponse(inBuffer, sequence); + else if (opcode == Protocol::OP_PACKAGE_INDICATION) handlePackageIndication(inBuffer, sequence); + else if (opcode == Protocol::OP_COMMAND_COMPLETE) handleCommandComplete(inBuffer, sequence); + else if (opcode == Protocol::OP_CLASS_INDICATION) handleClassIndication(inBuffer, sequence); + else if (opcode == Protocol::OP_METHOD_RESPONSE) handleMethodResponse(inBuffer, sequence); + else if (opcode == Protocol::OP_HEARTBEAT_INDICATION) handleHeartbeatIndication(inBuffer, sequence); + else if (opcode == Protocol::OP_EVENT_INDICATION) handleEventIndication(inBuffer, sequence); + else if (opcode == Protocol::OP_SCHEMA_RESPONSE) handleSchemaResponse(inBuffer, sequence); + else if (opcode == Protocol::OP_PROPERTY_INDICATION) handleObjectIndication(inBuffer, sequence, true, false); + else if (opcode == Protocol::OP_STATISTIC_INDICATION) handleObjectIndication(inBuffer, sequence, false, true); + else if (opcode == Protocol::OP_OBJECT_INDICATION) handleObjectIndication(inBuffer, sequence, true, true); + else { + QPID_LOG(trace, "BrokerProxyImpl::handleRcvMessage invalid opcode: " << opcode); + break; + } + } } bool BrokerProxyImpl::getXmtMessage(Message& item) const @@ -290,6 +360,16 @@ void BrokerProxyImpl::popEvent() eventQueue.pop_front(); } +void BrokerProxyImpl::complete() +{ + decOutstanding(); +} + +void BrokerProxyImpl::addBinding(const string& exchange, const string& key) +{ + eventQueue.push_back(eventBind(exchange, queueName, key)); +} + BrokerEventImpl::Ptr BrokerProxyImpl::eventDeclareQueue(const string& queueName) { BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::DECLARE_QUEUE)); @@ -313,6 +393,89 @@ BrokerEventImpl::Ptr BrokerProxyImpl::eventSetupComplete() return event; } +void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq) +{ + // Note that this function doesn't touch requestsOutstanding. This is because + // it accounts for one request completed (the BrokerRequest) and one request + // started (the PackageRequest) which cancel each other out. + + brokerId.decode(inBuffer); + QPID_LOG(trace, "RCVD BrokerResponse seq=" << seq << " brokerId=" << brokerId); + Mutex::ScopedLock _lock(lock); + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t sequence(seqMgr.reserve(this)); + Protocol::encodeHeader(outBuffer, Protocol::OP_PACKAGE_REQUEST, sequence); + sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY); + QPID_LOG(trace, "SENT PackageRequest seq=" << sequence); +} + +void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq) +{ + string package; + + inBuffer.getShortString(package); + QPID_LOG(trace, "RCVD PackageIndication seq=" << seq << " package=" << package); +} + +void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq) +{ + string text; + uint32_t code = inBuffer.getLong(); + inBuffer.getShortString(text); + QPID_LOG(trace, "RCVD CommandComplete seq=" << seq << " code=" << code << " text=" << text); + seqMgr.release(seq); +} + +void BrokerProxyImpl::handleClassIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/) +{ + // TODO +} + +void BrokerProxyImpl::handleMethodResponse(Buffer& /*inBuffer*/, uint32_t /*seq*/) +{ + // TODO +} + +void BrokerProxyImpl::handleHeartbeatIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/) +{ + // TODO +} + +void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/) +{ + // TODO +} + +void BrokerProxyImpl::handleSchemaResponse(Buffer& /*inBuffer*/, uint32_t /*seq*/) +{ + // TODO +} + +void BrokerProxyImpl::handleObjectIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/, bool /*prop*/, bool /*stat*/) +{ + // TODO +} + +void BrokerProxyImpl::incOutstanding() +{ + Mutex::ScopedLock _lock(lock); + requestsOutstanding++; +} + +void BrokerProxyImpl::decOutstanding() +{ + Mutex::ScopedLock _lock(lock); + requestsOutstanding--; + if (requestsOutstanding == 0 && !topicBound) { + for (vector<pair<string, string> >::const_iterator iter = console->bindingList.begin(); + iter != console->bindingList.end(); iter++) { + string exchange(iter->first.empty() ? QMF_EXCHANGE : iter->first); + string key(iter->second); + eventQueue.push_back(eventBind(exchange, queueName, key)); + } + } +} + MethodResponseImpl::MethodResponseImpl(Buffer& buf) : envelope(new MethodResponse(this)) { string text; @@ -329,6 +492,19 @@ MethodResponseImpl::MethodResponseImpl(Buffer& buf) : envelope(new MethodRespons ConsoleEngineImpl::ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& s) : envelope(e), settings(s) { + bindingList.push_back(pair<string, string>(string(), "schema.#")); + if (settings.rcvObjects && settings.rcvEvents && settings.rcvHeartbeats && !settings.userBindings) { + bindingList.push_back(pair<string, string>(string(), "console.#")); + } else { + if (settings.rcvObjects && !settings.userBindings) + bindingList.push_back(pair<string, string>(string(), "console.obj.#")); + else + bindingList.push_back(pair<string, string>(string(), "console.obj.*.*.org.apache.qpid.broker.agent")); + if (settings.rcvEvents) + bindingList.push_back(pair<string, string>(string(), "console.event.#")); + if (settings.rcvHeartbeats) + bindingList.push_back(pair<string, string>(string(), "console.heartbeat.#")); + } } ConsoleEngineImpl::~ConsoleEngineImpl() @@ -354,72 +530,105 @@ void ConsoleEngineImpl::popEvent() void ConsoleEngineImpl::addConnection(BrokerProxy& /*broker*/, void* /*context*/) { + // TODO } void ConsoleEngineImpl::delConnection(BrokerProxy& /*broker*/) { + // TODO } uint32_t ConsoleEngineImpl::packageCount() const { + // TODO return 0; } const string& ConsoleEngineImpl::getPackageName(uint32_t /*idx*/) const { + // TODO static string temp; return temp; } uint32_t ConsoleEngineImpl::classCount(const char* /*packageName*/) const { + // TODO return 0; } const SchemaClassKey* ConsoleEngineImpl::getClass(const char* /*packageName*/, uint32_t /*idx*/) const { + // TODO return 0; } ClassKind ConsoleEngineImpl::getClassKind(const SchemaClassKey& /*key*/) const { + // TODO return CLASS_OBJECT; } const SchemaObjectClass* ConsoleEngineImpl::getObjectClass(const SchemaClassKey& /*key*/) const { + // TODO return 0; } const SchemaEventClass* ConsoleEngineImpl::getEventClass(const SchemaClassKey& /*key*/) const { + // TODO return 0; } -void ConsoleEngineImpl::bindPackage(const char* /*packageName*/) +void ConsoleEngineImpl::bindPackage(const char* packageName) { + stringstream key; + key << "console.obj.*.*." << packageName << ".#"; + Mutex::ScopedLock _lock(lock); + bindingList.push_back(pair<string, string>(string(), key.str())); + for (vector<BrokerProxyImpl::Ptr>::iterator iter = brokerList.begin(); + iter != brokerList.end(); iter++) + (*iter)->addBinding(QMF_EXCHANGE, key.str()); } -void ConsoleEngineImpl::bindClass(const SchemaClassKey& /*key*/) +void ConsoleEngineImpl::bindClass(const SchemaClassKey& classKey) { + stringstream key; + key << "console.obj.*.*." << classKey.getPackageName() << "." << classKey.getClassName() << ".#"; + Mutex::ScopedLock _lock(lock); + bindingList.push_back(pair<string, string>(string(), key.str())); + for (vector<BrokerProxyImpl::Ptr>::iterator iter = brokerList.begin(); + iter != brokerList.end(); iter++) + (*iter)->addBinding(QMF_EXCHANGE, key.str()); } -void ConsoleEngineImpl::bindClass(const char* /*packageName*/, const char* /*className*/) +void ConsoleEngineImpl::bindClass(const char* packageName, const char* className) { + stringstream key; + key << "console.obj.*.*." << packageName << "." << className << ".#"; + Mutex::ScopedLock _lock(lock); + bindingList.push_back(pair<string, string>(string(), key.str())); + for (vector<BrokerProxyImpl::Ptr>::iterator iter = brokerList.begin(); + iter != brokerList.end(); iter++) + (*iter)->addBinding(QMF_EXCHANGE, key.str()); } uint32_t ConsoleEngineImpl::agentCount() const { + // TODO return 0; } const AgentProxy* ConsoleEngineImpl::getAgent(uint32_t /*idx*/) const { + // TODO return 0; } void ConsoleEngineImpl::sendQuery(const Query& /*query*/, void* /*context*/) { + // TODO } /* @@ -437,7 +646,6 @@ void ConsoleEngineImpl::endSync(SyncQuery& sync) */ - //================================================================== // Wrappers //================================================================== diff --git a/cpp/src/qmf/Protocol.cpp b/cpp/src/qmf/Protocol.cpp new file mode 100644 index 0000000000..0a3beeb276 --- /dev/null +++ b/cpp/src/qmf/Protocol.cpp @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qmf/Protocol.h" +#include "qpid/framing/Buffer.h" + +using namespace std; +using namespace qmf; +using namespace qpid::framing; + + +bool Protocol::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq) +{ + if (buf.available() < 8) + return false; + + uint8_t h1 = buf.getOctet(); + uint8_t h2 = buf.getOctet(); + uint8_t h3 = buf.getOctet(); + + *opcode = buf.getOctet(); + *seq = buf.getLong(); + + return h1 == 'A' && h2 == 'M' && h3 == '3'; +} + +void Protocol::encodeHeader(qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq) +{ + buf.putOctet('A'); + buf.putOctet('M'); + buf.putOctet('3'); + buf.putOctet(opcode); + buf.putLong (seq); +} + + diff --git a/cpp/src/qmf/Protocol.h b/cpp/src/qmf/Protocol.h new file mode 100644 index 0000000000..d5da08c1db --- /dev/null +++ b/cpp/src/qmf/Protocol.h @@ -0,0 +1,67 @@ +#ifndef _QmfProtocol_ +#define _QmfProtocol_ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/sys/IntegerTypes.h> + +namespace qpid { + namespace framing { + class Buffer; + } +} + +namespace qmf { + + class Protocol { + public: + static bool checkHeader(qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); + static void encodeHeader(qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); + + const static uint8_t OP_ATTACH_REQUEST = 'A'; + const static uint8_t OP_ATTACH_RESPONSE = 'a'; + + const static uint8_t OP_BROKER_REQUEST = 'B'; + const static uint8_t OP_BROKER_RESPONSE = 'b'; + + const static uint8_t OP_CONSOLE_ADDED_INDICATION = 'x'; + const static uint8_t OP_COMMAND_COMPLETE = 'z'; + const static uint8_t OP_HEARTBEAT_INDICATION = 'h'; + + const static uint8_t OP_PACKAGE_REQUEST = 'P'; + const static uint8_t OP_PACKAGE_INDICATION = 'p'; + const static uint8_t OP_CLASS_QUERY = 'Q'; + const static uint8_t OP_CLASS_INDICATION = 'q'; + const static uint8_t OP_SCHEMA_REQUEST = 'S'; + const static uint8_t OP_SCHEMA_RESPONSE = 's'; + + const static uint8_t OP_METHOD_REQUEST = 'M'; + const static uint8_t OP_METHOD_RESPONSE = 'm'; + const static uint8_t OP_GET_QUERY = 'G'; + const static uint8_t OP_OBJECT_INDICATION = 'g'; + const static uint8_t OP_PROPERTY_INDICATION = 'c'; + const static uint8_t OP_STATISTIC_INDICATION = 'i'; + const static uint8_t OP_EVENT_INDICATION = 'e'; + }; + +} + +#endif + diff --git a/cpp/src/qmf/ResilientConnection.cpp b/cpp/src/qmf/ResilientConnection.cpp index 88b9169c75..a6f9eddcde 100644 --- a/cpp/src/qmf/ResilientConnection.cpp +++ b/cpp/src/qmf/ResilientConnection.cpp @@ -318,6 +318,7 @@ void ResilientConnectionImpl::run() while (true) { try { + QPID_LOG(trace, "Trying to open connection..."); connection.open(settings.impl->getClientSettings()); { Mutex::ScopedLock _lock(lock); @@ -326,6 +327,7 @@ void ResilientConnectionImpl::run() while (connected) cond.wait(lock); + delay = delayMin; while (!sessions.empty()) { set<RCSession::Ptr>::iterator iter = sessions.begin(); @@ -334,6 +336,11 @@ void ResilientConnectionImpl::run() EnqueueEvent(ResilientConnectionEvent::SESSION_CLOSED, sess->userContext); Mutex::ScopedUnlock _u(lock); sess->stop(); + + // Nullify the intrusive pointer within the scoped unlock, otherwise, + // the reference is held until overwritted above (under lock) which causes + // the session destructor to be called with the lock held. + sess = 0; } EnqueueEvent(ResilientConnectionEvent::DISCONNECTED); @@ -341,7 +348,6 @@ void ResilientConnectionImpl::run() if (shutdown) return; } - delay = delayMin; connection.close(); } catch (exception &e) { QPID_LOG(debug, "connection.open exception: " << e.what()); diff --git a/cpp/src/qmf/SequenceManager.cpp b/cpp/src/qmf/SequenceManager.cpp new file mode 100644 index 0000000000..f51ce9d8b8 --- /dev/null +++ b/cpp/src/qmf/SequenceManager.cpp @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qmf/SequenceManager.h" + +using namespace std; +using namespace qmf; +using namespace qpid::sys; + +SequenceManager::SequenceManager() : nextSequence(1) {} + +uint32_t SequenceManager::reserve(SequenceContext* ctx) +{ + Mutex::ScopedLock _lock(lock); + uint32_t seq = nextSequence; + while (contextMap.find(seq) != contextMap.end()) + seq = seq < 0xFFFFFFFF ? seq + 1 : 1; + nextSequence = seq < 0xFFFFFFFF ? seq + 1 : 1; + contextMap[seq] = ctx; + return seq; +} + +void SequenceManager::release(uint32_t sequence) +{ + Mutex::ScopedLock _lock(lock); + map<uint32_t, SequenceContext*>::iterator iter = contextMap.find(sequence); + if (iter != contextMap.end()) { + if (iter->second != 0) + iter->second->complete(); + contextMap.erase(iter); + } +} + + diff --git a/cpp/src/qmf/SequenceManager.h b/cpp/src/qmf/SequenceManager.h new file mode 100644 index 0000000000..c027872313 --- /dev/null +++ b/cpp/src/qmf/SequenceManager.h @@ -0,0 +1,52 @@ +#ifndef _QmfSequenceManager_ +#define _QmfSequenceManager_ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qpid/sys/Mutex.h" +#include <map> + +namespace qmf { + + class SequenceContext { + public: + SequenceContext() {} + virtual ~SequenceContext() {} + + virtual void complete() = 0; + }; + + class SequenceManager { + public: + SequenceManager(); + + uint32_t reserve(SequenceContext* ctx); + void release(uint32_t sequence); + + private: + mutable qpid::sys::Mutex lock; + uint32_t nextSequence; + std::map<uint32_t, SequenceContext*> contextMap; + }; + +} + +#endif + |