summaryrefslogtreecommitdiff
path: root/cpp/src/qmf/ConsoleEngine.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qmf/ConsoleEngine.cpp')
-rw-r--r--cpp/src/qmf/ConsoleEngine.cpp260
1 files changed, 234 insertions, 26 deletions
diff --git a/cpp/src/qmf/ConsoleEngine.cpp b/cpp/src/qmf/ConsoleEngine.cpp
index 28b2852d67..7620e875eb 100644
--- a/cpp/src/qmf/ConsoleEngine.cpp
+++ b/cpp/src/qmf/ConsoleEngine.cpp
@@ -25,6 +25,8 @@
#include "qmf/ObjectIdImpl.h"
#include "qmf/QueryImpl.h"
#include "qmf/ValueImpl.h"
+#include "qmf/Protocol.h"
+#include "qmf/SequenceManager.h"
#include <qpid/framing/Buffer.h>
#include <qpid/framing/Uuid.h>
#include <qpid/framing/FieldTable.h>
@@ -36,6 +38,7 @@
#include <string>
#include <deque>
#include <map>
+#include <vector>
#include <iostream>
#include <fstream>
#include <boost/shared_ptr.hpp>
@@ -92,18 +95,9 @@ namespace qmf {
BrokerEvent copy();
};
- struct BrokerProxyImpl {
+ class BrokerProxyImpl : public SequenceContext {
+ public:
typedef boost::shared_ptr<BrokerProxyImpl> Ptr;
- mutable Mutex lock;
- BrokerProxy* envelope;
- ConsoleEngineImpl* console;
- string queueName;
- deque<MessageImpl::Ptr> xmtQueue;
- deque<BrokerEventImpl::Ptr> eventQueue;
-
- static const char* QMF_EXCHANGE;
- static const char* DIR_EXCHANGE;
- static const char* BROKER_KEY;
BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console);
~BrokerProxyImpl() {}
@@ -112,6 +106,7 @@ namespace qmf {
void sessionClosed();
void startProtocol();
+ void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey);
void handleRcvMessage(Message& message);
bool getXmtMessage(Message& item) const;
void popXmt();
@@ -119,9 +114,41 @@ namespace qmf {
bool getEvent(BrokerEvent& event) const;
void popEvent();
+ // From SequenceContext
+ void complete();
+
+ void addBinding(const string& exchange, const string& key);
+
+ private:
+ mutable Mutex lock;
+ BrokerProxy* envelope;
+ ConsoleEngineImpl* console;
+ string queueName;
+ Uuid brokerId;
+ SequenceManager seqMgr;
+ uint32_t requestsOutstanding;
+ bool topicBound;
+ deque<MessageImpl::Ptr> xmtQueue;
+ deque<BrokerEventImpl::Ptr> eventQueue;
+
+# define MA_BUFFER_SIZE 65536
+ char outputBuffer[MA_BUFFER_SIZE];
+
BrokerEventImpl::Ptr eventDeclareQueue(const string& queueName);
BrokerEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key);
BrokerEventImpl::Ptr eventSetupComplete();
+
+ void handleBrokerResponse(Buffer& inBuffer, uint32_t seq);
+ void handlePackageIndication(Buffer& inBuffer, uint32_t seq);
+ void handleCommandComplete(Buffer& inBuffer, uint32_t seq);
+ void handleClassIndication(Buffer& inBuffer, uint32_t seq);
+ void handleMethodResponse(Buffer& inBuffer, uint32_t seq);
+ void handleHeartbeatIndication(Buffer& inBuffer, uint32_t seq);
+ void handleEventIndication(Buffer& inBuffer, uint32_t seq);
+ void handleSchemaResponse(Buffer& inBuffer, uint32_t seq);
+ void handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat);
+ void incOutstanding();
+ void decOutstanding();
};
struct AgentProxyImpl {
@@ -139,10 +166,6 @@ namespace qmf {
ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& settings = ConsoleSettings());
~ConsoleEngineImpl();
- void handleRcvMessage(BrokerProxy& broker, Message& message);
- bool getXmtMessage(Message& item) const;
- void popXmt();
-
bool getEvent(ConsoleEvent& event) const;
void popEvent();
@@ -175,17 +198,21 @@ namespace qmf {
*/
private:
+ friend class BrokerProxyImpl;
ConsoleEngine* envelope;
const ConsoleSettings& settings;
mutable Mutex lock;
deque<ConsoleEventImpl::Ptr> eventQueue;
+ vector<BrokerProxyImpl::Ptr> brokerList;
+ vector<pair<string, string> > bindingList; // exchange/key (empty exchange => QMF_EXCHANGE)
};
}
-const char* BrokerProxyImpl::QMF_EXCHANGE = "qpid.management";
-const char* BrokerProxyImpl::DIR_EXCHANGE = "amq.direct";
-const char* BrokerProxyImpl::BROKER_KEY = "broker";
-
+namespace {
+const char* QMF_EXCHANGE = "qpid.management";
+const char* DIR_EXCHANGE = "amq.direct";
+const char* BROKER_KEY = "broker";
+}
#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
@@ -250,12 +277,55 @@ void BrokerProxyImpl::sessionClosed()
void BrokerProxyImpl::startProtocol()
{
- cout << "BrokerProxyImpl::startProtocol" << endl;
+ Mutex::ScopedLock _lock(lock);
+ char rawbuffer[512];
+ Buffer buffer(rawbuffer, 512);
+
+ requestsOutstanding = 1;
+ topicBound = false;
+ Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST);
+ sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT BrokerRequest");
+}
+
+void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
+{
+ uint32_t length = buf.getPosition();
+ MessageImpl::Ptr message(new MessageImpl);
+
+ buf.reset();
+ buf.getRawData(message->body, length);
+ message->destination = destination;
+ message->routingKey = routingKey;
+ message->replyExchange = DIR_EXCHANGE;
+ message->replyKey = queueName;
+
+ xmtQueue.push_back(message);
}
-void BrokerProxyImpl::handleRcvMessage(Message& /*message*/)
+void BrokerProxyImpl::handleRcvMessage(Message& message)
{
- // TODO: Dispatch the messages types
+ Buffer inBuffer(message.body, message.length);
+ uint8_t opcode;
+ uint32_t sequence;
+
+ while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) {
+ if (opcode == Protocol::OP_BROKER_RESPONSE) handleBrokerResponse(inBuffer, sequence);
+ else if (opcode == Protocol::OP_PACKAGE_INDICATION) handlePackageIndication(inBuffer, sequence);
+ else if (opcode == Protocol::OP_COMMAND_COMPLETE) handleCommandComplete(inBuffer, sequence);
+ else if (opcode == Protocol::OP_CLASS_INDICATION) handleClassIndication(inBuffer, sequence);
+ else if (opcode == Protocol::OP_METHOD_RESPONSE) handleMethodResponse(inBuffer, sequence);
+ else if (opcode == Protocol::OP_HEARTBEAT_INDICATION) handleHeartbeatIndication(inBuffer, sequence);
+ else if (opcode == Protocol::OP_EVENT_INDICATION) handleEventIndication(inBuffer, sequence);
+ else if (opcode == Protocol::OP_SCHEMA_RESPONSE) handleSchemaResponse(inBuffer, sequence);
+ else if (opcode == Protocol::OP_PROPERTY_INDICATION) handleObjectIndication(inBuffer, sequence, true, false);
+ else if (opcode == Protocol::OP_STATISTIC_INDICATION) handleObjectIndication(inBuffer, sequence, false, true);
+ else if (opcode == Protocol::OP_OBJECT_INDICATION) handleObjectIndication(inBuffer, sequence, true, true);
+ else {
+ QPID_LOG(trace, "BrokerProxyImpl::handleRcvMessage invalid opcode: " << opcode);
+ break;
+ }
+ }
}
bool BrokerProxyImpl::getXmtMessage(Message& item) const
@@ -290,6 +360,16 @@ void BrokerProxyImpl::popEvent()
eventQueue.pop_front();
}
+void BrokerProxyImpl::complete()
+{
+ decOutstanding();
+}
+
+void BrokerProxyImpl::addBinding(const string& exchange, const string& key)
+{
+ eventQueue.push_back(eventBind(exchange, queueName, key));
+}
+
BrokerEventImpl::Ptr BrokerProxyImpl::eventDeclareQueue(const string& queueName)
{
BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::DECLARE_QUEUE));
@@ -313,6 +393,89 @@ BrokerEventImpl::Ptr BrokerProxyImpl::eventSetupComplete()
return event;
}
+void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq)
+{
+ // Note that this function doesn't touch requestsOutstanding. This is because
+ // it accounts for one request completed (the BrokerRequest) and one request
+ // started (the PackageRequest) which cancel each other out.
+
+ brokerId.decode(inBuffer);
+ QPID_LOG(trace, "RCVD BrokerResponse seq=" << seq << " brokerId=" << brokerId);
+ Mutex::ScopedLock _lock(lock);
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve(this));
+ Protocol::encodeHeader(outBuffer, Protocol::OP_PACKAGE_REQUEST, sequence);
+ sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT PackageRequest seq=" << sequence);
+}
+
+void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq)
+{
+ string package;
+
+ inBuffer.getShortString(package);
+ QPID_LOG(trace, "RCVD PackageIndication seq=" << seq << " package=" << package);
+}
+
+void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq)
+{
+ string text;
+ uint32_t code = inBuffer.getLong();
+ inBuffer.getShortString(text);
+ QPID_LOG(trace, "RCVD CommandComplete seq=" << seq << " code=" << code << " text=" << text);
+ seqMgr.release(seq);
+}
+
+void BrokerProxyImpl::handleClassIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
+{
+ // TODO
+}
+
+void BrokerProxyImpl::handleMethodResponse(Buffer& /*inBuffer*/, uint32_t /*seq*/)
+{
+ // TODO
+}
+
+void BrokerProxyImpl::handleHeartbeatIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
+{
+ // TODO
+}
+
+void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
+{
+ // TODO
+}
+
+void BrokerProxyImpl::handleSchemaResponse(Buffer& /*inBuffer*/, uint32_t /*seq*/)
+{
+ // TODO
+}
+
+void BrokerProxyImpl::handleObjectIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/, bool /*prop*/, bool /*stat*/)
+{
+ // TODO
+}
+
+void BrokerProxyImpl::incOutstanding()
+{
+ Mutex::ScopedLock _lock(lock);
+ requestsOutstanding++;
+}
+
+void BrokerProxyImpl::decOutstanding()
+{
+ Mutex::ScopedLock _lock(lock);
+ requestsOutstanding--;
+ if (requestsOutstanding == 0 && !topicBound) {
+ for (vector<pair<string, string> >::const_iterator iter = console->bindingList.begin();
+ iter != console->bindingList.end(); iter++) {
+ string exchange(iter->first.empty() ? QMF_EXCHANGE : iter->first);
+ string key(iter->second);
+ eventQueue.push_back(eventBind(exchange, queueName, key));
+ }
+ }
+}
+
MethodResponseImpl::MethodResponseImpl(Buffer& buf) : envelope(new MethodResponse(this))
{
string text;
@@ -329,6 +492,19 @@ MethodResponseImpl::MethodResponseImpl(Buffer& buf) : envelope(new MethodRespons
ConsoleEngineImpl::ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& s) :
envelope(e), settings(s)
{
+ bindingList.push_back(pair<string, string>(string(), "schema.#"));
+ if (settings.rcvObjects && settings.rcvEvents && settings.rcvHeartbeats && !settings.userBindings) {
+ bindingList.push_back(pair<string, string>(string(), "console.#"));
+ } else {
+ if (settings.rcvObjects && !settings.userBindings)
+ bindingList.push_back(pair<string, string>(string(), "console.obj.#"));
+ else
+ bindingList.push_back(pair<string, string>(string(), "console.obj.*.*.org.apache.qpid.broker.agent"));
+ if (settings.rcvEvents)
+ bindingList.push_back(pair<string, string>(string(), "console.event.#"));
+ if (settings.rcvHeartbeats)
+ bindingList.push_back(pair<string, string>(string(), "console.heartbeat.#"));
+ }
}
ConsoleEngineImpl::~ConsoleEngineImpl()
@@ -354,72 +530,105 @@ void ConsoleEngineImpl::popEvent()
void ConsoleEngineImpl::addConnection(BrokerProxy& /*broker*/, void* /*context*/)
{
+ // TODO
}
void ConsoleEngineImpl::delConnection(BrokerProxy& /*broker*/)
{
+ // TODO
}
uint32_t ConsoleEngineImpl::packageCount() const
{
+ // TODO
return 0;
}
const string& ConsoleEngineImpl::getPackageName(uint32_t /*idx*/) const
{
+ // TODO
static string temp;
return temp;
}
uint32_t ConsoleEngineImpl::classCount(const char* /*packageName*/) const
{
+ // TODO
return 0;
}
const SchemaClassKey* ConsoleEngineImpl::getClass(const char* /*packageName*/, uint32_t /*idx*/) const
{
+ // TODO
return 0;
}
ClassKind ConsoleEngineImpl::getClassKind(const SchemaClassKey& /*key*/) const
{
+ // TODO
return CLASS_OBJECT;
}
const SchemaObjectClass* ConsoleEngineImpl::getObjectClass(const SchemaClassKey& /*key*/) const
{
+ // TODO
return 0;
}
const SchemaEventClass* ConsoleEngineImpl::getEventClass(const SchemaClassKey& /*key*/) const
{
+ // TODO
return 0;
}
-void ConsoleEngineImpl::bindPackage(const char* /*packageName*/)
+void ConsoleEngineImpl::bindPackage(const char* packageName)
{
+ stringstream key;
+ key << "console.obj.*.*." << packageName << ".#";
+ Mutex::ScopedLock _lock(lock);
+ bindingList.push_back(pair<string, string>(string(), key.str()));
+ for (vector<BrokerProxyImpl::Ptr>::iterator iter = brokerList.begin();
+ iter != brokerList.end(); iter++)
+ (*iter)->addBinding(QMF_EXCHANGE, key.str());
}
-void ConsoleEngineImpl::bindClass(const SchemaClassKey& /*key*/)
+void ConsoleEngineImpl::bindClass(const SchemaClassKey& classKey)
{
+ stringstream key;
+ key << "console.obj.*.*." << classKey.getPackageName() << "." << classKey.getClassName() << ".#";
+ Mutex::ScopedLock _lock(lock);
+ bindingList.push_back(pair<string, string>(string(), key.str()));
+ for (vector<BrokerProxyImpl::Ptr>::iterator iter = brokerList.begin();
+ iter != brokerList.end(); iter++)
+ (*iter)->addBinding(QMF_EXCHANGE, key.str());
}
-void ConsoleEngineImpl::bindClass(const char* /*packageName*/, const char* /*className*/)
+void ConsoleEngineImpl::bindClass(const char* packageName, const char* className)
{
+ stringstream key;
+ key << "console.obj.*.*." << packageName << "." << className << ".#";
+ Mutex::ScopedLock _lock(lock);
+ bindingList.push_back(pair<string, string>(string(), key.str()));
+ for (vector<BrokerProxyImpl::Ptr>::iterator iter = brokerList.begin();
+ iter != brokerList.end(); iter++)
+ (*iter)->addBinding(QMF_EXCHANGE, key.str());
}
uint32_t ConsoleEngineImpl::agentCount() const
{
+ // TODO
return 0;
}
const AgentProxy* ConsoleEngineImpl::getAgent(uint32_t /*idx*/) const
{
+ // TODO
return 0;
}
void ConsoleEngineImpl::sendQuery(const Query& /*query*/, void* /*context*/)
{
+ // TODO
}
/*
@@ -437,7 +646,6 @@ void ConsoleEngineImpl::endSync(SyncQuery& sync)
*/
-
//==================================================================
// Wrappers
//==================================================================