summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qmf.mk5
-rw-r--r--cpp/src/qmf/BrokerProxyImpl.cpp663
-rw-r--r--cpp/src/qmf/BrokerProxyImpl.h222
-rw-r--r--cpp/src/qmf/ConsoleEngine.cpp1091
-rw-r--r--cpp/src/qmf/ConsoleEngine.h20
-rw-r--r--cpp/src/qmf/ConsoleEngineImpl.cpp361
-rw-r--r--cpp/src/qmf/ConsoleEngineImpl.h133
-rw-r--r--cpp/src/qmf/Object.h1
-rw-r--r--cpp/src/qmf/ObjectImpl.cpp14
-rw-r--r--cpp/src/qmf/ObjectImpl.h7
-rw-r--r--cpp/src/qmf/Value.h1
-rw-r--r--cpp/src/qmf/ValueImpl.cpp354
-rw-r--r--cpp/src/qmf/ValueImpl.h3
13 files changed, 1478 insertions, 1397 deletions
diff --git a/cpp/src/qmf.mk b/cpp/src/qmf.mk
index 54110ebaf7..65caaedd5c 100644
--- a/cpp/src/qmf.mk
+++ b/cpp/src/qmf.mk
@@ -35,9 +35,12 @@ nobase_include_HEADERS += \
../include/qmf/AgentObject.h
libqmfcommon_la_SOURCES = \
+ qmf/BrokerProxyImpl.cpp \
+ qmf/BrokerProxyImpl.h \
qmf/ConnectionSettingsImpl.cpp \
qmf/ConnectionSettingsImpl.h \
- qmf/ConsoleEngine.cpp \
+ qmf/ConsoleEngineImpl.cpp \
+ qmf/ConsoleEngineImpl.h \
qmf/ConsoleEngine.h \
qmf/Event.h \
qmf/Message.h \
diff --git a/cpp/src/qmf/BrokerProxyImpl.cpp b/cpp/src/qmf/BrokerProxyImpl.cpp
new file mode 100644
index 0000000000..47dca309d7
--- /dev/null
+++ b/cpp/src/qmf/BrokerProxyImpl.cpp
@@ -0,0 +1,663 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/BrokerProxyImpl.h"
+#include "qmf/ConsoleEngineImpl.h"
+#include "qmf/Protocol.h"
+#include "qpid/Address.h"
+#include "qpid/sys/SystemInfo.h"
+#include <qpid/log/Statement.h>
+#include <string.h>
+#include <iostream>
+#include <fstream>
+
+using namespace std;
+using namespace qmf;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+namespace {
+ const char* QMF_EXCHANGE = "qpid.management";
+ const char* DIR_EXCHANGE = "amq.direct";
+ const char* BROKER_KEY = "broker";
+ const char* BROKER_PACKAGE = "org.apache.qpid.broker";
+ const char* AGENT_CLASS = "agent";
+ const char* BROKER_AGENT_KEY = "agent.1.0";
+}
+
+const Object* QueryResponseImpl::getObject(uint32_t idx) const
+{
+ vector<ObjectImpl::Ptr>::const_iterator iter = results.begin();
+
+ while (idx > 0) {
+ if (iter == results.end())
+ return 0;
+ iter++;
+ idx--;
+ }
+
+ return (*iter)->envelope;
+}
+
+#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
+
+BrokerEvent BrokerEventImpl::copy()
+{
+ BrokerEvent item;
+
+ ::memset(&item, 0, sizeof(BrokerEvent));
+ item.kind = kind;
+
+ STRING_REF(name);
+ STRING_REF(exchange);
+ STRING_REF(bindingKey);
+ item.context = context;
+ item.queryResponse = queryResponse.get() ? queryResponse->envelope : 0;
+ item.methodResponse = methodResponse.get() ? methodResponse->envelope : 0;
+
+ return item;
+}
+
+BrokerProxyImpl::BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console) :
+ envelope(e), console(_console.impl)
+{
+ stringstream qn;
+ qpid::TcpAddress addr;
+
+ SystemInfo::getLocalHostname(addr);
+ qn << "qmfc-" << SystemInfo::getProcessName() << "-" << addr << "-" << SystemInfo::getProcessId();
+ queueName = qn.str();
+
+ seqMgr.setUnsolicitedContext(SequenceContext::Ptr(new StaticContext(*this)));
+}
+
+void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/)
+{
+ Mutex::ScopedLock _lock(lock);
+ agentList.clear();
+ eventQueue.clear();
+ xmtQueue.clear();
+ eventQueue.push_back(eventDeclareQueue(queueName));
+ eventQueue.push_back(eventBind(DIR_EXCHANGE, queueName, queueName));
+ eventQueue.push_back(eventSetupComplete());
+
+ // TODO: Store session handle
+}
+
+void BrokerProxyImpl::sessionClosed()
+{
+ Mutex::ScopedLock _lock(lock);
+ agentList.clear();
+ eventQueue.clear();
+ xmtQueue.clear();
+}
+
+void BrokerProxyImpl::startProtocol()
+{
+ Mutex::ScopedLock _lock(lock);
+ char rawbuffer[512];
+ Buffer buffer(rawbuffer, 512);
+
+ agentList.push_back(AgentProxyImpl::Ptr(new AgentProxyImpl(console, this, 0, "Agent embedded in broker")));
+
+ requestsOutstanding = 1;
+ topicBound = false;
+ uint32_t sequence(seqMgr.reserve());
+ Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence);
+ sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence);
+}
+
+void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
+{
+ uint32_t length = buf.getPosition();
+ MessageImpl::Ptr message(new MessageImpl);
+
+ buf.reset();
+ buf.getRawData(message->body, length);
+ message->destination = destination;
+ message->routingKey = routingKey;
+ message->replyExchange = DIR_EXCHANGE;
+ message->replyKey = queueName;
+
+ xmtQueue.push_back(message);
+}
+
+void BrokerProxyImpl::handleRcvMessage(Message& message)
+{
+ Buffer inBuffer(message.body, message.length);
+ uint8_t opcode;
+ uint32_t sequence;
+
+ while (Protocol::checkHeader(inBuffer, &opcode, &sequence))
+ seqMgr.dispatch(opcode, sequence, inBuffer);
+}
+
+bool BrokerProxyImpl::getXmtMessage(Message& item) const
+{
+ Mutex::ScopedLock _lock(lock);
+ if (xmtQueue.empty())
+ return false;
+ item = xmtQueue.front()->copy();
+ return true;
+}
+
+void BrokerProxyImpl::popXmt()
+{
+ Mutex::ScopedLock _lock(lock);
+ if (!xmtQueue.empty())
+ xmtQueue.pop_front();
+}
+
+bool BrokerProxyImpl::getEvent(BrokerEvent& event) const
+{
+ Mutex::ScopedLock _lock(lock);
+ if (eventQueue.empty())
+ return false;
+ event = eventQueue.front()->copy();
+ return true;
+}
+
+void BrokerProxyImpl::popEvent()
+{
+ Mutex::ScopedLock _lock(lock);
+ if (!eventQueue.empty())
+ eventQueue.pop_front();
+}
+
+uint32_t BrokerProxyImpl::agentCount() const
+{
+ Mutex::ScopedLock _lock(lock);
+ return agentList.size();
+}
+
+const AgentProxy* BrokerProxyImpl::getAgent(uint32_t idx) const
+{
+ Mutex::ScopedLock _lock(lock);
+ for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin();
+ iter != agentList.end(); iter++)
+ if (idx-- == 0)
+ return (*iter)->envelope;
+ return 0;
+}
+
+void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentProxy* agent)
+{
+ SequenceContext::Ptr queryContext(new QueryContext(*this, context));
+ Mutex::ScopedLock _lock(lock);
+ if (agent != 0) {
+ sendGetRequestLH(queryContext, query, agent->impl);
+ } else {
+ // TODO (optimization) only send queries to agents that have the requested class+package
+ for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin();
+ iter != agentList.end(); iter++) {
+ sendGetRequestLH(queryContext, query, (*iter).get());
+ }
+ }
+}
+
+void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent)
+{
+ stringstream key;
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve(queryContext));
+
+ Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
+ query.impl->encode(outBuffer);
+ key << "agent.1." << agent->agentBank;
+ sendBufferLH(outBuffer, QMF_EXCHANGE, key.str());
+ QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << key.str());
+}
+
+string BrokerProxyImpl::encodeMethodArguments(const SchemaMethod* schema, const Value* argmap, Buffer& buffer)
+{
+ int argCount = schema->getArgumentCount();
+
+ if (argmap == 0 || !argmap->isMap())
+ return string("Arguments must be in a map value");
+
+ for (int aIdx = 0; aIdx < argCount; aIdx++) {
+ const SchemaArgument* arg(schema->getArgument(aIdx));
+ if (arg->getDirection() == DIR_IN || arg->getDirection() == DIR_IN_OUT) {
+ if (argmap->keyInMap(arg->getName())) {
+ const Value* argVal(argmap->byKey(arg->getName()));
+ if (argVal->getType() != arg->getType())
+ return string("Argument is the wrong type: ") + arg->getName();
+ argVal->impl->encode(buffer);
+ } else {
+ Value defaultValue(arg->getType());
+ defaultValue.impl->encode(buffer);
+ }
+ }
+ }
+
+ return string();
+}
+
+void BrokerProxyImpl::sendMethodRequest(ObjectIdImpl* oid, const SchemaObjectClass* cls,
+ const string& methodName, const Value* args, void* userContext)
+{
+ int methodCount = cls->getMethodCount();
+ int idx;
+ for (idx = 0; idx < methodCount; idx++) {
+ const SchemaMethod* method = cls->getMethod(idx);
+ if (string(method->getName()) == methodName) {
+ Mutex::ScopedLock _lock(lock);
+ SequenceContext::Ptr methodContext(new MethodContext(*this, userContext, method->impl));
+ stringstream key;
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve(methodContext));
+
+ Protocol::encodeHeader(outBuffer, Protocol::OP_METHOD_REQUEST, sequence);
+ oid->encode(outBuffer);
+ cls->getClassKey()->impl->encode(outBuffer);
+ outBuffer.putShortString(methodName);
+
+ string argErrorString = encodeMethodArguments(method, args, outBuffer);
+ if (argErrorString.empty()) {
+ key << "agent.1." << oid->getAgentBank();
+ sendBufferLH(outBuffer, QMF_EXCHANGE, key.str());
+ QPID_LOG(trace, "SENT MethodRequest seq=" << sequence << " method=" << methodName << " key=" << key.str());
+ } else {
+ MethodResponseImpl::Ptr argError(new MethodResponseImpl(1, argErrorString));
+ eventQueue.push_back(eventMethodResponse(userContext, argError));
+ }
+ return;
+ }
+ }
+
+ MethodResponseImpl::Ptr error(new MethodResponseImpl(1, string("Unknown method: ") + methodName));
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(eventMethodResponse(userContext, error));
+}
+
+void BrokerProxyImpl::addBinding(const string& exchange, const string& key)
+{
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(eventBind(exchange, queueName, key));
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventDeclareQueue(const string& queueName)
+{
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::DECLARE_QUEUE));
+ event->name = queueName;
+ return event;
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventBind(const string& exchange, const string& queue, const string& key)
+{
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::BIND));
+ event->name = queue;
+ event->exchange = exchange;
+ event->bindingKey = key;
+
+ return event;
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventSetupComplete()
+{
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::SETUP_COMPLETE));
+ return event;
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventStable()
+{
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::STABLE));
+ return event;
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventQueryComplete(void* context, QueryResponseImpl::Ptr response)
+{
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::QUERY_COMPLETE));
+ event->context = context;
+ event->queryResponse = response;
+ return event;
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventMethodResponse(void* context, MethodResponseImpl::Ptr response)
+{
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::METHOD_RESPONSE));
+ event->context = context;
+ event->methodResponse = response;
+ return event;
+}
+
+void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq)
+{
+ brokerId.decode(inBuffer);
+ QPID_LOG(trace, "RCVD BrokerResponse seq=" << seq << " brokerId=" << brokerId);
+ Mutex::ScopedLock _lock(lock);
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve());
+ incOutstandingLH();
+ Protocol::encodeHeader(outBuffer, Protocol::OP_PACKAGE_REQUEST, sequence);
+ sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT PackageRequest seq=" << sequence);
+}
+
+void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq)
+{
+ string package;
+
+ inBuffer.getShortString(package);
+ QPID_LOG(trace, "RCVD PackageIndication seq=" << seq << " package=" << package);
+ console->learnPackage(package);
+
+ Mutex::ScopedLock _lock(lock);
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve());
+ incOutstandingLH();
+ Protocol::encodeHeader(outBuffer, Protocol::OP_CLASS_QUERY, sequence);
+ outBuffer.putShortString(package);
+ sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT ClassQuery seq=" << sequence << " package=" << package);
+}
+
+void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq)
+{
+ string text;
+ uint32_t code = inBuffer.getLong();
+ inBuffer.getShortString(text);
+ QPID_LOG(trace, "RCVD CommandComplete seq=" << seq << " code=" << code << " text=" << text);
+}
+
+void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq)
+{
+ uint8_t kind = inBuffer.getOctet();
+ SchemaClassKeyImpl classKey(inBuffer);
+
+ QPID_LOG(trace, "RCVD ClassIndication seq=" << seq << " kind=" << (int) kind << " key=" << classKey.str());
+
+ if (!console->haveClass(classKey)) {
+ Mutex::ScopedLock _lock(lock);
+ incOutstandingLH();
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve());
+ Protocol::encodeHeader(outBuffer, Protocol::OP_SCHEMA_REQUEST, sequence);
+ classKey.encode(outBuffer);
+ sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT SchemaRequest seq=" << sequence <<" key=" << classKey.str());
+ }
+}
+
+MethodResponseImpl::Ptr BrokerProxyImpl::handleMethodResponse(Buffer& inBuffer, uint32_t seq, SchemaMethodImpl* schema)
+{
+ MethodResponseImpl::Ptr response(new MethodResponseImpl(inBuffer, schema));
+
+ QPID_LOG(trace, "RCVD MethodResponse seq=" << seq << " status=" << response->getStatus() << " text=" <<
+ response->getException()->asString());
+
+ return response;
+}
+
+void BrokerProxyImpl::handleHeartbeatIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
+{
+ // TODO
+}
+
+void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
+{
+ // TODO
+}
+
+void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq)
+{
+ SchemaObjectClassImpl::Ptr oClassPtr;
+ SchemaEventClassImpl::Ptr eClassPtr;
+ uint8_t kind = inBuffer.getOctet();
+ const SchemaClassKeyImpl* key;
+ if (kind == CLASS_OBJECT) {
+ oClassPtr.reset(new SchemaObjectClassImpl(inBuffer));
+ console->learnClass(oClassPtr);
+ key = oClassPtr->getClassKey()->impl;
+ QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->str());
+
+ //
+ // If we have just learned about the org.apache.qpid.broker:agent class, send a get
+ // request for the current list of agents so we can have it on-hand before we declare
+ // this session "stable".
+ //
+ if (key->getClassName() == AGENT_CLASS && key->getPackageName() == BROKER_PACKAGE) {
+ Mutex::ScopedLock _lock(lock);
+ incOutstandingLH();
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve());
+ Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
+ FieldTable ft;
+ ft.setString("_class", AGENT_CLASS);
+ ft.setString("_package", BROKER_PACKAGE);
+ ft.encode(outBuffer);
+ sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_AGENT_KEY);
+ QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << BROKER_AGENT_KEY);
+ }
+ } else if (kind == CLASS_EVENT) {
+ eClassPtr.reset(new SchemaEventClassImpl(inBuffer));
+ console->learnClass(eClassPtr);
+ key = eClassPtr->getClassKey()->impl;
+ QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=event key=" << key->str());
+ }
+ else {
+ QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind);
+ }
+}
+
+ObjectImpl::Ptr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat)
+{
+ SchemaClassKeyImpl classKey(inBuffer);
+ QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey.str());
+
+ SchemaObjectClassImpl::Ptr schema = console->getSchema(classKey);
+ if (schema.get() == 0) {
+ QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey.str());
+ return ObjectImpl::Ptr();
+ }
+
+ return ObjectImpl::Ptr(new ObjectImpl(schema->envelope, this, inBuffer, prop, stat, true));
+}
+
+void BrokerProxyImpl::incOutstandingLH()
+{
+ requestsOutstanding++;
+}
+
+void BrokerProxyImpl::decOutstanding()
+{
+ Mutex::ScopedLock _lock(lock);
+ requestsOutstanding--;
+ if (requestsOutstanding == 0 && !topicBound) {
+ topicBound = true;
+ for (vector<pair<string, string> >::const_iterator iter = console->bindingList.begin();
+ iter != console->bindingList.end(); iter++) {
+ string exchange(iter->first.empty() ? QMF_EXCHANGE : iter->first);
+ string key(iter->second);
+ eventQueue.push_back(eventBind(exchange, queueName, key));
+ }
+ eventQueue.push_back(eventStable());
+ }
+}
+
+MethodResponseImpl::MethodResponseImpl(const MethodResponseImpl& from) :
+ envelope(from.envelope), // !!!! TODO !!!! this is not right
+ status(from.status), schema(from.schema),
+ exception(from.exception.get() ? new Value(*from.exception) : 0),
+ arguments(from.arguments.get() ? new Value(*from.arguments) : 0)
+{
+}
+
+MethodResponseImpl::MethodResponseImpl(Buffer& buf, SchemaMethodImpl* s) :
+ envelope(new MethodResponse(this)), schema(s)
+{
+ string text;
+
+ status = buf.getLong();
+ buf.getMediumString(text);
+ exception.reset(new Value(TYPE_LSTR));
+ exception->setString(text.c_str());
+
+ if (status != 0)
+ return;
+
+ arguments.reset(new Value(TYPE_MAP));
+ int argCount(schema->getArgumentCount());
+ for (int idx = 0; idx < argCount; idx++) {
+ const SchemaArgument* arg = schema->getArgument(idx);
+ if (arg->getDirection() == DIR_OUT || arg->getDirection() == DIR_IN_OUT) {
+ ValueImpl* value(new ValueImpl(arg->getType(), buf));
+ arguments->insert(arg->getName(), value->envelope);
+ }
+ }
+}
+
+MethodResponseImpl::MethodResponseImpl(uint32_t s, const string& text) : envelope(new MethodResponse(this)), schema(0)
+{
+ status = s;
+ exception.reset(new Value(TYPE_LSTR));
+ exception->setString(text.c_str());
+}
+
+bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
+{
+ bool completeContext = false;
+ if (opcode == Protocol::OP_BROKER_RESPONSE) {
+ broker.handleBrokerResponse(buffer, sequence);
+ completeContext = true;
+ }
+ else if (opcode == Protocol::OP_COMMAND_COMPLETE) {
+ broker.handleCommandComplete(buffer, sequence);
+ completeContext = true;
+ }
+ else if (opcode == Protocol::OP_SCHEMA_RESPONSE) {
+ broker.handleSchemaResponse(buffer, sequence);
+ completeContext = true;
+ }
+ else if (opcode == Protocol::OP_PACKAGE_INDICATION)
+ broker.handlePackageIndication(buffer, sequence);
+ else if (opcode == Protocol::OP_CLASS_INDICATION)
+ broker.handleClassIndication(buffer, sequence);
+ else if (opcode == Protocol::OP_HEARTBEAT_INDICATION)
+ broker.handleHeartbeatIndication(buffer, sequence);
+ else if (opcode == Protocol::OP_EVENT_INDICATION)
+ broker.handleEventIndication(buffer, sequence);
+ else if (opcode == Protocol::OP_PROPERTY_INDICATION)
+ broker.handleObjectIndication(buffer, sequence, true, false);
+ else if (opcode == Protocol::OP_STATISTIC_INDICATION)
+ broker.handleObjectIndication(buffer, sequence, false, true);
+ else if (opcode == Protocol::OP_OBJECT_INDICATION)
+ broker.handleObjectIndication(buffer, sequence, true, true);
+ else {
+ QPID_LOG(trace, "StaticContext::handleMessage invalid opcode: " << opcode);
+ completeContext = true;
+ }
+
+ return completeContext;
+}
+
+void QueryContext::reserve()
+{
+ Mutex::ScopedLock _lock(lock);
+ requestsOutstanding++;
+}
+
+void QueryContext::release()
+{
+ {
+ Mutex::ScopedLock _lock(lock);
+ if (--requestsOutstanding > 0)
+ return;
+ }
+
+ Mutex::ScopedLock _block(broker.lock);
+ broker.eventQueue.push_back(broker.eventQueryComplete(userContext, queryResponse));
+}
+
+bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
+{
+ bool completeContext = false;
+ ObjectImpl::Ptr object;
+
+ if (opcode == Protocol::OP_COMMAND_COMPLETE) {
+ broker.handleCommandComplete(buffer, sequence);
+ completeContext = true;
+ }
+ else if (opcode == Protocol::OP_OBJECT_INDICATION) {
+ object = broker.handleObjectIndication(buffer, sequence, true, true);
+ if (object.get() != 0)
+ queryResponse->results.push_back(object);
+ }
+ else {
+ QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode);
+ completeContext = true;
+ }
+
+ return completeContext;
+}
+
+void MethodContext::release()
+{
+ Mutex::ScopedLock _block(broker.lock);
+ broker.eventQueue.push_back(broker.eventMethodResponse(userContext, methodResponse));
+}
+
+bool MethodContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
+{
+ if (opcode == Protocol::OP_METHOD_RESPONSE)
+ methodResponse = broker.handleMethodResponse(buffer, sequence, schema);
+ else
+ QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode);
+
+ return true;
+}
+
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+AgentProxy::AgentProxy(AgentProxyImpl* i) : impl(i) {}
+AgentProxy::~AgentProxy() { delete impl; }
+const char* AgentProxy::getLabel() const { return impl->getLabel().c_str(); }
+
+BrokerProxy::BrokerProxy(ConsoleEngine& console) : impl(new BrokerProxyImpl(this, console)) {}
+BrokerProxy::~BrokerProxy() { delete impl; }
+void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); }
+void BrokerProxy::sessionClosed() { impl->sessionClosed(); }
+void BrokerProxy::startProtocol() { impl->startProtocol(); }
+void BrokerProxy::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); }
+bool BrokerProxy::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); }
+void BrokerProxy::popXmt() { impl->popXmt(); }
+bool BrokerProxy::getEvent(BrokerEvent& event) const { return impl->getEvent(event); }
+void BrokerProxy::popEvent() { impl->popEvent(); }
+uint32_t BrokerProxy::agentCount() const { return impl->agentCount(); }
+const AgentProxy* BrokerProxy::getAgent(uint32_t idx) const { return impl->getAgent(idx); }
+void BrokerProxy::sendQuery(const Query& query, void* context, const AgentProxy* agent) { impl->sendQuery(query, context, agent); }
+
+MethodResponse::MethodResponse(const MethodResponse& from) : impl(new MethodResponseImpl(*(from.impl))) {}
+MethodResponse::MethodResponse(MethodResponseImpl* i) : impl(i) {}
+MethodResponse::~MethodResponse() {}
+uint32_t MethodResponse::getStatus() const { return impl->getStatus(); }
+const Value* MethodResponse::getException() const { return impl->getException(); }
+const Value* MethodResponse::getArgs() const { return impl->getArgs(); }
+
+QueryResponse::QueryResponse(QueryResponseImpl* i) : impl(i) {}
+QueryResponse::~QueryResponse() {}
+uint32_t QueryResponse::getStatus() const { return impl->getStatus(); }
+const Value* QueryResponse::getException() const { return impl->getException(); }
+uint32_t QueryResponse::getObjectCount() const { return impl->getObjectCount(); }
+const Object* QueryResponse::getObject(uint32_t idx) const { return impl->getObject(idx); }
+
diff --git a/cpp/src/qmf/BrokerProxyImpl.h b/cpp/src/qmf/BrokerProxyImpl.h
new file mode 100644
index 0000000000..930d8c8995
--- /dev/null
+++ b/cpp/src/qmf/BrokerProxyImpl.h
@@ -0,0 +1,222 @@
+#ifndef _QmfBrokerProxyImpl_
+#define _QmfBrokerProxyImpl_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/ConsoleEngine.h"
+#include "qmf/ObjectImpl.h"
+#include "qmf/SchemaImpl.h"
+#include "qmf/ValueImpl.h"
+#include "qmf/QueryImpl.h"
+#include "qmf/SequenceManager.h"
+#include "qmf/MessageImpl.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/sys/Mutex.h"
+#include "boost/shared_ptr.hpp"
+#include <memory>
+#include <string>
+#include <deque>
+#include <map>
+#include <vector>
+
+namespace qmf {
+
+ struct MethodResponseImpl {
+ typedef boost::shared_ptr<MethodResponseImpl> Ptr;
+ MethodResponse* envelope;
+ uint32_t status;
+ SchemaMethodImpl* schema;
+ boost::shared_ptr<Value> exception;
+ boost::shared_ptr<Value> arguments;
+
+ MethodResponseImpl(const MethodResponseImpl& from);
+ MethodResponseImpl(qpid::framing::Buffer& buf, SchemaMethodImpl* schema);
+ MethodResponseImpl(uint32_t status, const std::string& text);
+ ~MethodResponseImpl() { delete envelope; }
+ uint32_t getStatus() const { return status; }
+ const Value* getException() const { return exception.get(); }
+ const Value* getArgs() const { return arguments.get(); }
+ };
+
+ struct QueryResponseImpl {
+ typedef boost::shared_ptr<QueryResponseImpl> Ptr;
+ QueryResponse *envelope;
+ uint32_t status;
+ std::auto_ptr<Value> exception;
+ std::vector<ObjectImpl::Ptr> results;
+
+ QueryResponseImpl() : envelope(new QueryResponse(this)), status(0) {}
+ ~QueryResponseImpl() { delete envelope; }
+ uint32_t getStatus() const { return status; }
+ const Value* getException() const { return exception.get(); }
+ uint32_t getObjectCount() const { return results.size(); }
+ const Object* getObject(uint32_t idx) const;
+ };
+
+ struct BrokerEventImpl {
+ typedef boost::shared_ptr<BrokerEventImpl> Ptr;
+ BrokerEvent::EventKind kind;
+ std::string name;
+ std::string exchange;
+ std::string bindingKey;
+ void* context;
+ QueryResponseImpl::Ptr queryResponse;
+ MethodResponseImpl::Ptr methodResponse;
+
+ BrokerEventImpl(BrokerEvent::EventKind k) : kind(k), context(0) {}
+ ~BrokerEventImpl() {}
+ BrokerEvent copy();
+ };
+
+ struct AgentProxyImpl {
+ typedef boost::shared_ptr<AgentProxyImpl> Ptr;
+ AgentProxy* envelope;
+ ConsoleEngineImpl* console;
+ BrokerProxyImpl* broker;
+ uint32_t agentBank;
+ std::string label;
+
+ AgentProxyImpl(ConsoleEngineImpl* c, BrokerProxyImpl* b, uint32_t ab, const std::string& l) :
+ envelope(new AgentProxy(this)), console(c), broker(b), agentBank(ab), label(l) {}
+ ~AgentProxyImpl() {}
+ const std::string& getLabel() const { return label; }
+ };
+
+ class BrokerProxyImpl {
+ public:
+ typedef boost::shared_ptr<BrokerProxyImpl> Ptr;
+
+ BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console);
+ ~BrokerProxyImpl() {}
+
+ void sessionOpened(SessionHandle& sh);
+ void sessionClosed();
+ void startProtocol();
+
+ void sendBufferLH(qpid::framing::Buffer& buf, const std::string& destination, const std::string& routingKey);
+ void handleRcvMessage(Message& message);
+ bool getXmtMessage(Message& item) const;
+ void popXmt();
+
+ bool getEvent(BrokerEvent& event) const;
+ void popEvent();
+
+ uint32_t agentCount() const;
+ const AgentProxy* getAgent(uint32_t idx) const;
+ void sendQuery(const Query& query, void* context, const AgentProxy* agent);
+ void sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent);
+ std::string encodeMethodArguments(const SchemaMethod* schema, const Value* args, qpid::framing::Buffer& buffer);
+ void sendMethodRequest(ObjectIdImpl* oid, const SchemaObjectClass* cls, const std::string& method, const Value* args, void* context);
+
+ void addBinding(const std::string& exchange, const std::string& key);
+ void staticRelease() { decOutstanding(); }
+
+ private:
+ friend class StaticContext;
+ friend class QueryContext;
+ friend class MethodContext;
+ mutable qpid::sys::Mutex lock;
+ BrokerProxy* envelope;
+ ConsoleEngineImpl* console;
+ std::string queueName;
+ qpid::framing::Uuid brokerId;
+ SequenceManager seqMgr;
+ uint32_t requestsOutstanding;
+ bool topicBound;
+ std::vector<AgentProxyImpl::Ptr> agentList;
+ std::deque<MessageImpl::Ptr> xmtQueue;
+ std::deque<BrokerEventImpl::Ptr> eventQueue;
+
+# define MA_BUFFER_SIZE 65536
+ char outputBuffer[MA_BUFFER_SIZE];
+
+ BrokerEventImpl::Ptr eventDeclareQueue(const std::string& queueName);
+ BrokerEventImpl::Ptr eventBind(const std::string& exchange, const std::string& queue, const std::string& key);
+ BrokerEventImpl::Ptr eventSetupComplete();
+ BrokerEventImpl::Ptr eventStable();
+ BrokerEventImpl::Ptr eventQueryComplete(void* context, QueryResponseImpl::Ptr response);
+ BrokerEventImpl::Ptr eventMethodResponse(void* context, MethodResponseImpl::Ptr response);
+
+ void handleBrokerResponse(qpid::framing::Buffer& inBuffer, uint32_t seq);
+ void handlePackageIndication(qpid::framing::Buffer& inBuffer, uint32_t seq);
+ void handleCommandComplete(qpid::framing::Buffer& inBuffer, uint32_t seq);
+ void handleClassIndication(qpid::framing::Buffer& inBuffer, uint32_t seq);
+ MethodResponseImpl::Ptr handleMethodResponse(qpid::framing::Buffer& inBuffer, uint32_t seq, SchemaMethodImpl* schema);
+ void handleHeartbeatIndication(qpid::framing::Buffer& inBuffer, uint32_t seq);
+ void handleEventIndication(qpid::framing::Buffer& inBuffer, uint32_t seq);
+ void handleSchemaResponse(qpid::framing::Buffer& inBuffer, uint32_t seq);
+ ObjectImpl::Ptr handleObjectIndication(qpid::framing::Buffer& inBuffer, uint32_t seq, bool prop, bool stat);
+ void incOutstandingLH();
+ void decOutstanding();
+ };
+
+ //
+ // StaticContext is used to handle:
+ //
+ // 1) Responses to console-level requests (for schema info, etc.)
+ // 2) Unsolicited messages from agents (events, published updates, etc.)
+ //
+ struct StaticContext : public SequenceContext {
+ StaticContext(BrokerProxyImpl& b) : broker(b) {}
+ virtual ~StaticContext() {}
+ void reserve() {}
+ void release() { broker.staticRelease(); }
+ bool handleMessage(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer);
+ BrokerProxyImpl& broker;
+ };
+
+ //
+ // QueryContext is used to track and handle responses associated with a single Get Query
+ //
+ struct QueryContext : public SequenceContext {
+ QueryContext(BrokerProxyImpl& b, void* u) :
+ broker(b), userContext(u), requestsOutstanding(0), queryResponse(new QueryResponseImpl()) {}
+ virtual ~QueryContext() {}
+ void reserve();
+ void release();
+ bool handleMessage(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer);
+
+ mutable qpid::sys::Mutex lock;
+ BrokerProxyImpl& broker;
+ void* userContext;
+ uint32_t requestsOutstanding;
+ QueryResponseImpl::Ptr queryResponse;
+ };
+
+ struct MethodContext : public SequenceContext {
+ MethodContext(BrokerProxyImpl& b, void* u, SchemaMethodImpl* s) : broker(b), userContext(u), schema(s) {}
+ virtual ~MethodContext() {}
+ void reserve() {}
+ void release();
+ bool handleMessage(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer);
+
+ BrokerProxyImpl& broker;
+ void* userContext;
+ SchemaMethodImpl* schema;
+ MethodResponseImpl::Ptr methodResponse;
+ };
+
+
+
+}
+
+#endif
+
diff --git a/cpp/src/qmf/ConsoleEngine.cpp b/cpp/src/qmf/ConsoleEngine.cpp
deleted file mode 100644
index e7991328ee..0000000000
--- a/cpp/src/qmf/ConsoleEngine.cpp
+++ /dev/null
@@ -1,1091 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/ConsoleEngine.h"
-#include "qmf/MessageImpl.h"
-#include "qmf/SchemaImpl.h"
-#include "qmf/Typecode.h"
-#include "qmf/ObjectImpl.h"
-#include "qmf/ObjectIdImpl.h"
-#include "qmf/QueryImpl.h"
-#include "qmf/ValueImpl.h"
-#include "qmf/Protocol.h"
-#include "qmf/SequenceManager.h"
-#include <qpid/framing/Buffer.h>
-#include <qpid/framing/Uuid.h>
-#include <qpid/framing/FieldTable.h>
-#include <qpid/framing/FieldValue.h>
-#include <qpid/sys/Mutex.h>
-#include <qpid/log/Statement.h>
-#include <qpid/sys/Time.h>
-#include <qpid/sys/SystemInfo.h>
-#include <string.h>
-#include <string>
-#include <deque>
-#include <map>
-#include <vector>
-#include <iostream>
-#include <fstream>
-#include <boost/shared_ptr.hpp>
-
-using namespace std;
-using namespace qmf;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-namespace qmf {
-
- struct MethodResponseImpl {
- typedef boost::shared_ptr<MethodResponseImpl> Ptr;
- MethodResponse* envelope;
- uint32_t status;
- auto_ptr<Value> exception;
- auto_ptr<Value> arguments;
-
- MethodResponseImpl(Buffer& buf);
- ~MethodResponseImpl() { delete envelope; }
- uint32_t getStatus() const { return status; }
- const Value* getException() const { return exception.get(); }
- const Value* getArgs() const { return arguments.get(); }
- };
-
- struct QueryResponseImpl {
- typedef boost::shared_ptr<QueryResponseImpl> Ptr;
- QueryResponse *envelope;
- uint32_t status;
- auto_ptr<Value> exception;
- vector<ObjectImpl::Ptr> results;
-
- QueryResponseImpl() : envelope(new QueryResponse(this)), status(0) {}
- ~QueryResponseImpl() { delete envelope; }
- uint32_t getStatus() const { return status; }
- const Value* getException() const { return exception.get(); }
- uint32_t getObjectCount() const { return results.size(); }
- const Object* getObject(uint32_t idx) const;
- };
-
- struct ConsoleEventImpl {
- typedef boost::shared_ptr<ConsoleEventImpl> Ptr;
- ConsoleEvent::EventKind kind;
- boost::shared_ptr<AgentProxyImpl> agent;
- string name;
- boost::shared_ptr<SchemaClassKey> classKey;
- Object* object;
- void* context;
- Event* event;
- uint64_t timestamp;
- uint32_t methodHandle;
- MethodResponseImpl::Ptr methodResponse;
-
- ConsoleEventImpl(ConsoleEvent::EventKind k) :
- kind(k), object(0), context(0), event(0), timestamp(0), methodHandle(0) {}
- ~ConsoleEventImpl() {}
- ConsoleEvent copy();
- };
-
- struct BrokerEventImpl {
- typedef boost::shared_ptr<BrokerEventImpl> Ptr;
- BrokerEvent::EventKind kind;
- string name;
- string exchange;
- string bindingKey;
- void* context;
- QueryResponseImpl::Ptr queryResponse;
-
- BrokerEventImpl(BrokerEvent::EventKind k) : kind(k) {}
- ~BrokerEventImpl() {}
- BrokerEvent copy();
- };
-
- struct AgentProxyImpl {
- typedef boost::shared_ptr<AgentProxyImpl> Ptr;
- AgentProxy* envelope;
- ConsoleEngineImpl* console;
- BrokerProxyImpl* broker;
- uint32_t agentBank;
- string label;
-
- AgentProxyImpl(ConsoleEngineImpl* c, BrokerProxyImpl* b, uint32_t ab, const string& l) :
- envelope(new AgentProxy(this)), console(c), broker(b), agentBank(ab), label(l) {}
- ~AgentProxyImpl() {}
- const string& getLabel() const { return label; }
- };
-
- class BrokerProxyImpl {
- public:
- typedef boost::shared_ptr<BrokerProxyImpl> Ptr;
-
- BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console);
- ~BrokerProxyImpl() {}
-
- void sessionOpened(SessionHandle& sh);
- void sessionClosed();
- void startProtocol();
-
- void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey);
- void handleRcvMessage(Message& message);
- bool getXmtMessage(Message& item) const;
- void popXmt();
-
- bool getEvent(BrokerEvent& event) const;
- void popEvent();
-
- uint32_t agentCount() const;
- const AgentProxy* getAgent(uint32_t idx) const;
- void sendQuery(const Query& query, void* context, const AgentProxy* agent);
- void sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent);
-
- void addBinding(const string& exchange, const string& key);
- void staticRelease() { decOutstanding(); }
-
- private:
- friend class StaticContext;
- friend class QueryContext;
- mutable Mutex lock;
- BrokerProxy* envelope;
- ConsoleEngineImpl* console;
- string queueName;
- Uuid brokerId;
- SequenceManager seqMgr;
- uint32_t requestsOutstanding;
- bool topicBound;
- vector<AgentProxyImpl::Ptr> agentList;
- deque<MessageImpl::Ptr> xmtQueue;
- deque<BrokerEventImpl::Ptr> eventQueue;
-
-# define MA_BUFFER_SIZE 65536
- char outputBuffer[MA_BUFFER_SIZE];
-
- BrokerEventImpl::Ptr eventDeclareQueue(const string& queueName);
- BrokerEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key);
- BrokerEventImpl::Ptr eventSetupComplete();
- BrokerEventImpl::Ptr eventStable();
- BrokerEventImpl::Ptr eventQueryComplete(void* context, QueryResponseImpl::Ptr response);
-
- void handleBrokerResponse(Buffer& inBuffer, uint32_t seq);
- void handlePackageIndication(Buffer& inBuffer, uint32_t seq);
- void handleCommandComplete(Buffer& inBuffer, uint32_t seq);
- void handleClassIndication(Buffer& inBuffer, uint32_t seq);
- void handleMethodResponse(Buffer& inBuffer, uint32_t seq);
- void handleHeartbeatIndication(Buffer& inBuffer, uint32_t seq);
- void handleEventIndication(Buffer& inBuffer, uint32_t seq);
- void handleSchemaResponse(Buffer& inBuffer, uint32_t seq);
- ObjectImpl::Ptr handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat);
- void incOutstandingLH();
- void decOutstanding();
- };
-
- struct StaticContext : public SequenceContext {
- StaticContext(BrokerProxyImpl& b) : broker(b) {}
- ~StaticContext() {}
- void reserve() {}
- void release() { broker.staticRelease(); }
- bool handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer);
- BrokerProxyImpl& broker;
- };
-
- struct QueryContext : public SequenceContext {
- QueryContext(BrokerProxyImpl& b, void* u) :
- broker(b), userContext(u), requestsOutstanding(0), queryResponse(new QueryResponseImpl()) {}
- ~QueryContext() {}
- void reserve();
- void release();
- bool handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer);
-
- mutable Mutex lock;
- BrokerProxyImpl& broker;
- void* userContext;
- uint32_t requestsOutstanding;
- QueryResponseImpl::Ptr queryResponse;
- };
-
- class ConsoleEngineImpl {
- public:
- ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& settings = ConsoleSettings());
- ~ConsoleEngineImpl();
-
- bool getEvent(ConsoleEvent& event) const;
- void popEvent();
-
- void addConnection(BrokerProxy& broker, void* context);
- void delConnection(BrokerProxy& broker);
-
- uint32_t packageCount() const;
- const string& getPackageName(uint32_t idx) const;
-
- uint32_t classCount(const char* packageName) const;
- const SchemaClassKey* getClass(const char* packageName, uint32_t idx) const;
-
- ClassKind getClassKind(const SchemaClassKey* key) const;
- const SchemaObjectClass* getObjectClass(const SchemaClassKey* key) const;
- const SchemaEventClass* getEventClass(const SchemaClassKey* key) const;
-
- void bindPackage(const char* packageName);
- void bindClass(const SchemaClassKey* key);
- void bindClass(const char* packageName, const char* className);
-
- /*
- void startSync(const Query& query, void* context, SyncQuery& sync);
- void touchSync(SyncQuery& sync);
- void endSync(SyncQuery& sync);
- */
-
- private:
- friend class BrokerProxyImpl;
- ConsoleEngine* envelope;
- const ConsoleSettings& settings;
- mutable Mutex lock;
- deque<ConsoleEventImpl::Ptr> eventQueue;
- vector<BrokerProxyImpl*> brokerList;
- vector<pair<string, string> > bindingList; // exchange/key (empty exchange => QMF_EXCHANGE)
-
- // Declare a compare class for the class maps that compares the dereferenced
- // class key pointers. The default behavior would be to compare the pointer
- // addresses themselves.
- struct KeyCompare {
- bool operator()(const SchemaClassKeyImpl* left, const SchemaClassKeyImpl* right) const {
- return *left < *right;
- }
- };
-
- typedef map<const SchemaClassKeyImpl*, SchemaObjectClassImpl::Ptr, KeyCompare> ObjectClassList;
- typedef map<const SchemaClassKeyImpl*, SchemaEventClassImpl::Ptr, KeyCompare> EventClassList;
- typedef map<string, pair<ObjectClassList, EventClassList> > PackageList;
-
- PackageList packages;
-
- void learnPackage(const string& packageName);
- void learnClass(SchemaObjectClassImpl::Ptr cls);
- void learnClass(SchemaEventClassImpl::Ptr cls);
- bool haveClass(const SchemaClassKeyImpl& key) const;
- SchemaObjectClassImpl::Ptr getSchema(const SchemaClassKeyImpl& key) const;
- };
-}
-
-namespace {
- const char* QMF_EXCHANGE = "qpid.management";
- const char* DIR_EXCHANGE = "amq.direct";
- const char* BROKER_KEY = "broker";
- const char* BROKER_PACKAGE = "org.apache.qpid.broker";
- const char* AGENT_CLASS = "agent";
- const char* BROKER_AGENT_KEY = "agent.1.0";
-}
-
-const Object* QueryResponseImpl::getObject(uint32_t idx) const
-{
- vector<ObjectImpl::Ptr>::const_iterator iter = results.begin();
-
- while (idx > 0) {
- if (iter == results.end())
- return 0;
- iter++;
- idx--;
- }
-
- return (*iter)->envelope;
-}
-
-#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
-
-ConsoleEvent ConsoleEventImpl::copy()
-{
- ConsoleEvent item;
-
- ::memset(&item, 0, sizeof(ConsoleEvent));
- item.kind = kind;
- item.agent = agent.get() ? agent->envelope : 0;
- item.classKey = classKey.get();
- item.object = object;
- item.context = context;
- item.event = event;
- item.timestamp = timestamp;
- item.methodHandle = methodHandle;
- item.methodResponse = methodResponse.get() ? methodResponse->envelope : 0;
-
- STRING_REF(name);
-
- return item;
-}
-
-BrokerEvent BrokerEventImpl::copy()
-{
- BrokerEvent item;
-
- ::memset(&item, 0, sizeof(BrokerEvent));
- item.kind = kind;
-
- STRING_REF(name);
- STRING_REF(exchange);
- STRING_REF(bindingKey);
- item.context = context;
- item.queryResponse = queryResponse.get() ? queryResponse->envelope : 0;
-
- return item;
-}
-
-BrokerProxyImpl::BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console) :
- envelope(e), console(_console.impl)
-{
- stringstream qn;
- qpid::TcpAddress addr;
-
- SystemInfo::getLocalHostname(addr);
- qn << "qmfc-" << SystemInfo::getProcessName() << "-" << addr << "-" << SystemInfo::getProcessId();
- queueName = qn.str();
-
- seqMgr.setUnsolicitedContext(SequenceContext::Ptr(new StaticContext(*this)));
-}
-
-void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/)
-{
- Mutex::ScopedLock _lock(lock);
- agentList.clear();
- eventQueue.clear();
- xmtQueue.clear();
- eventQueue.push_back(eventDeclareQueue(queueName));
- eventQueue.push_back(eventBind(DIR_EXCHANGE, queueName, queueName));
- eventQueue.push_back(eventSetupComplete());
-
- // TODO: Store session handle
-}
-
-void BrokerProxyImpl::sessionClosed()
-{
- Mutex::ScopedLock _lock(lock);
- agentList.clear();
- eventQueue.clear();
- xmtQueue.clear();
-}
-
-void BrokerProxyImpl::startProtocol()
-{
- Mutex::ScopedLock _lock(lock);
- char rawbuffer[512];
- Buffer buffer(rawbuffer, 512);
-
- agentList.push_back(AgentProxyImpl::Ptr(new AgentProxyImpl(console, this, 0, "Agent embedded in broker")));
-
- requestsOutstanding = 1;
- topicBound = false;
- uint32_t sequence(seqMgr.reserve());
- Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence);
- sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence);
-}
-
-void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
-{
- uint32_t length = buf.getPosition();
- MessageImpl::Ptr message(new MessageImpl);
-
- buf.reset();
- buf.getRawData(message->body, length);
- message->destination = destination;
- message->routingKey = routingKey;
- message->replyExchange = DIR_EXCHANGE;
- message->replyKey = queueName;
-
- xmtQueue.push_back(message);
-}
-
-void BrokerProxyImpl::handleRcvMessage(Message& message)
-{
- Buffer inBuffer(message.body, message.length);
- uint8_t opcode;
- uint32_t sequence;
-
- while (Protocol::checkHeader(inBuffer, &opcode, &sequence))
- seqMgr.dispatch(opcode, sequence, inBuffer);
-}
-
-bool BrokerProxyImpl::getXmtMessage(Message& item) const
-{
- Mutex::ScopedLock _lock(lock);
- if (xmtQueue.empty())
- return false;
- item = xmtQueue.front()->copy();
- return true;
-}
-
-void BrokerProxyImpl::popXmt()
-{
- Mutex::ScopedLock _lock(lock);
- if (!xmtQueue.empty())
- xmtQueue.pop_front();
-}
-
-bool BrokerProxyImpl::getEvent(BrokerEvent& event) const
-{
- Mutex::ScopedLock _lock(lock);
- if (eventQueue.empty())
- return false;
- event = eventQueue.front()->copy();
- return true;
-}
-
-void BrokerProxyImpl::popEvent()
-{
- Mutex::ScopedLock _lock(lock);
- if (!eventQueue.empty())
- eventQueue.pop_front();
-}
-
-uint32_t BrokerProxyImpl::agentCount() const
-{
- Mutex::ScopedLock _lock(lock);
- return agentList.size();
-}
-
-const AgentProxy* BrokerProxyImpl::getAgent(uint32_t idx) const
-{
- Mutex::ScopedLock _lock(lock);
- for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin();
- iter != agentList.end(); iter++)
- if (idx-- == 0)
- return (*iter)->envelope;
- return 0;
-}
-
-void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentProxy* agent)
-{
- SequenceContext::Ptr queryContext(new QueryContext(*this, context));
- Mutex::ScopedLock _lock(lock);
- if (agent != 0) {
- sendGetRequestLH(queryContext, query, agent->impl);
- } else {
- // TODO (optimization) only send queries to agents that have the requested class+package
- for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin();
- iter != agentList.end(); iter++) {
- sendGetRequestLH(queryContext, query, (*iter).get());
- }
- }
-}
-
-void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent)
-{
- stringstream key;
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve(queryContext));
-
- Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
- query.impl->encode(outBuffer);
- key << "agent.1." << agent->agentBank;
- sendBufferLH(outBuffer, QMF_EXCHANGE, key.str());
- QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << key.str());
-}
-
-void BrokerProxyImpl::addBinding(const string& exchange, const string& key)
-{
- eventQueue.push_back(eventBind(exchange, queueName, key));
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventDeclareQueue(const string& queueName)
-{
- BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::DECLARE_QUEUE));
- event->name = queueName;
- return event;
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventBind(const string& exchange, const string& queue, const string& key)
-{
- BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::BIND));
- event->name = queue;
- event->exchange = exchange;
- event->bindingKey = key;
-
- return event;
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventSetupComplete()
-{
- BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::SETUP_COMPLETE));
- return event;
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventStable()
-{
- BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::STABLE));
- return event;
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventQueryComplete(void* context, QueryResponseImpl::Ptr response)
-{
- BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::QUERY_COMPLETE));
- event->context = context;
- event->queryResponse = response;
- return event;
-}
-
-void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq)
-{
- brokerId.decode(inBuffer);
- QPID_LOG(trace, "RCVD BrokerResponse seq=" << seq << " brokerId=" << brokerId);
- Mutex::ScopedLock _lock(lock);
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve());
- incOutstandingLH();
- Protocol::encodeHeader(outBuffer, Protocol::OP_PACKAGE_REQUEST, sequence);
- sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT PackageRequest seq=" << sequence);
-}
-
-void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq)
-{
- string package;
-
- inBuffer.getShortString(package);
- QPID_LOG(trace, "RCVD PackageIndication seq=" << seq << " package=" << package);
- console->learnPackage(package);
-
- Mutex::ScopedLock _lock(lock);
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve());
- incOutstandingLH();
- Protocol::encodeHeader(outBuffer, Protocol::OP_CLASS_QUERY, sequence);
- outBuffer.putShortString(package);
- sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT ClassQuery seq=" << sequence << " package=" << package);
-}
-
-void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq)
-{
- string text;
- uint32_t code = inBuffer.getLong();
- inBuffer.getShortString(text);
- QPID_LOG(trace, "RCVD CommandComplete seq=" << seq << " code=" << code << " text=" << text);
-}
-
-void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq)
-{
- uint8_t kind = inBuffer.getOctet();
- SchemaClassKeyImpl classKey(inBuffer);
-
- QPID_LOG(trace, "RCVD ClassIndication seq=" << seq << " kind=" << (int) kind << " key=" << classKey.str());
-
- if (!console->haveClass(classKey)) {
- Mutex::ScopedLock _lock(lock);
- incOutstandingLH();
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve());
- Protocol::encodeHeader(outBuffer, Protocol::OP_SCHEMA_REQUEST, sequence);
- classKey.encode(outBuffer);
- sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT SchemaRequest seq=" << sequence <<" key=" << classKey.str());
- }
-}
-
-void BrokerProxyImpl::handleMethodResponse(Buffer& /*inBuffer*/, uint32_t /*seq*/)
-{
- // TODO
-}
-
-void BrokerProxyImpl::handleHeartbeatIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
-{
- // TODO
-}
-
-void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
-{
- // TODO
-}
-
-void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq)
-{
- SchemaObjectClassImpl::Ptr oClassPtr;
- SchemaEventClassImpl::Ptr eClassPtr;
- uint8_t kind = inBuffer.getOctet();
- const SchemaClassKeyImpl* key;
- if (kind == CLASS_OBJECT) {
- oClassPtr.reset(new SchemaObjectClassImpl(inBuffer));
- console->learnClass(oClassPtr);
- key = oClassPtr->getClassKey()->impl;
- QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->str());
-
- //
- // If we have just learned about the org.apache.qpid.broker:agent class, send a get
- // request for the current list of agents so we can have it on-hand before we declare
- // this session "stable".
- //
- if (key->getClassName() == AGENT_CLASS && key->getPackageName() == BROKER_PACKAGE) {
- Mutex::ScopedLock _lock(lock);
- incOutstandingLH();
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve());
- Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
- FieldTable ft;
- ft.setString("_class", AGENT_CLASS);
- ft.setString("_package", BROKER_PACKAGE);
- ft.encode(outBuffer);
- sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_AGENT_KEY);
- QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << BROKER_AGENT_KEY);
- }
- } else if (kind == CLASS_EVENT) {
- eClassPtr.reset(new SchemaEventClassImpl(inBuffer));
- console->learnClass(eClassPtr);
- key = eClassPtr->getClassKey()->impl;
- QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=event key=" << key->str());
- }
- else {
- QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind);
- }
-}
-
-ObjectImpl::Ptr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat)
-{
- SchemaClassKeyImpl classKey(inBuffer);
- QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey.str());
-
- SchemaObjectClassImpl::Ptr schema = console->getSchema(classKey);
- if (schema.get() == 0) {
- QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey.str());
- return ObjectImpl::Ptr();
- }
-
- return ObjectImpl::Ptr(new ObjectImpl(schema->envelope, inBuffer, prop, stat, true));
-}
-
-void BrokerProxyImpl::incOutstandingLH()
-{
- requestsOutstanding++;
-}
-
-void BrokerProxyImpl::decOutstanding()
-{
- Mutex::ScopedLock _lock(lock);
- requestsOutstanding--;
- if (requestsOutstanding == 0 && !topicBound) {
- topicBound = true;
- for (vector<pair<string, string> >::const_iterator iter = console->bindingList.begin();
- iter != console->bindingList.end(); iter++) {
- string exchange(iter->first.empty() ? QMF_EXCHANGE : iter->first);
- string key(iter->second);
- eventQueue.push_back(eventBind(exchange, queueName, key));
- }
- eventQueue.push_back(eventStable());
- }
-}
-
-MethodResponseImpl::MethodResponseImpl(Buffer& buf) : envelope(new MethodResponse(this))
-{
- string text;
-
- status = buf.getLong();
- buf.getMediumString(text);
- exception.reset(new Value(TYPE_LSTR));
- exception->setString(text.c_str());
-
- // TODO: Parse schema-specific output arguments.
- arguments.reset(new Value(TYPE_MAP));
-}
-
-bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
-{
- bool completeContext = false;
- if (opcode == Protocol::OP_BROKER_RESPONSE) {
- broker.handleBrokerResponse(buffer, sequence);
- completeContext = true;
- }
- else if (opcode == Protocol::OP_COMMAND_COMPLETE) {
- broker.handleCommandComplete(buffer, sequence);
- completeContext = true;
- }
- else if (opcode == Protocol::OP_SCHEMA_RESPONSE) {
- broker.handleSchemaResponse(buffer, sequence);
- completeContext = true;
- }
- else if (opcode == Protocol::OP_PACKAGE_INDICATION)
- broker.handlePackageIndication(buffer, sequence);
- else if (opcode == Protocol::OP_CLASS_INDICATION)
- broker.handleClassIndication(buffer, sequence);
- else if (opcode == Protocol::OP_HEARTBEAT_INDICATION)
- broker.handleHeartbeatIndication(buffer, sequence);
- else if (opcode == Protocol::OP_EVENT_INDICATION)
- broker.handleEventIndication(buffer, sequence);
- else if (opcode == Protocol::OP_PROPERTY_INDICATION)
- broker.handleObjectIndication(buffer, sequence, true, false);
- else if (opcode == Protocol::OP_STATISTIC_INDICATION)
- broker.handleObjectIndication(buffer, sequence, false, true);
- else if (opcode == Protocol::OP_OBJECT_INDICATION)
- broker.handleObjectIndication(buffer, sequence, true, true);
- else {
- QPID_LOG(trace, "StaticContext::handleMessage invalid opcode: " << opcode);
- completeContext = true;
- }
-
- return completeContext;
-}
-
-void QueryContext::reserve()
-{
- Mutex::ScopedLock _lock(lock);
- requestsOutstanding++;
-}
-
-void QueryContext::release()
-{
- Mutex::ScopedLock _lock(lock);
- if (--requestsOutstanding == 0) {
- broker.eventQueue.push_back(broker.eventQueryComplete(userContext, queryResponse));
- }
-}
-
-bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
-{
- bool completeContext = false;
- ObjectImpl::Ptr object;
-
- if (opcode == Protocol::OP_COMMAND_COMPLETE) {
- broker.handleCommandComplete(buffer, sequence);
- completeContext = true;
- }
- else if (opcode == Protocol::OP_OBJECT_INDICATION) {
- object = broker.handleObjectIndication(buffer, sequence, true, true);
- if (object.get() != 0)
- queryResponse->results.push_back(object);
- }
- else {
- QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode);
- completeContext = true;
- }
-
- return completeContext;
-}
-
-ConsoleEngineImpl::ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& s) :
- envelope(e), settings(s)
-{
- bindingList.push_back(pair<string, string>(string(), "schema.#"));
- if (settings.rcvObjects && settings.rcvEvents && settings.rcvHeartbeats && !settings.userBindings) {
- bindingList.push_back(pair<string, string>(string(), "console.#"));
- } else {
- if (settings.rcvObjects && !settings.userBindings)
- bindingList.push_back(pair<string, string>(string(), "console.obj.#"));
- else
- bindingList.push_back(pair<string, string>(string(), "console.obj.*.*.org.apache.qpid.broker.agent"));
- if (settings.rcvEvents)
- bindingList.push_back(pair<string, string>(string(), "console.event.#"));
- if (settings.rcvHeartbeats)
- bindingList.push_back(pair<string, string>(string(), "console.heartbeat.#"));
- }
-}
-
-ConsoleEngineImpl::~ConsoleEngineImpl()
-{
- // This function intentionally left blank.
-}
-
-bool ConsoleEngineImpl::getEvent(ConsoleEvent& event) const
-{
- Mutex::ScopedLock _lock(lock);
- if (eventQueue.empty())
- return false;
- event = eventQueue.front()->copy();
- return true;
-}
-
-void ConsoleEngineImpl::popEvent()
-{
- Mutex::ScopedLock _lock(lock);
- if (!eventQueue.empty())
- eventQueue.pop_front();
-}
-
-void ConsoleEngineImpl::addConnection(BrokerProxy& broker, void* /*context*/)
-{
- Mutex::ScopedLock _lock(lock);
- brokerList.push_back(broker.impl);
-}
-
-void ConsoleEngineImpl::delConnection(BrokerProxy& broker)
-{
- Mutex::ScopedLock _lock(lock);
- for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
- iter != brokerList.end(); iter++)
- if (*iter == broker.impl) {
- brokerList.erase(iter);
- break;
- }
-}
-
-uint32_t ConsoleEngineImpl::packageCount() const
-{
- Mutex::ScopedLock _lock(lock);
- return packages.size();
-}
-
-const string& ConsoleEngineImpl::getPackageName(uint32_t idx) const
-{
- const static string empty;
-
- Mutex::ScopedLock _lock(lock);
- if (idx >= packages.size())
- return empty;
-
- PackageList::const_iterator iter = packages.begin();
- for (uint32_t i = 0; i < idx; i++) iter++;
- return iter->first;
-}
-
-uint32_t ConsoleEngineImpl::classCount(const char* packageName) const
-{
- Mutex::ScopedLock _lock(lock);
- PackageList::const_iterator pIter = packages.find(packageName);
- if (pIter == packages.end())
- return 0;
-
- const ObjectClassList& oList = pIter->second.first;
- const EventClassList& eList = pIter->second.second;
-
- return oList.size() + eList.size();
-}
-
-const SchemaClassKey* ConsoleEngineImpl::getClass(const char* packageName, uint32_t idx) const
-{
- Mutex::ScopedLock _lock(lock);
- PackageList::const_iterator pIter = packages.find(packageName);
- if (pIter == packages.end())
- return 0;
-
- const ObjectClassList& oList = pIter->second.first;
- const EventClassList& eList = pIter->second.second;
- uint32_t count = 0;
-
- for (ObjectClassList::const_iterator oIter = oList.begin();
- oIter != oList.end(); oIter++) {
- if (count == idx)
- return oIter->second->getClassKey();
- count++;
- }
-
- for (EventClassList::const_iterator eIter = eList.begin();
- eIter != eList.end(); eIter++) {
- if (count == idx)
- return eIter->second->getClassKey();
- count++;
- }
-
- return 0;
-}
-
-ClassKind ConsoleEngineImpl::getClassKind(const SchemaClassKey* key) const
-{
- Mutex::ScopedLock _lock(lock);
- PackageList::const_iterator pIter = packages.find(key->getPackageName());
- if (pIter == packages.end())
- return CLASS_OBJECT;
-
- const EventClassList& eList = pIter->second.second;
- if (eList.find(key->impl) != eList.end())
- return CLASS_EVENT;
- return CLASS_OBJECT;
-}
-
-const SchemaObjectClass* ConsoleEngineImpl::getObjectClass(const SchemaClassKey* key) const
-{
- Mutex::ScopedLock _lock(lock);
- PackageList::const_iterator pIter = packages.find(key->getPackageName());
- if (pIter == packages.end())
- return 0;
-
- const ObjectClassList& oList = pIter->second.first;
- ObjectClassList::const_iterator iter = oList.find(key->impl);
- if (iter == oList.end())
- return 0;
- return iter->second->envelope;
-}
-
-const SchemaEventClass* ConsoleEngineImpl::getEventClass(const SchemaClassKey* key) const
-{
- Mutex::ScopedLock _lock(lock);
- PackageList::const_iterator pIter = packages.find(key->getPackageName());
- if (pIter == packages.end())
- return 0;
-
- const EventClassList& eList = pIter->second.second;
- EventClassList::const_iterator iter = eList.find(key->impl);
- if (iter == eList.end())
- return 0;
- return iter->second->envelope;
-}
-
-void ConsoleEngineImpl::bindPackage(const char* packageName)
-{
- stringstream key;
- key << "console.obj.*.*." << packageName << ".#";
- Mutex::ScopedLock _lock(lock);
- bindingList.push_back(pair<string, string>(string(), key.str()));
- for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
- iter != brokerList.end(); iter++)
- (*iter)->addBinding(QMF_EXCHANGE, key.str());
-}
-
-void ConsoleEngineImpl::bindClass(const SchemaClassKey* classKey)
-{
- stringstream key;
- key << "console.obj.*.*." << classKey->getPackageName() << "." << classKey->getClassName() << ".#";
- Mutex::ScopedLock _lock(lock);
- bindingList.push_back(pair<string, string>(string(), key.str()));
- for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
- iter != brokerList.end(); iter++)
- (*iter)->addBinding(QMF_EXCHANGE, key.str());
-}
-
-void ConsoleEngineImpl::bindClass(const char* packageName, const char* className)
-{
- stringstream key;
- key << "console.obj.*.*." << packageName << "." << className << ".#";
- Mutex::ScopedLock _lock(lock);
- bindingList.push_back(pair<string, string>(string(), key.str()));
- for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
- iter != brokerList.end(); iter++)
- (*iter)->addBinding(QMF_EXCHANGE, key.str());
-}
-
-/*
-void ConsoleEngineImpl::startSync(const Query& query, void* context, SyncQuery& sync)
-{
-}
-
-void ConsoleEngineImpl::touchSync(SyncQuery& sync)
-{
-}
-
-void ConsoleEngineImpl::endSync(SyncQuery& sync)
-{
-}
-*/
-
-void ConsoleEngineImpl::learnPackage(const string& packageName)
-{
- Mutex::ScopedLock _lock(lock);
- if (packages.find(packageName) == packages.end())
- packages.insert(pair<string, pair<ObjectClassList, EventClassList> >
- (packageName, pair<ObjectClassList, EventClassList>(ObjectClassList(), EventClassList())));
-}
-
-void ConsoleEngineImpl::learnClass(SchemaObjectClassImpl::Ptr cls)
-{
- Mutex::ScopedLock _lock(lock);
- const SchemaClassKey* key = cls->getClassKey();
- PackageList::iterator pIter = packages.find(key->getPackageName());
- if (pIter == packages.end())
- return;
-
- ObjectClassList& list = pIter->second.first;
- if (list.find(key->impl) == list.end())
- list[key->impl] = cls;
-}
-
-void ConsoleEngineImpl::learnClass(SchemaEventClassImpl::Ptr cls)
-{
- Mutex::ScopedLock _lock(lock);
- const SchemaClassKey* key = cls->getClassKey();
- PackageList::iterator pIter = packages.find(key->getPackageName());
- if (pIter == packages.end())
- return;
-
- EventClassList& list = pIter->second.second;
- if (list.find(key->impl) == list.end())
- list[key->impl] = cls;
-}
-
-bool ConsoleEngineImpl::haveClass(const SchemaClassKeyImpl& key) const
-{
- Mutex::ScopedLock _lock(lock);
- PackageList::const_iterator pIter = packages.find(key.getPackageName());
- if (pIter == packages.end())
- return false;
-
- const ObjectClassList& oList = pIter->second.first;
- const EventClassList& eList = pIter->second.second;
-
- return oList.find(&key) != oList.end() || eList.find(&key) != eList.end();
-}
-
-SchemaObjectClassImpl::Ptr ConsoleEngineImpl::getSchema(const SchemaClassKeyImpl& key) const
-{
- Mutex::ScopedLock _lock(lock);
- PackageList::const_iterator pIter = packages.find(key.getPackageName());
- if (pIter == packages.end())
- return SchemaObjectClassImpl::Ptr();
-
- const ObjectClassList& oList = pIter->second.first;
- ObjectClassList::const_iterator iter = oList.find(&key);
- if (iter == oList.end())
- return SchemaObjectClassImpl::Ptr();
-
- return iter->second;
-}
-
-//==================================================================
-// Wrappers
-//==================================================================
-
-AgentProxy::AgentProxy(AgentProxyImpl* i) : impl(i) {}
-AgentProxy::~AgentProxy() { delete impl; }
-const char* AgentProxy::getLabel() const { return impl->getLabel().c_str(); }
-
-BrokerProxy::BrokerProxy(ConsoleEngine& console) : impl(new BrokerProxyImpl(this, console)) {}
-BrokerProxy::~BrokerProxy() { delete impl; }
-void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); }
-void BrokerProxy::sessionClosed() { impl->sessionClosed(); }
-void BrokerProxy::startProtocol() { impl->startProtocol(); }
-void BrokerProxy::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); }
-bool BrokerProxy::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); }
-void BrokerProxy::popXmt() { impl->popXmt(); }
-bool BrokerProxy::getEvent(BrokerEvent& event) const { return impl->getEvent(event); }
-void BrokerProxy::popEvent() { impl->popEvent(); }
-uint32_t BrokerProxy::agentCount() const { return impl->agentCount(); }
-const AgentProxy* BrokerProxy::getAgent(uint32_t idx) const { return impl->getAgent(idx); }
-void BrokerProxy::sendQuery(const Query& query, void* context, const AgentProxy* agent) { impl->sendQuery(query, context, agent); }
-
-MethodResponse::MethodResponse(MethodResponseImpl* i) : impl(i) {}
-MethodResponse::~MethodResponse() {}
-uint32_t MethodResponse::getStatus() const { return impl->getStatus(); }
-const Value* MethodResponse::getException() const { return impl->getException(); }
-const Value* MethodResponse::getArgs() const { return impl->getArgs(); }
-
-QueryResponse::QueryResponse(QueryResponseImpl* i) : impl(i) {}
-QueryResponse::~QueryResponse() {}
-uint32_t QueryResponse::getStatus() const { return impl->getStatus(); }
-const Value* QueryResponse::getException() const { return impl->getException(); }
-uint32_t QueryResponse::getObjectCount() const { return impl->getObjectCount(); }
-const Object* QueryResponse::getObject(uint32_t idx) const { return impl->getObject(idx); }
-
-ConsoleEngine::ConsoleEngine(const ConsoleSettings& settings) : impl(new ConsoleEngineImpl(this, settings)) {}
-ConsoleEngine::~ConsoleEngine() { delete impl; }
-bool ConsoleEngine::getEvent(ConsoleEvent& event) const { return impl->getEvent(event); }
-void ConsoleEngine::popEvent() { impl->popEvent(); }
-void ConsoleEngine::addConnection(BrokerProxy& broker, void* context) { impl->addConnection(broker, context); }
-void ConsoleEngine::delConnection(BrokerProxy& broker) { impl->delConnection(broker); }
-uint32_t ConsoleEngine::packageCount() const { return impl->packageCount(); }
-const char* ConsoleEngine::getPackageName(uint32_t idx) const { return impl->getPackageName(idx).c_str(); }
-uint32_t ConsoleEngine::classCount(const char* packageName) const { return impl->classCount(packageName); }
-const SchemaClassKey* ConsoleEngine::getClass(const char* packageName, uint32_t idx) const { return impl->getClass(packageName, idx); }
-ClassKind ConsoleEngine::getClassKind(const SchemaClassKey* key) const { return impl->getClassKind(key); }
-const SchemaObjectClass* ConsoleEngine::getObjectClass(const SchemaClassKey* key) const { return impl->getObjectClass(key); }
-const SchemaEventClass* ConsoleEngine::getEventClass(const SchemaClassKey* key) const { return impl->getEventClass(key); }
-void ConsoleEngine::bindPackage(const char* packageName) { impl->bindPackage(packageName); }
-void ConsoleEngine::bindClass(const SchemaClassKey* key) { impl->bindClass(key); }
-void ConsoleEngine::bindClass(const char* packageName, const char* className) { impl->bindClass(packageName, className); }
-//void ConsoleEngine::startSync(const Query& query, void* context, SyncQuery& sync) { impl->startSync(query, context, sync); }
-//void ConsoleEngine::touchSync(SyncQuery& sync) { impl->touchSync(sync); }
-//void ConsoleEngine::endSync(SyncQuery& sync) { impl->endSync(sync); }
-
-
diff --git a/cpp/src/qmf/ConsoleEngine.h b/cpp/src/qmf/ConsoleEngine.h
index 457e83ad58..f04bbcea47 100644
--- a/cpp/src/qmf/ConsoleEngine.h
+++ b/cpp/src/qmf/ConsoleEngine.h
@@ -46,6 +46,7 @@ namespace qmf {
class MethodResponse {
public:
MethodResponse(MethodResponseImpl* impl);
+ MethodResponse(const MethodResponse& from);
~MethodResponse();
uint32_t getStatus() const;
const Value* getException() const;
@@ -84,8 +85,7 @@ namespace qmf {
NEW_CLASS = 4,
OBJECT_UPDATE = 5,
EVENT_RECEIVED = 7,
- AGENT_HEARTBEAT = 8,
- METHOD_RESPONSE = 9
+ AGENT_HEARTBEAT = 8
};
EventKind kind;
@@ -96,8 +96,6 @@ namespace qmf {
void* context; // (OBJECT_UPDATE)
Event* event; // (EVENT_RECEIVED)
uint64_t timestamp; // (AGENT_HEARTBEAT)
- uint32_t methodHandle; // (METHOD_RESPONSE)
- MethodResponse* methodResponse; // (METHOD_RESPONSE)
QueryResponse* queryResponse; // (QUERY_COMPLETE)
};
@@ -113,15 +111,17 @@ namespace qmf {
UNBIND = 14,
SETUP_COMPLETE = 15,
STABLE = 16,
- QUERY_COMPLETE = 17
+ QUERY_COMPLETE = 17,
+ METHOD_RESPONSE = 18
};
EventKind kind;
- char* name; // ([DECLARE|DELETE]_QUEUE, [UN]BIND)
- char* exchange; // ([UN]BIND)
- char* bindingKey; // ([UN]BIND)
- void* context; // (QUERY_COMPLETE)
- QueryResponse* queryResponse; // (QUERY_COMPLETE)
+ char* name; // ([DECLARE|DELETE]_QUEUE, [UN]BIND)
+ char* exchange; // ([UN]BIND)
+ char* bindingKey; // ([UN]BIND)
+ void* context; // (QUERY_COMPLETE, METHOD_RESPONSE)
+ QueryResponse* queryResponse; // (QUERY_COMPLETE)
+ MethodResponse* methodResponse; // (METHOD_RESPONSE)
};
/**
diff --git a/cpp/src/qmf/ConsoleEngineImpl.cpp b/cpp/src/qmf/ConsoleEngineImpl.cpp
new file mode 100644
index 0000000000..d71bf93105
--- /dev/null
+++ b/cpp/src/qmf/ConsoleEngineImpl.cpp
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/ConsoleEngineImpl.h"
+#include "qmf/MessageImpl.h"
+#include "qmf/SchemaImpl.h"
+#include "qmf/Typecode.h"
+#include "qmf/ObjectImpl.h"
+#include "qmf/ObjectIdImpl.h"
+#include "qmf/QueryImpl.h"
+#include "qmf/ValueImpl.h"
+#include "qmf/Protocol.h"
+#include "qmf/SequenceManager.h"
+#include "qmf/BrokerProxyImpl.h"
+#include <qpid/framing/Buffer.h>
+#include <qpid/framing/Uuid.h>
+#include <qpid/framing/FieldTable.h>
+#include <qpid/framing/FieldValue.h>
+#include <qpid/log/Statement.h>
+#include <qpid/sys/Time.h>
+#include <qpid/sys/SystemInfo.h>
+#include <string.h>
+#include <iostream>
+#include <fstream>
+
+using namespace std;
+using namespace qmf;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+namespace {
+ const char* QMF_EXCHANGE = "qpid.management";
+}
+
+#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
+
+ConsoleEvent ConsoleEventImpl::copy()
+{
+ ConsoleEvent item;
+
+ ::memset(&item, 0, sizeof(ConsoleEvent));
+ item.kind = kind;
+ item.agent = agent.get() ? agent->envelope : 0;
+ item.classKey = classKey.get();
+ item.object = object;
+ item.context = context;
+ item.event = event;
+ item.timestamp = timestamp;
+
+ STRING_REF(name);
+
+ return item;
+}
+
+ConsoleEngineImpl::ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& s) :
+ envelope(e), settings(s)
+{
+ bindingList.push_back(pair<string, string>(string(), "schema.#"));
+ if (settings.rcvObjects && settings.rcvEvents && settings.rcvHeartbeats && !settings.userBindings) {
+ bindingList.push_back(pair<string, string>(string(), "console.#"));
+ } else {
+ if (settings.rcvObjects && !settings.userBindings)
+ bindingList.push_back(pair<string, string>(string(), "console.obj.#"));
+ else
+ bindingList.push_back(pair<string, string>(string(), "console.obj.*.*.org.apache.qpid.broker.agent"));
+ if (settings.rcvEvents)
+ bindingList.push_back(pair<string, string>(string(), "console.event.#"));
+ if (settings.rcvHeartbeats)
+ bindingList.push_back(pair<string, string>(string(), "console.heartbeat.#"));
+ }
+}
+
+ConsoleEngineImpl::~ConsoleEngineImpl()
+{
+ // This function intentionally left blank.
+}
+
+bool ConsoleEngineImpl::getEvent(ConsoleEvent& event) const
+{
+ Mutex::ScopedLock _lock(lock);
+ if (eventQueue.empty())
+ return false;
+ event = eventQueue.front()->copy();
+ return true;
+}
+
+void ConsoleEngineImpl::popEvent()
+{
+ Mutex::ScopedLock _lock(lock);
+ if (!eventQueue.empty())
+ eventQueue.pop_front();
+}
+
+void ConsoleEngineImpl::addConnection(BrokerProxy& broker, void* /*context*/)
+{
+ Mutex::ScopedLock _lock(lock);
+ brokerList.push_back(broker.impl);
+}
+
+void ConsoleEngineImpl::delConnection(BrokerProxy& broker)
+{
+ Mutex::ScopedLock _lock(lock);
+ for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
+ iter != brokerList.end(); iter++)
+ if (*iter == broker.impl) {
+ brokerList.erase(iter);
+ break;
+ }
+}
+
+uint32_t ConsoleEngineImpl::packageCount() const
+{
+ Mutex::ScopedLock _lock(lock);
+ return packages.size();
+}
+
+const string& ConsoleEngineImpl::getPackageName(uint32_t idx) const
+{
+ const static string empty;
+
+ Mutex::ScopedLock _lock(lock);
+ if (idx >= packages.size())
+ return empty;
+
+ PackageList::const_iterator iter = packages.begin();
+ for (uint32_t i = 0; i < idx; i++) iter++;
+ return iter->first;
+}
+
+uint32_t ConsoleEngineImpl::classCount(const char* packageName) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(packageName);
+ if (pIter == packages.end())
+ return 0;
+
+ const ObjectClassList& oList = pIter->second.first;
+ const EventClassList& eList = pIter->second.second;
+
+ return oList.size() + eList.size();
+}
+
+const SchemaClassKey* ConsoleEngineImpl::getClass(const char* packageName, uint32_t idx) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(packageName);
+ if (pIter == packages.end())
+ return 0;
+
+ const ObjectClassList& oList = pIter->second.first;
+ const EventClassList& eList = pIter->second.second;
+ uint32_t count = 0;
+
+ for (ObjectClassList::const_iterator oIter = oList.begin();
+ oIter != oList.end(); oIter++) {
+ if (count == idx)
+ return oIter->second->getClassKey();
+ count++;
+ }
+
+ for (EventClassList::const_iterator eIter = eList.begin();
+ eIter != eList.end(); eIter++) {
+ if (count == idx)
+ return eIter->second->getClassKey();
+ count++;
+ }
+
+ return 0;
+}
+
+ClassKind ConsoleEngineImpl::getClassKind(const SchemaClassKey* key) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return CLASS_OBJECT;
+
+ const EventClassList& eList = pIter->second.second;
+ if (eList.find(key->impl) != eList.end())
+ return CLASS_EVENT;
+ return CLASS_OBJECT;
+}
+
+const SchemaObjectClass* ConsoleEngineImpl::getObjectClass(const SchemaClassKey* key) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return 0;
+
+ const ObjectClassList& oList = pIter->second.first;
+ ObjectClassList::const_iterator iter = oList.find(key->impl);
+ if (iter == oList.end())
+ return 0;
+ return iter->second->envelope;
+}
+
+const SchemaEventClass* ConsoleEngineImpl::getEventClass(const SchemaClassKey* key) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return 0;
+
+ const EventClassList& eList = pIter->second.second;
+ EventClassList::const_iterator iter = eList.find(key->impl);
+ if (iter == eList.end())
+ return 0;
+ return iter->second->envelope;
+}
+
+void ConsoleEngineImpl::bindPackage(const char* packageName)
+{
+ stringstream key;
+ key << "console.obj.*.*." << packageName << ".#";
+ Mutex::ScopedLock _lock(lock);
+ bindingList.push_back(pair<string, string>(string(), key.str()));
+ for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
+ iter != brokerList.end(); iter++)
+ (*iter)->addBinding(QMF_EXCHANGE, key.str());
+}
+
+void ConsoleEngineImpl::bindClass(const SchemaClassKey* classKey)
+{
+ stringstream key;
+ key << "console.obj.*.*." << classKey->getPackageName() << "." << classKey->getClassName() << ".#";
+ Mutex::ScopedLock _lock(lock);
+ bindingList.push_back(pair<string, string>(string(), key.str()));
+ for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
+ iter != brokerList.end(); iter++)
+ (*iter)->addBinding(QMF_EXCHANGE, key.str());
+}
+
+void ConsoleEngineImpl::bindClass(const char* packageName, const char* className)
+{
+ stringstream key;
+ key << "console.obj.*.*." << packageName << "." << className << ".#";
+ Mutex::ScopedLock _lock(lock);
+ bindingList.push_back(pair<string, string>(string(), key.str()));
+ for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
+ iter != brokerList.end(); iter++)
+ (*iter)->addBinding(QMF_EXCHANGE, key.str());
+}
+
+/*
+void ConsoleEngineImpl::startSync(const Query& query, void* context, SyncQuery& sync)
+{
+}
+
+void ConsoleEngineImpl::touchSync(SyncQuery& sync)
+{
+}
+
+void ConsoleEngineImpl::endSync(SyncQuery& sync)
+{
+}
+*/
+
+void ConsoleEngineImpl::learnPackage(const string& packageName)
+{
+ Mutex::ScopedLock _lock(lock);
+ if (packages.find(packageName) == packages.end())
+ packages.insert(pair<string, pair<ObjectClassList, EventClassList> >
+ (packageName, pair<ObjectClassList, EventClassList>(ObjectClassList(), EventClassList())));
+}
+
+void ConsoleEngineImpl::learnClass(SchemaObjectClassImpl::Ptr cls)
+{
+ Mutex::ScopedLock _lock(lock);
+ const SchemaClassKey* key = cls->getClassKey();
+ PackageList::iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return;
+
+ ObjectClassList& list = pIter->second.first;
+ if (list.find(key->impl) == list.end())
+ list[key->impl] = cls;
+}
+
+void ConsoleEngineImpl::learnClass(SchemaEventClassImpl::Ptr cls)
+{
+ Mutex::ScopedLock _lock(lock);
+ const SchemaClassKey* key = cls->getClassKey();
+ PackageList::iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return;
+
+ EventClassList& list = pIter->second.second;
+ if (list.find(key->impl) == list.end())
+ list[key->impl] = cls;
+}
+
+bool ConsoleEngineImpl::haveClass(const SchemaClassKeyImpl& key) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(key.getPackageName());
+ if (pIter == packages.end())
+ return false;
+
+ const ObjectClassList& oList = pIter->second.first;
+ const EventClassList& eList = pIter->second.second;
+
+ return oList.find(&key) != oList.end() || eList.find(&key) != eList.end();
+}
+
+SchemaObjectClassImpl::Ptr ConsoleEngineImpl::getSchema(const SchemaClassKeyImpl& key) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(key.getPackageName());
+ if (pIter == packages.end())
+ return SchemaObjectClassImpl::Ptr();
+
+ const ObjectClassList& oList = pIter->second.first;
+ ObjectClassList::const_iterator iter = oList.find(&key);
+ if (iter == oList.end())
+ return SchemaObjectClassImpl::Ptr();
+
+ return iter->second;
+}
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+ConsoleEngine::ConsoleEngine(const ConsoleSettings& settings) : impl(new ConsoleEngineImpl(this, settings)) {}
+ConsoleEngine::~ConsoleEngine() { delete impl; }
+bool ConsoleEngine::getEvent(ConsoleEvent& event) const { return impl->getEvent(event); }
+void ConsoleEngine::popEvent() { impl->popEvent(); }
+void ConsoleEngine::addConnection(BrokerProxy& broker, void* context) { impl->addConnection(broker, context); }
+void ConsoleEngine::delConnection(BrokerProxy& broker) { impl->delConnection(broker); }
+uint32_t ConsoleEngine::packageCount() const { return impl->packageCount(); }
+const char* ConsoleEngine::getPackageName(uint32_t idx) const { return impl->getPackageName(idx).c_str(); }
+uint32_t ConsoleEngine::classCount(const char* packageName) const { return impl->classCount(packageName); }
+const SchemaClassKey* ConsoleEngine::getClass(const char* packageName, uint32_t idx) const { return impl->getClass(packageName, idx); }
+ClassKind ConsoleEngine::getClassKind(const SchemaClassKey* key) const { return impl->getClassKind(key); }
+const SchemaObjectClass* ConsoleEngine::getObjectClass(const SchemaClassKey* key) const { return impl->getObjectClass(key); }
+const SchemaEventClass* ConsoleEngine::getEventClass(const SchemaClassKey* key) const { return impl->getEventClass(key); }
+void ConsoleEngine::bindPackage(const char* packageName) { impl->bindPackage(packageName); }
+void ConsoleEngine::bindClass(const SchemaClassKey* key) { impl->bindClass(key); }
+void ConsoleEngine::bindClass(const char* packageName, const char* className) { impl->bindClass(packageName, className); }
+//void ConsoleEngine::startSync(const Query& query, void* context, SyncQuery& sync) { impl->startSync(query, context, sync); }
+//void ConsoleEngine::touchSync(SyncQuery& sync) { impl->touchSync(sync); }
+//void ConsoleEngine::endSync(SyncQuery& sync) { impl->endSync(sync); }
+
+
diff --git a/cpp/src/qmf/ConsoleEngineImpl.h b/cpp/src/qmf/ConsoleEngineImpl.h
new file mode 100644
index 0000000000..b95cb7523a
--- /dev/null
+++ b/cpp/src/qmf/ConsoleEngineImpl.h
@@ -0,0 +1,133 @@
+#ifndef _QmfConsoleEngineImpl_
+#define _QmfConsoleEngineImpl_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/ConsoleEngine.h"
+#include "qmf/MessageImpl.h"
+#include "qmf/SchemaImpl.h"
+#include "qmf/Typecode.h"
+#include "qmf/ObjectImpl.h"
+#include "qmf/ObjectIdImpl.h"
+#include "qmf/QueryImpl.h"
+#include "qmf/ValueImpl.h"
+#include "qmf/Protocol.h"
+#include "qmf/SequenceManager.h"
+#include "qmf/BrokerProxyImpl.h"
+#include <qpid/framing/Buffer.h>
+#include <qpid/framing/Uuid.h>
+#include <qpid/framing/FieldTable.h>
+#include <qpid/framing/FieldValue.h>
+#include <qpid/sys/Mutex.h>
+#include <qpid/sys/Time.h>
+#include <qpid/sys/SystemInfo.h>
+#include <string.h>
+#include <string>
+#include <deque>
+#include <map>
+#include <vector>
+#include <iostream>
+#include <fstream>
+#include <boost/shared_ptr.hpp>
+
+namespace qmf {
+
+ struct ConsoleEventImpl {
+ typedef boost::shared_ptr<ConsoleEventImpl> Ptr;
+ ConsoleEvent::EventKind kind;
+ boost::shared_ptr<AgentProxyImpl> agent;
+ std::string name;
+ boost::shared_ptr<SchemaClassKey> classKey;
+ Object* object;
+ void* context;
+ Event* event;
+ uint64_t timestamp;
+
+ ConsoleEventImpl(ConsoleEvent::EventKind k) :
+ kind(k), object(0), context(0), event(0), timestamp(0) {}
+ ~ConsoleEventImpl() {}
+ ConsoleEvent copy();
+ };
+
+ class ConsoleEngineImpl {
+ public:
+ ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& settings = ConsoleSettings());
+ ~ConsoleEngineImpl();
+
+ bool getEvent(ConsoleEvent& event) const;
+ void popEvent();
+
+ void addConnection(BrokerProxy& broker, void* context);
+ void delConnection(BrokerProxy& broker);
+
+ uint32_t packageCount() const;
+ const std::string& getPackageName(uint32_t idx) const;
+
+ uint32_t classCount(const char* packageName) const;
+ const SchemaClassKey* getClass(const char* packageName, uint32_t idx) const;
+
+ ClassKind getClassKind(const SchemaClassKey* key) const;
+ const SchemaObjectClass* getObjectClass(const SchemaClassKey* key) const;
+ const SchemaEventClass* getEventClass(const SchemaClassKey* key) const;
+
+ void bindPackage(const char* packageName);
+ void bindClass(const SchemaClassKey* key);
+ void bindClass(const char* packageName, const char* className);
+
+ /*
+ void startSync(const Query& query, void* context, SyncQuery& sync);
+ void touchSync(SyncQuery& sync);
+ void endSync(SyncQuery& sync);
+ */
+
+ private:
+ friend class BrokerProxyImpl;
+ ConsoleEngine* envelope;
+ const ConsoleSettings& settings;
+ mutable qpid::sys::Mutex lock;
+ std::deque<ConsoleEventImpl::Ptr> eventQueue;
+ std::vector<BrokerProxyImpl*> brokerList;
+ std::vector<std::pair<std::string, std::string> > bindingList; // exchange/key (empty exchange => QMF_EXCHANGE)
+
+ // Declare a compare class for the class maps that compares the dereferenced
+ // class key pointers. The default behavior would be to compare the pointer
+ // addresses themselves.
+ struct KeyCompare {
+ bool operator()(const SchemaClassKeyImpl* left, const SchemaClassKeyImpl* right) const {
+ return *left < *right;
+ }
+ };
+
+ typedef std::map<const SchemaClassKeyImpl*, SchemaObjectClassImpl::Ptr, KeyCompare> ObjectClassList;
+ typedef std::map<const SchemaClassKeyImpl*, SchemaEventClassImpl::Ptr, KeyCompare> EventClassList;
+ typedef std::map<std::string, std::pair<ObjectClassList, EventClassList> > PackageList;
+
+ PackageList packages;
+
+ void learnPackage(const std::string& packageName);
+ void learnClass(SchemaObjectClassImpl::Ptr cls);
+ void learnClass(SchemaEventClassImpl::Ptr cls);
+ bool haveClass(const SchemaClassKeyImpl& key) const;
+ SchemaObjectClassImpl::Ptr getSchema(const SchemaClassKeyImpl& key) const;
+ };
+}
+
+#endif
+
diff --git a/cpp/src/qmf/Object.h b/cpp/src/qmf/Object.h
index 9cb3224d9b..db89dbb700 100644
--- a/cpp/src/qmf/Object.h
+++ b/cpp/src/qmf/Object.h
@@ -39,6 +39,7 @@ namespace qmf {
void setObjectId(ObjectId* oid);
const SchemaObjectClass* getClass() const;
Value* getValue(char* key) const;
+ void invokeMethod(const char* methodName, const Value* inArgs, void* context) const;
ObjectImpl* impl;
};
diff --git a/cpp/src/qmf/ObjectImpl.cpp b/cpp/src/qmf/ObjectImpl.cpp
index 1ea2d54527..216a9d883e 100644
--- a/cpp/src/qmf/ObjectImpl.cpp
+++ b/cpp/src/qmf/ObjectImpl.cpp
@@ -19,6 +19,7 @@
#include "qmf/ObjectImpl.h"
#include "qmf/ValueImpl.h"
+#include "qmf/BrokerProxyImpl.h"
#include <qpid/sys/Time.h>
using namespace std;
@@ -27,7 +28,7 @@ using namespace qpid::sys;
using qpid::framing::Buffer;
ObjectImpl::ObjectImpl(Object* e, const SchemaObjectClass* type) :
- envelope(e), objectClass(type), createTime(uint64_t(Duration(now()))),
+ envelope(e), objectClass(type), broker(0), createTime(uint64_t(Duration(now()))),
destroyTime(0), lastUpdatedTime(createTime)
{
int propCount = objectClass->getPropertyCount();
@@ -45,8 +46,8 @@ ObjectImpl::ObjectImpl(Object* e, const SchemaObjectClass* type) :
}
}
-ObjectImpl::ObjectImpl(const SchemaObjectClass* type, Buffer& buffer, bool prop, bool stat, bool managed) :
- envelope(new Object(this)), objectClass(type), createTime(0), destroyTime(0), lastUpdatedTime(0)
+ObjectImpl::ObjectImpl(const SchemaObjectClass* type, BrokerProxyImpl* b, Buffer& buffer, bool prop, bool stat, bool managed) :
+ envelope(new Object(this)), objectClass(type), broker(b), createTime(0), destroyTime(0), lastUpdatedTime(0)
{
int idx;
@@ -107,6 +108,12 @@ Value* ObjectImpl::getValue(const string& key) const
return 0;
}
+void ObjectImpl::invokeMethod(const string& methodName, const Value* inArgs, void* context) const
+{
+ if (broker != 0 && objectId.get() != 0)
+ broker->sendMethodRequest(objectId.get(), objectClass, methodName, inArgs, context);
+}
+
void ObjectImpl::parsePresenceMasks(Buffer& buffer, set<string>& excludeList)
{
int propCount = objectClass->getPropertyCount();
@@ -205,4 +212,5 @@ const ObjectId* Object::getObjectId() const { return impl->getObjectId(); }
void Object::setObjectId(ObjectId* oid) { impl->setObjectId(oid); }
const SchemaObjectClass* Object::getClass() const { return impl->getClass(); }
Value* Object::getValue(char* key) const { return impl->getValue(key); }
+void Object::invokeMethod(const char* m, const Value* a, void* c) const { impl->invokeMethod(m, a, c); }
diff --git a/cpp/src/qmf/ObjectImpl.h b/cpp/src/qmf/ObjectImpl.h
index d69979e0da..4ac5a31b54 100644
--- a/cpp/src/qmf/ObjectImpl.h
+++ b/cpp/src/qmf/ObjectImpl.h
@@ -31,11 +31,14 @@
namespace qmf {
+ class BrokerProxyImpl;
+
struct ObjectImpl {
typedef boost::shared_ptr<ObjectImpl> Ptr;
typedef boost::shared_ptr<Value> ValuePtr;
Object* envelope;
const SchemaObjectClass* objectClass;
+ BrokerProxyImpl* broker;
boost::shared_ptr<ObjectIdImpl> objectId;
uint64_t createTime;
uint64_t destroyTime;
@@ -44,7 +47,8 @@ namespace qmf {
mutable std::map<std::string, ValuePtr> statistics;
ObjectImpl(Object* e, const SchemaObjectClass* type);
- ObjectImpl(const SchemaObjectClass* type, qpid::framing::Buffer& buffer, bool prop, bool stat, bool managed);
+ ObjectImpl(const SchemaObjectClass* type, BrokerProxyImpl* b, qpid::framing::Buffer& buffer,
+ bool prop, bool stat, bool managed);
~ObjectImpl();
void destroy();
@@ -52,6 +56,7 @@ namespace qmf {
void setObjectId(ObjectId* oid) { objectId.reset(oid->impl); }
const SchemaObjectClass* getClass() const { return objectClass; }
Value* getValue(const std::string& key) const;
+ void invokeMethod(const std::string& methodName, const Value* inArgs, void* context) const;
void parsePresenceMasks(qpid::framing::Buffer& buffer, std::set<std::string>& excludeList);
void encodeSchemaKey(qpid::framing::Buffer& buffer) const;
diff --git a/cpp/src/qmf/Value.h b/cpp/src/qmf/Value.h
index bb946d31d3..2ec2149110 100644
--- a/cpp/src/qmf/Value.h
+++ b/cpp/src/qmf/Value.h
@@ -31,6 +31,7 @@ namespace qmf {
class Value {
public:
// Value();
+ Value(const Value& from);
Value(Typecode t, Typecode arrayType = TYPE_UINT8);
Value(ValueImpl* impl);
~Value();
diff --git a/cpp/src/qmf/ValueImpl.cpp b/cpp/src/qmf/ValueImpl.cpp
index f42c85eb33..b03d222835 100644
--- a/cpp/src/qmf/ValueImpl.cpp
+++ b/cpp/src/qmf/ValueImpl.cpp
@@ -67,6 +67,11 @@ ValueImpl::ValueImpl(Typecode t, Buffer& buf) : envelope(new Value(this)), typec
}
}
+ValueImpl::ValueImpl(Value* e, Typecode t, Typecode at) :
+ envelope(e), typecode(t), valid(false), arrayTypecode(at)
+{
+}
+
ValueImpl::ValueImpl(Typecode t) : envelope(new Value(this)), typecode(t)
{
::memset(&value, 0, sizeof(value));
@@ -186,293 +191,64 @@ void ValueImpl::deleteArrayItem(uint32_t)
// Wrappers
//==================================================================
-Value::Value(Typecode t, Typecode at)
-{
- impl = new ValueImpl(this, t, at);
-}
-
-Value::Value(ValueImpl* i)
-{
- impl = i;
-}
-
-Value::~Value()
-{
- delete impl;
-}
-
-Typecode Value::getType() const
-{
- return impl->getType();
-}
-
-bool Value::isNull() const
-{
- return impl->isNull();
-}
-
-void Value::setNull()
-{
- impl->setNull();
-}
-
-bool Value::isObjectId() const
-{
- return impl->isObjectId();
-}
-
-const ObjectId& Value::asObjectId() const
-{
- return impl->asObjectId();
-}
-
-void Value::setObjectId(const ObjectId& oid)
-{
- impl->setObjectId(oid);
-}
-
-bool Value::isUint() const
-{
- return impl->isUint();
-}
-
-uint32_t Value::asUint() const
-{
- return impl->asUint();
-}
-
-void Value::setUint(uint32_t val)
-{
- impl->setUint(val);
-}
-
-bool Value::isInt() const
-{
- return impl->isInt();
-}
-
-int32_t Value::asInt() const
-{
- return impl->asInt();
-}
-
-void Value::setInt(int32_t val)
-{
- impl->setInt(val);
-}
-
-bool Value::isUint64() const
-{
- return impl->isUint64();
-}
-
-uint64_t Value::asUint64() const
-{
- return impl->asUint64();
-}
-
-void Value::setUint64(uint64_t val)
-{
- impl->setUint64(val);
-}
-
-bool Value::isInt64() const
-{
- return impl->isInt64();
-}
-
-int64_t Value::asInt64() const
-{
- return impl->asInt64();
-}
-
-void Value::setInt64(int64_t val)
-{
- impl->setInt64(val);
-}
-
-bool Value::isString() const
-{
- return impl->isString();
-}
-
-const char* Value::asString() const
-{
- return impl->asString();
-}
-
-void Value::setString(const char* val)
-{
- impl->setString(val);
-}
-
-bool Value::isBool() const
-{
- return impl->isBool();
-}
-
-bool Value::asBool() const
-{
- return impl->asBool();
-}
-
-void Value::setBool(bool val)
-{
- impl->setBool(val);
-}
-
-bool Value::isFloat() const
-{
- return impl->isFloat();
-}
-
-float Value::asFloat() const
-{
- return impl->asFloat();
-}
-
-void Value::setFloat(float val)
-{
- impl->setFloat(val);
-}
-
-bool Value::isDouble() const
-{
- return impl->isDouble();
-}
-
-double Value::asDouble() const
-{
- return impl->asDouble();
-}
-
-void Value::setDouble(double val)
-{
- impl->setDouble(val);
-}
-
-bool Value::isUuid() const
-{
- return impl->isUuid();
-}
-
-const uint8_t* Value::asUuid() const
-{
- return impl->asUuid();
-}
-
-void Value::setUuid(const uint8_t* val)
-{
- impl->setUuid(val);
-}
-
-bool Value::isObject() const
-{
- return impl->isObject();
-}
-
-Object* Value::asObject() const
-{
- return impl->asObject();
-}
-
-void Value::setObject(Object* val)
-{
- impl->setObject(val);
-}
-
-bool Value::isMap() const
-{
- return impl->isMap();
-}
-
-bool Value::keyInMap(const char* key) const
-{
- return impl->keyInMap(key);
-}
-
-Value* Value::byKey(const char* key)
-{
- return impl->byKey(key);
-}
-
-const Value* Value::byKey(const char* key) const
-{
- return impl->byKey(key);
-}
-
-void Value::deleteKey(const char* key)
-{
- impl->deleteKey(key);
-}
-
-void Value::insert(const char* key, Value* val)
-{
- impl->insert(key, val);
-}
-
-uint32_t Value::keyCount() const
-{
- return impl->keyCount();
-}
-
-const char* Value::key(uint32_t idx) const
-{
- return impl->key(idx);
-}
-
-bool Value::isList() const
-{
- return impl->isList();
-}
-
-uint32_t Value::listItemCount() const
-{
- return impl->listItemCount();
-}
-
-Value* Value::listItem(uint32_t idx)
-{
- return impl->listItem(idx);
-}
-
-void Value::appendToList(Value* val)
-{
- impl->appendToList(val);
-}
-
-void Value::deleteListItem(uint32_t idx)
-{
- impl->deleteListItem(idx);
-}
-
-bool Value::isArray() const
-{
- return impl->isArray();
-}
-
-Typecode Value::arrayType() const
-{
- return impl->arrayType();
-}
-
-uint32_t Value::arrayItemCount() const
-{
- return impl->arrayItemCount();
-}
-
-Value* Value::arrayItem(uint32_t idx)
-{
- return impl->arrayItem(idx);
-}
-
-void Value::appendToArray(Value* val)
-{
- impl->appendToArray(val);
-}
-
-void Value::deleteArrayItem(uint32_t idx)
-{
- impl->deleteArrayItem(idx);
-}
+Value::Value(const Value& from) : impl(new ValueImpl(*(from.impl))) {}
+Value::Value(Typecode t, Typecode at) : impl(new ValueImpl(this, t, at)) {}
+Value::Value(ValueImpl* i) : impl(i) {}
+Value::~Value() { delete impl; }
+
+Typecode Value::getType() const { return impl->getType(); }
+bool Value::isNull() const { return impl->isNull(); }
+void Value::setNull() { impl->setNull(); }
+bool Value::isObjectId() const { return impl->isObjectId(); }
+const ObjectId& Value::asObjectId() const { return impl->asObjectId(); }
+void Value::setObjectId(const ObjectId& oid) { impl->setObjectId(oid); }
+bool Value::isUint() const { return impl->isUint(); }
+uint32_t Value::asUint() const { return impl->asUint(); }
+void Value::setUint(uint32_t val) { impl->setUint(val); }
+bool Value::isInt() const { return impl->isInt(); }
+int32_t Value::asInt() const { return impl->asInt(); }
+void Value::setInt(int32_t val) { impl->setInt(val); }
+bool Value::isUint64() const { return impl->isUint64(); }
+uint64_t Value::asUint64() const { return impl->asUint64(); }
+void Value::setUint64(uint64_t val) { impl->setUint64(val); }
+bool Value::isInt64() const { return impl->isInt64(); }
+int64_t Value::asInt64() const { return impl->asInt64(); }
+void Value::setInt64(int64_t val) { impl->setInt64(val); }
+bool Value::isString() const { return impl->isString(); }
+const char* Value::asString() const { return impl->asString(); }
+void Value::setString(const char* val) { impl->setString(val); }
+bool Value::isBool() const { return impl->isBool(); }
+bool Value::asBool() const { return impl->asBool(); }
+void Value::setBool(bool val) { impl->setBool(val); }
+bool Value::isFloat() const { return impl->isFloat(); }
+float Value::asFloat() const { return impl->asFloat(); }
+void Value::setFloat(float val) { impl->setFloat(val); }
+bool Value::isDouble() const { return impl->isDouble(); }
+double Value::asDouble() const { return impl->asDouble(); }
+void Value::setDouble(double val) { impl->setDouble(val); }
+bool Value::isUuid() const { return impl->isUuid(); }
+const uint8_t* Value::asUuid() const { return impl->asUuid(); }
+void Value::setUuid(const uint8_t* val) { impl->setUuid(val); }
+bool Value::isObject() const { return impl->isObject(); }
+Object* Value::asObject() const { return impl->asObject(); }
+void Value::setObject(Object* val) { impl->setObject(val); }
+bool Value::isMap() const { return impl->isMap(); }
+bool Value::keyInMap(const char* key) const { return impl->keyInMap(key); }
+Value* Value::byKey(const char* key) { return impl->byKey(key); }
+const Value* Value::byKey(const char* key) const { return impl->byKey(key); }
+void Value::deleteKey(const char* key) { impl->deleteKey(key); }
+void Value::insert(const char* key, Value* val) { impl->insert(key, val); }
+uint32_t Value::keyCount() const { return impl->keyCount(); }
+const char* Value::key(uint32_t idx) const { return impl->key(idx); }
+bool Value::isList() const { return impl->isList(); }
+uint32_t Value::listItemCount() const { return impl->listItemCount(); }
+Value* Value::listItem(uint32_t idx) { return impl->listItem(idx); }
+void Value::appendToList(Value* val) { impl->appendToList(val); }
+void Value::deleteListItem(uint32_t idx) { impl->deleteListItem(idx); }
+bool Value::isArray() const { return impl->isArray(); }
+Typecode Value::arrayType() const { return impl->arrayType(); }
+uint32_t Value::arrayItemCount() const { return impl->arrayItemCount(); }
+Value* Value::arrayItem(uint32_t idx) { return impl->arrayItem(idx); }
+void Value::appendToArray(Value* val) { impl->appendToArray(val); }
+void Value::deleteArrayItem(uint32_t idx) { impl->deleteArrayItem(idx); }
diff --git a/cpp/src/qmf/ValueImpl.h b/cpp/src/qmf/ValueImpl.h
index cf33035bf7..5be4cbc205 100644
--- a/cpp/src/qmf/ValueImpl.h
+++ b/cpp/src/qmf/ValueImpl.h
@@ -60,8 +60,7 @@ namespace qmf {
uint8_t uuidVal[16];
} value;
- ValueImpl(Value* e, Typecode t, Typecode at) :
- envelope(e), typecode(t), valid(false), arrayTypecode(at) {}
+ ValueImpl(Value* e, Typecode t, Typecode at);
ValueImpl(Typecode t, qpid::framing::Buffer& b);
ValueImpl(Typecode t);
~ValueImpl();