summaryrefslogtreecommitdiff
path: root/cpp/src/qmf/ConsoleEngine.cpp
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2009-09-15 17:45:51 +0000
committerTed Ross <tross@apache.org>2009-09-15 17:45:51 +0000
commit3f0838479df2a5678a6093f34276b9e336af3ded (patch)
treeecceca23bb8b0d37701bb7678cb1d232a8fb4bfc /cpp/src/qmf/ConsoleEngine.cpp
parent3cf100216bc1e9c7207a3c963d984665d7a5b9a1 (diff)
downloadqpid-python-3f0838479df2a5678a6093f34276b9e336af3ded.tar.gz
QMF Console updated to the point where query (get_object) is supported.
The Ruby binding continues to track the c++ engine progress. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@815416 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qmf/ConsoleEngine.cpp')
-rw-r--r--cpp/src/qmf/ConsoleEngine.cpp379
1 files changed, 292 insertions, 87 deletions
diff --git a/cpp/src/qmf/ConsoleEngine.cpp b/cpp/src/qmf/ConsoleEngine.cpp
index 3d1b378b68..e7991328ee 100644
--- a/cpp/src/qmf/ConsoleEngine.cpp
+++ b/cpp/src/qmf/ConsoleEngine.cpp
@@ -34,6 +34,7 @@
#include <qpid/sys/Mutex.h>
#include <qpid/log/Statement.h>
#include <qpid/sys/Time.h>
+#include <qpid/sys/SystemInfo.h>
#include <string.h>
#include <string>
#include <deque>
@@ -58,12 +59,27 @@ namespace qmf {
auto_ptr<Value> arguments;
MethodResponseImpl(Buffer& buf);
- ~MethodResponseImpl() {}
+ ~MethodResponseImpl() { delete envelope; }
uint32_t getStatus() const { return status; }
const Value* getException() const { return exception.get(); }
const Value* getArgs() const { return arguments.get(); }
};
+ struct QueryResponseImpl {
+ typedef boost::shared_ptr<QueryResponseImpl> Ptr;
+ QueryResponse *envelope;
+ uint32_t status;
+ auto_ptr<Value> exception;
+ vector<ObjectImpl::Ptr> results;
+
+ QueryResponseImpl() : envelope(new QueryResponse(this)), status(0) {}
+ ~QueryResponseImpl() { delete envelope; }
+ uint32_t getStatus() const { return status; }
+ const Value* getException() const { return exception.get(); }
+ uint32_t getObjectCount() const { return results.size(); }
+ const Object* getObject(uint32_t idx) const;
+ };
+
struct ConsoleEventImpl {
typedef boost::shared_ptr<ConsoleEventImpl> Ptr;
ConsoleEvent::EventKind kind;
@@ -89,13 +105,29 @@ namespace qmf {
string name;
string exchange;
string bindingKey;
+ void* context;
+ QueryResponseImpl::Ptr queryResponse;
BrokerEventImpl(BrokerEvent::EventKind k) : kind(k) {}
~BrokerEventImpl() {}
BrokerEvent copy();
};
- class BrokerProxyImpl : public SequenceContext {
+ struct AgentProxyImpl {
+ typedef boost::shared_ptr<AgentProxyImpl> Ptr;
+ AgentProxy* envelope;
+ ConsoleEngineImpl* console;
+ BrokerProxyImpl* broker;
+ uint32_t agentBank;
+ string label;
+
+ AgentProxyImpl(ConsoleEngineImpl* c, BrokerProxyImpl* b, uint32_t ab, const string& l) :
+ envelope(new AgentProxy(this)), console(c), broker(b), agentBank(ab), label(l) {}
+ ~AgentProxyImpl() {}
+ const string& getLabel() const { return label; }
+ };
+
+ class BrokerProxyImpl {
public:
typedef boost::shared_ptr<BrokerProxyImpl> Ptr;
@@ -114,12 +146,17 @@ namespace qmf {
bool getEvent(BrokerEvent& event) const;
void popEvent();
- // From SequenceContext
- void complete();
+ uint32_t agentCount() const;
+ const AgentProxy* getAgent(uint32_t idx) const;
+ void sendQuery(const Query& query, void* context, const AgentProxy* agent);
+ void sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent);
void addBinding(const string& exchange, const string& key);
+ void staticRelease() { decOutstanding(); }
private:
+ friend class StaticContext;
+ friend class QueryContext;
mutable Mutex lock;
BrokerProxy* envelope;
ConsoleEngineImpl* console;
@@ -128,6 +165,7 @@ namespace qmf {
SequenceManager seqMgr;
uint32_t requestsOutstanding;
bool topicBound;
+ vector<AgentProxyImpl::Ptr> agentList;
deque<MessageImpl::Ptr> xmtQueue;
deque<BrokerEventImpl::Ptr> eventQueue;
@@ -138,6 +176,7 @@ namespace qmf {
BrokerEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key);
BrokerEventImpl::Ptr eventSetupComplete();
BrokerEventImpl::Ptr eventStable();
+ BrokerEventImpl::Ptr eventQueryComplete(void* context, QueryResponseImpl::Ptr response);
void handleBrokerResponse(Buffer& inBuffer, uint32_t seq);
void handlePackageIndication(Buffer& inBuffer, uint32_t seq);
@@ -147,19 +186,33 @@ namespace qmf {
void handleHeartbeatIndication(Buffer& inBuffer, uint32_t seq);
void handleEventIndication(Buffer& inBuffer, uint32_t seq);
void handleSchemaResponse(Buffer& inBuffer, uint32_t seq);
- void handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat);
+ ObjectImpl::Ptr handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat);
void incOutstandingLH();
void decOutstanding();
};
- struct AgentProxyImpl {
- typedef boost::shared_ptr<AgentProxyImpl> Ptr;
- AgentProxy* envelope;
- ConsoleEngineImpl* console;
+ struct StaticContext : public SequenceContext {
+ StaticContext(BrokerProxyImpl& b) : broker(b) {}
+ ~StaticContext() {}
+ void reserve() {}
+ void release() { broker.staticRelease(); }
+ bool handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer);
+ BrokerProxyImpl& broker;
+ };
- AgentProxyImpl(AgentProxy* e, ConsoleEngine& _console) :
- envelope(e), console(_console.impl) {}
- ~AgentProxyImpl() {}
+ struct QueryContext : public SequenceContext {
+ QueryContext(BrokerProxyImpl& b, void* u) :
+ broker(b), userContext(u), requestsOutstanding(0), queryResponse(new QueryResponseImpl()) {}
+ ~QueryContext() {}
+ void reserve();
+ void release();
+ bool handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer);
+
+ mutable Mutex lock;
+ BrokerProxyImpl& broker;
+ void* userContext;
+ uint32_t requestsOutstanding;
+ QueryResponseImpl::Ptr queryResponse;
};
class ConsoleEngineImpl {
@@ -187,11 +240,6 @@ namespace qmf {
void bindClass(const SchemaClassKey* key);
void bindClass(const char* packageName, const char* className);
- uint32_t agentCount() const;
- const AgentProxy* getAgent(uint32_t idx) const;
-
- void sendQuery(const Query& query, void* context);
-
/*
void startSync(const Query& query, void* context, SyncQuery& sync);
void touchSync(SyncQuery& sync);
@@ -226,13 +274,31 @@ namespace qmf {
void learnClass(SchemaObjectClassImpl::Ptr cls);
void learnClass(SchemaEventClassImpl::Ptr cls);
bool haveClass(const SchemaClassKeyImpl& key) const;
+ SchemaObjectClassImpl::Ptr getSchema(const SchemaClassKeyImpl& key) const;
};
}
namespace {
-const char* QMF_EXCHANGE = "qpid.management";
-const char* DIR_EXCHANGE = "amq.direct";
-const char* BROKER_KEY = "broker";
+ const char* QMF_EXCHANGE = "qpid.management";
+ const char* DIR_EXCHANGE = "amq.direct";
+ const char* BROKER_KEY = "broker";
+ const char* BROKER_PACKAGE = "org.apache.qpid.broker";
+ const char* AGENT_CLASS = "agent";
+ const char* BROKER_AGENT_KEY = "agent.1.0";
+}
+
+const Object* QueryResponseImpl::getObject(uint32_t idx) const
+{
+ vector<ObjectImpl::Ptr>::const_iterator iter = results.begin();
+
+ while (idx > 0) {
+ if (iter == results.end())
+ return 0;
+ iter++;
+ idx--;
+ }
+
+ return (*iter)->envelope;
}
#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
@@ -267,19 +333,29 @@ BrokerEvent BrokerEventImpl::copy()
STRING_REF(name);
STRING_REF(exchange);
STRING_REF(bindingKey);
+ item.context = context;
+ item.queryResponse = queryResponse.get() ? queryResponse->envelope : 0;
return item;
}
BrokerProxyImpl::BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console) :
- envelope(e), console(_console.impl), queueName("qmfc-")
+ envelope(e), console(_console.impl)
{
- // TODO: Give the queue name a unique suffix
+ stringstream qn;
+ qpid::TcpAddress addr;
+
+ SystemInfo::getLocalHostname(addr);
+ qn << "qmfc-" << SystemInfo::getProcessName() << "-" << addr << "-" << SystemInfo::getProcessId();
+ queueName = qn.str();
+
+ seqMgr.setUnsolicitedContext(SequenceContext::Ptr(new StaticContext(*this)));
}
void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/)
{
Mutex::ScopedLock _lock(lock);
+ agentList.clear();
eventQueue.clear();
xmtQueue.clear();
eventQueue.push_back(eventDeclareQueue(queueName));
@@ -292,6 +368,7 @@ void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/)
void BrokerProxyImpl::sessionClosed()
{
Mutex::ScopedLock _lock(lock);
+ agentList.clear();
eventQueue.clear();
xmtQueue.clear();
}
@@ -302,11 +379,14 @@ void BrokerProxyImpl::startProtocol()
char rawbuffer[512];
Buffer buffer(rawbuffer, 512);
+ agentList.push_back(AgentProxyImpl::Ptr(new AgentProxyImpl(console, this, 0, "Agent embedded in broker")));
+
requestsOutstanding = 1;
topicBound = false;
- Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST);
+ uint32_t sequence(seqMgr.reserve());
+ Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence);
sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT BrokerRequest");
+ QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence);
}
void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
@@ -330,23 +410,8 @@ void BrokerProxyImpl::handleRcvMessage(Message& message)
uint8_t opcode;
uint32_t sequence;
- while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) {
- if (opcode == Protocol::OP_BROKER_RESPONSE) handleBrokerResponse(inBuffer, sequence);
- else if (opcode == Protocol::OP_PACKAGE_INDICATION) handlePackageIndication(inBuffer, sequence);
- else if (opcode == Protocol::OP_COMMAND_COMPLETE) handleCommandComplete(inBuffer, sequence);
- else if (opcode == Protocol::OP_CLASS_INDICATION) handleClassIndication(inBuffer, sequence);
- else if (opcode == Protocol::OP_METHOD_RESPONSE) handleMethodResponse(inBuffer, sequence);
- else if (opcode == Protocol::OP_HEARTBEAT_INDICATION) handleHeartbeatIndication(inBuffer, sequence);
- else if (opcode == Protocol::OP_EVENT_INDICATION) handleEventIndication(inBuffer, sequence);
- else if (opcode == Protocol::OP_SCHEMA_RESPONSE) handleSchemaResponse(inBuffer, sequence);
- else if (opcode == Protocol::OP_PROPERTY_INDICATION) handleObjectIndication(inBuffer, sequence, true, false);
- else if (opcode == Protocol::OP_STATISTIC_INDICATION) handleObjectIndication(inBuffer, sequence, false, true);
- else if (opcode == Protocol::OP_OBJECT_INDICATION) handleObjectIndication(inBuffer, sequence, true, true);
- else {
- QPID_LOG(trace, "BrokerProxyImpl::handleRcvMessage invalid opcode: " << opcode);
- break;
- }
- }
+ while (Protocol::checkHeader(inBuffer, &opcode, &sequence))
+ seqMgr.dispatch(opcode, sequence, inBuffer);
}
bool BrokerProxyImpl::getXmtMessage(Message& item) const
@@ -381,9 +446,48 @@ void BrokerProxyImpl::popEvent()
eventQueue.pop_front();
}
-void BrokerProxyImpl::complete()
+uint32_t BrokerProxyImpl::agentCount() const
{
- decOutstanding();
+ Mutex::ScopedLock _lock(lock);
+ return agentList.size();
+}
+
+const AgentProxy* BrokerProxyImpl::getAgent(uint32_t idx) const
+{
+ Mutex::ScopedLock _lock(lock);
+ for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin();
+ iter != agentList.end(); iter++)
+ if (idx-- == 0)
+ return (*iter)->envelope;
+ return 0;
+}
+
+void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentProxy* agent)
+{
+ SequenceContext::Ptr queryContext(new QueryContext(*this, context));
+ Mutex::ScopedLock _lock(lock);
+ if (agent != 0) {
+ sendGetRequestLH(queryContext, query, agent->impl);
+ } else {
+ // TODO (optimization) only send queries to agents that have the requested class+package
+ for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin();
+ iter != agentList.end(); iter++) {
+ sendGetRequestLH(queryContext, query, (*iter).get());
+ }
+ }
+}
+
+void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent)
+{
+ stringstream key;
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve(queryContext));
+
+ Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
+ query.impl->encode(outBuffer);
+ key << "agent.1." << agent->agentBank;
+ sendBufferLH(outBuffer, QMF_EXCHANGE, key.str());
+ QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << key.str());
}
void BrokerProxyImpl::addBinding(const string& exchange, const string& key)
@@ -420,17 +524,22 @@ BrokerEventImpl::Ptr BrokerProxyImpl::eventStable()
return event;
}
-void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq)
+BrokerEventImpl::Ptr BrokerProxyImpl::eventQueryComplete(void* context, QueryResponseImpl::Ptr response)
{
- // Note that this function doesn't touch requestsOutstanding. This is because
- // it accounts for one request completed (the BrokerRequest) and one request
- // started (the PackageRequest) which cancel each other out.
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::QUERY_COMPLETE));
+ event->context = context;
+ event->queryResponse = response;
+ return event;
+}
+void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq)
+{
brokerId.decode(inBuffer);
QPID_LOG(trace, "RCVD BrokerResponse seq=" << seq << " brokerId=" << brokerId);
Mutex::ScopedLock _lock(lock);
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve(this));
+ uint32_t sequence(seqMgr.reserve());
+ incOutstandingLH();
Protocol::encodeHeader(outBuffer, Protocol::OP_PACKAGE_REQUEST, sequence);
sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
QPID_LOG(trace, "SENT PackageRequest seq=" << sequence);
@@ -446,7 +555,7 @@ void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq)
Mutex::ScopedLock _lock(lock);
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve(this));
+ uint32_t sequence(seqMgr.reserve());
incOutstandingLH();
Protocol::encodeHeader(outBuffer, Protocol::OP_CLASS_QUERY, sequence);
outBuffer.putShortString(package);
@@ -460,20 +569,12 @@ void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq)
uint32_t code = inBuffer.getLong();
inBuffer.getShortString(text);
QPID_LOG(trace, "RCVD CommandComplete seq=" << seq << " code=" << code << " text=" << text);
- seqMgr.release(seq);
}
void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq)
{
- string package;
- string clsName;
- SchemaHash hash;
uint8_t kind = inBuffer.getOctet();
- inBuffer.getShortString(package);
- inBuffer.getShortString(clsName);
- hash.decode(inBuffer);
- Uuid printableHash(hash.get());
- SchemaClassKeyImpl classKey(package, clsName, hash);
+ SchemaClassKeyImpl classKey(inBuffer);
QPID_LOG(trace, "RCVD ClassIndication seq=" << seq << " kind=" << (int) kind << " key=" << classKey.str());
@@ -481,7 +582,7 @@ void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq)
Mutex::ScopedLock _lock(lock);
incOutstandingLH();
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve(this));
+ uint32_t sequence(seqMgr.reserve());
Protocol::encodeHeader(outBuffer, Protocol::OP_SCHEMA_REQUEST, sequence);
classKey.encode(outBuffer);
sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
@@ -515,6 +616,25 @@ void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq)
console->learnClass(oClassPtr);
key = oClassPtr->getClassKey()->impl;
QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->str());
+
+ //
+ // If we have just learned about the org.apache.qpid.broker:agent class, send a get
+ // request for the current list of agents so we can have it on-hand before we declare
+ // this session "stable".
+ //
+ if (key->getClassName() == AGENT_CLASS && key->getPackageName() == BROKER_PACKAGE) {
+ Mutex::ScopedLock _lock(lock);
+ incOutstandingLH();
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve());
+ Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
+ FieldTable ft;
+ ft.setString("_class", AGENT_CLASS);
+ ft.setString("_package", BROKER_PACKAGE);
+ ft.encode(outBuffer);
+ sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_AGENT_KEY);
+ QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << BROKER_AGENT_KEY);
+ }
} else if (kind == CLASS_EVENT) {
eClassPtr.reset(new SchemaEventClassImpl(inBuffer));
console->learnClass(eClassPtr);
@@ -524,13 +644,20 @@ void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq)
else {
QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind);
}
-
- decOutstanding();
}
-void BrokerProxyImpl::handleObjectIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/, bool /*prop*/, bool /*stat*/)
+ObjectImpl::Ptr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat)
{
- // TODO
+ SchemaClassKeyImpl classKey(inBuffer);
+ QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey.str());
+
+ SchemaObjectClassImpl::Ptr schema = console->getSchema(classKey);
+ if (schema.get() == 0) {
+ QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey.str());
+ return ObjectImpl::Ptr();
+ }
+
+ return ObjectImpl::Ptr(new ObjectImpl(schema->envelope, inBuffer, prop, stat, true));
}
void BrokerProxyImpl::incOutstandingLH()
@@ -567,6 +694,79 @@ MethodResponseImpl::MethodResponseImpl(Buffer& buf) : envelope(new MethodRespons
arguments.reset(new Value(TYPE_MAP));
}
+bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
+{
+ bool completeContext = false;
+ if (opcode == Protocol::OP_BROKER_RESPONSE) {
+ broker.handleBrokerResponse(buffer, sequence);
+ completeContext = true;
+ }
+ else if (opcode == Protocol::OP_COMMAND_COMPLETE) {
+ broker.handleCommandComplete(buffer, sequence);
+ completeContext = true;
+ }
+ else if (opcode == Protocol::OP_SCHEMA_RESPONSE) {
+ broker.handleSchemaResponse(buffer, sequence);
+ completeContext = true;
+ }
+ else if (opcode == Protocol::OP_PACKAGE_INDICATION)
+ broker.handlePackageIndication(buffer, sequence);
+ else if (opcode == Protocol::OP_CLASS_INDICATION)
+ broker.handleClassIndication(buffer, sequence);
+ else if (opcode == Protocol::OP_HEARTBEAT_INDICATION)
+ broker.handleHeartbeatIndication(buffer, sequence);
+ else if (opcode == Protocol::OP_EVENT_INDICATION)
+ broker.handleEventIndication(buffer, sequence);
+ else if (opcode == Protocol::OP_PROPERTY_INDICATION)
+ broker.handleObjectIndication(buffer, sequence, true, false);
+ else if (opcode == Protocol::OP_STATISTIC_INDICATION)
+ broker.handleObjectIndication(buffer, sequence, false, true);
+ else if (opcode == Protocol::OP_OBJECT_INDICATION)
+ broker.handleObjectIndication(buffer, sequence, true, true);
+ else {
+ QPID_LOG(trace, "StaticContext::handleMessage invalid opcode: " << opcode);
+ completeContext = true;
+ }
+
+ return completeContext;
+}
+
+void QueryContext::reserve()
+{
+ Mutex::ScopedLock _lock(lock);
+ requestsOutstanding++;
+}
+
+void QueryContext::release()
+{
+ Mutex::ScopedLock _lock(lock);
+ if (--requestsOutstanding == 0) {
+ broker.eventQueue.push_back(broker.eventQueryComplete(userContext, queryResponse));
+ }
+}
+
+bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
+{
+ bool completeContext = false;
+ ObjectImpl::Ptr object;
+
+ if (opcode == Protocol::OP_COMMAND_COMPLETE) {
+ broker.handleCommandComplete(buffer, sequence);
+ completeContext = true;
+ }
+ else if (opcode == Protocol::OP_OBJECT_INDICATION) {
+ object = broker.handleObjectIndication(buffer, sequence, true, true);
+ if (object.get() != 0)
+ queryResponse->results.push_back(object);
+ }
+ else {
+ QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode);
+ completeContext = true;
+ }
+
+ return completeContext;
+}
+
ConsoleEngineImpl::ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& s) :
envelope(e), settings(s)
{
@@ -757,23 +957,6 @@ void ConsoleEngineImpl::bindClass(const char* packageName, const char* className
(*iter)->addBinding(QMF_EXCHANGE, key.str());
}
-uint32_t ConsoleEngineImpl::agentCount() const
-{
- // TODO
- return 0;
-}
-
-const AgentProxy* ConsoleEngineImpl::getAgent(uint32_t /*idx*/) const
-{
- // TODO
- return 0;
-}
-
-void ConsoleEngineImpl::sendQuery(const Query& /*query*/, void* /*context*/)
-{
- // TODO
-}
-
/*
void ConsoleEngineImpl::startSync(const Query& query, void* context, SyncQuery& sync)
{
@@ -835,11 +1018,29 @@ bool ConsoleEngineImpl::haveClass(const SchemaClassKeyImpl& key) const
return oList.find(&key) != oList.end() || eList.find(&key) != eList.end();
}
+SchemaObjectClassImpl::Ptr ConsoleEngineImpl::getSchema(const SchemaClassKeyImpl& key) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(key.getPackageName());
+ if (pIter == packages.end())
+ return SchemaObjectClassImpl::Ptr();
+
+ const ObjectClassList& oList = pIter->second.first;
+ ObjectClassList::const_iterator iter = oList.find(&key);
+ if (iter == oList.end())
+ return SchemaObjectClassImpl::Ptr();
+
+ return iter->second;
+}
//==================================================================
// Wrappers
//==================================================================
+AgentProxy::AgentProxy(AgentProxyImpl* i) : impl(i) {}
+AgentProxy::~AgentProxy() { delete impl; }
+const char* AgentProxy::getLabel() const { return impl->getLabel().c_str(); }
+
BrokerProxy::BrokerProxy(ConsoleEngine& console) : impl(new BrokerProxyImpl(this, console)) {}
BrokerProxy::~BrokerProxy() { delete impl; }
void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); }
@@ -850,16 +1051,23 @@ bool BrokerProxy::getXmtMessage(Message& item) const { return impl->getXmtMessag
void BrokerProxy::popXmt() { impl->popXmt(); }
bool BrokerProxy::getEvent(BrokerEvent& event) const { return impl->getEvent(event); }
void BrokerProxy::popEvent() { impl->popEvent(); }
-
-AgentProxy::AgentProxy(ConsoleEngine& console) : impl(new AgentProxyImpl(this, console)) {}
-AgentProxy::~AgentProxy() { delete impl; }
+uint32_t BrokerProxy::agentCount() const { return impl->agentCount(); }
+const AgentProxy* BrokerProxy::getAgent(uint32_t idx) const { return impl->getAgent(idx); }
+void BrokerProxy::sendQuery(const Query& query, void* context, const AgentProxy* agent) { impl->sendQuery(query, context, agent); }
MethodResponse::MethodResponse(MethodResponseImpl* i) : impl(i) {}
-MethodResponse::~MethodResponse() { delete impl; } // TODO: correct to delete here?
+MethodResponse::~MethodResponse() {}
uint32_t MethodResponse::getStatus() const { return impl->getStatus(); }
const Value* MethodResponse::getException() const { return impl->getException(); }
const Value* MethodResponse::getArgs() const { return impl->getArgs(); }
+QueryResponse::QueryResponse(QueryResponseImpl* i) : impl(i) {}
+QueryResponse::~QueryResponse() {}
+uint32_t QueryResponse::getStatus() const { return impl->getStatus(); }
+const Value* QueryResponse::getException() const { return impl->getException(); }
+uint32_t QueryResponse::getObjectCount() const { return impl->getObjectCount(); }
+const Object* QueryResponse::getObject(uint32_t idx) const { return impl->getObject(idx); }
+
ConsoleEngine::ConsoleEngine(const ConsoleSettings& settings) : impl(new ConsoleEngineImpl(this, settings)) {}
ConsoleEngine::~ConsoleEngine() { delete impl; }
bool ConsoleEngine::getEvent(ConsoleEvent& event) const { return impl->getEvent(event); }
@@ -876,9 +1084,6 @@ const SchemaEventClass* ConsoleEngine::getEventClass(const SchemaClassKey* key)
void ConsoleEngine::bindPackage(const char* packageName) { impl->bindPackage(packageName); }
void ConsoleEngine::bindClass(const SchemaClassKey* key) { impl->bindClass(key); }
void ConsoleEngine::bindClass(const char* packageName, const char* className) { impl->bindClass(packageName, className); }
-uint32_t ConsoleEngine::agentCount() const { return impl->agentCount(); }
-const AgentProxy* ConsoleEngine::getAgent(uint32_t idx) const { return impl->getAgent(idx); }
-void ConsoleEngine::sendQuery(const Query& query, void* context) { impl->sendQuery(query, context); }
//void ConsoleEngine::startSync(const Query& query, void* context, SyncQuery& sync) { impl->startSync(query, context, sync); }
//void ConsoleEngine::touchSync(SyncQuery& sync) { impl->touchSync(sync); }
//void ConsoleEngine::endSync(SyncQuery& sync) { impl->endSync(sync); }