summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-03-02 00:58:16 +0000
committerTed Ross <tross@apache.org>2010-03-02 00:58:16 +0000
commit10e6d0d251e8da42ab3e401dc75b0661bcee301e (patch)
tree79fd4e86bb55144d6ec9a8a6e3425b37dd5a3ed2
parentacf3a1931ec404d1b02a2e115ef18e531d3924e4 (diff)
downloadqpid-python-10e6d0d251e8da42ab3e401dc75b0661bcee301e.tar.gz
Further implementation of the QMFv2 agent engine.
- deprecated old ObjectId class - renamed Object to Data - added hooks for authorization of get, subscribe, and method call - added infrastructure for optional internal storage git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7@917825 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/include/qmf/Notifiable.h2
-rw-r--r--qpid/cpp/include/qmf/Protocol.h1
-rw-r--r--qpid/cpp/include/qmf/engine/Agent.h38
-rw-r--r--qpid/cpp/include/qmf/engine/Data.h (renamed from qpid/cpp/include/qmf/engine/Object.h)26
-rw-r--r--qpid/cpp/include/qmf/engine/Event.h49
-rw-r--r--qpid/cpp/include/qmf/engine/ObjectId.h68
-rw-r--r--qpid/cpp/include/qmf/engine/Query.h4
-rw-r--r--qpid/cpp/src/qmf.mk11
-rw-r--r--qpid/cpp/src/qmf/Protocol.cpp1
-rw-r--r--qpid/cpp/src/qmf/engine/Agent.cpp257
-rw-r--r--qpid/cpp/src/qmf/engine/DataImpl.cpp84
-rw-r--r--qpid/cpp/src/qmf/engine/DataImpl.h (renamed from qpid/cpp/src/qmf/engine/ObjectImpl.h)22
-rw-r--r--qpid/cpp/src/qmf/engine/EventImpl.cpp108
-rw-r--r--qpid/cpp/src/qmf/engine/EventImpl.h53
-rw-r--r--qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp210
-rw-r--r--qpid/cpp/src/qmf/engine/ObjectIdImpl.h72
-rw-r--r--qpid/cpp/src/qmf/engine/ObjectImpl.cpp67
-rw-r--r--qpid/cpp/src/qmf/engine/QueryImpl.cpp26
-rw-r--r--qpid/cpp/src/qmf/engine/QueryImpl.h9
-rw-r--r--qpid/cpp/src/qmf/engine/SchemaImpl.h2
20 files changed, 389 insertions, 721 deletions
diff --git a/qpid/cpp/include/qmf/Notifiable.h b/qpid/cpp/include/qmf/Notifiable.h
index 43f546d9cd..e3b075b49e 100644
--- a/qpid/cpp/include/qmf/Notifiable.h
+++ b/qpid/cpp/include/qmf/Notifiable.h
@@ -40,7 +40,7 @@ namespace qmf {
*/
class Notifiable {
public:
- QMF_EXTERN virtual ~Notifiable();
+ QMF_EXTERN virtual ~Notifiable() {}
virtual void notify() = 0;
};
}
diff --git a/qpid/cpp/include/qmf/Protocol.h b/qpid/cpp/include/qmf/Protocol.h
index 361a7bd283..e2ec287c00 100644
--- a/qpid/cpp/include/qmf/Protocol.h
+++ b/qpid/cpp/include/qmf/Protocol.h
@@ -71,6 +71,7 @@ namespace qmf {
/**
* QMF Op Codes
*/
+ const static std::string OP_EXCEPTION;
const static std::string OP_AGENT_LOCATE_REQUEST;
const static std::string OP_AGENT_LOCATE_RESPONSE;
const static std::string OP_AGENT_HEARTBEAT_INDICATION;
diff --git a/qpid/cpp/include/qmf/engine/Agent.h b/qpid/cpp/include/qmf/engine/Agent.h
index 4585ce51cc..d7fff97ad1 100644
--- a/qpid/cpp/include/qmf/engine/Agent.h
+++ b/qpid/cpp/include/qmf/engine/Agent.h
@@ -22,9 +22,7 @@
#include <qmf/Notifiable.h>
#include <qmf/engine/Schema.h>
-#include <qmf/engine/ObjectId.h>
-#include <qmf/engine/Object.h>
-#include <qmf/engine/Event.h>
+#include <qmf/engine/Data.h>
#include <qmf/engine/Query.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Variant.h>
@@ -40,10 +38,13 @@ namespace engine {
*/
struct AgentEvent {
enum EventKind {
- GET_QUERY = 1,
- START_SYNC = 2,
- END_SYNC = 3,
- METHOD_CALL = 4
+ GET_QUERY = 1,
+ START_SYNC = 2,
+ END_SYNC = 3,
+ METHOD_CALL = 4,
+ GET_AUTHORIZE = 5,
+ METHOD_AUTHORIZE = 6,
+ SYNC_AUTHORIZE = 7
};
EventKind kind;
@@ -52,7 +53,7 @@ namespace engine {
char* authToken; // Authentication token if issued (for all kinds)
char* name; // Name of the method/sync query
// (METHOD_CALL, START_SYNC, END_SYNC)
- Object* object; // Object involved in method call (METHOD_CALL)
+ Data* object; // Object involved in method call (METHOD_CALL)
char* objectKey; // Object key for method call (METHOD_CALL)
Query* query; // Query parameters (GET_QUERY, START_SYNC)
qpid::messaging::Variant::Map* arguments; // Method parameters (METHOD_CALL)
@@ -128,6 +129,21 @@ namespace engine {
void setConnection(qpid::messaging::Connection& conn);
/**
+ * Respond to an authorize request by allowing the requested action.
+ *@param sequence The sequence number from the authorization request event.
+ */
+ void authAllow(uint32_t sequence);
+
+ /**
+ * Respond to an authorize request by denying the requested action.
+ *@param sequence The sequence number from the authorization request event.
+ *@param exception Value (typically a string) describing the reason for the
+ * rejection of authorization.
+ */
+ void authDeny(uint32_t sequence, const Data& exception=Data());
+ void authDeny(uint32_t sequence, const char* error);
+
+ /**
* Respond to a method request.
*@param sequence The sequence number from the method request event.
*@param status The method's completion status.
@@ -143,7 +159,7 @@ namespace engine {
*@param sequence The sequence number of the GET request or the SYNC_START request.
*@param object The object (annotated with "changed" flags) for publication.
*/
- void queryResponse(uint32_t sequence, Object& object);
+ void queryResponse(uint32_t sequence, Data& object);
/**
* Indicate the completion of a query. This is not used for SYNC_START requests.
@@ -165,13 +181,13 @@ namespace engine {
* left null, the agent will create a unique name for the object.
*@return The key for the managed object.
*/
- const char* addObject(Object& obj, const char* key=0);
+ const char* addObject(Data& obj, const char* key=0);
/**
* Raise an event into the QMF network..
*@param event The event object for the event to be raised.
*/
- void raiseEvent(Event& event);
+ void raiseEvent(Data& event);
private:
AgentImpl* impl;
diff --git a/qpid/cpp/include/qmf/engine/Object.h b/qpid/cpp/include/qmf/engine/Data.h
index 61d4f3d75c..30f2093df7 100644
--- a/qpid/cpp/include/qmf/engine/Object.h
+++ b/qpid/cpp/include/qmf/engine/Data.h
@@ -1,5 +1,5 @@
-#ifndef _QmfEngineObject_
-#define _QmfEngineObject_
+#ifndef _QmfEngineData_
+#define _QmfEngineData_
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -21,23 +21,25 @@
*/
#include <qmf/engine/Schema.h>
-#include <qmf/engine/ObjectId.h>
#include <qpid/messaging/Variant.h>
namespace qmf {
namespace engine {
- struct ObjectImpl;
- class Object {
+ struct DataImpl;
+ class Data {
public:
- Object();
- Object(SchemaClass* type);
- Object(const Object& from);
- virtual ~Object();
+ Data();
+ Data(SchemaClass* type, const qpid::messaging::Variant::Map& values=qpid::messaging::Variant::Map());
+ Data(const Data& from);
+ virtual ~Data();
const qpid::messaging::Variant::Map& getValues() const;
qpid::messaging::Variant::Map& getValues();
+ const qpid::messaging::Variant::Map& getSubtypes() const;
+ qpid::messaging::Variant::Map& getSubtypes();
+
const SchemaClass* getSchema() const;
void setSchema(SchemaClass* schema);
@@ -47,10 +49,12 @@ namespace engine {
void touch();
void destroy();
+ qpid::messaging::Variant::Map asMap() const;
+
private:
- friend struct ObjectImpl;
+ friend struct DataImpl;
friend class AgentImpl;
- ObjectImpl* impl;
+ DataImpl* impl;
};
}
}
diff --git a/qpid/cpp/include/qmf/engine/Event.h b/qpid/cpp/include/qmf/engine/Event.h
deleted file mode 100644
index 5096e3e064..0000000000
--- a/qpid/cpp/include/qmf/engine/Event.h
+++ /dev/null
@@ -1,49 +0,0 @@
-#ifndef _QmfEngineEvent_
-#define _QmfEngineEvent_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-namespace qmf {
-namespace engine {
-
- class SchemaClass;
- class Value;
- struct EventImpl;
-
- class Event {
- public:
- Event(const SchemaClass* type);
- Event(const Event& from);
- ~Event();
-
- const SchemaClass* getClass() const;
- Value* getValue(const char* key) const;
-
- private:
- friend struct EventImpl;
- friend class AgentImpl;
- Event(EventImpl* impl);
- EventImpl* impl;
- };
-}
-}
-
-#endif
-
diff --git a/qpid/cpp/include/qmf/engine/ObjectId.h b/qpid/cpp/include/qmf/engine/ObjectId.h
deleted file mode 100644
index 51eb2bc9e7..0000000000
--- a/qpid/cpp/include/qmf/engine/ObjectId.h
+++ /dev/null
@@ -1,68 +0,0 @@
-#ifndef _QmfEngineObjectId_
-#define _QmfEngineObjectId_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <qpid/sys/IntegerTypes.h>
-
-namespace qmf {
-namespace engine {
-
- // TODO: Add to/from string and << operator
-
- struct ObjectIdImpl;
- class ObjectId {
- public:
- ObjectId();
- ObjectId(const ObjectId& from);
- ~ObjectId();
-
- uint64_t getObjectNum() const;
- uint32_t getObjectNumHi() const;
- uint32_t getObjectNumLo() const;
- bool isDurable() const;
- const char* str() const;
- uint8_t getFlags() const;
- uint16_t getSequence() const;
- uint32_t getBrokerBank() const;
- uint32_t getAgentBank() const;
-
- bool operator==(const ObjectId& other) const;
- bool operator<(const ObjectId& other) const;
- bool operator>(const ObjectId& other) const;
- bool operator<=(const ObjectId& other) const;
- bool operator>=(const ObjectId& other) const;
- ObjectId& operator=(const ObjectId &other);
-
- private:
- friend struct ObjectIdImpl;
- friend struct ObjectImpl;
- friend class BrokerProxyImpl;
- friend struct QueryImpl;
- friend struct ValueImpl;
- friend class AgentImpl;
- ObjectId(ObjectIdImpl* impl);
- ObjectIdImpl* impl;
- };
-}
-}
-
-#endif
-
diff --git a/qpid/cpp/include/qmf/engine/Query.h b/qpid/cpp/include/qmf/engine/Query.h
index 8954a08285..7c22360b03 100644
--- a/qpid/cpp/include/qmf/engine/Query.h
+++ b/qpid/cpp/include/qmf/engine/Query.h
@@ -25,7 +25,6 @@
namespace qmf {
namespace engine {
- class Object;
class QueryImpl;
class Query {
@@ -49,11 +48,12 @@ namespace engine {
const char* getOrderBy() const;
bool getDecreasing() const;
- bool matches(const Object& object) const;
+ bool matches(const qpid::messaging::Variant::Map& data) const;
private:
friend struct QueryImpl;
friend struct BrokerProxyImpl;
+ Query(QueryImpl*);
QueryImpl* impl;
};
}
diff --git a/qpid/cpp/src/qmf.mk b/qpid/cpp/src/qmf.mk
index 80e4f5bc46..3106d0bcb9 100644
--- a/qpid/cpp/src/qmf.mk
+++ b/qpid/cpp/src/qmf.mk
@@ -40,16 +40,13 @@ QMF_API = \
QMF_ENGINE_API = \
../include/qmf/engine/Agent.h \
../include/qmf/engine/Console.h \
- ../include/qmf/engine/Event.h \
- ../include/qmf/engine/Object.h \
+ ../include/qmf/engine/Data.h \
../include/qmf/engine/QmfEngineImportExport.h \
../include/qmf/engine/Query.h \
../include/qmf/engine/Schema.h \
../include/qmf/Agent.h \
../include/qmf/Notifiable.h
-# ../include/qmf/engine/ObjectId.h
-
# Public header files
nobase_include_HEADERS += \
$(QMF_API) \
@@ -65,8 +62,8 @@ libqmf_la_SOURCES = \
libqmfengine_la_SOURCES = \
$(QMF_ENGINE_API) \
qmf/engine/Agent.cpp \
- qmf/engine/ObjectImpl.cpp \
- qmf/engine/ObjectImpl.h \
+ qmf/engine/DataImpl.cpp \
+ qmf/engine/DataImpl.h \
qmf/Protocol.cpp \
qmf/Protocol.h \
qmf/engine/QueryImpl.cpp \
@@ -78,8 +75,6 @@ libqmfengine_la_SOURCES = \
# qmf/engine/BrokerProxyImpl.h
# qmf/engine/ConsoleImpl.cpp
# qmf/engine/ConsoleImpl.h
-# qmf/engine/ObjectIdImpl.cpp
-# qmf/engine/ObjectIdImpl.h
# qmf/engine/SequenceManager.cpp
# qmf/engine/SequenceManager.h
diff --git a/qpid/cpp/src/qmf/Protocol.cpp b/qpid/cpp/src/qmf/Protocol.cpp
index 518d263080..5ab009d6bf 100644
--- a/qpid/cpp/src/qmf/Protocol.cpp
+++ b/qpid/cpp/src/qmf/Protocol.cpp
@@ -48,6 +48,7 @@ const string Protocol::AMQP_CONTENT_LIST("amqp/list");
const string Protocol::APP_OPCODE("qmf.opcode");
+const string Protocol::OP_EXCEPTION("_exception");
const string Protocol::OP_AGENT_LOCATE_REQUEST("_agent_locate_request");
const string Protocol::OP_AGENT_LOCATE_RESPONSE("_agent_locate_response");
const string Protocol::OP_AGENT_HEARTBEAT_INDICATION("_agent_heartbeat_indication");
diff --git a/qpid/cpp/src/qmf/engine/Agent.cpp b/qpid/cpp/src/qmf/engine/Agent.cpp
index c0c8b69bc8..04308b954a 100644
--- a/qpid/cpp/src/qmf/engine/Agent.cpp
+++ b/qpid/cpp/src/qmf/engine/Agent.cpp
@@ -19,10 +19,11 @@
#include "qmf/engine/Agent.h"
#include "qmf/engine/SchemaImpl.h"
-#include "qmf/engine/ObjectImpl.h"
+#include "qmf/engine/Data.h"
#include "qmf/engine/QueryImpl.h"
#include "qmf/Protocol.h"
#include <qpid/sys/Mutex.h>
+#include <qpid/sys/Condition.h>
#include <qpid/log/Statement.h>
#include <qpid/sys/Time.h>
#include <qpid/sys/Thread.h>
@@ -30,8 +31,11 @@
#include <qpid/messaging/Session.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
+#include <qpid/messaging/Address.h>
#include <qpid/messaging/Message.h>
+#include <qpid/messaging/MapContent.h>
#include <qpid/messaging/MapView.h>
+#include <qpid/messaging/ListView.h>
#include <string>
#include <deque>
#include <map>
@@ -48,6 +52,8 @@ using namespace qpid::messaging;
namespace qmf {
namespace engine {
+ class AgentImpl;
+
struct AgentEventImpl {
typedef boost::shared_ptr<AgentEventImpl> Ptr;
AgentEvent::EventKind kind;
@@ -55,7 +61,7 @@ namespace engine {
string authUserId;
string authToken;
string name;
- Object* object;
+ Data* object;
string objectKey;
boost::shared_ptr<Query> query;
boost::shared_ptr<Variant::Map> arguments;
@@ -68,15 +74,31 @@ namespace engine {
};
/**
- * AgentQueryContext is used to track asynchronous requests (Query, Sync, or Method)
+ * AsyncContext is used to track asynchronous requests (Query, Sync, or Method)
* sent up to the application.
*/
- struct AgentQueryContext {
- typedef boost::shared_ptr<AgentQueryContext> Ptr;
- uint32_t sequence;
- string consoleAddr;
+ struct AsyncContext {
+ typedef boost::shared_ptr<AsyncContext> Ptr;
+ string correlationId;
+ Address replyTo;
+ AgentEventImpl::Ptr authorizedEvent;
const SchemaMethod* schemaMethod;
- AgentQueryContext() : schemaMethod(0) {}
+ AsyncContext(const string& cid, const Address& rt) : correlationId(cid), replyTo(rt), schemaMethod(0) {}
+ };
+
+ class StoreThread : public boost::noncopyable, public qpid::sys::Runnable {
+ public:
+ StoreThread(AgentImpl& a) : agent(a), running(true), thread(*this) {}
+ ~StoreThread() {
+ stop();
+ }
+ void run();
+ void stop();
+
+ private:
+ AgentImpl& agent;
+ bool running;
+ qpid::sys::Thread thread;
};
class AgentImpl : public boost::noncopyable, public qpid::sys::Runnable {
@@ -92,24 +114,34 @@ namespace engine {
bool getEvent(AgentEvent& event) const;
void popEvent();
void setConnection(Connection& conn);
+ void authAllow(uint32_t sequence);
+ void authDeny(uint32_t sequence, const Data&);
+ void authDeny(uint32_t sequence, const string&);
void methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& arguments);
- void queryResponse(uint32_t sequence, Object& object);
+ void queryResponse(uint32_t sequence, Data& object);
void queryComplete(uint32_t sequence);
void registerClass(SchemaClass* cls);
- const char* addObject(Object& obj, const char* key);
- void raiseEvent(Event& event);
+ const char* addObject(Data& obj, const char* key);
+ void raiseEvent(Data& event);
void run();
void stop();
+ // This blocking call is used by the internal store thread(s) to get work to do.
+ AgentEventImpl::Ptr nextInternalEvent();
+ void signalInternal() { cond.notify(); }
+
private:
mutable Mutex lock;
- Mutex addLock;
+ Condition cond;
const string vendor;
const string product;
const string name;
const string domain;
string directAddr;
+ string directAddrParams;
+ string topicAddr;
+ string topicAddrParams;
Variant::Map attrMap;
string storeDir;
string transferDir;
@@ -121,13 +153,15 @@ namespace engine {
uint32_t nextContextNum;
bool running;
deque<AgentEventImpl::Ptr> eventQueue;
- map<uint32_t, AgentQueryContext::Ptr> contextMap;
+ deque<AgentEventImpl::Ptr> internalEventQueue;
+ map<uint32_t, AsyncContext::Ptr> contextMap;
Connection connection;
Session session;
Receiver directReceiver;
Receiver topicReceiver;
Sender sender;
qpid::sys::Thread* thread;
+ StoreThread* storeThread;
struct AgentClassKey {
string name;
@@ -169,6 +203,8 @@ namespace engine {
void handleSubscribeCancel(const Message& message);
void handleSubscribeRefresh(const Message& message);
void handleMethodRequest(const Message& message);
+ void sendResponse(const Message& request, const string& opcode, const Data& data);
+ void sendResponse(const Address& address, const string& correlationId, const string& opcode, const Data& data);
void sendPackageIndicationLH(const string& packageName);
void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key);
@@ -207,18 +243,38 @@ AgentEvent AgentEventImpl::copy()
return item;
}
+void StoreThread::run()
+{
+ while (running) {
+ AgentEventImpl::Ptr ptr(agent.nextInternalEvent());
+ }
+}
+
+void StoreThread::stop()
+{
+ running = false;
+ agent.signalInternal();
+}
+
AgentImpl::AgentImpl(const char* _v, const char* _p, const char* _n, const char* _d, bool _i) :
vendor(_v), product(_p), name(_n), domain(_d ? _d : "default"), internalStore(_i),
notifyHandler(0), notifiable(0),
bootSequence(1), nextContextNum(1), running(true), thread(0)
{
directAddr = "qmf." + domain + ".direct/" + vendor + ":" + product + ":" + name;
- if (_d == 0) {
- directAddr += " { create:always }";
+ topicAddr = "qmf." + domain + ".topic/console.ind.#";
+ if (_d != 0) {
+ directAddrParams = " {create: always, type: topic, x-properties: {type: direct}}";
+ topicAddrParams = " {create: always, type: topic, x-properties: {type: topic}}";
+ }
+ attrMap["_vendor"] = vendor;
+ attrMap["_product"] = product;
+ attrMap["_instance"] = name;
+ attrMap["_name"] = vendor + ":" + product + ":" + name;
+
+ if (internalStore) {
+ storeThread = new StoreThread(*this);
}
- attrMap["vendor"] = vendor;
- attrMap["product"] = product;
- attrMap["name"] = name;
}
@@ -297,42 +353,93 @@ void AgentImpl::setConnection(Connection& conn)
thread = new qpid::sys::Thread(*this);
}
+void AgentImpl::authAllow(uint32_t sequence)
+{
+ Mutex::ScopedLock _lock(lock);
+
+ // Find the context associated with the sequence number
+ map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
+ if (iter == contextMap.end())
+ return;
+ AsyncContext::Ptr context = iter->second;
+
+ // Transform the authorize event into the real event
+ switch (context->authorizedEvent->kind) {
+ case AgentEvent::GET_AUTHORIZE : context->authorizedEvent->kind = AgentEvent::GET_QUERY; break;
+ case AgentEvent::METHOD_AUTHORIZE : context->authorizedEvent->kind = AgentEvent::METHOD_CALL; break;
+ case AgentEvent::SYNC_AUTHORIZE : context->authorizedEvent->kind = AgentEvent::START_SYNC; break;
+ default:
+ contextMap.erase(iter);
+ return;
+ }
+
+ // Re-issue the now-authorized action. If this is a data query (get or subscribe),
+ // and the agent is handling storage internally, redirect to the internal event
+ // queue for processing by the internal-storage thread.
+ if (internalStore) {
+ internalEventQueue.push_back(context->authorizedEvent);
+ cond.notify();
+ } else {
+ eventQueue.push_back(context->authorizedEvent);
+ notify();
+ }
+}
+
+void AgentImpl::authDeny(uint32_t sequence, const Data& exception)
+{
+ Mutex::ScopedLock _lock(lock);
+ map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
+ if (iter == contextMap.end())
+ return;
+ AsyncContext::Ptr context = iter->second;
+ contextMap.erase(iter);
+
+ // Return an exception message to the requestor
+ sendResponse(context->replyTo, context->correlationId, Protocol::OP_EXCEPTION, exception);
+}
+
+void AgentImpl::authDeny(uint32_t sequence, const string& error)
+{
+ Data exception;
+ exception.getValues()["status"] = "Access to this Operation Denied";
+ exception.getValues()["text"] = error;
+ authDeny(sequence, exception);
+}
+
void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& /*argMap*/)
{
Mutex::ScopedLock _lock(lock);
- map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
+ map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
if (iter == contextMap.end())
return;
- AgentQueryContext::Ptr context = iter->second;
+ AsyncContext::Ptr context = iter->second;
contextMap.erase(iter);
// TODO: Encode method response
- QPID_LOG(trace, "SENT MethodResponse seq=" << context->sequence << " status=" << status << " text=" << text);
+ QPID_LOG(trace, "SENT MethodResponse corr=" << context->correlationId << " status=" << status << " text=" << text);
}
-void AgentImpl::queryResponse(uint32_t sequence, Object&)
+void AgentImpl::queryResponse(uint32_t sequence, Data&)
{
Mutex::ScopedLock _lock(lock);
- map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
+ map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
if (iter == contextMap.end())
return;
- AgentQueryContext::Ptr context = iter->second;
+ AsyncContext::Ptr context = iter->second;
// TODO: accumulate data records and send response messages when we have "enough"
-
- QPID_LOG(trace, "SENT ContentIndication seq=" << context->sequence);
}
void AgentImpl::queryComplete(uint32_t sequence)
{
Mutex::ScopedLock _lock(lock);
- map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
+ map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
if (iter == contextMap.end())
return;
// TODO: send a response message if there are any unsent data records
- AgentQueryContext::Ptr context = iter->second;
+ AsyncContext::Ptr context = iter->second;
contextMap.erase(iter);
//sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK");
}
@@ -354,13 +461,13 @@ void AgentImpl::registerClass(SchemaClass* cls)
// TODO: Indicate this schema if connected.
}
-const char* AgentImpl::addObject(Object&, const char*)
+const char* AgentImpl::addObject(Data&, const char*)
{
Mutex::ScopedLock _lock(lock);
return 0;
}
-void AgentImpl::raiseEvent(Event&)
+void AgentImpl::raiseEvent(Data&)
{
Mutex::ScopedLock _lock(lock);
}
@@ -370,9 +477,14 @@ void AgentImpl::run()
qpid::sys::Duration duration = qpid::sys::TIME_MSEC * 500;
session = connection.newSession();
- directReceiver = session.createReceiver(directAddr);
+ QPID_LOG(trace, "Creating direct receiver to address: " << directAddr << directAddrParams);
+ directReceiver = session.createReceiver(directAddr + directAddrParams);
directReceiver.setCapacity(10);
+ QPID_LOG(trace, "Creating topic receiver to address: " << topicAddr << topicAddrParams);
+ topicReceiver = session.createReceiver(topicAddr + topicAddrParams);
+ topicReceiver.setCapacity(10);
+
Mutex::ScopedLock _lock(lock);
while (running) {
Receiver rcvr;
@@ -398,12 +510,29 @@ void AgentImpl::stop()
running = false;
}
+AgentEventImpl::Ptr AgentImpl::nextInternalEvent()
+{
+ Mutex::ScopedLock _lock(lock);
+ while (internalEventQueue.empty())
+ cond.wait(lock);
+
+ AgentEventImpl::Ptr event(internalEventQueue.front());
+ internalEventQueue.pop_front();
+ return event;
+
+ // TODO: make sure this function returns with a null pointer when the thread needs to stop.
+}
+
+
void AgentImpl::handleRcvMessageLH(const Message& message)
{
Variant::Map headers(message.getHeaders());
- cout << "AgentImpl::handleRcvMessageLH headers=" << headers << endl;
+ cout << "AgentImpl::handleRcvMessageLH contentType=" << message.getContentType() <<
+ " replyTo=" << message.getReplyTo() <<
+ " headers=" << headers << endl;
- if (message.getContentType() != Protocol::AMQP_CONTENT_MAP)
+ if (message.getContentType() != Protocol::AMQP_CONTENT_MAP &&
+ message.getContentType() != Protocol::AMQP_CONTENT_LIST)
return;
Variant::Map::const_iterator iter = headers.find(Protocol::APP_OPCODE);
@@ -421,16 +550,33 @@ void AgentImpl::handleRcvMessageLH(const Message& message)
void AgentImpl::handleAgentLocateLH(const Message& message)
{
- const MapView predicate(message);
-
- //if (predicateMatches(predicate, attrMap)) {
- // sendResponse(message, Protocol::OP_AGENT_LOCATE_RESPONSE, attrMap);
- //}
+ QPID_LOG(trace, "RCVD AgentLocateRequest replyTo=" << message.getReplyTo());
+ auto_ptr<Query> query(QueryImpl::factory(ListView(message)));
+ if (query->matches(attrMap)) {
+ Data data(0, attrMap);
+ sendResponse(message, Protocol::OP_AGENT_LOCATE_RESPONSE, data);
+ QPID_LOG(trace, "SENT AgentLocateResponse");
+ }
}
void AgentImpl::handleQueryRequestLH(const Message& message)
{
- const MapView map(message);
+ uint32_t contextNum = nextContextNum++;
+ AsyncContext::Ptr context(new AsyncContext(message.getCorrelationId(), message.getReplyTo()));
+ contextMap[contextNum] = context;
+
+ // Build the event for the get request
+ AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_AUTHORIZE));
+ event->sequence = contextNum;
+ event->authUserId = message.getUserId();
+ event->query.reset(QueryImpl::factory(MapView(message)));
+
+ // Put the not-yet-authorized event into the context for possible later use
+ context->authorizedEvent = event;
+
+ // Enqueue the event
+ eventQueue.push_back(event);
+ notify();
}
void AgentImpl::handleSubscribeRequest(const Message& message)
@@ -453,6 +599,22 @@ void AgentImpl::handleMethodRequest(const Message& message)
const MapView map(message);
}
+void AgentImpl::sendResponse(const Message& request, const string& opcode, const Data& data)
+{
+ sendResponse(request.getReplyTo(), request.getCorrelationId(), opcode, data);
+}
+
+void AgentImpl::sendResponse(const Address& address, const string& correlationId, const string& opcode, const Data& data)
+{
+ Message message;
+ MapContent content(message, data.asMap());
+
+ message.setCorrelationId(correlationId);
+ message.getHeaders()[Protocol::APP_OPCODE] = opcode;
+ content.encode();
+ session.createSender(address).send(message);
+}
+
AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string&, const string&, const string& key)
{
AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY));
@@ -528,14 +690,8 @@ void AgentImpl::handleGetQuery(Message&, uint32_t, const string&, const string&)
Mutex::ScopedLock _lock(lock);
}
-void AgentImpl::handleMethodRequest(Message& /*msg*/, uint32_t sequence, const string& /*replyTo*/, const string& /*userId*/)
+void AgentImpl::handleMethodRequest(Message& /*msg*/, uint32_t, const string& /*replyTo*/, const string& /*userId*/)
{
- Mutex::ScopedLock _lock(lock);
- QPID_LOG(trace, "RCVD MethodRequest seq=" << sequence << " method=");
-
- AgentQueryContext::Ptr context(new AgentQueryContext);
- uint32_t contextNum = nextContextNum++;
- contextMap[contextNum] = context;
}
//==================================================================
@@ -552,10 +708,13 @@ void Agent::setTransferDir(const char* path) { impl->setTransferDir(path); }
bool Agent::getEvent(AgentEvent& event) const { return impl->getEvent(event); }
void Agent::popEvent() { impl->popEvent(); }
void Agent::setConnection(Connection& conn) { impl->setConnection(conn); }
+void Agent::authAllow(uint32_t sequence) { impl->authAllow(sequence); }
+void Agent::authDeny(uint32_t sequence, const Data& ex) { impl->authDeny(sequence, ex); }
+void Agent::authDeny(uint32_t sequence, const char* ex) { impl->authDeny(sequence, string(ex)); }
void Agent::methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& arguments) { impl->methodResponse(sequence, status, text, arguments); }
-void Agent::queryResponse(uint32_t sequence, Object& object) { impl->queryResponse(sequence, object); }
+void Agent::queryResponse(uint32_t sequence, Data& object) { impl->queryResponse(sequence, object); }
void Agent::queryComplete(uint32_t sequence) { impl->queryComplete(sequence); }
void Agent::registerClass(SchemaClass* cls) { impl->registerClass(cls); }
-const char* Agent::addObject(Object& obj, const char* key) { return impl->addObject(obj, key); }
-void Agent::raiseEvent(Event& event) { impl->raiseEvent(event); }
+const char* Agent::addObject(Data& obj, const char* key) { return impl->addObject(obj, key); }
+void Agent::raiseEvent(Data& event) { impl->raiseEvent(event); }
diff --git a/qpid/cpp/src/qmf/engine/DataImpl.cpp b/qpid/cpp/src/qmf/engine/DataImpl.cpp
new file mode 100644
index 0000000000..7eced7e504
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/DataImpl.cpp
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/Protocol.h"
+#include "qmf/engine/DataImpl.h"
+#include <qpid/sys/Time.h>
+
+using namespace std;
+using namespace qmf::engine;
+using namespace qpid::sys;
+using namespace qpid::messaging;
+
+DataImpl::DataImpl() :
+ objectClass(0), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime)
+{
+}
+
+
+DataImpl::DataImpl(SchemaClass* type, const qpid::messaging::Variant::Map& v) :
+ values(v), objectClass(type), createTime(uint64_t(Duration(now()))),
+ destroyTime(0), lastUpdatedTime(createTime)
+{
+}
+
+
+void DataImpl::touch()
+{
+ lastUpdatedTime = uint64_t(Duration(now()));
+}
+
+
+void DataImpl::destroy()
+{
+ destroyTime = uint64_t(Duration(now()));
+}
+
+Variant::Map DataImpl::asMap() const
+{
+ Variant::Map map;
+
+ map[Protocol::VALUES] = values;
+ if (!subtypes.empty())
+ map[Protocol::SUBTYPES] = subtypes;
+ // TODO: Add key, schema, and lifecycle data
+
+ return map;
+}
+
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+Data::Data() : impl(new DataImpl()) {}
+Data::Data(SchemaClass* type, const Variant::Map& m) : impl(new DataImpl(type, m)) {}
+Data::Data(const Data& from) : impl(new DataImpl(*(from.impl))) {}
+Data::~Data() { delete impl; }
+const Variant::Map& Data::getValues() const { return impl->getValues(); }
+Variant::Map& Data::getValues() { return impl->getValues(); }
+const Variant::Map& Data::getSubtypes() const { return impl->getSubtypes(); }
+Variant::Map& Data::getSubtypes() { return impl->getSubtypes(); }
+const SchemaClass* Data::getSchema() const { return impl->getSchema(); }
+void Data::setSchema(SchemaClass* schema) { impl->setSchema(schema); }
+const char* Data::getKey() const { return impl->getKey(); }
+void Data::setKey(const char* key) { impl->setKey(key); }
+void Data::touch() { impl->touch(); }
+void Data::destroy() { impl->destroy(); }
+Variant::Map Data::asMap() const { return impl->asMap(); }
diff --git a/qpid/cpp/src/qmf/engine/ObjectImpl.h b/qpid/cpp/src/qmf/engine/DataImpl.h
index 8be4ef655f..92559ae634 100644
--- a/qpid/cpp/src/qmf/engine/ObjectImpl.h
+++ b/qpid/cpp/src/qmf/engine/DataImpl.h
@@ -1,5 +1,5 @@
-#ifndef _QmfEngineObjectImpl_
-#define _QmfEngineObjectImpl_
+#ifndef _QmfEngineDataImpl_
+#define _QmfEngineDataImpl_
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,7 +20,7 @@
* under the License.
*/
-#include <qmf/engine/Object.h>
+#include <qmf/engine/Data.h>
#include <qpid/sys/Mutex.h>
#include <qpid/messaging/Variant.h>
#include <map>
@@ -33,13 +33,14 @@ namespace engine {
class SchemaClass;
- typedef boost::shared_ptr<Object> ObjectPtr;
+ typedef boost::shared_ptr<Data> DataPtr;
- struct ObjectImpl {
+ struct DataImpl {
/**
* Content of the object's data
*/
qpid::messaging::Variant::Map values;
+ qpid::messaging::Variant::Map subtypes;
/**
* Schema reference if this object is "described"
@@ -55,13 +56,16 @@ namespace engine {
uint64_t destroyTime;
uint64_t lastUpdatedTime;
- ObjectImpl();
- ObjectImpl(SchemaClass* type);
- ~ObjectImpl() {}
+ DataImpl();
+ DataImpl(SchemaClass* type, const qpid::messaging::Variant::Map&);
+ ~DataImpl() {}
const qpid::messaging::Variant::Map& getValues() const { return values; }
qpid::messaging::Variant::Map& getValues() { return values; }
+ const qpid::messaging::Variant::Map& getSubtypes() const { return subtypes; }
+ qpid::messaging::Variant::Map& getSubtypes() { return subtypes; }
+
const SchemaClass* getSchema() const { return objectClass; }
void setSchema(SchemaClass* schema) { objectClass = schema; }
@@ -70,6 +74,8 @@ namespace engine {
void touch();
void destroy();
+
+ qpid::messaging::Variant::Map asMap() const;
};
}
}
diff --git a/qpid/cpp/src/qmf/engine/EventImpl.cpp b/qpid/cpp/src/qmf/engine/EventImpl.cpp
deleted file mode 100644
index 27501cc396..0000000000
--- a/qpid/cpp/src/qmf/engine/EventImpl.cpp
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <qmf/engine/EventImpl.h>
-#include <qmf/engine/ValueImpl.h>
-
-#include <sstream>
-
-using namespace std;
-using namespace qmf::engine;
-using qpid::framing::Buffer;
-
-EventImpl::EventImpl(const SchemaEventClass* type) : eventClass(type)
-{
- int argCount = eventClass->getPropertyCount();
- int idx;
-
- for (idx = 0; idx < argCount; idx++) {
- const SchemaProperty* arg = eventClass->getProperty(idx);
- properties[arg->getName()] = ValuePtr(new Value(arg->getType()));
- }
-}
-
-
-EventImpl::EventImpl(const SchemaEventClass* type, Buffer&) :
- eventClass(type)
-{
-}
-
-
-Event* EventImpl::factory(const SchemaEventClass* type, Buffer& buffer)
-{
- EventImpl* impl(new EventImpl(type, buffer));
- return new Event(impl);
-}
-
-
-Value* EventImpl::getValue(const char* key) const
-{
- map<string, ValuePtr>::const_iterator iter;
-
- iter = properties.find(key);
- if (iter != properties.end())
- return iter->second.get();
-
- return 0;
-}
-
-
-void EventImpl::encodeSchemaKey(Buffer& buffer) const
-{
- buffer.putShortString(eventClass->getClassKey()->getPackageName());
- buffer.putShortString(eventClass->getClassKey()->getClassName());
- buffer.putBin128(const_cast<uint8_t*>(eventClass->getClassKey()->getHashData()));
-}
-
-
-void EventImpl::encode(Buffer& buffer) const
-{
- buffer.putOctet((uint8_t) eventClass->getSeverity());
-
- int argCount = eventClass->getPropertyCount();
- for (int idx = 0; idx < argCount; idx++) {
- const SchemaProperty* arg = eventClass->getProperty(idx);
- ValuePtr value = properties[arg->getName()];
- value->impl->encode(buffer);
- }
-}
-
-
-string EventImpl::getRoutingKey(uint32_t brokerBank, uint32_t agentBank) const
-{
- stringstream key;
-
- key << "console.event." << brokerBank << "." << agentBank << "." <<
- eventClass->getClassKey()->getPackageName() << "." <<
- eventClass->getClassKey()->getClassName();
- return key.str();
-}
-
-
-//==================================================================
-// Wrappers
-//==================================================================
-
-Event::Event(const SchemaEventClass* type) : impl(new EventImpl(type)) {}
-Event::Event(EventImpl* i) : impl(i) {}
-Event::Event(const Event& from) : impl(new EventImpl(*(from.impl))) {}
-Event::~Event() { delete impl; }
-const SchemaEventClass* Event::getClass() const { return impl->getClass(); }
-Value* Event::getValue(const char* key) const { return impl->getValue(key); }
-
diff --git a/qpid/cpp/src/qmf/engine/EventImpl.h b/qpid/cpp/src/qmf/engine/EventImpl.h
deleted file mode 100644
index ab790f08fa..0000000000
--- a/qpid/cpp/src/qmf/engine/EventImpl.h
+++ /dev/null
@@ -1,53 +0,0 @@
-#ifndef _QmfEngineEventImpl_
-#define _QmfEngineEventImpl_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <qmf/engine/Event.h>
-#include <qmf/engine/Schema.h>
-#include <qpid/framing/Buffer.h>
-#include <boost/shared_ptr.hpp>
-#include <map>
-
-namespace qmf {
-namespace engine {
-
- struct EventImpl {
- typedef boost::shared_ptr<Value> ValuePtr;
- const SchemaEventClass* eventClass;
- mutable std::map<std::string, ValuePtr> properties;
-
- EventImpl(const SchemaEventClass* type);
- EventImpl(const SchemaEventClass* type, qpid::framing::Buffer& buffer);
- static Event* factory(const SchemaEventClass* type, qpid::framing::Buffer& buffer);
-
- const SchemaEventClass* getClass() const { return eventClass; }
- Value* getValue(const char* key) const;
-
- void encodeSchemaKey(qpid::framing::Buffer& buffer) const;
- void encode(qpid::framing::Buffer& buffer) const;
- std::string getRoutingKey(uint32_t brokerBank, uint32_t agentBank) const;
- };
-
-}
-}
-
-#endif
-
diff --git a/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp b/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp
deleted file mode 100644
index 9216f7bac0..0000000000
--- a/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/ObjectIdImpl.h"
-#include <stdlib.h>
-#include <sstream>
-
-using namespace std;
-using namespace qmf::engine;
-using qpid::framing::Buffer;
-
-void AgentAttachment::setBanks(uint32_t broker, uint32_t agent)
-{
- first =
- ((uint64_t) (broker & 0x000fffff)) << 28 |
- ((uint64_t) (agent & 0x0fffffff));
-}
-
-ObjectIdImpl::ObjectIdImpl(Buffer& buffer) : agent(0)
-{
- decode(buffer);
-}
-
-ObjectIdImpl::ObjectIdImpl(AgentAttachment* a, uint8_t flags, uint16_t seq, uint64_t object) : agent(a)
-{
- first =
- ((uint64_t) (flags & 0x0f)) << 60 |
- ((uint64_t) (seq & 0x0fff)) << 48;
- second = object;
-}
-
-ObjectId* ObjectIdImpl::factory(Buffer& buffer)
-{
- ObjectIdImpl* impl(new ObjectIdImpl(buffer));
- return new ObjectId(impl);
-}
-
-ObjectId* ObjectIdImpl::factory(AgentAttachment* agent, uint8_t flags, uint16_t seq, uint64_t object)
-{
- ObjectIdImpl* impl(new ObjectIdImpl(agent, flags, seq, object));
- return new ObjectId(impl);
-}
-
-void ObjectIdImpl::decode(Buffer& buffer)
-{
- first = buffer.getLongLong();
- second = buffer.getLongLong();
-}
-
-void ObjectIdImpl::encode(Buffer& buffer) const
-{
- if (agent == 0)
- buffer.putLongLong(first);
- else
- buffer.putLongLong(first | agent->first);
- buffer.putLongLong(second);
-}
-
-void ObjectIdImpl::fromString(const std::string& repr)
-{
-#define FIELDS 5
-#if defined (_WIN32) && !defined (atoll)
-# define atoll(X) _atoi64(X)
-#endif
-
- std::string copy(repr.c_str());
- char* cText;
- char* field[FIELDS];
- bool atFieldStart = true;
- int idx = 0;
-
- cText = const_cast<char*>(copy.c_str());
- for (char* cursor = cText; *cursor; cursor++) {
- if (atFieldStart) {
- if (idx >= FIELDS)
- return; // TODO error
- field[idx++] = cursor;
- atFieldStart = false;
- } else {
- if (*cursor == '-') {
- *cursor = '\0';
- atFieldStart = true;
- }
- }
- }
-
- if (idx != FIELDS)
- return; // TODO error
-
- first = (atoll(field[0]) << 60) +
- (atoll(field[1]) << 48) +
- (atoll(field[2]) << 28) +
- atoll(field[3]);
- second = atoll(field[4]);
- agent = 0;
-}
-
-const string& ObjectIdImpl::asString() const
-{
- stringstream val;
-
- val << (int) getFlags() << "-" << getSequence() << "-" << getBrokerBank() << "-" <<
- getAgentBank() << "-" << getObjectNum();
- repr = val.str();
- return repr;
-}
-
-#define ACTUAL_FIRST (agent == 0 ? first : first | agent->first)
-#define ACTUAL_OTHER (other.agent == 0 ? other.first : other.first | other.agent->first)
-
-uint8_t ObjectIdImpl::getFlags() const
-{
- return (ACTUAL_FIRST & 0xF000000000000000LL) >> 60;
-}
-
-uint16_t ObjectIdImpl::getSequence() const
-{
- return (ACTUAL_FIRST & 0x0FFF000000000000LL) >> 48;
-}
-
-uint32_t ObjectIdImpl::getBrokerBank() const
-{
- return (ACTUAL_FIRST & 0x0000FFFFF0000000LL) >> 28;
-}
-
-uint32_t ObjectIdImpl::getAgentBank() const
-{
- return ACTUAL_FIRST & 0x000000000FFFFFFFLL;
-}
-
-uint64_t ObjectIdImpl::getObjectNum() const
-{
- return second;
-}
-
-uint32_t ObjectIdImpl::getObjectNumHi() const
-{
- return (uint32_t) (second >> 32);
-}
-
-uint32_t ObjectIdImpl::getObjectNumLo() const
-{
- return (uint32_t) (second & 0x00000000FFFFFFFFLL);
-}
-
-bool ObjectIdImpl::operator==(const ObjectIdImpl& other) const
-{
- return ACTUAL_FIRST == ACTUAL_OTHER && second == other.second;
-}
-
-bool ObjectIdImpl::operator<(const ObjectIdImpl& other) const
-{
- return (ACTUAL_FIRST < ACTUAL_OTHER) || ((ACTUAL_FIRST == ACTUAL_OTHER) && (second < other.second));
-}
-
-bool ObjectIdImpl::operator>(const ObjectIdImpl& other) const
-{
- return (ACTUAL_FIRST > ACTUAL_OTHER) || ((ACTUAL_FIRST == ACTUAL_OTHER) && (second > other.second));
-}
-
-
-//==================================================================
-// Wrappers
-//==================================================================
-
-ObjectId::ObjectId() : impl(new ObjectIdImpl()) {}
-ObjectId::ObjectId(const ObjectId& from) : impl(new ObjectIdImpl(*(from.impl))) {}
-ObjectId::ObjectId(ObjectIdImpl* i) : impl(i) {}
-ObjectId::~ObjectId() { delete impl; }
-uint64_t ObjectId::getObjectNum() const { return impl->getObjectNum(); }
-uint32_t ObjectId::getObjectNumHi() const { return impl->getObjectNumHi(); }
-uint32_t ObjectId::getObjectNumLo() const { return impl->getObjectNumLo(); }
-bool ObjectId::isDurable() const { return impl->isDurable(); }
-const char* ObjectId::str() const { return impl->asString().c_str(); }
-uint8_t ObjectId::getFlags() const { return impl->getFlags(); }
-uint16_t ObjectId::getSequence() const { return impl->getSequence(); }
-uint32_t ObjectId::getBrokerBank() const { return impl->getBrokerBank(); }
-uint32_t ObjectId::getAgentBank() const { return impl->getAgentBank(); }
-bool ObjectId::operator==(const ObjectId& other) const { return *impl == *other.impl; }
-bool ObjectId::operator<(const ObjectId& other) const { return *impl < *other.impl; }
-bool ObjectId::operator>(const ObjectId& other) const { return *impl > *other.impl; }
-bool ObjectId::operator<=(const ObjectId& other) const { return !(*impl > *other.impl); }
-bool ObjectId::operator>=(const ObjectId& other) const { return !(*impl < *other.impl); }
-ObjectId& ObjectId::operator=(const ObjectId& other) {
- ObjectIdImpl *old;
- if (this != &other) {
- old = impl;
- impl = new ObjectIdImpl(*(other.impl));
- if (old)
- delete old;
- }
- return *this;
-}
-
diff --git a/qpid/cpp/src/qmf/engine/ObjectIdImpl.h b/qpid/cpp/src/qmf/engine/ObjectIdImpl.h
deleted file mode 100644
index d70c8efff4..0000000000
--- a/qpid/cpp/src/qmf/engine/ObjectIdImpl.h
+++ /dev/null
@@ -1,72 +0,0 @@
-#ifndef _QmfEngineObjectIdImpl_
-#define _QmfEngineObjectIdImpl_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <qmf/engine/ObjectId.h>
-#include <qpid/framing/Buffer.h>
-
-namespace qmf {
-namespace engine {
-
- struct AgentAttachment {
- uint64_t first;
-
- AgentAttachment() : first(0) {}
- void setBanks(uint32_t broker, uint32_t bank);
- uint64_t getFirst() const { return first; }
- };
-
- struct ObjectIdImpl {
- AgentAttachment* agent;
- uint64_t first;
- uint64_t second;
- mutable std::string repr;
-
- ObjectIdImpl() : agent(0), first(0), second(0) {}
- ObjectIdImpl(qpid::framing::Buffer& buffer);
- ObjectIdImpl(AgentAttachment* agent, uint8_t flags, uint16_t seq, uint64_t object);
-
- static ObjectId* factory(qpid::framing::Buffer& buffer);
- static ObjectId* factory(AgentAttachment* agent, uint8_t flags, uint16_t seq, uint64_t object);
-
- void decode(qpid::framing::Buffer& buffer);
- void encode(qpid::framing::Buffer& buffer) const;
- void fromString(const std::string& repr);
- const std::string& asString() const;
- uint8_t getFlags() const;
- uint16_t getSequence() const;
- uint32_t getBrokerBank() const;
- uint32_t getAgentBank() const;
- uint64_t getObjectNum() const;
- uint32_t getObjectNumHi() const;
- uint32_t getObjectNumLo() const;
- bool isDurable() const { return getSequence() == 0; }
- void setValue(uint64_t f, uint64_t s) { first = f; second = s; agent = 0; }
-
- bool operator==(const ObjectIdImpl& other) const;
- bool operator<(const ObjectIdImpl& other) const;
- bool operator>(const ObjectIdImpl& other) const;
- };
-}
-}
-
-#endif
-
diff --git a/qpid/cpp/src/qmf/engine/ObjectImpl.cpp b/qpid/cpp/src/qmf/engine/ObjectImpl.cpp
deleted file mode 100644
index 353b16ee37..0000000000
--- a/qpid/cpp/src/qmf/engine/ObjectImpl.cpp
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/ObjectImpl.h"
-#include <qpid/sys/Time.h>
-
-using namespace std;
-using namespace qmf::engine;
-using namespace qpid::sys;
-using namespace qpid::messaging;
-
-ObjectImpl::ObjectImpl() :
- objectClass(0), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime)
-{
-}
-
-
-ObjectImpl::ObjectImpl(SchemaClass* type) :
- objectClass(type), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime)
-{
-}
-
-
-void ObjectImpl::touch()
-{
- lastUpdatedTime = uint64_t(Duration(now()));
-}
-
-
-void ObjectImpl::destroy()
-{
- destroyTime = uint64_t(Duration(now()));
-}
-
-
-//==================================================================
-// Wrappers
-//==================================================================
-
-Object::Object() : impl(new ObjectImpl()) {}
-Object::Object(SchemaClass* type) : impl(new ObjectImpl(type)) {}
-Object::Object(const Object& from) : impl(new ObjectImpl(*(from.impl))) {}
-Object::~Object() { delete impl; }
-const Variant::Map& Object::getValues() const { return impl->getValues(); }
-Variant::Map& Object::getValues() { return impl->getValues(); }
-const SchemaClass* Object::getSchema() const { return impl->getSchema(); }
-void Object::setSchema(SchemaClass* schema) { impl->setSchema(schema); }
-const char* Object::getKey() const { return impl->getKey(); }
-void Object::setKey(const char* key) { impl->setKey(key); }
-void Object::touch() { impl->touch(); }
-void Object::destroy() { impl->destroy(); }
diff --git a/qpid/cpp/src/qmf/engine/QueryImpl.cpp b/qpid/cpp/src/qmf/engine/QueryImpl.cpp
index 0df49ff646..371f400f7b 100644
--- a/qpid/cpp/src/qmf/engine/QueryImpl.cpp
+++ b/qpid/cpp/src/qmf/engine/QueryImpl.cpp
@@ -23,7 +23,17 @@ using namespace std;
using namespace qmf::engine;
using namespace qpid::messaging;
-bool QueryImpl::matches(const Object&) const
+QueryImpl::QueryImpl(const qpid::messaging::MapView&)
+{
+ // TODO
+}
+
+QueryImpl::QueryImpl(const qpid::messaging::ListView&)
+{
+ //TODO
+}
+
+bool QueryImpl::matches(const Variant::Map&) const
{
return true;
}
@@ -34,6 +44,17 @@ void QueryImpl::parsePredicate(const std::string&)
predicate.clear();
}
+Query* QueryImpl::factory(const qpid::messaging::MapView& map)
+{
+ QueryImpl* impl(new QueryImpl(map));
+ return new Query(impl);
+}
+
+Query* QueryImpl::factory(const qpid::messaging::ListView& pred)
+{
+ QueryImpl* impl(new QueryImpl(pred));
+ return new Query(impl);
+}
//==================================================================
// Wrappers
@@ -43,6 +64,7 @@ Query::Query(const char* target) : impl(new QueryImpl(target)) {}
Query::Query(const char* target, const Variant::List& predicate) : impl(new QueryImpl(target, predicate)) {}
Query::Query(const char* target, const char* expression) : impl(new QueryImpl(target, expression)) {}
Query::Query(const Query& from) : impl(new QueryImpl(*(from.impl))) {}
+Query::Query(QueryImpl* i) : impl(i) {}
Query::~Query() { delete impl; }
void Query::where(const Variant::List& predicate) { impl->where(predicate); }
void Query::where(const char* expression) { impl->where(expression); }
@@ -55,5 +77,5 @@ const Variant::List& Query::getPredicate() const { return impl->getPredicate();
uint32_t Query::getLimit() const { return impl->getLimit(); }
const char* Query::getOrderBy() const { return impl->getOrderBy(); }
bool Query::getDecreasing() const { return impl->getDecreasing(); }
-bool Query::matches(const Object& object) const { return impl->matches(object); }
+bool Query::matches(const Variant::Map& data) const { return impl->matches(data); }
diff --git a/qpid/cpp/src/qmf/engine/QueryImpl.h b/qpid/cpp/src/qmf/engine/QueryImpl.h
index 0ef8711f8e..326bbb7fa6 100644
--- a/qpid/cpp/src/qmf/engine/QueryImpl.h
+++ b/qpid/cpp/src/qmf/engine/QueryImpl.h
@@ -22,6 +22,8 @@
#include "qmf/engine/Query.h"
#include <qpid/messaging/Variant.h>
+#include <qpid/messaging/MapView.h>
+#include <qpid/messaging/ListView.h>
#include <string>
#include <boost/shared_ptr.hpp>
@@ -34,8 +36,13 @@ namespace engine {
target(_target), predicate(_predicate), resultLimit(0) {}
QueryImpl(const char* _target, const char* expression) :
target(_target), resultLimit(0) { parsePredicate(expression); }
+ QueryImpl(const qpid::messaging::MapView& map);
+ QueryImpl(const qpid::messaging::ListView& pred);
~QueryImpl() {}
+ static Query* factory(const qpid::messaging::MapView& map);
+ static Query* factory(const qpid::messaging::ListView& pred);
+
void where(const qpid::messaging::Variant::List& _predicate) { predicate = _predicate; }
void where(const char* expression) { parsePredicate(expression); }
void limit(uint32_t maxResults) { resultLimit = maxResults; }
@@ -48,7 +55,7 @@ namespace engine {
uint32_t getLimit() const { return resultLimit; }
const char* getOrderBy() const { return sortAttr.c_str(); }
bool getDecreasing() const { return orderDecreasing; }
- bool matches(const Object& object) const;
+ bool matches(const qpid::messaging::Variant::Map& data) const;
void parsePredicate(const std::string& expression);
diff --git a/qpid/cpp/src/qmf/engine/SchemaImpl.h b/qpid/cpp/src/qmf/engine/SchemaImpl.h
index 71a10559cf..29b327f6cd 100644
--- a/qpid/cpp/src/qmf/engine/SchemaImpl.h
+++ b/qpid/cpp/src/qmf/engine/SchemaImpl.h
@@ -41,7 +41,7 @@ namespace engine {
SchemaException(const std::string& context, const std::string& expected) {
text = context + ": Expected item with key " + expected;
}
- virtual ~SchemaException() throw();
+ virtual ~SchemaException() throw() {}
virtual const char* what() const throw() { return text.c_str(); }
private: