diff options
author | Ted Ross <tross@apache.org> | 2009-09-15 17:45:51 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2009-09-15 17:45:51 +0000 |
commit | 3f0838479df2a5678a6093f34276b9e336af3ded (patch) | |
tree | ecceca23bb8b0d37701bb7678cb1d232a8fb4bfc /cpp/src/qmf/ConsoleEngine.cpp | |
parent | 3cf100216bc1e9c7207a3c963d984665d7a5b9a1 (diff) | |
download | qpid-python-3f0838479df2a5678a6093f34276b9e336af3ded.tar.gz |
QMF Console updated to the point where query (get_object) is supported.
The Ruby binding continues to track the c++ engine progress.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@815416 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qmf/ConsoleEngine.cpp')
-rw-r--r-- | cpp/src/qmf/ConsoleEngine.cpp | 379 |
1 files changed, 292 insertions, 87 deletions
diff --git a/cpp/src/qmf/ConsoleEngine.cpp b/cpp/src/qmf/ConsoleEngine.cpp index 3d1b378b68..e7991328ee 100644 --- a/cpp/src/qmf/ConsoleEngine.cpp +++ b/cpp/src/qmf/ConsoleEngine.cpp @@ -34,6 +34,7 @@ #include <qpid/sys/Mutex.h> #include <qpid/log/Statement.h> #include <qpid/sys/Time.h> +#include <qpid/sys/SystemInfo.h> #include <string.h> #include <string> #include <deque> @@ -58,12 +59,27 @@ namespace qmf { auto_ptr<Value> arguments; MethodResponseImpl(Buffer& buf); - ~MethodResponseImpl() {} + ~MethodResponseImpl() { delete envelope; } uint32_t getStatus() const { return status; } const Value* getException() const { return exception.get(); } const Value* getArgs() const { return arguments.get(); } }; + struct QueryResponseImpl { + typedef boost::shared_ptr<QueryResponseImpl> Ptr; + QueryResponse *envelope; + uint32_t status; + auto_ptr<Value> exception; + vector<ObjectImpl::Ptr> results; + + QueryResponseImpl() : envelope(new QueryResponse(this)), status(0) {} + ~QueryResponseImpl() { delete envelope; } + uint32_t getStatus() const { return status; } + const Value* getException() const { return exception.get(); } + uint32_t getObjectCount() const { return results.size(); } + const Object* getObject(uint32_t idx) const; + }; + struct ConsoleEventImpl { typedef boost::shared_ptr<ConsoleEventImpl> Ptr; ConsoleEvent::EventKind kind; @@ -89,13 +105,29 @@ namespace qmf { string name; string exchange; string bindingKey; + void* context; + QueryResponseImpl::Ptr queryResponse; BrokerEventImpl(BrokerEvent::EventKind k) : kind(k) {} ~BrokerEventImpl() {} BrokerEvent copy(); }; - class BrokerProxyImpl : public SequenceContext { + struct AgentProxyImpl { + typedef boost::shared_ptr<AgentProxyImpl> Ptr; + AgentProxy* envelope; + ConsoleEngineImpl* console; + BrokerProxyImpl* broker; + uint32_t agentBank; + string label; + + AgentProxyImpl(ConsoleEngineImpl* c, BrokerProxyImpl* b, uint32_t ab, const string& l) : + envelope(new AgentProxy(this)), console(c), broker(b), agentBank(ab), label(l) {} + ~AgentProxyImpl() {} + const string& getLabel() const { return label; } + }; + + class BrokerProxyImpl { public: typedef boost::shared_ptr<BrokerProxyImpl> Ptr; @@ -114,12 +146,17 @@ namespace qmf { bool getEvent(BrokerEvent& event) const; void popEvent(); - // From SequenceContext - void complete(); + uint32_t agentCount() const; + const AgentProxy* getAgent(uint32_t idx) const; + void sendQuery(const Query& query, void* context, const AgentProxy* agent); + void sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent); void addBinding(const string& exchange, const string& key); + void staticRelease() { decOutstanding(); } private: + friend class StaticContext; + friend class QueryContext; mutable Mutex lock; BrokerProxy* envelope; ConsoleEngineImpl* console; @@ -128,6 +165,7 @@ namespace qmf { SequenceManager seqMgr; uint32_t requestsOutstanding; bool topicBound; + vector<AgentProxyImpl::Ptr> agentList; deque<MessageImpl::Ptr> xmtQueue; deque<BrokerEventImpl::Ptr> eventQueue; @@ -138,6 +176,7 @@ namespace qmf { BrokerEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key); BrokerEventImpl::Ptr eventSetupComplete(); BrokerEventImpl::Ptr eventStable(); + BrokerEventImpl::Ptr eventQueryComplete(void* context, QueryResponseImpl::Ptr response); void handleBrokerResponse(Buffer& inBuffer, uint32_t seq); void handlePackageIndication(Buffer& inBuffer, uint32_t seq); @@ -147,19 +186,33 @@ namespace qmf { void handleHeartbeatIndication(Buffer& inBuffer, uint32_t seq); void handleEventIndication(Buffer& inBuffer, uint32_t seq); void handleSchemaResponse(Buffer& inBuffer, uint32_t seq); - void handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat); + ObjectImpl::Ptr handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat); void incOutstandingLH(); void decOutstanding(); }; - struct AgentProxyImpl { - typedef boost::shared_ptr<AgentProxyImpl> Ptr; - AgentProxy* envelope; - ConsoleEngineImpl* console; + struct StaticContext : public SequenceContext { + StaticContext(BrokerProxyImpl& b) : broker(b) {} + ~StaticContext() {} + void reserve() {} + void release() { broker.staticRelease(); } + bool handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer); + BrokerProxyImpl& broker; + }; - AgentProxyImpl(AgentProxy* e, ConsoleEngine& _console) : - envelope(e), console(_console.impl) {} - ~AgentProxyImpl() {} + struct QueryContext : public SequenceContext { + QueryContext(BrokerProxyImpl& b, void* u) : + broker(b), userContext(u), requestsOutstanding(0), queryResponse(new QueryResponseImpl()) {} + ~QueryContext() {} + void reserve(); + void release(); + bool handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer); + + mutable Mutex lock; + BrokerProxyImpl& broker; + void* userContext; + uint32_t requestsOutstanding; + QueryResponseImpl::Ptr queryResponse; }; class ConsoleEngineImpl { @@ -187,11 +240,6 @@ namespace qmf { void bindClass(const SchemaClassKey* key); void bindClass(const char* packageName, const char* className); - uint32_t agentCount() const; - const AgentProxy* getAgent(uint32_t idx) const; - - void sendQuery(const Query& query, void* context); - /* void startSync(const Query& query, void* context, SyncQuery& sync); void touchSync(SyncQuery& sync); @@ -226,13 +274,31 @@ namespace qmf { void learnClass(SchemaObjectClassImpl::Ptr cls); void learnClass(SchemaEventClassImpl::Ptr cls); bool haveClass(const SchemaClassKeyImpl& key) const; + SchemaObjectClassImpl::Ptr getSchema(const SchemaClassKeyImpl& key) const; }; } namespace { -const char* QMF_EXCHANGE = "qpid.management"; -const char* DIR_EXCHANGE = "amq.direct"; -const char* BROKER_KEY = "broker"; + const char* QMF_EXCHANGE = "qpid.management"; + const char* DIR_EXCHANGE = "amq.direct"; + const char* BROKER_KEY = "broker"; + const char* BROKER_PACKAGE = "org.apache.qpid.broker"; + const char* AGENT_CLASS = "agent"; + const char* BROKER_AGENT_KEY = "agent.1.0"; +} + +const Object* QueryResponseImpl::getObject(uint32_t idx) const +{ + vector<ObjectImpl::Ptr>::const_iterator iter = results.begin(); + + while (idx > 0) { + if (iter == results.end()) + return 0; + iter++; + idx--; + } + + return (*iter)->envelope; } #define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());} @@ -267,19 +333,29 @@ BrokerEvent BrokerEventImpl::copy() STRING_REF(name); STRING_REF(exchange); STRING_REF(bindingKey); + item.context = context; + item.queryResponse = queryResponse.get() ? queryResponse->envelope : 0; return item; } BrokerProxyImpl::BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console) : - envelope(e), console(_console.impl), queueName("qmfc-") + envelope(e), console(_console.impl) { - // TODO: Give the queue name a unique suffix + stringstream qn; + qpid::TcpAddress addr; + + SystemInfo::getLocalHostname(addr); + qn << "qmfc-" << SystemInfo::getProcessName() << "-" << addr << "-" << SystemInfo::getProcessId(); + queueName = qn.str(); + + seqMgr.setUnsolicitedContext(SequenceContext::Ptr(new StaticContext(*this))); } void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/) { Mutex::ScopedLock _lock(lock); + agentList.clear(); eventQueue.clear(); xmtQueue.clear(); eventQueue.push_back(eventDeclareQueue(queueName)); @@ -292,6 +368,7 @@ void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/) void BrokerProxyImpl::sessionClosed() { Mutex::ScopedLock _lock(lock); + agentList.clear(); eventQueue.clear(); xmtQueue.clear(); } @@ -302,11 +379,14 @@ void BrokerProxyImpl::startProtocol() char rawbuffer[512]; Buffer buffer(rawbuffer, 512); + agentList.push_back(AgentProxyImpl::Ptr(new AgentProxyImpl(console, this, 0, "Agent embedded in broker"))); + requestsOutstanding = 1; topicBound = false; - Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST); + uint32_t sequence(seqMgr.reserve()); + Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence); sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); - QPID_LOG(trace, "SENT BrokerRequest"); + QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence); } void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey) @@ -330,23 +410,8 @@ void BrokerProxyImpl::handleRcvMessage(Message& message) uint8_t opcode; uint32_t sequence; - while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) { - if (opcode == Protocol::OP_BROKER_RESPONSE) handleBrokerResponse(inBuffer, sequence); - else if (opcode == Protocol::OP_PACKAGE_INDICATION) handlePackageIndication(inBuffer, sequence); - else if (opcode == Protocol::OP_COMMAND_COMPLETE) handleCommandComplete(inBuffer, sequence); - else if (opcode == Protocol::OP_CLASS_INDICATION) handleClassIndication(inBuffer, sequence); - else if (opcode == Protocol::OP_METHOD_RESPONSE) handleMethodResponse(inBuffer, sequence); - else if (opcode == Protocol::OP_HEARTBEAT_INDICATION) handleHeartbeatIndication(inBuffer, sequence); - else if (opcode == Protocol::OP_EVENT_INDICATION) handleEventIndication(inBuffer, sequence); - else if (opcode == Protocol::OP_SCHEMA_RESPONSE) handleSchemaResponse(inBuffer, sequence); - else if (opcode == Protocol::OP_PROPERTY_INDICATION) handleObjectIndication(inBuffer, sequence, true, false); - else if (opcode == Protocol::OP_STATISTIC_INDICATION) handleObjectIndication(inBuffer, sequence, false, true); - else if (opcode == Protocol::OP_OBJECT_INDICATION) handleObjectIndication(inBuffer, sequence, true, true); - else { - QPID_LOG(trace, "BrokerProxyImpl::handleRcvMessage invalid opcode: " << opcode); - break; - } - } + while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) + seqMgr.dispatch(opcode, sequence, inBuffer); } bool BrokerProxyImpl::getXmtMessage(Message& item) const @@ -381,9 +446,48 @@ void BrokerProxyImpl::popEvent() eventQueue.pop_front(); } -void BrokerProxyImpl::complete() +uint32_t BrokerProxyImpl::agentCount() const { - decOutstanding(); + Mutex::ScopedLock _lock(lock); + return agentList.size(); +} + +const AgentProxy* BrokerProxyImpl::getAgent(uint32_t idx) const +{ + Mutex::ScopedLock _lock(lock); + for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin(); + iter != agentList.end(); iter++) + if (idx-- == 0) + return (*iter)->envelope; + return 0; +} + +void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentProxy* agent) +{ + SequenceContext::Ptr queryContext(new QueryContext(*this, context)); + Mutex::ScopedLock _lock(lock); + if (agent != 0) { + sendGetRequestLH(queryContext, query, agent->impl); + } else { + // TODO (optimization) only send queries to agents that have the requested class+package + for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin(); + iter != agentList.end(); iter++) { + sendGetRequestLH(queryContext, query, (*iter).get()); + } + } +} + +void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent) +{ + stringstream key; + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t sequence(seqMgr.reserve(queryContext)); + + Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence); + query.impl->encode(outBuffer); + key << "agent.1." << agent->agentBank; + sendBufferLH(outBuffer, QMF_EXCHANGE, key.str()); + QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << key.str()); } void BrokerProxyImpl::addBinding(const string& exchange, const string& key) @@ -420,17 +524,22 @@ BrokerEventImpl::Ptr BrokerProxyImpl::eventStable() return event; } -void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq) +BrokerEventImpl::Ptr BrokerProxyImpl::eventQueryComplete(void* context, QueryResponseImpl::Ptr response) { - // Note that this function doesn't touch requestsOutstanding. This is because - // it accounts for one request completed (the BrokerRequest) and one request - // started (the PackageRequest) which cancel each other out. + BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::QUERY_COMPLETE)); + event->context = context; + event->queryResponse = response; + return event; +} +void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq) +{ brokerId.decode(inBuffer); QPID_LOG(trace, "RCVD BrokerResponse seq=" << seq << " brokerId=" << brokerId); Mutex::ScopedLock _lock(lock); Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t sequence(seqMgr.reserve(this)); + uint32_t sequence(seqMgr.reserve()); + incOutstandingLH(); Protocol::encodeHeader(outBuffer, Protocol::OP_PACKAGE_REQUEST, sequence); sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY); QPID_LOG(trace, "SENT PackageRequest seq=" << sequence); @@ -446,7 +555,7 @@ void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq) Mutex::ScopedLock _lock(lock); Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t sequence(seqMgr.reserve(this)); + uint32_t sequence(seqMgr.reserve()); incOutstandingLH(); Protocol::encodeHeader(outBuffer, Protocol::OP_CLASS_QUERY, sequence); outBuffer.putShortString(package); @@ -460,20 +569,12 @@ void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq) uint32_t code = inBuffer.getLong(); inBuffer.getShortString(text); QPID_LOG(trace, "RCVD CommandComplete seq=" << seq << " code=" << code << " text=" << text); - seqMgr.release(seq); } void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq) { - string package; - string clsName; - SchemaHash hash; uint8_t kind = inBuffer.getOctet(); - inBuffer.getShortString(package); - inBuffer.getShortString(clsName); - hash.decode(inBuffer); - Uuid printableHash(hash.get()); - SchemaClassKeyImpl classKey(package, clsName, hash); + SchemaClassKeyImpl classKey(inBuffer); QPID_LOG(trace, "RCVD ClassIndication seq=" << seq << " kind=" << (int) kind << " key=" << classKey.str()); @@ -481,7 +582,7 @@ void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq) Mutex::ScopedLock _lock(lock); incOutstandingLH(); Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t sequence(seqMgr.reserve(this)); + uint32_t sequence(seqMgr.reserve()); Protocol::encodeHeader(outBuffer, Protocol::OP_SCHEMA_REQUEST, sequence); classKey.encode(outBuffer); sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY); @@ -515,6 +616,25 @@ void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq) console->learnClass(oClassPtr); key = oClassPtr->getClassKey()->impl; QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->str()); + + // + // If we have just learned about the org.apache.qpid.broker:agent class, send a get + // request for the current list of agents so we can have it on-hand before we declare + // this session "stable". + // + if (key->getClassName() == AGENT_CLASS && key->getPackageName() == BROKER_PACKAGE) { + Mutex::ScopedLock _lock(lock); + incOutstandingLH(); + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t sequence(seqMgr.reserve()); + Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence); + FieldTable ft; + ft.setString("_class", AGENT_CLASS); + ft.setString("_package", BROKER_PACKAGE); + ft.encode(outBuffer); + sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_AGENT_KEY); + QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << BROKER_AGENT_KEY); + } } else if (kind == CLASS_EVENT) { eClassPtr.reset(new SchemaEventClassImpl(inBuffer)); console->learnClass(eClassPtr); @@ -524,13 +644,20 @@ void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq) else { QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind); } - - decOutstanding(); } -void BrokerProxyImpl::handleObjectIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/, bool /*prop*/, bool /*stat*/) +ObjectImpl::Ptr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat) { - // TODO + SchemaClassKeyImpl classKey(inBuffer); + QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey.str()); + + SchemaObjectClassImpl::Ptr schema = console->getSchema(classKey); + if (schema.get() == 0) { + QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey.str()); + return ObjectImpl::Ptr(); + } + + return ObjectImpl::Ptr(new ObjectImpl(schema->envelope, inBuffer, prop, stat, true)); } void BrokerProxyImpl::incOutstandingLH() @@ -567,6 +694,79 @@ MethodResponseImpl::MethodResponseImpl(Buffer& buf) : envelope(new MethodRespons arguments.reset(new Value(TYPE_MAP)); } +bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer) +{ + bool completeContext = false; + if (opcode == Protocol::OP_BROKER_RESPONSE) { + broker.handleBrokerResponse(buffer, sequence); + completeContext = true; + } + else if (opcode == Protocol::OP_COMMAND_COMPLETE) { + broker.handleCommandComplete(buffer, sequence); + completeContext = true; + } + else if (opcode == Protocol::OP_SCHEMA_RESPONSE) { + broker.handleSchemaResponse(buffer, sequence); + completeContext = true; + } + else if (opcode == Protocol::OP_PACKAGE_INDICATION) + broker.handlePackageIndication(buffer, sequence); + else if (opcode == Protocol::OP_CLASS_INDICATION) + broker.handleClassIndication(buffer, sequence); + else if (opcode == Protocol::OP_HEARTBEAT_INDICATION) + broker.handleHeartbeatIndication(buffer, sequence); + else if (opcode == Protocol::OP_EVENT_INDICATION) + broker.handleEventIndication(buffer, sequence); + else if (opcode == Protocol::OP_PROPERTY_INDICATION) + broker.handleObjectIndication(buffer, sequence, true, false); + else if (opcode == Protocol::OP_STATISTIC_INDICATION) + broker.handleObjectIndication(buffer, sequence, false, true); + else if (opcode == Protocol::OP_OBJECT_INDICATION) + broker.handleObjectIndication(buffer, sequence, true, true); + else { + QPID_LOG(trace, "StaticContext::handleMessage invalid opcode: " << opcode); + completeContext = true; + } + + return completeContext; +} + +void QueryContext::reserve() +{ + Mutex::ScopedLock _lock(lock); + requestsOutstanding++; +} + +void QueryContext::release() +{ + Mutex::ScopedLock _lock(lock); + if (--requestsOutstanding == 0) { + broker.eventQueue.push_back(broker.eventQueryComplete(userContext, queryResponse)); + } +} + +bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer) +{ + bool completeContext = false; + ObjectImpl::Ptr object; + + if (opcode == Protocol::OP_COMMAND_COMPLETE) { + broker.handleCommandComplete(buffer, sequence); + completeContext = true; + } + else if (opcode == Protocol::OP_OBJECT_INDICATION) { + object = broker.handleObjectIndication(buffer, sequence, true, true); + if (object.get() != 0) + queryResponse->results.push_back(object); + } + else { + QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode); + completeContext = true; + } + + return completeContext; +} + ConsoleEngineImpl::ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& s) : envelope(e), settings(s) { @@ -757,23 +957,6 @@ void ConsoleEngineImpl::bindClass(const char* packageName, const char* className (*iter)->addBinding(QMF_EXCHANGE, key.str()); } -uint32_t ConsoleEngineImpl::agentCount() const -{ - // TODO - return 0; -} - -const AgentProxy* ConsoleEngineImpl::getAgent(uint32_t /*idx*/) const -{ - // TODO - return 0; -} - -void ConsoleEngineImpl::sendQuery(const Query& /*query*/, void* /*context*/) -{ - // TODO -} - /* void ConsoleEngineImpl::startSync(const Query& query, void* context, SyncQuery& sync) { @@ -835,11 +1018,29 @@ bool ConsoleEngineImpl::haveClass(const SchemaClassKeyImpl& key) const return oList.find(&key) != oList.end() || eList.find(&key) != eList.end(); } +SchemaObjectClassImpl::Ptr ConsoleEngineImpl::getSchema(const SchemaClassKeyImpl& key) const +{ + Mutex::ScopedLock _lock(lock); + PackageList::const_iterator pIter = packages.find(key.getPackageName()); + if (pIter == packages.end()) + return SchemaObjectClassImpl::Ptr(); + + const ObjectClassList& oList = pIter->second.first; + ObjectClassList::const_iterator iter = oList.find(&key); + if (iter == oList.end()) + return SchemaObjectClassImpl::Ptr(); + + return iter->second; +} //================================================================== // Wrappers //================================================================== +AgentProxy::AgentProxy(AgentProxyImpl* i) : impl(i) {} +AgentProxy::~AgentProxy() { delete impl; } +const char* AgentProxy::getLabel() const { return impl->getLabel().c_str(); } + BrokerProxy::BrokerProxy(ConsoleEngine& console) : impl(new BrokerProxyImpl(this, console)) {} BrokerProxy::~BrokerProxy() { delete impl; } void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); } @@ -850,16 +1051,23 @@ bool BrokerProxy::getXmtMessage(Message& item) const { return impl->getXmtMessag void BrokerProxy::popXmt() { impl->popXmt(); } bool BrokerProxy::getEvent(BrokerEvent& event) const { return impl->getEvent(event); } void BrokerProxy::popEvent() { impl->popEvent(); } - -AgentProxy::AgentProxy(ConsoleEngine& console) : impl(new AgentProxyImpl(this, console)) {} -AgentProxy::~AgentProxy() { delete impl; } +uint32_t BrokerProxy::agentCount() const { return impl->agentCount(); } +const AgentProxy* BrokerProxy::getAgent(uint32_t idx) const { return impl->getAgent(idx); } +void BrokerProxy::sendQuery(const Query& query, void* context, const AgentProxy* agent) { impl->sendQuery(query, context, agent); } MethodResponse::MethodResponse(MethodResponseImpl* i) : impl(i) {} -MethodResponse::~MethodResponse() { delete impl; } // TODO: correct to delete here? +MethodResponse::~MethodResponse() {} uint32_t MethodResponse::getStatus() const { return impl->getStatus(); } const Value* MethodResponse::getException() const { return impl->getException(); } const Value* MethodResponse::getArgs() const { return impl->getArgs(); } +QueryResponse::QueryResponse(QueryResponseImpl* i) : impl(i) {} +QueryResponse::~QueryResponse() {} +uint32_t QueryResponse::getStatus() const { return impl->getStatus(); } +const Value* QueryResponse::getException() const { return impl->getException(); } +uint32_t QueryResponse::getObjectCount() const { return impl->getObjectCount(); } +const Object* QueryResponse::getObject(uint32_t idx) const { return impl->getObject(idx); } + ConsoleEngine::ConsoleEngine(const ConsoleSettings& settings) : impl(new ConsoleEngineImpl(this, settings)) {} ConsoleEngine::~ConsoleEngine() { delete impl; } bool ConsoleEngine::getEvent(ConsoleEvent& event) const { return impl->getEvent(event); } @@ -876,9 +1084,6 @@ const SchemaEventClass* ConsoleEngine::getEventClass(const SchemaClassKey* key) void ConsoleEngine::bindPackage(const char* packageName) { impl->bindPackage(packageName); } void ConsoleEngine::bindClass(const SchemaClassKey* key) { impl->bindClass(key); } void ConsoleEngine::bindClass(const char* packageName, const char* className) { impl->bindClass(packageName, className); } -uint32_t ConsoleEngine::agentCount() const { return impl->agentCount(); } -const AgentProxy* ConsoleEngine::getAgent(uint32_t idx) const { return impl->getAgent(idx); } -void ConsoleEngine::sendQuery(const Query& query, void* context) { impl->sendQuery(query, context); } //void ConsoleEngine::startSync(const Query& query, void* context, SyncQuery& sync) { impl->startSync(query, context, sync); } //void ConsoleEngine::touchSync(SyncQuery& sync) { impl->touchSync(sync); } //void ConsoleEngine::endSync(SyncQuery& sync) { impl->endSync(sync); } |