diff options
Diffstat (limited to 'qpid/cpp/src/qmf')
-rw-r--r-- | qpid/cpp/src/qmf/AgentEngine.cpp | 19 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/ConsoleEngine.cpp | 379 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/ConsoleEngine.h | 70 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/Object.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/ObjectId.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/ObjectIdImpl.cpp | 21 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/ObjectIdImpl.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/ObjectImpl.cpp | 88 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/ObjectImpl.h | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/Query.h | 70 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/QueryImpl.cpp | 85 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/QueryImpl.h | 78 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/SchemaImpl.cpp | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/SchemaImpl.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/SequenceManager.cpp | 52 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/SequenceManager.h | 20 |
16 files changed, 676 insertions, 246 deletions
diff --git a/qpid/cpp/src/qmf/AgentEngine.cpp b/qpid/cpp/src/qmf/AgentEngine.cpp index d3204042d5..9ea3be5907 100644 --- a/qpid/cpp/src/qmf/AgentEngine.cpp +++ b/qpid/cpp/src/qmf/AgentEngine.cpp @@ -57,7 +57,7 @@ namespace qmf { string name; Object* object; boost::shared_ptr<ObjectId> objectId; - Query query; + boost::shared_ptr<Query> query; boost::shared_ptr<Value> arguments; string exchange; string bindingKey; @@ -214,7 +214,7 @@ AgentEvent AgentEventImpl::copy() item.sequence = sequence; item.object = object; item.objectId = objectId.get(); - item.query = &query; + item.query = query.get(); item.arguments = arguments.get(); item.objectClass = objectClass; @@ -381,7 +381,7 @@ void AgentEngineImpl::methodResponse(uint32_t sequence, uint32_t status, char* t } } sendBufferLH(buffer, context->exchange, context->key); - QPID_LOG(trace, "SENT MethodResponse"); + QPID_LOG(trace, "SENT MethodResponse seq=" << context->sequence << " status=" << status << " text=" << text); } void AgentEngineImpl::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) @@ -403,7 +403,7 @@ void AgentEngineImpl::queryResponse(uint32_t sequence, Object& object, bool prop object.impl->encodeStatistics(buffer); sendBufferLH(buffer, context->exchange, context->key); - QPID_LOG(trace, "SENT ContentIndication"); + QPID_LOG(trace, "SENT ContentIndication seq=" << context->sequence); } void AgentEngineImpl::queryComplete(uint32_t sequence) @@ -511,9 +511,10 @@ AgentEventImpl::Ptr AgentEngineImpl::eventQuery(uint32_t num, const string& user AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY)); event->sequence = num; event->authUserId = userId; - event->query.impl->packageName = package; - event->query.impl->className = cls; - event->query.impl->oid = oid; + if (oid.get()) + event->query.reset(new Query(oid.get())); + else + event->query.reset(new Query(cls.c_str(), package.c_str())); return event; } @@ -723,7 +724,7 @@ void AgentEngineImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, const ft.decode(inBuffer); - QPID_LOG(trace, "RCVD GetQuery: map=" << ft); + QPID_LOG(trace, "RCVD GetQuery: seq=" << sequence << " map=" << ft); value = ft.get("_package"); if (value.get() && value->convertsTo<string>()) { @@ -773,6 +774,8 @@ void AgentEngineImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, con AgentClassKey classKey(buffer); buffer.getShortString(method); + QPID_LOG(trace, "RCVD MethodRequest seq=" << sequence << " method=" << method); + map<string, ClassMaps>::const_iterator pIter = packages.find(pname); if (pIter == packages.end()) { sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_PACKAGE, pname); diff --git a/qpid/cpp/src/qmf/ConsoleEngine.cpp b/qpid/cpp/src/qmf/ConsoleEngine.cpp index 3d1b378b68..e7991328ee 100644 --- a/qpid/cpp/src/qmf/ConsoleEngine.cpp +++ b/qpid/cpp/src/qmf/ConsoleEngine.cpp @@ -34,6 +34,7 @@ #include <qpid/sys/Mutex.h> #include <qpid/log/Statement.h> #include <qpid/sys/Time.h> +#include <qpid/sys/SystemInfo.h> #include <string.h> #include <string> #include <deque> @@ -58,12 +59,27 @@ namespace qmf { auto_ptr<Value> arguments; MethodResponseImpl(Buffer& buf); - ~MethodResponseImpl() {} + ~MethodResponseImpl() { delete envelope; } uint32_t getStatus() const { return status; } const Value* getException() const { return exception.get(); } const Value* getArgs() const { return arguments.get(); } }; + struct QueryResponseImpl { + typedef boost::shared_ptr<QueryResponseImpl> Ptr; + QueryResponse *envelope; + uint32_t status; + auto_ptr<Value> exception; + vector<ObjectImpl::Ptr> results; + + QueryResponseImpl() : envelope(new QueryResponse(this)), status(0) {} + ~QueryResponseImpl() { delete envelope; } + uint32_t getStatus() const { return status; } + const Value* getException() const { return exception.get(); } + uint32_t getObjectCount() const { return results.size(); } + const Object* getObject(uint32_t idx) const; + }; + struct ConsoleEventImpl { typedef boost::shared_ptr<ConsoleEventImpl> Ptr; ConsoleEvent::EventKind kind; @@ -89,13 +105,29 @@ namespace qmf { string name; string exchange; string bindingKey; + void* context; + QueryResponseImpl::Ptr queryResponse; BrokerEventImpl(BrokerEvent::EventKind k) : kind(k) {} ~BrokerEventImpl() {} BrokerEvent copy(); }; - class BrokerProxyImpl : public SequenceContext { + struct AgentProxyImpl { + typedef boost::shared_ptr<AgentProxyImpl> Ptr; + AgentProxy* envelope; + ConsoleEngineImpl* console; + BrokerProxyImpl* broker; + uint32_t agentBank; + string label; + + AgentProxyImpl(ConsoleEngineImpl* c, BrokerProxyImpl* b, uint32_t ab, const string& l) : + envelope(new AgentProxy(this)), console(c), broker(b), agentBank(ab), label(l) {} + ~AgentProxyImpl() {} + const string& getLabel() const { return label; } + }; + + class BrokerProxyImpl { public: typedef boost::shared_ptr<BrokerProxyImpl> Ptr; @@ -114,12 +146,17 @@ namespace qmf { bool getEvent(BrokerEvent& event) const; void popEvent(); - // From SequenceContext - void complete(); + uint32_t agentCount() const; + const AgentProxy* getAgent(uint32_t idx) const; + void sendQuery(const Query& query, void* context, const AgentProxy* agent); + void sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent); void addBinding(const string& exchange, const string& key); + void staticRelease() { decOutstanding(); } private: + friend class StaticContext; + friend class QueryContext; mutable Mutex lock; BrokerProxy* envelope; ConsoleEngineImpl* console; @@ -128,6 +165,7 @@ namespace qmf { SequenceManager seqMgr; uint32_t requestsOutstanding; bool topicBound; + vector<AgentProxyImpl::Ptr> agentList; deque<MessageImpl::Ptr> xmtQueue; deque<BrokerEventImpl::Ptr> eventQueue; @@ -138,6 +176,7 @@ namespace qmf { BrokerEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key); BrokerEventImpl::Ptr eventSetupComplete(); BrokerEventImpl::Ptr eventStable(); + BrokerEventImpl::Ptr eventQueryComplete(void* context, QueryResponseImpl::Ptr response); void handleBrokerResponse(Buffer& inBuffer, uint32_t seq); void handlePackageIndication(Buffer& inBuffer, uint32_t seq); @@ -147,19 +186,33 @@ namespace qmf { void handleHeartbeatIndication(Buffer& inBuffer, uint32_t seq); void handleEventIndication(Buffer& inBuffer, uint32_t seq); void handleSchemaResponse(Buffer& inBuffer, uint32_t seq); - void handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat); + ObjectImpl::Ptr handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat); void incOutstandingLH(); void decOutstanding(); }; - struct AgentProxyImpl { - typedef boost::shared_ptr<AgentProxyImpl> Ptr; - AgentProxy* envelope; - ConsoleEngineImpl* console; + struct StaticContext : public SequenceContext { + StaticContext(BrokerProxyImpl& b) : broker(b) {} + ~StaticContext() {} + void reserve() {} + void release() { broker.staticRelease(); } + bool handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer); + BrokerProxyImpl& broker; + }; - AgentProxyImpl(AgentProxy* e, ConsoleEngine& _console) : - envelope(e), console(_console.impl) {} - ~AgentProxyImpl() {} + struct QueryContext : public SequenceContext { + QueryContext(BrokerProxyImpl& b, void* u) : + broker(b), userContext(u), requestsOutstanding(0), queryResponse(new QueryResponseImpl()) {} + ~QueryContext() {} + void reserve(); + void release(); + bool handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer); + + mutable Mutex lock; + BrokerProxyImpl& broker; + void* userContext; + uint32_t requestsOutstanding; + QueryResponseImpl::Ptr queryResponse; }; class ConsoleEngineImpl { @@ -187,11 +240,6 @@ namespace qmf { void bindClass(const SchemaClassKey* key); void bindClass(const char* packageName, const char* className); - uint32_t agentCount() const; - const AgentProxy* getAgent(uint32_t idx) const; - - void sendQuery(const Query& query, void* context); - /* void startSync(const Query& query, void* context, SyncQuery& sync); void touchSync(SyncQuery& sync); @@ -226,13 +274,31 @@ namespace qmf { void learnClass(SchemaObjectClassImpl::Ptr cls); void learnClass(SchemaEventClassImpl::Ptr cls); bool haveClass(const SchemaClassKeyImpl& key) const; + SchemaObjectClassImpl::Ptr getSchema(const SchemaClassKeyImpl& key) const; }; } namespace { -const char* QMF_EXCHANGE = "qpid.management"; -const char* DIR_EXCHANGE = "amq.direct"; -const char* BROKER_KEY = "broker"; + const char* QMF_EXCHANGE = "qpid.management"; + const char* DIR_EXCHANGE = "amq.direct"; + const char* BROKER_KEY = "broker"; + const char* BROKER_PACKAGE = "org.apache.qpid.broker"; + const char* AGENT_CLASS = "agent"; + const char* BROKER_AGENT_KEY = "agent.1.0"; +} + +const Object* QueryResponseImpl::getObject(uint32_t idx) const +{ + vector<ObjectImpl::Ptr>::const_iterator iter = results.begin(); + + while (idx > 0) { + if (iter == results.end()) + return 0; + iter++; + idx--; + } + + return (*iter)->envelope; } #define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());} @@ -267,19 +333,29 @@ BrokerEvent BrokerEventImpl::copy() STRING_REF(name); STRING_REF(exchange); STRING_REF(bindingKey); + item.context = context; + item.queryResponse = queryResponse.get() ? queryResponse->envelope : 0; return item; } BrokerProxyImpl::BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console) : - envelope(e), console(_console.impl), queueName("qmfc-") + envelope(e), console(_console.impl) { - // TODO: Give the queue name a unique suffix + stringstream qn; + qpid::TcpAddress addr; + + SystemInfo::getLocalHostname(addr); + qn << "qmfc-" << SystemInfo::getProcessName() << "-" << addr << "-" << SystemInfo::getProcessId(); + queueName = qn.str(); + + seqMgr.setUnsolicitedContext(SequenceContext::Ptr(new StaticContext(*this))); } void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/) { Mutex::ScopedLock _lock(lock); + agentList.clear(); eventQueue.clear(); xmtQueue.clear(); eventQueue.push_back(eventDeclareQueue(queueName)); @@ -292,6 +368,7 @@ void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/) void BrokerProxyImpl::sessionClosed() { Mutex::ScopedLock _lock(lock); + agentList.clear(); eventQueue.clear(); xmtQueue.clear(); } @@ -302,11 +379,14 @@ void BrokerProxyImpl::startProtocol() char rawbuffer[512]; Buffer buffer(rawbuffer, 512); + agentList.push_back(AgentProxyImpl::Ptr(new AgentProxyImpl(console, this, 0, "Agent embedded in broker"))); + requestsOutstanding = 1; topicBound = false; - Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST); + uint32_t sequence(seqMgr.reserve()); + Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence); sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); - QPID_LOG(trace, "SENT BrokerRequest"); + QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence); } void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey) @@ -330,23 +410,8 @@ void BrokerProxyImpl::handleRcvMessage(Message& message) uint8_t opcode; uint32_t sequence; - while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) { - if (opcode == Protocol::OP_BROKER_RESPONSE) handleBrokerResponse(inBuffer, sequence); - else if (opcode == Protocol::OP_PACKAGE_INDICATION) handlePackageIndication(inBuffer, sequence); - else if (opcode == Protocol::OP_COMMAND_COMPLETE) handleCommandComplete(inBuffer, sequence); - else if (opcode == Protocol::OP_CLASS_INDICATION) handleClassIndication(inBuffer, sequence); - else if (opcode == Protocol::OP_METHOD_RESPONSE) handleMethodResponse(inBuffer, sequence); - else if (opcode == Protocol::OP_HEARTBEAT_INDICATION) handleHeartbeatIndication(inBuffer, sequence); - else if (opcode == Protocol::OP_EVENT_INDICATION) handleEventIndication(inBuffer, sequence); - else if (opcode == Protocol::OP_SCHEMA_RESPONSE) handleSchemaResponse(inBuffer, sequence); - else if (opcode == Protocol::OP_PROPERTY_INDICATION) handleObjectIndication(inBuffer, sequence, true, false); - else if (opcode == Protocol::OP_STATISTIC_INDICATION) handleObjectIndication(inBuffer, sequence, false, true); - else if (opcode == Protocol::OP_OBJECT_INDICATION) handleObjectIndication(inBuffer, sequence, true, true); - else { - QPID_LOG(trace, "BrokerProxyImpl::handleRcvMessage invalid opcode: " << opcode); - break; - } - } + while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) + seqMgr.dispatch(opcode, sequence, inBuffer); } bool BrokerProxyImpl::getXmtMessage(Message& item) const @@ -381,9 +446,48 @@ void BrokerProxyImpl::popEvent() eventQueue.pop_front(); } -void BrokerProxyImpl::complete() +uint32_t BrokerProxyImpl::agentCount() const { - decOutstanding(); + Mutex::ScopedLock _lock(lock); + return agentList.size(); +} + +const AgentProxy* BrokerProxyImpl::getAgent(uint32_t idx) const +{ + Mutex::ScopedLock _lock(lock); + for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin(); + iter != agentList.end(); iter++) + if (idx-- == 0) + return (*iter)->envelope; + return 0; +} + +void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentProxy* agent) +{ + SequenceContext::Ptr queryContext(new QueryContext(*this, context)); + Mutex::ScopedLock _lock(lock); + if (agent != 0) { + sendGetRequestLH(queryContext, query, agent->impl); + } else { + // TODO (optimization) only send queries to agents that have the requested class+package + for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin(); + iter != agentList.end(); iter++) { + sendGetRequestLH(queryContext, query, (*iter).get()); + } + } +} + +void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent) +{ + stringstream key; + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t sequence(seqMgr.reserve(queryContext)); + + Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence); + query.impl->encode(outBuffer); + key << "agent.1." << agent->agentBank; + sendBufferLH(outBuffer, QMF_EXCHANGE, key.str()); + QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << key.str()); } void BrokerProxyImpl::addBinding(const string& exchange, const string& key) @@ -420,17 +524,22 @@ BrokerEventImpl::Ptr BrokerProxyImpl::eventStable() return event; } -void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq) +BrokerEventImpl::Ptr BrokerProxyImpl::eventQueryComplete(void* context, QueryResponseImpl::Ptr response) { - // Note that this function doesn't touch requestsOutstanding. This is because - // it accounts for one request completed (the BrokerRequest) and one request - // started (the PackageRequest) which cancel each other out. + BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::QUERY_COMPLETE)); + event->context = context; + event->queryResponse = response; + return event; +} +void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq) +{ brokerId.decode(inBuffer); QPID_LOG(trace, "RCVD BrokerResponse seq=" << seq << " brokerId=" << brokerId); Mutex::ScopedLock _lock(lock); Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t sequence(seqMgr.reserve(this)); + uint32_t sequence(seqMgr.reserve()); + incOutstandingLH(); Protocol::encodeHeader(outBuffer, Protocol::OP_PACKAGE_REQUEST, sequence); sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY); QPID_LOG(trace, "SENT PackageRequest seq=" << sequence); @@ -446,7 +555,7 @@ void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq) Mutex::ScopedLock _lock(lock); Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t sequence(seqMgr.reserve(this)); + uint32_t sequence(seqMgr.reserve()); incOutstandingLH(); Protocol::encodeHeader(outBuffer, Protocol::OP_CLASS_QUERY, sequence); outBuffer.putShortString(package); @@ -460,20 +569,12 @@ void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq) uint32_t code = inBuffer.getLong(); inBuffer.getShortString(text); QPID_LOG(trace, "RCVD CommandComplete seq=" << seq << " code=" << code << " text=" << text); - seqMgr.release(seq); } void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq) { - string package; - string clsName; - SchemaHash hash; uint8_t kind = inBuffer.getOctet(); - inBuffer.getShortString(package); - inBuffer.getShortString(clsName); - hash.decode(inBuffer); - Uuid printableHash(hash.get()); - SchemaClassKeyImpl classKey(package, clsName, hash); + SchemaClassKeyImpl classKey(inBuffer); QPID_LOG(trace, "RCVD ClassIndication seq=" << seq << " kind=" << (int) kind << " key=" << classKey.str()); @@ -481,7 +582,7 @@ void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq) Mutex::ScopedLock _lock(lock); incOutstandingLH(); Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t sequence(seqMgr.reserve(this)); + uint32_t sequence(seqMgr.reserve()); Protocol::encodeHeader(outBuffer, Protocol::OP_SCHEMA_REQUEST, sequence); classKey.encode(outBuffer); sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY); @@ -515,6 +616,25 @@ void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq) console->learnClass(oClassPtr); key = oClassPtr->getClassKey()->impl; QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->str()); + + // + // If we have just learned about the org.apache.qpid.broker:agent class, send a get + // request for the current list of agents so we can have it on-hand before we declare + // this session "stable". + // + if (key->getClassName() == AGENT_CLASS && key->getPackageName() == BROKER_PACKAGE) { + Mutex::ScopedLock _lock(lock); + incOutstandingLH(); + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t sequence(seqMgr.reserve()); + Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence); + FieldTable ft; + ft.setString("_class", AGENT_CLASS); + ft.setString("_package", BROKER_PACKAGE); + ft.encode(outBuffer); + sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_AGENT_KEY); + QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << BROKER_AGENT_KEY); + } } else if (kind == CLASS_EVENT) { eClassPtr.reset(new SchemaEventClassImpl(inBuffer)); console->learnClass(eClassPtr); @@ -524,13 +644,20 @@ void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq) else { QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind); } - - decOutstanding(); } -void BrokerProxyImpl::handleObjectIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/, bool /*prop*/, bool /*stat*/) +ObjectImpl::Ptr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat) { - // TODO + SchemaClassKeyImpl classKey(inBuffer); + QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey.str()); + + SchemaObjectClassImpl::Ptr schema = console->getSchema(classKey); + if (schema.get() == 0) { + QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey.str()); + return ObjectImpl::Ptr(); + } + + return ObjectImpl::Ptr(new ObjectImpl(schema->envelope, inBuffer, prop, stat, true)); } void BrokerProxyImpl::incOutstandingLH() @@ -567,6 +694,79 @@ MethodResponseImpl::MethodResponseImpl(Buffer& buf) : envelope(new MethodRespons arguments.reset(new Value(TYPE_MAP)); } +bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer) +{ + bool completeContext = false; + if (opcode == Protocol::OP_BROKER_RESPONSE) { + broker.handleBrokerResponse(buffer, sequence); + completeContext = true; + } + else if (opcode == Protocol::OP_COMMAND_COMPLETE) { + broker.handleCommandComplete(buffer, sequence); + completeContext = true; + } + else if (opcode == Protocol::OP_SCHEMA_RESPONSE) { + broker.handleSchemaResponse(buffer, sequence); + completeContext = true; + } + else if (opcode == Protocol::OP_PACKAGE_INDICATION) + broker.handlePackageIndication(buffer, sequence); + else if (opcode == Protocol::OP_CLASS_INDICATION) + broker.handleClassIndication(buffer, sequence); + else if (opcode == Protocol::OP_HEARTBEAT_INDICATION) + broker.handleHeartbeatIndication(buffer, sequence); + else if (opcode == Protocol::OP_EVENT_INDICATION) + broker.handleEventIndication(buffer, sequence); + else if (opcode == Protocol::OP_PROPERTY_INDICATION) + broker.handleObjectIndication(buffer, sequence, true, false); + else if (opcode == Protocol::OP_STATISTIC_INDICATION) + broker.handleObjectIndication(buffer, sequence, false, true); + else if (opcode == Protocol::OP_OBJECT_INDICATION) + broker.handleObjectIndication(buffer, sequence, true, true); + else { + QPID_LOG(trace, "StaticContext::handleMessage invalid opcode: " << opcode); + completeContext = true; + } + + return completeContext; +} + +void QueryContext::reserve() +{ + Mutex::ScopedLock _lock(lock); + requestsOutstanding++; +} + +void QueryContext::release() +{ + Mutex::ScopedLock _lock(lock); + if (--requestsOutstanding == 0) { + broker.eventQueue.push_back(broker.eventQueryComplete(userContext, queryResponse)); + } +} + +bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer) +{ + bool completeContext = false; + ObjectImpl::Ptr object; + + if (opcode == Protocol::OP_COMMAND_COMPLETE) { + broker.handleCommandComplete(buffer, sequence); + completeContext = true; + } + else if (opcode == Protocol::OP_OBJECT_INDICATION) { + object = broker.handleObjectIndication(buffer, sequence, true, true); + if (object.get() != 0) + queryResponse->results.push_back(object); + } + else { + QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode); + completeContext = true; + } + + return completeContext; +} + ConsoleEngineImpl::ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& s) : envelope(e), settings(s) { @@ -757,23 +957,6 @@ void ConsoleEngineImpl::bindClass(const char* packageName, const char* className (*iter)->addBinding(QMF_EXCHANGE, key.str()); } -uint32_t ConsoleEngineImpl::agentCount() const -{ - // TODO - return 0; -} - -const AgentProxy* ConsoleEngineImpl::getAgent(uint32_t /*idx*/) const -{ - // TODO - return 0; -} - -void ConsoleEngineImpl::sendQuery(const Query& /*query*/, void* /*context*/) -{ - // TODO -} - /* void ConsoleEngineImpl::startSync(const Query& query, void* context, SyncQuery& sync) { @@ -835,11 +1018,29 @@ bool ConsoleEngineImpl::haveClass(const SchemaClassKeyImpl& key) const return oList.find(&key) != oList.end() || eList.find(&key) != eList.end(); } +SchemaObjectClassImpl::Ptr ConsoleEngineImpl::getSchema(const SchemaClassKeyImpl& key) const +{ + Mutex::ScopedLock _lock(lock); + PackageList::const_iterator pIter = packages.find(key.getPackageName()); + if (pIter == packages.end()) + return SchemaObjectClassImpl::Ptr(); + + const ObjectClassList& oList = pIter->second.first; + ObjectClassList::const_iterator iter = oList.find(&key); + if (iter == oList.end()) + return SchemaObjectClassImpl::Ptr(); + + return iter->second; +} //================================================================== // Wrappers //================================================================== +AgentProxy::AgentProxy(AgentProxyImpl* i) : impl(i) {} +AgentProxy::~AgentProxy() { delete impl; } +const char* AgentProxy::getLabel() const { return impl->getLabel().c_str(); } + BrokerProxy::BrokerProxy(ConsoleEngine& console) : impl(new BrokerProxyImpl(this, console)) {} BrokerProxy::~BrokerProxy() { delete impl; } void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); } @@ -850,16 +1051,23 @@ bool BrokerProxy::getXmtMessage(Message& item) const { return impl->getXmtMessag void BrokerProxy::popXmt() { impl->popXmt(); } bool BrokerProxy::getEvent(BrokerEvent& event) const { return impl->getEvent(event); } void BrokerProxy::popEvent() { impl->popEvent(); } - -AgentProxy::AgentProxy(ConsoleEngine& console) : impl(new AgentProxyImpl(this, console)) {} -AgentProxy::~AgentProxy() { delete impl; } +uint32_t BrokerProxy::agentCount() const { return impl->agentCount(); } +const AgentProxy* BrokerProxy::getAgent(uint32_t idx) const { return impl->getAgent(idx); } +void BrokerProxy::sendQuery(const Query& query, void* context, const AgentProxy* agent) { impl->sendQuery(query, context, agent); } MethodResponse::MethodResponse(MethodResponseImpl* i) : impl(i) {} -MethodResponse::~MethodResponse() { delete impl; } // TODO: correct to delete here? +MethodResponse::~MethodResponse() {} uint32_t MethodResponse::getStatus() const { return impl->getStatus(); } const Value* MethodResponse::getException() const { return impl->getException(); } const Value* MethodResponse::getArgs() const { return impl->getArgs(); } +QueryResponse::QueryResponse(QueryResponseImpl* i) : impl(i) {} +QueryResponse::~QueryResponse() {} +uint32_t QueryResponse::getStatus() const { return impl->getStatus(); } +const Value* QueryResponse::getException() const { return impl->getException(); } +uint32_t QueryResponse::getObjectCount() const { return impl->getObjectCount(); } +const Object* QueryResponse::getObject(uint32_t idx) const { return impl->getObject(idx); } + ConsoleEngine::ConsoleEngine(const ConsoleSettings& settings) : impl(new ConsoleEngineImpl(this, settings)) {} ConsoleEngine::~ConsoleEngine() { delete impl; } bool ConsoleEngine::getEvent(ConsoleEvent& event) const { return impl->getEvent(event); } @@ -876,9 +1084,6 @@ const SchemaEventClass* ConsoleEngine::getEventClass(const SchemaClassKey* key) void ConsoleEngine::bindPackage(const char* packageName) { impl->bindPackage(packageName); } void ConsoleEngine::bindClass(const SchemaClassKey* key) { impl->bindClass(key); } void ConsoleEngine::bindClass(const char* packageName, const char* className) { impl->bindClass(packageName, className); } -uint32_t ConsoleEngine::agentCount() const { return impl->agentCount(); } -const AgentProxy* ConsoleEngine::getAgent(uint32_t idx) const { return impl->getAgent(idx); } -void ConsoleEngine::sendQuery(const Query& query, void* context) { impl->sendQuery(query, context); } //void ConsoleEngine::startSync(const Query& query, void* context, SyncQuery& sync) { impl->startSync(query, context, sync); } //void ConsoleEngine::touchSync(SyncQuery& sync) { impl->touchSync(sync); } //void ConsoleEngine::endSync(SyncQuery& sync) { impl->endSync(sync); } diff --git a/qpid/cpp/src/qmf/ConsoleEngine.h b/qpid/cpp/src/qmf/ConsoleEngine.h index 84ac78cd69..457e83ad58 100644 --- a/qpid/cpp/src/qmf/ConsoleEngine.h +++ b/qpid/cpp/src/qmf/ConsoleEngine.h @@ -37,6 +37,8 @@ namespace qmf { class AgentProxy; class AgentProxyImpl; class MethodResponseImpl; + class QueryResponseImpl; + class QueryContext; /** * @@ -57,6 +59,23 @@ namespace qmf { /** * */ + class QueryResponse { + public: + QueryResponse(QueryResponseImpl* impl); + ~QueryResponse(); + uint32_t getStatus() const; + const Value* getException() const; + uint32_t getObjectCount() const; + const Object* getObject(uint32_t idx) const; + + private: + friend class QueryContext; + QueryResponseImpl *impl; + }; + + /** + * + */ struct ConsoleEvent { enum EventKind { AGENT_ADDED = 1, @@ -64,7 +83,6 @@ namespace qmf { NEW_PACKAGE = 3, NEW_CLASS = 4, OBJECT_UPDATE = 5, - QUERY_COMPLETE = 6, EVENT_RECEIVED = 7, AGENT_HEARTBEAT = 8, METHOD_RESPONSE = 9 @@ -75,11 +93,12 @@ namespace qmf { char* name; // (NEW_PACKAGE) SchemaClassKey* classKey; // (NEW_CLASS) Object* object; // (OBJECT_UPDATE) - void* context; // (OBJECT_UPDATE, QUERY_COMPLETE) + void* context; // (OBJECT_UPDATE) Event* event; // (EVENT_RECEIVED) uint64_t timestamp; // (AGENT_HEARTBEAT) uint32_t methodHandle; // (METHOD_RESPONSE) MethodResponse* methodResponse; // (METHOD_RESPONSE) + QueryResponse* queryResponse; // (QUERY_COMPLETE) }; /** @@ -93,13 +112,30 @@ namespace qmf { BIND = 13, UNBIND = 14, SETUP_COMPLETE = 15, - STABLE = 16 + STABLE = 16, + QUERY_COMPLETE = 17 }; EventKind kind; - char* name; // ([DECLARE|DELETE]_QUEUE, [UN]BIND) - char* exchange; // ([UN]BIND) - char* bindingKey; // ([UN]BIND) + char* name; // ([DECLARE|DELETE]_QUEUE, [UN]BIND) + char* exchange; // ([UN]BIND) + char* bindingKey; // ([UN]BIND) + void* context; // (QUERY_COMPLETE) + QueryResponse* queryResponse; // (QUERY_COMPLETE) + }; + + /** + * + */ + class AgentProxy { + public: + AgentProxy(AgentProxyImpl* impl); + ~AgentProxy(); + const char* getLabel() const; + + private: + friend class BrokerProxyImpl; + AgentProxyImpl* impl; }; /** @@ -121,22 +157,13 @@ namespace qmf { bool getEvent(BrokerEvent& event) const; void popEvent(); - private: - friend class ConsoleEngineImpl; - BrokerProxyImpl* impl; - }; - - /** - * - */ - class AgentProxy { - public: - AgentProxy(ConsoleEngine& console); - ~AgentProxy(); + uint32_t agentCount() const; + const AgentProxy* getAgent(uint32_t idx) const; + void sendQuery(const Query& query, void* context, const AgentProxy* agent = 0); private: friend class ConsoleEngineImpl; - AgentProxyImpl* impl; + BrokerProxyImpl* impl; }; // TODO - move this to a public header @@ -178,11 +205,6 @@ namespace qmf { void bindClass(const SchemaClassKey* key); void bindClass(const char* packageName, const char* className); - uint32_t agentCount() const; - const AgentProxy* getAgent(uint32_t idx) const; - - void sendQuery(const Query& query, void* context); - /* void startSync(const Query& query, void* context, SyncQuery& sync); void touchSync(SyncQuery& sync); diff --git a/qpid/cpp/src/qmf/Object.h b/qpid/cpp/src/qmf/Object.h index eb92cbbe45..9cb3224d9b 100644 --- a/qpid/cpp/src/qmf/Object.h +++ b/qpid/cpp/src/qmf/Object.h @@ -31,13 +31,14 @@ namespace qmf { public: Object(const SchemaObjectClass* type); Object(ObjectImpl* impl); + Object(const Object& from); virtual ~Object(); void destroy(); const ObjectId* getObjectId() const; void setObjectId(ObjectId* oid); const SchemaObjectClass* getClass() const; - Value* getValue(char* key); + Value* getValue(char* key) const; ObjectImpl* impl; }; diff --git a/qpid/cpp/src/qmf/ObjectId.h b/qpid/cpp/src/qmf/ObjectId.h index ffd1b6978b..e894e0b39c 100644 --- a/qpid/cpp/src/qmf/ObjectId.h +++ b/qpid/cpp/src/qmf/ObjectId.h @@ -30,6 +30,7 @@ namespace qmf { class ObjectId { public: ObjectId(); + ObjectId(const ObjectId& from); ObjectId(ObjectIdImpl* impl); ~ObjectId(); diff --git a/qpid/cpp/src/qmf/ObjectIdImpl.cpp b/qpid/cpp/src/qmf/ObjectIdImpl.cpp index 75661fdb47..c0618ccc49 100644 --- a/qpid/cpp/src/qmf/ObjectIdImpl.cpp +++ b/qpid/cpp/src/qmf/ObjectIdImpl.cpp @@ -100,6 +100,15 @@ void ObjectIdImpl::fromString(const std::string& repr) agent = 0; } +std::string ObjectIdImpl::asString() const +{ + stringstream val; + + val << getFlags() << "-" << getSequence() << "-" << getBrokerBank() << "-" << + getAgentBank() << "-" << getObjectNum(); + return val.str(); +} + bool ObjectIdImpl::operator==(const ObjectIdImpl& other) const { uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL; @@ -126,15 +135,11 @@ bool ObjectIdImpl::operator>(const ObjectIdImpl& other) const // Wrappers //================================================================== -ObjectId::ObjectId() -{ - impl = new ObjectIdImpl(this); -} +ObjectId::ObjectId() : impl(new ObjectIdImpl(this)) {} -ObjectId::ObjectId(ObjectIdImpl* i) -{ - impl = i; -} +ObjectId::ObjectId(const ObjectId& from) : impl(new ObjectIdImpl(*(from.impl))) {} + +ObjectId::ObjectId(ObjectIdImpl* i) : impl(i) {} ObjectId::~ObjectId() { diff --git a/qpid/cpp/src/qmf/ObjectIdImpl.h b/qpid/cpp/src/qmf/ObjectIdImpl.h index 5d8ee59aee..38d231237f 100644 --- a/qpid/cpp/src/qmf/ObjectIdImpl.h +++ b/qpid/cpp/src/qmf/ObjectIdImpl.h @@ -39,13 +39,14 @@ namespace qmf { uint64_t first; uint64_t second; - ObjectIdImpl(ObjectId* e) : envelope(e), agent(0) {} + ObjectIdImpl(ObjectId* e) : envelope(e), agent(0), first(0), second(0) {} ObjectIdImpl(qpid::framing::Buffer& buffer); ObjectIdImpl(AgentAttachment* agent, uint8_t flags, uint16_t seq, uint64_t object); void decode(qpid::framing::Buffer& buffer); void encode(qpid::framing::Buffer& buffer) const; void fromString(const std::string& repr); + std::string asString() const; uint8_t getFlags() const { return (first & 0xF000000000000000LL) >> 60; } uint16_t getSequence() const { return (first & 0x0FFF000000000000LL) >> 48; } uint32_t getBrokerBank() const { return (first & 0x0000FFFFF0000000LL) >> 28; } diff --git a/qpid/cpp/src/qmf/ObjectImpl.cpp b/qpid/cpp/src/qmf/ObjectImpl.cpp index 645ccd5c81..1ea2d54527 100644 --- a/qpid/cpp/src/qmf/ObjectImpl.cpp +++ b/qpid/cpp/src/qmf/ObjectImpl.cpp @@ -45,30 +45,40 @@ ObjectImpl::ObjectImpl(Object* e, const SchemaObjectClass* type) : } } -ObjectImpl::ObjectImpl(const SchemaObjectClass* type, Buffer& buffer) : - envelope(new Object(this)), objectClass(type), createTime(uint64_t(Duration(now()))), - destroyTime(0), lastUpdatedTime(createTime) +ObjectImpl::ObjectImpl(const SchemaObjectClass* type, Buffer& buffer, bool prop, bool stat, bool managed) : + envelope(new Object(this)), objectClass(type), createTime(0), destroyTime(0), lastUpdatedTime(0) { - int propCount = objectClass->getPropertyCount(); - int statCount = objectClass->getStatisticCount(); int idx; - set<string> excludes; - parsePresenceMasks(buffer, excludes); - for (idx = 0; idx < propCount; idx++) { - const SchemaProperty* prop = objectClass->getProperty(idx); - if (excludes.count(prop->getName()) != 0) { - properties[prop->getName()] = ValuePtr(new Value(prop->getType())); - } else { - ValueImpl* pval = new ValueImpl(prop->getType(), buffer); - properties[prop->getName()] = ValuePtr(pval->envelope); + if (managed) { + lastUpdatedTime = buffer.getLongLong(); + createTime = buffer.getLongLong(); + destroyTime = buffer.getLongLong(); + objectId.reset(new ObjectIdImpl(buffer)); + } + + if (prop) { + int propCount = objectClass->getPropertyCount(); + set<string> excludes; + parsePresenceMasks(buffer, excludes); + for (idx = 0; idx < propCount; idx++) { + const SchemaProperty* prop = objectClass->getProperty(idx); + if (excludes.count(prop->getName()) != 0) { + properties[prop->getName()] = ValuePtr(new Value(prop->getType())); + } else { + ValueImpl* pval = new ValueImpl(prop->getType(), buffer); + properties[prop->getName()] = ValuePtr(pval->envelope); + } } } - for (idx = 0; idx < statCount; idx++) { - const SchemaStatistic* stat = objectClass->getStatistic(idx); - ValueImpl* sval = new ValueImpl(stat->getType(), buffer); - statistics[stat->getName()] = ValuePtr(sval->envelope); + if (stat) { + int statCount = objectClass->getStatisticCount(); + for (idx = 0; idx < statCount; idx++) { + const SchemaStatistic* stat = objectClass->getStatistic(idx); + ValueImpl* sval = new ValueImpl(stat->getType(), buffer); + statistics[stat->getName()] = ValuePtr(sval->envelope); + } } } @@ -82,7 +92,7 @@ void ObjectImpl::destroy() // TODO - flag deletion } -Value* ObjectImpl::getValue(const string& key) +Value* ObjectImpl::getValue(const string& key) const { map<string, ValuePtr>::const_iterator iter; @@ -133,7 +143,7 @@ void ObjectImpl::encodeManagedObjectData(qpid::framing::Buffer& buffer) const buffer.putLongLong(lastUpdatedTime); buffer.putLongLong(createTime); buffer.putLongLong(destroyTime); - objectId->impl->encode(buffer); + objectId->encode(buffer); } void ObjectImpl::encodeProperties(qpid::framing::Buffer& buffer) const @@ -187,36 +197,12 @@ void ObjectImpl::encodeStatistics(qpid::framing::Buffer& buffer) const //================================================================== Object::Object(const SchemaObjectClass* type) : impl(new ObjectImpl(this, type)) {} - Object::Object(ObjectImpl* i) : impl(i) {} - -Object::~Object() -{ - delete impl; -} - -void Object::destroy() -{ - impl->destroy(); -} - -const ObjectId* Object::getObjectId() const -{ - return impl->getObjectId(); -} - -void Object::setObjectId(ObjectId* oid) -{ - impl->setObjectId(oid); -} - -const SchemaObjectClass* Object::getClass() const -{ - return impl->getClass(); -} - -Value* Object::getValue(char* key) -{ - return impl->getValue(key); -} +Object::Object(const Object& from) : impl(new ObjectImpl(*(from.impl))) {} +Object::~Object() { delete impl; } +void Object::destroy() { impl->destroy(); } +const ObjectId* Object::getObjectId() const { return impl->getObjectId(); } +void Object::setObjectId(ObjectId* oid) { impl->setObjectId(oid); } +const SchemaObjectClass* Object::getClass() const { return impl->getClass(); } +Value* Object::getValue(char* key) const { return impl->getValue(key); } diff --git a/qpid/cpp/src/qmf/ObjectImpl.h b/qpid/cpp/src/qmf/ObjectImpl.h index 4dc2170bfc..d69979e0da 100644 --- a/qpid/cpp/src/qmf/ObjectImpl.h +++ b/qpid/cpp/src/qmf/ObjectImpl.h @@ -21,19 +21,22 @@ */ #include <qmf/Object.h> +#include <qmf/ObjectIdImpl.h> #include <map> #include <set> #include <string> #include <qpid/framing/Buffer.h> #include <boost/shared_ptr.hpp> +#include <qpid/sys/Mutex.h> namespace qmf { struct ObjectImpl { + typedef boost::shared_ptr<ObjectImpl> Ptr; typedef boost::shared_ptr<Value> ValuePtr; Object* envelope; const SchemaObjectClass* objectClass; - boost::shared_ptr<ObjectId> objectId; + boost::shared_ptr<ObjectIdImpl> objectId; uint64_t createTime; uint64_t destroyTime; uint64_t lastUpdatedTime; @@ -41,14 +44,14 @@ namespace qmf { mutable std::map<std::string, ValuePtr> statistics; ObjectImpl(Object* e, const SchemaObjectClass* type); - ObjectImpl(const SchemaObjectClass* type, qpid::framing::Buffer& buffer); + ObjectImpl(const SchemaObjectClass* type, qpid::framing::Buffer& buffer, bool prop, bool stat, bool managed); ~ObjectImpl(); void destroy(); - const ObjectId* getObjectId() const { return objectId.get(); } - void setObjectId(ObjectId* oid) { objectId.reset(oid); } + const ObjectId* getObjectId() const { return objectId.get() ? objectId->envelope : 0; } + void setObjectId(ObjectId* oid) { objectId.reset(oid->impl); } const SchemaObjectClass* getClass() const { return objectClass; } - Value* getValue(const std::string& key); + Value* getValue(const std::string& key) const; void parsePresenceMasks(qpid::framing::Buffer& buffer, std::set<std::string>& excludeList); void encodeSchemaKey(qpid::framing::Buffer& buffer) const; diff --git a/qpid/cpp/src/qmf/Query.h b/qpid/cpp/src/qmf/Query.h index 78bc6f4ae2..875749862e 100644 --- a/qpid/cpp/src/qmf/Query.h +++ b/qpid/cpp/src/qmf/Query.h @@ -25,26 +25,76 @@ namespace qmf { + struct Object; + struct QueryElementImpl; struct QueryImpl; + struct QueryExpressionImpl; + struct SchemaClassKey; + + enum ValueOper { + O_EQ = 1, + O_NE = 2, + O_LT = 3, + O_LE = 4, + O_GT = 5, + O_GE = 6, + O_RE_MATCH = 7, + O_RE_NOMATCH = 8 + }; + + struct QueryOperand { + virtual ~QueryOperand() {} + virtual bool evaluate(const Object* object) const = 0; + }; + + struct QueryElement : public QueryOperand { + QueryElement(const char* attrName, const Value* value, ValueOper oper); + QueryElement(QueryElementImpl* impl); + virtual ~QueryElement(); + bool evaluate(const Object* object) const; + + QueryElementImpl* impl; + }; + + enum ExprOper { + E_NOT = 1, + E_AND = 2, + E_OR = 3, + E_XOR = 4 + }; + + struct QueryExpression : public QueryOperand { + QueryExpression(ExprOper oper, const QueryOperand* operand1, const QueryOperand* operand2); + QueryExpression(QueryExpressionImpl* impl); + virtual ~QueryExpression(); + bool evaluate(const Object* object) const; + + QueryExpressionImpl* impl; + }; + class Query { public: - Query(); + Query(const char* className, const char* packageName); + Query(const SchemaClassKey* key); + Query(const ObjectId* oid); Query(QueryImpl* impl); ~Query(); + void setSelect(const QueryOperand* criterion); + void setLimit(uint32_t maxResults); + void setOrderBy(const char* attrName, bool decreasing); + const char* getPackage() const; const char* getClass() const; const ObjectId* getObjectId() const; - enum Oper { - OPER_AND = 1, - OPER_OR = 2 - }; - - int whereCount() const; - Oper whereOper() const; - const char* whereKey() const; - const Value* whereValue() const; + bool haveSelect() const; + bool haveLimit() const; + bool haveOrderBy() const; + const QueryOperand* getSelect() const; + uint32_t getLimit() const; + const char* getOrderBy() const; + bool getDecreasing() const; QueryImpl* impl; }; diff --git a/qpid/cpp/src/qmf/QueryImpl.cpp b/qpid/cpp/src/qmf/QueryImpl.cpp index 7e827796bb..f75a9aa5d5 100644 --- a/qpid/cpp/src/qmf/QueryImpl.cpp +++ b/qpid/cpp/src/qmf/QueryImpl.cpp @@ -18,54 +18,77 @@ */ #include "qmf/QueryImpl.h" +#include "qmf/ObjectIdImpl.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/FieldTable.h" using namespace std; using namespace qmf; +using namespace qpid::framing; -//================================================================== -// Wrappers -//================================================================== - -Query::Query() : impl(new QueryImpl(this)) {} -Query::Query(QueryImpl* i) : impl(i) {} - -Query::~Query() +bool QueryElementImpl::evaluate(const Object* /*object*/) const { - delete impl; + // TODO: Implement this + return false; } -const char* Query::getPackage() const +bool QueryExpressionImpl::evaluate(const Object* /*object*/) const { - return impl->getPackage(); + // TODO: Implement this + return false; } -const char* Query::getClass() const +QueryImpl::QueryImpl(Buffer& buffer) { - return impl->getClass(); + FieldTable ft; + ft.decode(buffer); + // TODO } -const ObjectId* Query::getObjectId() const +void QueryImpl::encode(Buffer& buffer) const { - return impl->getObjectId(); -} + FieldTable ft; -int Query::whereCount() const -{ - return impl->whereCount(); -} + if (oid.get() != 0) { + ft.setString("_objectid", oid->impl->asString()); + } else { + if (!packageName.empty()) + ft.setString("_package", packageName); + ft.setString("_class", className); + } -Query::Oper Query::whereOper() const -{ - return impl->whereOper(); + ft.encode(buffer); } -const char* Query::whereKey() const -{ - return impl->whereKey(); -} -const Value* Query::whereValue() const -{ - return impl->whereValue(); -} +//================================================================== +// Wrappers +//================================================================== + +QueryElement::QueryElement(const char* attrName, const Value* value, ValueOper oper) : impl(new QueryElementImpl(attrName, value, oper)) {} +QueryElement::QueryElement(QueryElementImpl* i) : impl(i) {} +QueryElement::~QueryElement() { delete impl; } +bool QueryElement::evaluate(const Object* object) const { return impl->evaluate(object); } +QueryExpression::QueryExpression(ExprOper oper, const QueryOperand* operand1, const QueryOperand* operand2) : impl(new QueryExpressionImpl(oper, operand1, operand2)) {} +QueryExpression::QueryExpression(QueryExpressionImpl* i) : impl(i) {} +QueryExpression::~QueryExpression() { delete impl; } +bool QueryExpression::evaluate(const Object* object) const { return impl->evaluate(object); } +Query::Query(const char* className, const char* packageName) : impl(new QueryImpl(className, packageName)) {} +Query::Query(const SchemaClassKey* key) : impl(new QueryImpl(key)) {} +Query::Query(const ObjectId* oid) : impl(new QueryImpl(oid)) {} +Query::Query(QueryImpl* i) : impl(i) {} +Query::~Query() { delete impl; } +void Query::setSelect(const QueryOperand* criterion) { impl->setSelect(criterion); } +void Query::setLimit(uint32_t maxResults) { impl->setLimit(maxResults); } +void Query::setOrderBy(const char* attrName, bool decreasing) { impl->setOrderBy(attrName, decreasing); } +const char* Query::getPackage() const { return impl->getPackage().c_str(); } +const char* Query::getClass() const { return impl->getClass().c_str(); } +const ObjectId* Query::getObjectId() const { return impl->getObjectId(); } +bool Query::haveSelect() const { return impl->haveSelect(); } +bool Query::haveLimit() const { return impl->haveLimit(); } +bool Query::haveOrderBy() const { return impl->haveOrderBy(); } +const QueryOperand* Query::getSelect() const { return impl->getSelect(); } +uint32_t Query::getLimit() const { return impl->getLimit(); } +const char* Query::getOrderBy() const { return impl->getOrderBy().c_str(); } +bool Query::getDecreasing() const { return impl->getDecreasing(); } diff --git a/qpid/cpp/src/qmf/QueryImpl.h b/qpid/cpp/src/qmf/QueryImpl.h index 1cb9bfe554..4a56a457c0 100644 --- a/qpid/cpp/src/qmf/QueryImpl.h +++ b/qpid/cpp/src/qmf/QueryImpl.h @@ -20,28 +20,82 @@ * under the License. */ -#include <qmf/Query.h> +#include "qmf/Query.h" +#include "qmf/Schema.h" #include <string> #include <boost/shared_ptr.hpp> +namespace qpid { + namespace framing { + class Buffer; + } +} + namespace qmf { + struct QueryElementImpl { + QueryElementImpl(const std::string& a, const Value* v, ValueOper o) : + envelope(new QueryElement(this)), attrName(a), value(v), oper(o) {} + ~QueryElementImpl() {} + bool evaluate(const Object* object) const; + + QueryElement* envelope; + std::string attrName; + const Value* value; + ValueOper oper; + }; + + struct QueryExpressionImpl { + QueryExpressionImpl(ExprOper o, const QueryOperand* operand1, const QueryOperand* operand2) : + envelope(new QueryExpression(this)), oper(o), left(operand1), right(operand2) {} + ~QueryExpressionImpl() {} + bool evaluate(const Object* object) const; + + QueryExpression* envelope; + ExprOper oper; + const QueryOperand* left; + const QueryOperand* right; + }; + struct QueryImpl { - Query* envelope; - std::string packageName; - std::string className; - boost::shared_ptr<ObjectId> oid; + QueryImpl(Query* e) : envelope(e), select(0) {} + QueryImpl(const std::string& c, const std::string& p) : + envelope(new Query(this)), packageName(p), className(c) {} + QueryImpl(const SchemaClassKey* key) : + envelope(new Query(this)), packageName(key->getPackageName()), className(key->getClassName()) {} + QueryImpl(const ObjectId* oid) : + envelope(new Query(this)), oid(new ObjectId(*oid)) {} + QueryImpl(qpid::framing::Buffer& buffer); + ~QueryImpl() {}; - QueryImpl(Query* e) : envelope(e) {} + void setSelect(const QueryOperand* criterion) { select = criterion; } + void setLimit(uint32_t maxResults) { resultLimit = maxResults; } + void setOrderBy(const std::string& attrName, bool decreasing) { + orderBy = attrName; orderDecreasing = decreasing; + } - const char* getPackage() const { return packageName.empty() ? 0 : packageName.c_str(); } - const char* getClass() const { return className.empty() ? 0 : className.c_str(); } + const std::string& getPackage() const { return packageName; } + const std::string& getClass() const { return className; } const ObjectId* getObjectId() const { return oid.get(); } - int whereCount() const { return 0;} - Query::Oper whereOper() const { return Query::OPER_AND; } - const char* whereKey() const { return 0; } - const Value* whereValue() const { return 0; } + bool haveSelect() const { return select != 0; } + bool haveLimit() const { return resultLimit > 0; } + bool haveOrderBy() const { return !orderBy.empty(); } + const QueryOperand* getSelect() const { return select; } + uint32_t getLimit() const { return resultLimit; } + const std::string& getOrderBy() const { return orderBy; } + bool getDecreasing() const { return orderDecreasing; } + + void encode(qpid::framing::Buffer& buffer) const; + + Query* envelope; + std::string packageName; + std::string className; + boost::shared_ptr<ObjectId> oid; + const QueryOperand* select; + uint32_t resultLimit; + std::string orderBy; + bool orderDecreasing; }; } diff --git a/qpid/cpp/src/qmf/SchemaImpl.cpp b/qpid/cpp/src/qmf/SchemaImpl.cpp index ae7d6ca689..3eb14c3952 100644 --- a/qpid/cpp/src/qmf/SchemaImpl.cpp +++ b/qpid/cpp/src/qmf/SchemaImpl.cpp @@ -261,7 +261,15 @@ void SchemaStatisticImpl::updateHash(SchemaHash& hash) const SchemaClassKeyImpl::SchemaClassKeyImpl(const string& p, const string& n, const SchemaHash& h) : envelope(new SchemaClassKey(this)), package(p), name(n), hash(h) {} -void SchemaClassKeyImpl::encode(qpid::framing::Buffer& buffer) const +SchemaClassKeyImpl::SchemaClassKeyImpl(Buffer& buffer) : + envelope(new SchemaClassKey(this)), package(packageContainer), name(nameContainer), hash(hashContainer) +{ + buffer.getShortString(packageContainer); + buffer.getShortString(nameContainer); + hashContainer.decode(buffer); +} + +void SchemaClassKeyImpl::encode(Buffer& buffer) const { buffer.putShortString(package); buffer.putShortString(name); @@ -413,8 +421,9 @@ SchemaEventClassImpl::SchemaEventClassImpl(Buffer& buffer) : buffer.getShortString(package); buffer.getShortString(name); hash.decode(buffer); + buffer.putOctet(0); // No parent class - uint16_t argCount = buffer.getShort(); + uint16_t argCount = buffer.getShort(); for (uint16_t idx = 0; idx < argCount; idx++) { SchemaArgumentImpl* argument = new SchemaArgumentImpl(buffer); diff --git a/qpid/cpp/src/qmf/SchemaImpl.h b/qpid/cpp/src/qmf/SchemaImpl.h index 3e9677d1fa..035d99aecd 100644 --- a/qpid/cpp/src/qmf/SchemaImpl.h +++ b/qpid/cpp/src/qmf/SchemaImpl.h @@ -148,7 +148,14 @@ namespace qmf { const std::string& name; const SchemaHash& hash; + // The *Container elements are only used if there isn't an external place to + // store these values. + std::string packageContainer; + std::string nameContainer; + SchemaHash hashContainer; + SchemaClassKeyImpl(const std::string& package, const std::string& name, const SchemaHash& hash); + SchemaClassKeyImpl(qpid::framing::Buffer& buffer); const std::string& getPackageName() const { return package; } const std::string& getClassName() const { return name; } diff --git a/qpid/cpp/src/qmf/SequenceManager.cpp b/qpid/cpp/src/qmf/SequenceManager.cpp index f51ce9d8b8..3171e66fac 100644 --- a/qpid/cpp/src/qmf/SequenceManager.cpp +++ b/qpid/cpp/src/qmf/SequenceManager.cpp @@ -25,26 +25,72 @@ using namespace qpid::sys; SequenceManager::SequenceManager() : nextSequence(1) {} -uint32_t SequenceManager::reserve(SequenceContext* ctx) +void SequenceManager::setUnsolicitedContext(SequenceContext::Ptr ctx) +{ + unsolicitedContext = ctx; +} + +uint32_t SequenceManager::reserve(SequenceContext::Ptr ctx) { Mutex::ScopedLock _lock(lock); + if (ctx.get() == 0) + ctx = unsolicitedContext; uint32_t seq = nextSequence; while (contextMap.find(seq) != contextMap.end()) seq = seq < 0xFFFFFFFF ? seq + 1 : 1; nextSequence = seq < 0xFFFFFFFF ? seq + 1 : 1; contextMap[seq] = ctx; + ctx->reserve(); return seq; } void SequenceManager::release(uint32_t sequence) { Mutex::ScopedLock _lock(lock); - map<uint32_t, SequenceContext*>::iterator iter = contextMap.find(sequence); + + if (sequence == 0) { + if (unsolicitedContext.get() != 0) + unsolicitedContext->release(); + return; + } + + map<uint32_t, SequenceContext::Ptr>::iterator iter = contextMap.find(sequence); if (iter != contextMap.end()) { if (iter->second != 0) - iter->second->complete(); + iter->second->release(); contextMap.erase(iter); } } +void SequenceManager::releaseAll() +{ + Mutex::ScopedLock _lock(lock); + contextMap.clear(); +} + +void SequenceManager::dispatch(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer) +{ + Mutex::ScopedLock _lock(lock); + bool done; + + if (sequence == 0) { + if (unsolicitedContext.get() != 0) { + done = unsolicitedContext->handleMessage(opcode, sequence, buffer); + if (done) + unsolicitedContext->release(); + } + return; + } + + map<uint32_t, SequenceContext::Ptr>::iterator iter = contextMap.find(sequence); + if (iter != contextMap.end()) { + if (iter->second != 0) { + done = iter->second->handleMessage(opcode, sequence, buffer); + if (done) { + iter->second->release(); + contextMap.erase(iter); + } + } + } +} diff --git a/qpid/cpp/src/qmf/SequenceManager.h b/qpid/cpp/src/qmf/SequenceManager.h index c027872313..bbfd0728a7 100644 --- a/qpid/cpp/src/qmf/SequenceManager.h +++ b/qpid/cpp/src/qmf/SequenceManager.h @@ -21,29 +21,43 @@ */ #include "qpid/sys/Mutex.h" +#include <boost/shared_ptr.hpp> #include <map> +namespace qpid { + namespace framing { + class Buffer; + } +} + namespace qmf { class SequenceContext { public: + typedef boost::shared_ptr<SequenceContext> Ptr; SequenceContext() {} virtual ~SequenceContext() {} - virtual void complete() = 0; + virtual void reserve() = 0; + virtual bool handleMessage(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer) = 0; + virtual void release() = 0; }; class SequenceManager { public: SequenceManager(); - uint32_t reserve(SequenceContext* ctx); + void setUnsolicitedContext(SequenceContext::Ptr ctx); + uint32_t reserve(SequenceContext::Ptr ctx = SequenceContext::Ptr()); void release(uint32_t sequence); + void releaseAll(); + void dispatch(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer); private: mutable qpid::sys::Mutex lock; uint32_t nextSequence; - std::map<uint32_t, SequenceContext*> contextMap; + SequenceContext::Ptr unsolicitedContext; + std::map<uint32_t, SequenceContext::Ptr> contextMap; }; } |