summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qmf
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qmf')
-rw-r--r--qpid/cpp/src/qmf/AgentEngine.cpp19
-rw-r--r--qpid/cpp/src/qmf/ConsoleEngine.cpp379
-rw-r--r--qpid/cpp/src/qmf/ConsoleEngine.h70
-rw-r--r--qpid/cpp/src/qmf/Object.h3
-rw-r--r--qpid/cpp/src/qmf/ObjectId.h1
-rw-r--r--qpid/cpp/src/qmf/ObjectIdImpl.cpp21
-rw-r--r--qpid/cpp/src/qmf/ObjectIdImpl.h3
-rw-r--r--qpid/cpp/src/qmf/ObjectImpl.cpp88
-rw-r--r--qpid/cpp/src/qmf/ObjectImpl.h13
-rw-r--r--qpid/cpp/src/qmf/Query.h70
-rw-r--r--qpid/cpp/src/qmf/QueryImpl.cpp85
-rw-r--r--qpid/cpp/src/qmf/QueryImpl.h78
-rw-r--r--qpid/cpp/src/qmf/SchemaImpl.cpp13
-rw-r--r--qpid/cpp/src/qmf/SchemaImpl.h7
-rw-r--r--qpid/cpp/src/qmf/SequenceManager.cpp52
-rw-r--r--qpid/cpp/src/qmf/SequenceManager.h20
16 files changed, 676 insertions, 246 deletions
diff --git a/qpid/cpp/src/qmf/AgentEngine.cpp b/qpid/cpp/src/qmf/AgentEngine.cpp
index d3204042d5..9ea3be5907 100644
--- a/qpid/cpp/src/qmf/AgentEngine.cpp
+++ b/qpid/cpp/src/qmf/AgentEngine.cpp
@@ -57,7 +57,7 @@ namespace qmf {
string name;
Object* object;
boost::shared_ptr<ObjectId> objectId;
- Query query;
+ boost::shared_ptr<Query> query;
boost::shared_ptr<Value> arguments;
string exchange;
string bindingKey;
@@ -214,7 +214,7 @@ AgentEvent AgentEventImpl::copy()
item.sequence = sequence;
item.object = object;
item.objectId = objectId.get();
- item.query = &query;
+ item.query = query.get();
item.arguments = arguments.get();
item.objectClass = objectClass;
@@ -381,7 +381,7 @@ void AgentEngineImpl::methodResponse(uint32_t sequence, uint32_t status, char* t
}
}
sendBufferLH(buffer, context->exchange, context->key);
- QPID_LOG(trace, "SENT MethodResponse");
+ QPID_LOG(trace, "SENT MethodResponse seq=" << context->sequence << " status=" << status << " text=" << text);
}
void AgentEngineImpl::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat)
@@ -403,7 +403,7 @@ void AgentEngineImpl::queryResponse(uint32_t sequence, Object& object, bool prop
object.impl->encodeStatistics(buffer);
sendBufferLH(buffer, context->exchange, context->key);
- QPID_LOG(trace, "SENT ContentIndication");
+ QPID_LOG(trace, "SENT ContentIndication seq=" << context->sequence);
}
void AgentEngineImpl::queryComplete(uint32_t sequence)
@@ -511,9 +511,10 @@ AgentEventImpl::Ptr AgentEngineImpl::eventQuery(uint32_t num, const string& user
AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY));
event->sequence = num;
event->authUserId = userId;
- event->query.impl->packageName = package;
- event->query.impl->className = cls;
- event->query.impl->oid = oid;
+ if (oid.get())
+ event->query.reset(new Query(oid.get()));
+ else
+ event->query.reset(new Query(cls.c_str(), package.c_str()));
return event;
}
@@ -723,7 +724,7 @@ void AgentEngineImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, const
ft.decode(inBuffer);
- QPID_LOG(trace, "RCVD GetQuery: map=" << ft);
+ QPID_LOG(trace, "RCVD GetQuery: seq=" << sequence << " map=" << ft);
value = ft.get("_package");
if (value.get() && value->convertsTo<string>()) {
@@ -773,6 +774,8 @@ void AgentEngineImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, con
AgentClassKey classKey(buffer);
buffer.getShortString(method);
+ QPID_LOG(trace, "RCVD MethodRequest seq=" << sequence << " method=" << method);
+
map<string, ClassMaps>::const_iterator pIter = packages.find(pname);
if (pIter == packages.end()) {
sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_PACKAGE, pname);
diff --git a/qpid/cpp/src/qmf/ConsoleEngine.cpp b/qpid/cpp/src/qmf/ConsoleEngine.cpp
index 3d1b378b68..e7991328ee 100644
--- a/qpid/cpp/src/qmf/ConsoleEngine.cpp
+++ b/qpid/cpp/src/qmf/ConsoleEngine.cpp
@@ -34,6 +34,7 @@
#include <qpid/sys/Mutex.h>
#include <qpid/log/Statement.h>
#include <qpid/sys/Time.h>
+#include <qpid/sys/SystemInfo.h>
#include <string.h>
#include <string>
#include <deque>
@@ -58,12 +59,27 @@ namespace qmf {
auto_ptr<Value> arguments;
MethodResponseImpl(Buffer& buf);
- ~MethodResponseImpl() {}
+ ~MethodResponseImpl() { delete envelope; }
uint32_t getStatus() const { return status; }
const Value* getException() const { return exception.get(); }
const Value* getArgs() const { return arguments.get(); }
};
+ struct QueryResponseImpl {
+ typedef boost::shared_ptr<QueryResponseImpl> Ptr;
+ QueryResponse *envelope;
+ uint32_t status;
+ auto_ptr<Value> exception;
+ vector<ObjectImpl::Ptr> results;
+
+ QueryResponseImpl() : envelope(new QueryResponse(this)), status(0) {}
+ ~QueryResponseImpl() { delete envelope; }
+ uint32_t getStatus() const { return status; }
+ const Value* getException() const { return exception.get(); }
+ uint32_t getObjectCount() const { return results.size(); }
+ const Object* getObject(uint32_t idx) const;
+ };
+
struct ConsoleEventImpl {
typedef boost::shared_ptr<ConsoleEventImpl> Ptr;
ConsoleEvent::EventKind kind;
@@ -89,13 +105,29 @@ namespace qmf {
string name;
string exchange;
string bindingKey;
+ void* context;
+ QueryResponseImpl::Ptr queryResponse;
BrokerEventImpl(BrokerEvent::EventKind k) : kind(k) {}
~BrokerEventImpl() {}
BrokerEvent copy();
};
- class BrokerProxyImpl : public SequenceContext {
+ struct AgentProxyImpl {
+ typedef boost::shared_ptr<AgentProxyImpl> Ptr;
+ AgentProxy* envelope;
+ ConsoleEngineImpl* console;
+ BrokerProxyImpl* broker;
+ uint32_t agentBank;
+ string label;
+
+ AgentProxyImpl(ConsoleEngineImpl* c, BrokerProxyImpl* b, uint32_t ab, const string& l) :
+ envelope(new AgentProxy(this)), console(c), broker(b), agentBank(ab), label(l) {}
+ ~AgentProxyImpl() {}
+ const string& getLabel() const { return label; }
+ };
+
+ class BrokerProxyImpl {
public:
typedef boost::shared_ptr<BrokerProxyImpl> Ptr;
@@ -114,12 +146,17 @@ namespace qmf {
bool getEvent(BrokerEvent& event) const;
void popEvent();
- // From SequenceContext
- void complete();
+ uint32_t agentCount() const;
+ const AgentProxy* getAgent(uint32_t idx) const;
+ void sendQuery(const Query& query, void* context, const AgentProxy* agent);
+ void sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent);
void addBinding(const string& exchange, const string& key);
+ void staticRelease() { decOutstanding(); }
private:
+ friend class StaticContext;
+ friend class QueryContext;
mutable Mutex lock;
BrokerProxy* envelope;
ConsoleEngineImpl* console;
@@ -128,6 +165,7 @@ namespace qmf {
SequenceManager seqMgr;
uint32_t requestsOutstanding;
bool topicBound;
+ vector<AgentProxyImpl::Ptr> agentList;
deque<MessageImpl::Ptr> xmtQueue;
deque<BrokerEventImpl::Ptr> eventQueue;
@@ -138,6 +176,7 @@ namespace qmf {
BrokerEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key);
BrokerEventImpl::Ptr eventSetupComplete();
BrokerEventImpl::Ptr eventStable();
+ BrokerEventImpl::Ptr eventQueryComplete(void* context, QueryResponseImpl::Ptr response);
void handleBrokerResponse(Buffer& inBuffer, uint32_t seq);
void handlePackageIndication(Buffer& inBuffer, uint32_t seq);
@@ -147,19 +186,33 @@ namespace qmf {
void handleHeartbeatIndication(Buffer& inBuffer, uint32_t seq);
void handleEventIndication(Buffer& inBuffer, uint32_t seq);
void handleSchemaResponse(Buffer& inBuffer, uint32_t seq);
- void handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat);
+ ObjectImpl::Ptr handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat);
void incOutstandingLH();
void decOutstanding();
};
- struct AgentProxyImpl {
- typedef boost::shared_ptr<AgentProxyImpl> Ptr;
- AgentProxy* envelope;
- ConsoleEngineImpl* console;
+ struct StaticContext : public SequenceContext {
+ StaticContext(BrokerProxyImpl& b) : broker(b) {}
+ ~StaticContext() {}
+ void reserve() {}
+ void release() { broker.staticRelease(); }
+ bool handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer);
+ BrokerProxyImpl& broker;
+ };
- AgentProxyImpl(AgentProxy* e, ConsoleEngine& _console) :
- envelope(e), console(_console.impl) {}
- ~AgentProxyImpl() {}
+ struct QueryContext : public SequenceContext {
+ QueryContext(BrokerProxyImpl& b, void* u) :
+ broker(b), userContext(u), requestsOutstanding(0), queryResponse(new QueryResponseImpl()) {}
+ ~QueryContext() {}
+ void reserve();
+ void release();
+ bool handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer);
+
+ mutable Mutex lock;
+ BrokerProxyImpl& broker;
+ void* userContext;
+ uint32_t requestsOutstanding;
+ QueryResponseImpl::Ptr queryResponse;
};
class ConsoleEngineImpl {
@@ -187,11 +240,6 @@ namespace qmf {
void bindClass(const SchemaClassKey* key);
void bindClass(const char* packageName, const char* className);
- uint32_t agentCount() const;
- const AgentProxy* getAgent(uint32_t idx) const;
-
- void sendQuery(const Query& query, void* context);
-
/*
void startSync(const Query& query, void* context, SyncQuery& sync);
void touchSync(SyncQuery& sync);
@@ -226,13 +274,31 @@ namespace qmf {
void learnClass(SchemaObjectClassImpl::Ptr cls);
void learnClass(SchemaEventClassImpl::Ptr cls);
bool haveClass(const SchemaClassKeyImpl& key) const;
+ SchemaObjectClassImpl::Ptr getSchema(const SchemaClassKeyImpl& key) const;
};
}
namespace {
-const char* QMF_EXCHANGE = "qpid.management";
-const char* DIR_EXCHANGE = "amq.direct";
-const char* BROKER_KEY = "broker";
+ const char* QMF_EXCHANGE = "qpid.management";
+ const char* DIR_EXCHANGE = "amq.direct";
+ const char* BROKER_KEY = "broker";
+ const char* BROKER_PACKAGE = "org.apache.qpid.broker";
+ const char* AGENT_CLASS = "agent";
+ const char* BROKER_AGENT_KEY = "agent.1.0";
+}
+
+const Object* QueryResponseImpl::getObject(uint32_t idx) const
+{
+ vector<ObjectImpl::Ptr>::const_iterator iter = results.begin();
+
+ while (idx > 0) {
+ if (iter == results.end())
+ return 0;
+ iter++;
+ idx--;
+ }
+
+ return (*iter)->envelope;
}
#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
@@ -267,19 +333,29 @@ BrokerEvent BrokerEventImpl::copy()
STRING_REF(name);
STRING_REF(exchange);
STRING_REF(bindingKey);
+ item.context = context;
+ item.queryResponse = queryResponse.get() ? queryResponse->envelope : 0;
return item;
}
BrokerProxyImpl::BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console) :
- envelope(e), console(_console.impl), queueName("qmfc-")
+ envelope(e), console(_console.impl)
{
- // TODO: Give the queue name a unique suffix
+ stringstream qn;
+ qpid::TcpAddress addr;
+
+ SystemInfo::getLocalHostname(addr);
+ qn << "qmfc-" << SystemInfo::getProcessName() << "-" << addr << "-" << SystemInfo::getProcessId();
+ queueName = qn.str();
+
+ seqMgr.setUnsolicitedContext(SequenceContext::Ptr(new StaticContext(*this)));
}
void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/)
{
Mutex::ScopedLock _lock(lock);
+ agentList.clear();
eventQueue.clear();
xmtQueue.clear();
eventQueue.push_back(eventDeclareQueue(queueName));
@@ -292,6 +368,7 @@ void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/)
void BrokerProxyImpl::sessionClosed()
{
Mutex::ScopedLock _lock(lock);
+ agentList.clear();
eventQueue.clear();
xmtQueue.clear();
}
@@ -302,11 +379,14 @@ void BrokerProxyImpl::startProtocol()
char rawbuffer[512];
Buffer buffer(rawbuffer, 512);
+ agentList.push_back(AgentProxyImpl::Ptr(new AgentProxyImpl(console, this, 0, "Agent embedded in broker")));
+
requestsOutstanding = 1;
topicBound = false;
- Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST);
+ uint32_t sequence(seqMgr.reserve());
+ Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence);
sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT BrokerRequest");
+ QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence);
}
void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
@@ -330,23 +410,8 @@ void BrokerProxyImpl::handleRcvMessage(Message& message)
uint8_t opcode;
uint32_t sequence;
- while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) {
- if (opcode == Protocol::OP_BROKER_RESPONSE) handleBrokerResponse(inBuffer, sequence);
- else if (opcode == Protocol::OP_PACKAGE_INDICATION) handlePackageIndication(inBuffer, sequence);
- else if (opcode == Protocol::OP_COMMAND_COMPLETE) handleCommandComplete(inBuffer, sequence);
- else if (opcode == Protocol::OP_CLASS_INDICATION) handleClassIndication(inBuffer, sequence);
- else if (opcode == Protocol::OP_METHOD_RESPONSE) handleMethodResponse(inBuffer, sequence);
- else if (opcode == Protocol::OP_HEARTBEAT_INDICATION) handleHeartbeatIndication(inBuffer, sequence);
- else if (opcode == Protocol::OP_EVENT_INDICATION) handleEventIndication(inBuffer, sequence);
- else if (opcode == Protocol::OP_SCHEMA_RESPONSE) handleSchemaResponse(inBuffer, sequence);
- else if (opcode == Protocol::OP_PROPERTY_INDICATION) handleObjectIndication(inBuffer, sequence, true, false);
- else if (opcode == Protocol::OP_STATISTIC_INDICATION) handleObjectIndication(inBuffer, sequence, false, true);
- else if (opcode == Protocol::OP_OBJECT_INDICATION) handleObjectIndication(inBuffer, sequence, true, true);
- else {
- QPID_LOG(trace, "BrokerProxyImpl::handleRcvMessage invalid opcode: " << opcode);
- break;
- }
- }
+ while (Protocol::checkHeader(inBuffer, &opcode, &sequence))
+ seqMgr.dispatch(opcode, sequence, inBuffer);
}
bool BrokerProxyImpl::getXmtMessage(Message& item) const
@@ -381,9 +446,48 @@ void BrokerProxyImpl::popEvent()
eventQueue.pop_front();
}
-void BrokerProxyImpl::complete()
+uint32_t BrokerProxyImpl::agentCount() const
{
- decOutstanding();
+ Mutex::ScopedLock _lock(lock);
+ return agentList.size();
+}
+
+const AgentProxy* BrokerProxyImpl::getAgent(uint32_t idx) const
+{
+ Mutex::ScopedLock _lock(lock);
+ for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin();
+ iter != agentList.end(); iter++)
+ if (idx-- == 0)
+ return (*iter)->envelope;
+ return 0;
+}
+
+void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentProxy* agent)
+{
+ SequenceContext::Ptr queryContext(new QueryContext(*this, context));
+ Mutex::ScopedLock _lock(lock);
+ if (agent != 0) {
+ sendGetRequestLH(queryContext, query, agent->impl);
+ } else {
+ // TODO (optimization) only send queries to agents that have the requested class+package
+ for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin();
+ iter != agentList.end(); iter++) {
+ sendGetRequestLH(queryContext, query, (*iter).get());
+ }
+ }
+}
+
+void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent)
+{
+ stringstream key;
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve(queryContext));
+
+ Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
+ query.impl->encode(outBuffer);
+ key << "agent.1." << agent->agentBank;
+ sendBufferLH(outBuffer, QMF_EXCHANGE, key.str());
+ QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << key.str());
}
void BrokerProxyImpl::addBinding(const string& exchange, const string& key)
@@ -420,17 +524,22 @@ BrokerEventImpl::Ptr BrokerProxyImpl::eventStable()
return event;
}
-void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq)
+BrokerEventImpl::Ptr BrokerProxyImpl::eventQueryComplete(void* context, QueryResponseImpl::Ptr response)
{
- // Note that this function doesn't touch requestsOutstanding. This is because
- // it accounts for one request completed (the BrokerRequest) and one request
- // started (the PackageRequest) which cancel each other out.
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::QUERY_COMPLETE));
+ event->context = context;
+ event->queryResponse = response;
+ return event;
+}
+void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq)
+{
brokerId.decode(inBuffer);
QPID_LOG(trace, "RCVD BrokerResponse seq=" << seq << " brokerId=" << brokerId);
Mutex::ScopedLock _lock(lock);
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve(this));
+ uint32_t sequence(seqMgr.reserve());
+ incOutstandingLH();
Protocol::encodeHeader(outBuffer, Protocol::OP_PACKAGE_REQUEST, sequence);
sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
QPID_LOG(trace, "SENT PackageRequest seq=" << sequence);
@@ -446,7 +555,7 @@ void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq)
Mutex::ScopedLock _lock(lock);
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve(this));
+ uint32_t sequence(seqMgr.reserve());
incOutstandingLH();
Protocol::encodeHeader(outBuffer, Protocol::OP_CLASS_QUERY, sequence);
outBuffer.putShortString(package);
@@ -460,20 +569,12 @@ void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq)
uint32_t code = inBuffer.getLong();
inBuffer.getShortString(text);
QPID_LOG(trace, "RCVD CommandComplete seq=" << seq << " code=" << code << " text=" << text);
- seqMgr.release(seq);
}
void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq)
{
- string package;
- string clsName;
- SchemaHash hash;
uint8_t kind = inBuffer.getOctet();
- inBuffer.getShortString(package);
- inBuffer.getShortString(clsName);
- hash.decode(inBuffer);
- Uuid printableHash(hash.get());
- SchemaClassKeyImpl classKey(package, clsName, hash);
+ SchemaClassKeyImpl classKey(inBuffer);
QPID_LOG(trace, "RCVD ClassIndication seq=" << seq << " kind=" << (int) kind << " key=" << classKey.str());
@@ -481,7 +582,7 @@ void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq)
Mutex::ScopedLock _lock(lock);
incOutstandingLH();
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve(this));
+ uint32_t sequence(seqMgr.reserve());
Protocol::encodeHeader(outBuffer, Protocol::OP_SCHEMA_REQUEST, sequence);
classKey.encode(outBuffer);
sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
@@ -515,6 +616,25 @@ void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq)
console->learnClass(oClassPtr);
key = oClassPtr->getClassKey()->impl;
QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->str());
+
+ //
+ // If we have just learned about the org.apache.qpid.broker:agent class, send a get
+ // request for the current list of agents so we can have it on-hand before we declare
+ // this session "stable".
+ //
+ if (key->getClassName() == AGENT_CLASS && key->getPackageName() == BROKER_PACKAGE) {
+ Mutex::ScopedLock _lock(lock);
+ incOutstandingLH();
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve());
+ Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
+ FieldTable ft;
+ ft.setString("_class", AGENT_CLASS);
+ ft.setString("_package", BROKER_PACKAGE);
+ ft.encode(outBuffer);
+ sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_AGENT_KEY);
+ QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << BROKER_AGENT_KEY);
+ }
} else if (kind == CLASS_EVENT) {
eClassPtr.reset(new SchemaEventClassImpl(inBuffer));
console->learnClass(eClassPtr);
@@ -524,13 +644,20 @@ void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq)
else {
QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind);
}
-
- decOutstanding();
}
-void BrokerProxyImpl::handleObjectIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/, bool /*prop*/, bool /*stat*/)
+ObjectImpl::Ptr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat)
{
- // TODO
+ SchemaClassKeyImpl classKey(inBuffer);
+ QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey.str());
+
+ SchemaObjectClassImpl::Ptr schema = console->getSchema(classKey);
+ if (schema.get() == 0) {
+ QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey.str());
+ return ObjectImpl::Ptr();
+ }
+
+ return ObjectImpl::Ptr(new ObjectImpl(schema->envelope, inBuffer, prop, stat, true));
}
void BrokerProxyImpl::incOutstandingLH()
@@ -567,6 +694,79 @@ MethodResponseImpl::MethodResponseImpl(Buffer& buf) : envelope(new MethodRespons
arguments.reset(new Value(TYPE_MAP));
}
+bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
+{
+ bool completeContext = false;
+ if (opcode == Protocol::OP_BROKER_RESPONSE) {
+ broker.handleBrokerResponse(buffer, sequence);
+ completeContext = true;
+ }
+ else if (opcode == Protocol::OP_COMMAND_COMPLETE) {
+ broker.handleCommandComplete(buffer, sequence);
+ completeContext = true;
+ }
+ else if (opcode == Protocol::OP_SCHEMA_RESPONSE) {
+ broker.handleSchemaResponse(buffer, sequence);
+ completeContext = true;
+ }
+ else if (opcode == Protocol::OP_PACKAGE_INDICATION)
+ broker.handlePackageIndication(buffer, sequence);
+ else if (opcode == Protocol::OP_CLASS_INDICATION)
+ broker.handleClassIndication(buffer, sequence);
+ else if (opcode == Protocol::OP_HEARTBEAT_INDICATION)
+ broker.handleHeartbeatIndication(buffer, sequence);
+ else if (opcode == Protocol::OP_EVENT_INDICATION)
+ broker.handleEventIndication(buffer, sequence);
+ else if (opcode == Protocol::OP_PROPERTY_INDICATION)
+ broker.handleObjectIndication(buffer, sequence, true, false);
+ else if (opcode == Protocol::OP_STATISTIC_INDICATION)
+ broker.handleObjectIndication(buffer, sequence, false, true);
+ else if (opcode == Protocol::OP_OBJECT_INDICATION)
+ broker.handleObjectIndication(buffer, sequence, true, true);
+ else {
+ QPID_LOG(trace, "StaticContext::handleMessage invalid opcode: " << opcode);
+ completeContext = true;
+ }
+
+ return completeContext;
+}
+
+void QueryContext::reserve()
+{
+ Mutex::ScopedLock _lock(lock);
+ requestsOutstanding++;
+}
+
+void QueryContext::release()
+{
+ Mutex::ScopedLock _lock(lock);
+ if (--requestsOutstanding == 0) {
+ broker.eventQueue.push_back(broker.eventQueryComplete(userContext, queryResponse));
+ }
+}
+
+bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
+{
+ bool completeContext = false;
+ ObjectImpl::Ptr object;
+
+ if (opcode == Protocol::OP_COMMAND_COMPLETE) {
+ broker.handleCommandComplete(buffer, sequence);
+ completeContext = true;
+ }
+ else if (opcode == Protocol::OP_OBJECT_INDICATION) {
+ object = broker.handleObjectIndication(buffer, sequence, true, true);
+ if (object.get() != 0)
+ queryResponse->results.push_back(object);
+ }
+ else {
+ QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode);
+ completeContext = true;
+ }
+
+ return completeContext;
+}
+
ConsoleEngineImpl::ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& s) :
envelope(e), settings(s)
{
@@ -757,23 +957,6 @@ void ConsoleEngineImpl::bindClass(const char* packageName, const char* className
(*iter)->addBinding(QMF_EXCHANGE, key.str());
}
-uint32_t ConsoleEngineImpl::agentCount() const
-{
- // TODO
- return 0;
-}
-
-const AgentProxy* ConsoleEngineImpl::getAgent(uint32_t /*idx*/) const
-{
- // TODO
- return 0;
-}
-
-void ConsoleEngineImpl::sendQuery(const Query& /*query*/, void* /*context*/)
-{
- // TODO
-}
-
/*
void ConsoleEngineImpl::startSync(const Query& query, void* context, SyncQuery& sync)
{
@@ -835,11 +1018,29 @@ bool ConsoleEngineImpl::haveClass(const SchemaClassKeyImpl& key) const
return oList.find(&key) != oList.end() || eList.find(&key) != eList.end();
}
+SchemaObjectClassImpl::Ptr ConsoleEngineImpl::getSchema(const SchemaClassKeyImpl& key) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(key.getPackageName());
+ if (pIter == packages.end())
+ return SchemaObjectClassImpl::Ptr();
+
+ const ObjectClassList& oList = pIter->second.first;
+ ObjectClassList::const_iterator iter = oList.find(&key);
+ if (iter == oList.end())
+ return SchemaObjectClassImpl::Ptr();
+
+ return iter->second;
+}
//==================================================================
// Wrappers
//==================================================================
+AgentProxy::AgentProxy(AgentProxyImpl* i) : impl(i) {}
+AgentProxy::~AgentProxy() { delete impl; }
+const char* AgentProxy::getLabel() const { return impl->getLabel().c_str(); }
+
BrokerProxy::BrokerProxy(ConsoleEngine& console) : impl(new BrokerProxyImpl(this, console)) {}
BrokerProxy::~BrokerProxy() { delete impl; }
void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); }
@@ -850,16 +1051,23 @@ bool BrokerProxy::getXmtMessage(Message& item) const { return impl->getXmtMessag
void BrokerProxy::popXmt() { impl->popXmt(); }
bool BrokerProxy::getEvent(BrokerEvent& event) const { return impl->getEvent(event); }
void BrokerProxy::popEvent() { impl->popEvent(); }
-
-AgentProxy::AgentProxy(ConsoleEngine& console) : impl(new AgentProxyImpl(this, console)) {}
-AgentProxy::~AgentProxy() { delete impl; }
+uint32_t BrokerProxy::agentCount() const { return impl->agentCount(); }
+const AgentProxy* BrokerProxy::getAgent(uint32_t idx) const { return impl->getAgent(idx); }
+void BrokerProxy::sendQuery(const Query& query, void* context, const AgentProxy* agent) { impl->sendQuery(query, context, agent); }
MethodResponse::MethodResponse(MethodResponseImpl* i) : impl(i) {}
-MethodResponse::~MethodResponse() { delete impl; } // TODO: correct to delete here?
+MethodResponse::~MethodResponse() {}
uint32_t MethodResponse::getStatus() const { return impl->getStatus(); }
const Value* MethodResponse::getException() const { return impl->getException(); }
const Value* MethodResponse::getArgs() const { return impl->getArgs(); }
+QueryResponse::QueryResponse(QueryResponseImpl* i) : impl(i) {}
+QueryResponse::~QueryResponse() {}
+uint32_t QueryResponse::getStatus() const { return impl->getStatus(); }
+const Value* QueryResponse::getException() const { return impl->getException(); }
+uint32_t QueryResponse::getObjectCount() const { return impl->getObjectCount(); }
+const Object* QueryResponse::getObject(uint32_t idx) const { return impl->getObject(idx); }
+
ConsoleEngine::ConsoleEngine(const ConsoleSettings& settings) : impl(new ConsoleEngineImpl(this, settings)) {}
ConsoleEngine::~ConsoleEngine() { delete impl; }
bool ConsoleEngine::getEvent(ConsoleEvent& event) const { return impl->getEvent(event); }
@@ -876,9 +1084,6 @@ const SchemaEventClass* ConsoleEngine::getEventClass(const SchemaClassKey* key)
void ConsoleEngine::bindPackage(const char* packageName) { impl->bindPackage(packageName); }
void ConsoleEngine::bindClass(const SchemaClassKey* key) { impl->bindClass(key); }
void ConsoleEngine::bindClass(const char* packageName, const char* className) { impl->bindClass(packageName, className); }
-uint32_t ConsoleEngine::agentCount() const { return impl->agentCount(); }
-const AgentProxy* ConsoleEngine::getAgent(uint32_t idx) const { return impl->getAgent(idx); }
-void ConsoleEngine::sendQuery(const Query& query, void* context) { impl->sendQuery(query, context); }
//void ConsoleEngine::startSync(const Query& query, void* context, SyncQuery& sync) { impl->startSync(query, context, sync); }
//void ConsoleEngine::touchSync(SyncQuery& sync) { impl->touchSync(sync); }
//void ConsoleEngine::endSync(SyncQuery& sync) { impl->endSync(sync); }
diff --git a/qpid/cpp/src/qmf/ConsoleEngine.h b/qpid/cpp/src/qmf/ConsoleEngine.h
index 84ac78cd69..457e83ad58 100644
--- a/qpid/cpp/src/qmf/ConsoleEngine.h
+++ b/qpid/cpp/src/qmf/ConsoleEngine.h
@@ -37,6 +37,8 @@ namespace qmf {
class AgentProxy;
class AgentProxyImpl;
class MethodResponseImpl;
+ class QueryResponseImpl;
+ class QueryContext;
/**
*
@@ -57,6 +59,23 @@ namespace qmf {
/**
*
*/
+ class QueryResponse {
+ public:
+ QueryResponse(QueryResponseImpl* impl);
+ ~QueryResponse();
+ uint32_t getStatus() const;
+ const Value* getException() const;
+ uint32_t getObjectCount() const;
+ const Object* getObject(uint32_t idx) const;
+
+ private:
+ friend class QueryContext;
+ QueryResponseImpl *impl;
+ };
+
+ /**
+ *
+ */
struct ConsoleEvent {
enum EventKind {
AGENT_ADDED = 1,
@@ -64,7 +83,6 @@ namespace qmf {
NEW_PACKAGE = 3,
NEW_CLASS = 4,
OBJECT_UPDATE = 5,
- QUERY_COMPLETE = 6,
EVENT_RECEIVED = 7,
AGENT_HEARTBEAT = 8,
METHOD_RESPONSE = 9
@@ -75,11 +93,12 @@ namespace qmf {
char* name; // (NEW_PACKAGE)
SchemaClassKey* classKey; // (NEW_CLASS)
Object* object; // (OBJECT_UPDATE)
- void* context; // (OBJECT_UPDATE, QUERY_COMPLETE)
+ void* context; // (OBJECT_UPDATE)
Event* event; // (EVENT_RECEIVED)
uint64_t timestamp; // (AGENT_HEARTBEAT)
uint32_t methodHandle; // (METHOD_RESPONSE)
MethodResponse* methodResponse; // (METHOD_RESPONSE)
+ QueryResponse* queryResponse; // (QUERY_COMPLETE)
};
/**
@@ -93,13 +112,30 @@ namespace qmf {
BIND = 13,
UNBIND = 14,
SETUP_COMPLETE = 15,
- STABLE = 16
+ STABLE = 16,
+ QUERY_COMPLETE = 17
};
EventKind kind;
- char* name; // ([DECLARE|DELETE]_QUEUE, [UN]BIND)
- char* exchange; // ([UN]BIND)
- char* bindingKey; // ([UN]BIND)
+ char* name; // ([DECLARE|DELETE]_QUEUE, [UN]BIND)
+ char* exchange; // ([UN]BIND)
+ char* bindingKey; // ([UN]BIND)
+ void* context; // (QUERY_COMPLETE)
+ QueryResponse* queryResponse; // (QUERY_COMPLETE)
+ };
+
+ /**
+ *
+ */
+ class AgentProxy {
+ public:
+ AgentProxy(AgentProxyImpl* impl);
+ ~AgentProxy();
+ const char* getLabel() const;
+
+ private:
+ friend class BrokerProxyImpl;
+ AgentProxyImpl* impl;
};
/**
@@ -121,22 +157,13 @@ namespace qmf {
bool getEvent(BrokerEvent& event) const;
void popEvent();
- private:
- friend class ConsoleEngineImpl;
- BrokerProxyImpl* impl;
- };
-
- /**
- *
- */
- class AgentProxy {
- public:
- AgentProxy(ConsoleEngine& console);
- ~AgentProxy();
+ uint32_t agentCount() const;
+ const AgentProxy* getAgent(uint32_t idx) const;
+ void sendQuery(const Query& query, void* context, const AgentProxy* agent = 0);
private:
friend class ConsoleEngineImpl;
- AgentProxyImpl* impl;
+ BrokerProxyImpl* impl;
};
// TODO - move this to a public header
@@ -178,11 +205,6 @@ namespace qmf {
void bindClass(const SchemaClassKey* key);
void bindClass(const char* packageName, const char* className);
- uint32_t agentCount() const;
- const AgentProxy* getAgent(uint32_t idx) const;
-
- void sendQuery(const Query& query, void* context);
-
/*
void startSync(const Query& query, void* context, SyncQuery& sync);
void touchSync(SyncQuery& sync);
diff --git a/qpid/cpp/src/qmf/Object.h b/qpid/cpp/src/qmf/Object.h
index eb92cbbe45..9cb3224d9b 100644
--- a/qpid/cpp/src/qmf/Object.h
+++ b/qpid/cpp/src/qmf/Object.h
@@ -31,13 +31,14 @@ namespace qmf {
public:
Object(const SchemaObjectClass* type);
Object(ObjectImpl* impl);
+ Object(const Object& from);
virtual ~Object();
void destroy();
const ObjectId* getObjectId() const;
void setObjectId(ObjectId* oid);
const SchemaObjectClass* getClass() const;
- Value* getValue(char* key);
+ Value* getValue(char* key) const;
ObjectImpl* impl;
};
diff --git a/qpid/cpp/src/qmf/ObjectId.h b/qpid/cpp/src/qmf/ObjectId.h
index ffd1b6978b..e894e0b39c 100644
--- a/qpid/cpp/src/qmf/ObjectId.h
+++ b/qpid/cpp/src/qmf/ObjectId.h
@@ -30,6 +30,7 @@ namespace qmf {
class ObjectId {
public:
ObjectId();
+ ObjectId(const ObjectId& from);
ObjectId(ObjectIdImpl* impl);
~ObjectId();
diff --git a/qpid/cpp/src/qmf/ObjectIdImpl.cpp b/qpid/cpp/src/qmf/ObjectIdImpl.cpp
index 75661fdb47..c0618ccc49 100644
--- a/qpid/cpp/src/qmf/ObjectIdImpl.cpp
+++ b/qpid/cpp/src/qmf/ObjectIdImpl.cpp
@@ -100,6 +100,15 @@ void ObjectIdImpl::fromString(const std::string& repr)
agent = 0;
}
+std::string ObjectIdImpl::asString() const
+{
+ stringstream val;
+
+ val << getFlags() << "-" << getSequence() << "-" << getBrokerBank() << "-" <<
+ getAgentBank() << "-" << getObjectNum();
+ return val.str();
+}
+
bool ObjectIdImpl::operator==(const ObjectIdImpl& other) const
{
uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL;
@@ -126,15 +135,11 @@ bool ObjectIdImpl::operator>(const ObjectIdImpl& other) const
// Wrappers
//==================================================================
-ObjectId::ObjectId()
-{
- impl = new ObjectIdImpl(this);
-}
+ObjectId::ObjectId() : impl(new ObjectIdImpl(this)) {}
-ObjectId::ObjectId(ObjectIdImpl* i)
-{
- impl = i;
-}
+ObjectId::ObjectId(const ObjectId& from) : impl(new ObjectIdImpl(*(from.impl))) {}
+
+ObjectId::ObjectId(ObjectIdImpl* i) : impl(i) {}
ObjectId::~ObjectId()
{
diff --git a/qpid/cpp/src/qmf/ObjectIdImpl.h b/qpid/cpp/src/qmf/ObjectIdImpl.h
index 5d8ee59aee..38d231237f 100644
--- a/qpid/cpp/src/qmf/ObjectIdImpl.h
+++ b/qpid/cpp/src/qmf/ObjectIdImpl.h
@@ -39,13 +39,14 @@ namespace qmf {
uint64_t first;
uint64_t second;
- ObjectIdImpl(ObjectId* e) : envelope(e), agent(0) {}
+ ObjectIdImpl(ObjectId* e) : envelope(e), agent(0), first(0), second(0) {}
ObjectIdImpl(qpid::framing::Buffer& buffer);
ObjectIdImpl(AgentAttachment* agent, uint8_t flags, uint16_t seq, uint64_t object);
void decode(qpid::framing::Buffer& buffer);
void encode(qpid::framing::Buffer& buffer) const;
void fromString(const std::string& repr);
+ std::string asString() const;
uint8_t getFlags() const { return (first & 0xF000000000000000LL) >> 60; }
uint16_t getSequence() const { return (first & 0x0FFF000000000000LL) >> 48; }
uint32_t getBrokerBank() const { return (first & 0x0000FFFFF0000000LL) >> 28; }
diff --git a/qpid/cpp/src/qmf/ObjectImpl.cpp b/qpid/cpp/src/qmf/ObjectImpl.cpp
index 645ccd5c81..1ea2d54527 100644
--- a/qpid/cpp/src/qmf/ObjectImpl.cpp
+++ b/qpid/cpp/src/qmf/ObjectImpl.cpp
@@ -45,30 +45,40 @@ ObjectImpl::ObjectImpl(Object* e, const SchemaObjectClass* type) :
}
}
-ObjectImpl::ObjectImpl(const SchemaObjectClass* type, Buffer& buffer) :
- envelope(new Object(this)), objectClass(type), createTime(uint64_t(Duration(now()))),
- destroyTime(0), lastUpdatedTime(createTime)
+ObjectImpl::ObjectImpl(const SchemaObjectClass* type, Buffer& buffer, bool prop, bool stat, bool managed) :
+ envelope(new Object(this)), objectClass(type), createTime(0), destroyTime(0), lastUpdatedTime(0)
{
- int propCount = objectClass->getPropertyCount();
- int statCount = objectClass->getStatisticCount();
int idx;
- set<string> excludes;
- parsePresenceMasks(buffer, excludes);
- for (idx = 0; idx < propCount; idx++) {
- const SchemaProperty* prop = objectClass->getProperty(idx);
- if (excludes.count(prop->getName()) != 0) {
- properties[prop->getName()] = ValuePtr(new Value(prop->getType()));
- } else {
- ValueImpl* pval = new ValueImpl(prop->getType(), buffer);
- properties[prop->getName()] = ValuePtr(pval->envelope);
+ if (managed) {
+ lastUpdatedTime = buffer.getLongLong();
+ createTime = buffer.getLongLong();
+ destroyTime = buffer.getLongLong();
+ objectId.reset(new ObjectIdImpl(buffer));
+ }
+
+ if (prop) {
+ int propCount = objectClass->getPropertyCount();
+ set<string> excludes;
+ parsePresenceMasks(buffer, excludes);
+ for (idx = 0; idx < propCount; idx++) {
+ const SchemaProperty* prop = objectClass->getProperty(idx);
+ if (excludes.count(prop->getName()) != 0) {
+ properties[prop->getName()] = ValuePtr(new Value(prop->getType()));
+ } else {
+ ValueImpl* pval = new ValueImpl(prop->getType(), buffer);
+ properties[prop->getName()] = ValuePtr(pval->envelope);
+ }
}
}
- for (idx = 0; idx < statCount; idx++) {
- const SchemaStatistic* stat = objectClass->getStatistic(idx);
- ValueImpl* sval = new ValueImpl(stat->getType(), buffer);
- statistics[stat->getName()] = ValuePtr(sval->envelope);
+ if (stat) {
+ int statCount = objectClass->getStatisticCount();
+ for (idx = 0; idx < statCount; idx++) {
+ const SchemaStatistic* stat = objectClass->getStatistic(idx);
+ ValueImpl* sval = new ValueImpl(stat->getType(), buffer);
+ statistics[stat->getName()] = ValuePtr(sval->envelope);
+ }
}
}
@@ -82,7 +92,7 @@ void ObjectImpl::destroy()
// TODO - flag deletion
}
-Value* ObjectImpl::getValue(const string& key)
+Value* ObjectImpl::getValue(const string& key) const
{
map<string, ValuePtr>::const_iterator iter;
@@ -133,7 +143,7 @@ void ObjectImpl::encodeManagedObjectData(qpid::framing::Buffer& buffer) const
buffer.putLongLong(lastUpdatedTime);
buffer.putLongLong(createTime);
buffer.putLongLong(destroyTime);
- objectId->impl->encode(buffer);
+ objectId->encode(buffer);
}
void ObjectImpl::encodeProperties(qpid::framing::Buffer& buffer) const
@@ -187,36 +197,12 @@ void ObjectImpl::encodeStatistics(qpid::framing::Buffer& buffer) const
//==================================================================
Object::Object(const SchemaObjectClass* type) : impl(new ObjectImpl(this, type)) {}
-
Object::Object(ObjectImpl* i) : impl(i) {}
-
-Object::~Object()
-{
- delete impl;
-}
-
-void Object::destroy()
-{
- impl->destroy();
-}
-
-const ObjectId* Object::getObjectId() const
-{
- return impl->getObjectId();
-}
-
-void Object::setObjectId(ObjectId* oid)
-{
- impl->setObjectId(oid);
-}
-
-const SchemaObjectClass* Object::getClass() const
-{
- return impl->getClass();
-}
-
-Value* Object::getValue(char* key)
-{
- return impl->getValue(key);
-}
+Object::Object(const Object& from) : impl(new ObjectImpl(*(from.impl))) {}
+Object::~Object() { delete impl; }
+void Object::destroy() { impl->destroy(); }
+const ObjectId* Object::getObjectId() const { return impl->getObjectId(); }
+void Object::setObjectId(ObjectId* oid) { impl->setObjectId(oid); }
+const SchemaObjectClass* Object::getClass() const { return impl->getClass(); }
+Value* Object::getValue(char* key) const { return impl->getValue(key); }
diff --git a/qpid/cpp/src/qmf/ObjectImpl.h b/qpid/cpp/src/qmf/ObjectImpl.h
index 4dc2170bfc..d69979e0da 100644
--- a/qpid/cpp/src/qmf/ObjectImpl.h
+++ b/qpid/cpp/src/qmf/ObjectImpl.h
@@ -21,19 +21,22 @@
*/
#include <qmf/Object.h>
+#include <qmf/ObjectIdImpl.h>
#include <map>
#include <set>
#include <string>
#include <qpid/framing/Buffer.h>
#include <boost/shared_ptr.hpp>
+#include <qpid/sys/Mutex.h>
namespace qmf {
struct ObjectImpl {
+ typedef boost::shared_ptr<ObjectImpl> Ptr;
typedef boost::shared_ptr<Value> ValuePtr;
Object* envelope;
const SchemaObjectClass* objectClass;
- boost::shared_ptr<ObjectId> objectId;
+ boost::shared_ptr<ObjectIdImpl> objectId;
uint64_t createTime;
uint64_t destroyTime;
uint64_t lastUpdatedTime;
@@ -41,14 +44,14 @@ namespace qmf {
mutable std::map<std::string, ValuePtr> statistics;
ObjectImpl(Object* e, const SchemaObjectClass* type);
- ObjectImpl(const SchemaObjectClass* type, qpid::framing::Buffer& buffer);
+ ObjectImpl(const SchemaObjectClass* type, qpid::framing::Buffer& buffer, bool prop, bool stat, bool managed);
~ObjectImpl();
void destroy();
- const ObjectId* getObjectId() const { return objectId.get(); }
- void setObjectId(ObjectId* oid) { objectId.reset(oid); }
+ const ObjectId* getObjectId() const { return objectId.get() ? objectId->envelope : 0; }
+ void setObjectId(ObjectId* oid) { objectId.reset(oid->impl); }
const SchemaObjectClass* getClass() const { return objectClass; }
- Value* getValue(const std::string& key);
+ Value* getValue(const std::string& key) const;
void parsePresenceMasks(qpid::framing::Buffer& buffer, std::set<std::string>& excludeList);
void encodeSchemaKey(qpid::framing::Buffer& buffer) const;
diff --git a/qpid/cpp/src/qmf/Query.h b/qpid/cpp/src/qmf/Query.h
index 78bc6f4ae2..875749862e 100644
--- a/qpid/cpp/src/qmf/Query.h
+++ b/qpid/cpp/src/qmf/Query.h
@@ -25,26 +25,76 @@
namespace qmf {
+ struct Object;
+ struct QueryElementImpl;
struct QueryImpl;
+ struct QueryExpressionImpl;
+ struct SchemaClassKey;
+
+ enum ValueOper {
+ O_EQ = 1,
+ O_NE = 2,
+ O_LT = 3,
+ O_LE = 4,
+ O_GT = 5,
+ O_GE = 6,
+ O_RE_MATCH = 7,
+ O_RE_NOMATCH = 8
+ };
+
+ struct QueryOperand {
+ virtual ~QueryOperand() {}
+ virtual bool evaluate(const Object* object) const = 0;
+ };
+
+ struct QueryElement : public QueryOperand {
+ QueryElement(const char* attrName, const Value* value, ValueOper oper);
+ QueryElement(QueryElementImpl* impl);
+ virtual ~QueryElement();
+ bool evaluate(const Object* object) const;
+
+ QueryElementImpl* impl;
+ };
+
+ enum ExprOper {
+ E_NOT = 1,
+ E_AND = 2,
+ E_OR = 3,
+ E_XOR = 4
+ };
+
+ struct QueryExpression : public QueryOperand {
+ QueryExpression(ExprOper oper, const QueryOperand* operand1, const QueryOperand* operand2);
+ QueryExpression(QueryExpressionImpl* impl);
+ virtual ~QueryExpression();
+ bool evaluate(const Object* object) const;
+
+ QueryExpressionImpl* impl;
+ };
+
class Query {
public:
- Query();
+ Query(const char* className, const char* packageName);
+ Query(const SchemaClassKey* key);
+ Query(const ObjectId* oid);
Query(QueryImpl* impl);
~Query();
+ void setSelect(const QueryOperand* criterion);
+ void setLimit(uint32_t maxResults);
+ void setOrderBy(const char* attrName, bool decreasing);
+
const char* getPackage() const;
const char* getClass() const;
const ObjectId* getObjectId() const;
- enum Oper {
- OPER_AND = 1,
- OPER_OR = 2
- };
-
- int whereCount() const;
- Oper whereOper() const;
- const char* whereKey() const;
- const Value* whereValue() const;
+ bool haveSelect() const;
+ bool haveLimit() const;
+ bool haveOrderBy() const;
+ const QueryOperand* getSelect() const;
+ uint32_t getLimit() const;
+ const char* getOrderBy() const;
+ bool getDecreasing() const;
QueryImpl* impl;
};
diff --git a/qpid/cpp/src/qmf/QueryImpl.cpp b/qpid/cpp/src/qmf/QueryImpl.cpp
index 7e827796bb..f75a9aa5d5 100644
--- a/qpid/cpp/src/qmf/QueryImpl.cpp
+++ b/qpid/cpp/src/qmf/QueryImpl.cpp
@@ -18,54 +18,77 @@
*/
#include "qmf/QueryImpl.h"
+#include "qmf/ObjectIdImpl.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/FieldTable.h"
using namespace std;
using namespace qmf;
+using namespace qpid::framing;
-//==================================================================
-// Wrappers
-//==================================================================
-
-Query::Query() : impl(new QueryImpl(this)) {}
-Query::Query(QueryImpl* i) : impl(i) {}
-
-Query::~Query()
+bool QueryElementImpl::evaluate(const Object* /*object*/) const
{
- delete impl;
+ // TODO: Implement this
+ return false;
}
-const char* Query::getPackage() const
+bool QueryExpressionImpl::evaluate(const Object* /*object*/) const
{
- return impl->getPackage();
+ // TODO: Implement this
+ return false;
}
-const char* Query::getClass() const
+QueryImpl::QueryImpl(Buffer& buffer)
{
- return impl->getClass();
+ FieldTable ft;
+ ft.decode(buffer);
+ // TODO
}
-const ObjectId* Query::getObjectId() const
+void QueryImpl::encode(Buffer& buffer) const
{
- return impl->getObjectId();
-}
+ FieldTable ft;
-int Query::whereCount() const
-{
- return impl->whereCount();
-}
+ if (oid.get() != 0) {
+ ft.setString("_objectid", oid->impl->asString());
+ } else {
+ if (!packageName.empty())
+ ft.setString("_package", packageName);
+ ft.setString("_class", className);
+ }
-Query::Oper Query::whereOper() const
-{
- return impl->whereOper();
+ ft.encode(buffer);
}
-const char* Query::whereKey() const
-{
- return impl->whereKey();
-}
-const Value* Query::whereValue() const
-{
- return impl->whereValue();
-}
+//==================================================================
+// Wrappers
+//==================================================================
+
+QueryElement::QueryElement(const char* attrName, const Value* value, ValueOper oper) : impl(new QueryElementImpl(attrName, value, oper)) {}
+QueryElement::QueryElement(QueryElementImpl* i) : impl(i) {}
+QueryElement::~QueryElement() { delete impl; }
+bool QueryElement::evaluate(const Object* object) const { return impl->evaluate(object); }
+QueryExpression::QueryExpression(ExprOper oper, const QueryOperand* operand1, const QueryOperand* operand2) : impl(new QueryExpressionImpl(oper, operand1, operand2)) {}
+QueryExpression::QueryExpression(QueryExpressionImpl* i) : impl(i) {}
+QueryExpression::~QueryExpression() { delete impl; }
+bool QueryExpression::evaluate(const Object* object) const { return impl->evaluate(object); }
+Query::Query(const char* className, const char* packageName) : impl(new QueryImpl(className, packageName)) {}
+Query::Query(const SchemaClassKey* key) : impl(new QueryImpl(key)) {}
+Query::Query(const ObjectId* oid) : impl(new QueryImpl(oid)) {}
+Query::Query(QueryImpl* i) : impl(i) {}
+Query::~Query() { delete impl; }
+void Query::setSelect(const QueryOperand* criterion) { impl->setSelect(criterion); }
+void Query::setLimit(uint32_t maxResults) { impl->setLimit(maxResults); }
+void Query::setOrderBy(const char* attrName, bool decreasing) { impl->setOrderBy(attrName, decreasing); }
+const char* Query::getPackage() const { return impl->getPackage().c_str(); }
+const char* Query::getClass() const { return impl->getClass().c_str(); }
+const ObjectId* Query::getObjectId() const { return impl->getObjectId(); }
+bool Query::haveSelect() const { return impl->haveSelect(); }
+bool Query::haveLimit() const { return impl->haveLimit(); }
+bool Query::haveOrderBy() const { return impl->haveOrderBy(); }
+const QueryOperand* Query::getSelect() const { return impl->getSelect(); }
+uint32_t Query::getLimit() const { return impl->getLimit(); }
+const char* Query::getOrderBy() const { return impl->getOrderBy().c_str(); }
+bool Query::getDecreasing() const { return impl->getDecreasing(); }
diff --git a/qpid/cpp/src/qmf/QueryImpl.h b/qpid/cpp/src/qmf/QueryImpl.h
index 1cb9bfe554..4a56a457c0 100644
--- a/qpid/cpp/src/qmf/QueryImpl.h
+++ b/qpid/cpp/src/qmf/QueryImpl.h
@@ -20,28 +20,82 @@
* under the License.
*/
-#include <qmf/Query.h>
+#include "qmf/Query.h"
+#include "qmf/Schema.h"
#include <string>
#include <boost/shared_ptr.hpp>
+namespace qpid {
+ namespace framing {
+ class Buffer;
+ }
+}
+
namespace qmf {
+ struct QueryElementImpl {
+ QueryElementImpl(const std::string& a, const Value* v, ValueOper o) :
+ envelope(new QueryElement(this)), attrName(a), value(v), oper(o) {}
+ ~QueryElementImpl() {}
+ bool evaluate(const Object* object) const;
+
+ QueryElement* envelope;
+ std::string attrName;
+ const Value* value;
+ ValueOper oper;
+ };
+
+ struct QueryExpressionImpl {
+ QueryExpressionImpl(ExprOper o, const QueryOperand* operand1, const QueryOperand* operand2) :
+ envelope(new QueryExpression(this)), oper(o), left(operand1), right(operand2) {}
+ ~QueryExpressionImpl() {}
+ bool evaluate(const Object* object) const;
+
+ QueryExpression* envelope;
+ ExprOper oper;
+ const QueryOperand* left;
+ const QueryOperand* right;
+ };
+
struct QueryImpl {
- Query* envelope;
- std::string packageName;
- std::string className;
- boost::shared_ptr<ObjectId> oid;
+ QueryImpl(Query* e) : envelope(e), select(0) {}
+ QueryImpl(const std::string& c, const std::string& p) :
+ envelope(new Query(this)), packageName(p), className(c) {}
+ QueryImpl(const SchemaClassKey* key) :
+ envelope(new Query(this)), packageName(key->getPackageName()), className(key->getClassName()) {}
+ QueryImpl(const ObjectId* oid) :
+ envelope(new Query(this)), oid(new ObjectId(*oid)) {}
+ QueryImpl(qpid::framing::Buffer& buffer);
+ ~QueryImpl() {};
- QueryImpl(Query* e) : envelope(e) {}
+ void setSelect(const QueryOperand* criterion) { select = criterion; }
+ void setLimit(uint32_t maxResults) { resultLimit = maxResults; }
+ void setOrderBy(const std::string& attrName, bool decreasing) {
+ orderBy = attrName; orderDecreasing = decreasing;
+ }
- const char* getPackage() const { return packageName.empty() ? 0 : packageName.c_str(); }
- const char* getClass() const { return className.empty() ? 0 : className.c_str(); }
+ const std::string& getPackage() const { return packageName; }
+ const std::string& getClass() const { return className; }
const ObjectId* getObjectId() const { return oid.get(); }
- int whereCount() const { return 0;}
- Query::Oper whereOper() const { return Query::OPER_AND; }
- const char* whereKey() const { return 0; }
- const Value* whereValue() const { return 0; }
+ bool haveSelect() const { return select != 0; }
+ bool haveLimit() const { return resultLimit > 0; }
+ bool haveOrderBy() const { return !orderBy.empty(); }
+ const QueryOperand* getSelect() const { return select; }
+ uint32_t getLimit() const { return resultLimit; }
+ const std::string& getOrderBy() const { return orderBy; }
+ bool getDecreasing() const { return orderDecreasing; }
+
+ void encode(qpid::framing::Buffer& buffer) const;
+
+ Query* envelope;
+ std::string packageName;
+ std::string className;
+ boost::shared_ptr<ObjectId> oid;
+ const QueryOperand* select;
+ uint32_t resultLimit;
+ std::string orderBy;
+ bool orderDecreasing;
};
}
diff --git a/qpid/cpp/src/qmf/SchemaImpl.cpp b/qpid/cpp/src/qmf/SchemaImpl.cpp
index ae7d6ca689..3eb14c3952 100644
--- a/qpid/cpp/src/qmf/SchemaImpl.cpp
+++ b/qpid/cpp/src/qmf/SchemaImpl.cpp
@@ -261,7 +261,15 @@ void SchemaStatisticImpl::updateHash(SchemaHash& hash) const
SchemaClassKeyImpl::SchemaClassKeyImpl(const string& p, const string& n, const SchemaHash& h) :
envelope(new SchemaClassKey(this)), package(p), name(n), hash(h) {}
-void SchemaClassKeyImpl::encode(qpid::framing::Buffer& buffer) const
+SchemaClassKeyImpl::SchemaClassKeyImpl(Buffer& buffer) :
+ envelope(new SchemaClassKey(this)), package(packageContainer), name(nameContainer), hash(hashContainer)
+{
+ buffer.getShortString(packageContainer);
+ buffer.getShortString(nameContainer);
+ hashContainer.decode(buffer);
+}
+
+void SchemaClassKeyImpl::encode(Buffer& buffer) const
{
buffer.putShortString(package);
buffer.putShortString(name);
@@ -413,8 +421,9 @@ SchemaEventClassImpl::SchemaEventClassImpl(Buffer& buffer) :
buffer.getShortString(package);
buffer.getShortString(name);
hash.decode(buffer);
+ buffer.putOctet(0); // No parent class
- uint16_t argCount = buffer.getShort();
+ uint16_t argCount = buffer.getShort();
for (uint16_t idx = 0; idx < argCount; idx++) {
SchemaArgumentImpl* argument = new SchemaArgumentImpl(buffer);
diff --git a/qpid/cpp/src/qmf/SchemaImpl.h b/qpid/cpp/src/qmf/SchemaImpl.h
index 3e9677d1fa..035d99aecd 100644
--- a/qpid/cpp/src/qmf/SchemaImpl.h
+++ b/qpid/cpp/src/qmf/SchemaImpl.h
@@ -148,7 +148,14 @@ namespace qmf {
const std::string& name;
const SchemaHash& hash;
+ // The *Container elements are only used if there isn't an external place to
+ // store these values.
+ std::string packageContainer;
+ std::string nameContainer;
+ SchemaHash hashContainer;
+
SchemaClassKeyImpl(const std::string& package, const std::string& name, const SchemaHash& hash);
+ SchemaClassKeyImpl(qpid::framing::Buffer& buffer);
const std::string& getPackageName() const { return package; }
const std::string& getClassName() const { return name; }
diff --git a/qpid/cpp/src/qmf/SequenceManager.cpp b/qpid/cpp/src/qmf/SequenceManager.cpp
index f51ce9d8b8..3171e66fac 100644
--- a/qpid/cpp/src/qmf/SequenceManager.cpp
+++ b/qpid/cpp/src/qmf/SequenceManager.cpp
@@ -25,26 +25,72 @@ using namespace qpid::sys;
SequenceManager::SequenceManager() : nextSequence(1) {}
-uint32_t SequenceManager::reserve(SequenceContext* ctx)
+void SequenceManager::setUnsolicitedContext(SequenceContext::Ptr ctx)
+{
+ unsolicitedContext = ctx;
+}
+
+uint32_t SequenceManager::reserve(SequenceContext::Ptr ctx)
{
Mutex::ScopedLock _lock(lock);
+ if (ctx.get() == 0)
+ ctx = unsolicitedContext;
uint32_t seq = nextSequence;
while (contextMap.find(seq) != contextMap.end())
seq = seq < 0xFFFFFFFF ? seq + 1 : 1;
nextSequence = seq < 0xFFFFFFFF ? seq + 1 : 1;
contextMap[seq] = ctx;
+ ctx->reserve();
return seq;
}
void SequenceManager::release(uint32_t sequence)
{
Mutex::ScopedLock _lock(lock);
- map<uint32_t, SequenceContext*>::iterator iter = contextMap.find(sequence);
+
+ if (sequence == 0) {
+ if (unsolicitedContext.get() != 0)
+ unsolicitedContext->release();
+ return;
+ }
+
+ map<uint32_t, SequenceContext::Ptr>::iterator iter = contextMap.find(sequence);
if (iter != contextMap.end()) {
if (iter->second != 0)
- iter->second->complete();
+ iter->second->release();
contextMap.erase(iter);
}
}
+void SequenceManager::releaseAll()
+{
+ Mutex::ScopedLock _lock(lock);
+ contextMap.clear();
+}
+
+void SequenceManager::dispatch(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer)
+{
+ Mutex::ScopedLock _lock(lock);
+ bool done;
+
+ if (sequence == 0) {
+ if (unsolicitedContext.get() != 0) {
+ done = unsolicitedContext->handleMessage(opcode, sequence, buffer);
+ if (done)
+ unsolicitedContext->release();
+ }
+ return;
+ }
+
+ map<uint32_t, SequenceContext::Ptr>::iterator iter = contextMap.find(sequence);
+ if (iter != contextMap.end()) {
+ if (iter->second != 0) {
+ done = iter->second->handleMessage(opcode, sequence, buffer);
+ if (done) {
+ iter->second->release();
+ contextMap.erase(iter);
+ }
+ }
+ }
+}
diff --git a/qpid/cpp/src/qmf/SequenceManager.h b/qpid/cpp/src/qmf/SequenceManager.h
index c027872313..bbfd0728a7 100644
--- a/qpid/cpp/src/qmf/SequenceManager.h
+++ b/qpid/cpp/src/qmf/SequenceManager.h
@@ -21,29 +21,43 @@
*/
#include "qpid/sys/Mutex.h"
+#include <boost/shared_ptr.hpp>
#include <map>
+namespace qpid {
+ namespace framing {
+ class Buffer;
+ }
+}
+
namespace qmf {
class SequenceContext {
public:
+ typedef boost::shared_ptr<SequenceContext> Ptr;
SequenceContext() {}
virtual ~SequenceContext() {}
- virtual void complete() = 0;
+ virtual void reserve() = 0;
+ virtual bool handleMessage(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer) = 0;
+ virtual void release() = 0;
};
class SequenceManager {
public:
SequenceManager();
- uint32_t reserve(SequenceContext* ctx);
+ void setUnsolicitedContext(SequenceContext::Ptr ctx);
+ uint32_t reserve(SequenceContext::Ptr ctx = SequenceContext::Ptr());
void release(uint32_t sequence);
+ void releaseAll();
+ void dispatch(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer);
private:
mutable qpid::sys::Mutex lock;
uint32_t nextSequence;
- std::map<uint32_t, SequenceContext*> contextMap;
+ SequenceContext::Ptr unsolicitedContext;
+ std::map<uint32_t, SequenceContext::Ptr> contextMap;
};
}