summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-02-26 23:11:19 +0000
committerTed Ross <tross@apache.org>2010-02-26 23:11:19 +0000
commit3296ad1ca8f77bf82fe9fd059c5e44580a4f2f4b (patch)
tree16f81a8ee4c6ff28ea4c3fd6e2aaf9f9a24e71ef /qpid/cpp/src
parentc1611f64a7c9dce39c19794dd3d887e3f1815b29 (diff)
downloadqpid-python-3296ad1ca8f77bf82fe9fd059c5e44580a4f2f4b.tar.gz
Checkpointing Agent engine code.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7@916854 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qmf.mk6
-rw-r--r--qpid/cpp/src/qmf/Agent.cpp145
-rw-r--r--qpid/cpp/src/qmf/Protocol.cpp24
-rw-r--r--qpid/cpp/src/qmf/engine/Agent.cpp131
-rw-r--r--qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp136
-rw-r--r--qpid/cpp/src/qmf/engine/BrokerProxyImpl.h17
-rw-r--r--qpid/cpp/src/qmf/engine/ConsoleImpl.cpp14
-rw-r--r--qpid/cpp/src/qmf/engine/ConsoleImpl.h4
-rw-r--r--qpid/cpp/src/qpid/messaging/ListContent.cpp6
-rw-r--r--qpid/cpp/src/qpid/messaging/MapContent.cpp6
-rw-r--r--qpid/cpp/src/tests/MessagingSessionTests.cpp53
11 files changed, 358 insertions, 184 deletions
diff --git a/qpid/cpp/src/qmf.mk b/qpid/cpp/src/qmf.mk
index 34f93c63ed..80e4f5bc46 100644
--- a/qpid/cpp/src/qmf.mk
+++ b/qpid/cpp/src/qmf.mk
@@ -44,7 +44,9 @@ QMF_ENGINE_API = \
../include/qmf/engine/Object.h \
../include/qmf/engine/QmfEngineImportExport.h \
../include/qmf/engine/Query.h \
- ../include/qmf/engine/Schema.h
+ ../include/qmf/engine/Schema.h \
+ ../include/qmf/Agent.h \
+ ../include/qmf/Notifiable.h
# ../include/qmf/engine/ObjectId.h
@@ -58,6 +60,8 @@ libqmf_la_SOURCES = \
qpid/agent/ManagementAgentImpl.cpp \
qpid/agent/ManagementAgentImpl.h
+# qmf/Agent.cpp
+
libqmfengine_la_SOURCES = \
$(QMF_ENGINE_API) \
qmf/engine/Agent.cpp \
diff --git a/qpid/cpp/src/qmf/Agent.cpp b/qpid/cpp/src/qmf/Agent.cpp
new file mode 100644
index 0000000000..bdaf064f0a
--- /dev/null
+++ b/qpid/cpp/src/qmf/Agent.cpp
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/Agent.h"
+#include "qmf/engine/Agent.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Condition.h"
+
+using namespace std;
+using namespace qmf;
+using namespace qpid::messaging;
+using qpid::sys::Duration;
+
+namespace qmf {
+ class AgentImpl {
+ public:
+ AgentImpl(const string& vendor, const string& product, const string& instance, const string& domain, bool internalStore,
+ AgentHandler* handler, Notifiable* notifiable);
+ ~AgentImpl();
+ void setAttribute(const string& name, const Variant& value) { agentEngine.setAttr(name.c_str(), value); }
+ void setStoreDir(const string& path) { agentEngine.setStoreDir(path.c_str()); }
+ void setConnection(Connection& conn) { agentEngine.setConnection(conn); }
+ void registerClass(SchemaClass* cls) { agentEngine.registerClass(cls); }
+ uint32_t invokeHandler(uint32_t limit, Duration timeout);
+ // const ObjectId* addObject(AgentObject& obj, bool persistent, uint64_t oid);
+ // const ObjectId* addObject(AgentObject& obj, bool persistent, uint32_t oidLo, uint32_t oidHi);
+ void raiseEvent(Event& event);
+ void queryResponse(uint32_t context, AgentObject& object);
+ void queryComplete(uint32_t context);
+ void methodResponse(uint32_t context, const Variant::Map& args, const Variant& exception);
+ private:
+ const string vendor;
+ const string product;
+ const string instance;
+ const string domain;
+ const bool internalStore;
+ AgentHandler* handler;
+ Notifiable* notifiable;
+ engine::Agent agentEngine;
+ qpid::sys::Mutex lock;
+ qpid::sys::Condition cond;
+ };
+}
+
+AgentImpl::AgentImpl(const string& _vendor, const string& _product, const string& _instance, const string& _domain,
+ bool _internalStore, AgentHandler* _handler, Notifiable* _notifiable) :
+ vendor(_vendor), product(_product), instance(_instance.empty() ? "TODO" : _instance),
+ domain(_domain.empty() ? "default" : _domain), internalStore(_internalStore),
+ handler(_handler), notifiable(_notifiable),
+ agentEngine(vendor.c_str(), product.c_str(), instance.c_str(), domain.c_str(), internalStore)
+{
+}
+
+AgentImpl::~AgentImpl()
+{
+}
+
+void AgentImpl::registerClass(SchemaClass* /*cls*/)
+{
+}
+
+uint32_t AgentImpl::invokeHandler(uint32_t limit, Duration timeout)
+{
+ engine::AgentEvent event;
+ bool valid;
+ qpid::sys::AbsTime endTime(qpid::sys::now(), timeout);
+
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ valid = agentEngine.getEvent(event);
+ while (!valid) {
+ if (!cond.wait(lock, endTime))
+ return 0;
+ valid = agentEngine.getEvent(event);
+ }
+ }
+
+ uint32_t count = 0;
+ while (valid) {
+ // TODO: Process event
+ count++;
+ if (limit > 0 && count == limit)
+ break;
+ agentEngine.popEvent();
+ valid = agentEngine.getEvent(event);
+ }
+
+ return count;
+}
+
+// const ObjectId* AgentImpl::addObject(AgentObject& obj, bool persistent, uint64_t oid);
+// const ObjectId* AgentImpl::addObject(AgentObject& obj, bool persistent, uint32_t oidLo, uint32_t oidHi);
+
+void AgentImpl::raiseEvent(Event& /*event*/)
+{
+}
+
+void AgentImpl::queryResponse(uint32_t /*context*/, AgentObject& /*object*/)
+{
+}
+
+void AgentImpl::queryComplete(uint32_t /*context*/)
+{
+}
+
+void AgentImpl::methodResponse(uint32_t /*context*/, const Variant::Map& /*args*/, const Variant& /*exception*/)
+{
+}
+
+
+//==================================================================
+// Wrappers
+//==================================================================
+Agent::Agent(const string& vendor, const string& product, const string& instance, const string& domain,
+ bool internalStore, AgentHandler* handler, Notifiable* notifiable) {
+ impl = new AgentImpl(vendor, product, instance, domain, internalStore, handler, notifiable); }
+Agent::~Agent() { delete impl; }
+void Agent::setAttribute(const string& name, const qpid::messaging::Variant& value) { impl->setAttribute(name, value); }
+void Agent::setStoreDir(const string& path) { impl->setStoreDir(path); }
+void Agent::setConnection(qpid::messaging::Connection& conn) { impl->setConnection(conn); }
+void Agent::registerClass(SchemaClass* cls) { impl->registerClass(cls); }
+uint32_t Agent::invokeHandler(uint32_t limit, qpid::sys::Duration timeout) { return impl->invokeHandler(limit, timeout); }
+//const ObjectId* Agent::addObject(AgentObject& obj, bool persistent, uint64_t oid);
+//const ObjectId* Agent::addObject(AgentObject& obj, bool persistent, uint32_t oidLo, uint32_t oidHi);
+void Agent::raiseEvent(Event& event) { impl->raiseEvent(event); }
+void Agent::queryResponse(uint32_t context, AgentObject& object) { impl->queryResponse(context, object); }
+void Agent::queryComplete(uint32_t context) { impl->queryComplete(context); }
+void Agent::methodResponse(uint32_t context, const Variant::Map& args, const Variant& exception) { impl->methodResponse(context, args, exception); }
+
diff --git a/qpid/cpp/src/qmf/Protocol.cpp b/qpid/cpp/src/qmf/Protocol.cpp
index faaa4c567d..518d263080 100644
--- a/qpid/cpp/src/qmf/Protocol.cpp
+++ b/qpid/cpp/src/qmf/Protocol.cpp
@@ -43,6 +43,30 @@ const string Protocol::SUBTYPES("_subtypes");
const string Protocol::SUBTYPE_SCHEMA_PROPERTY("qmfProperty");
const string Protocol::SUBTYPE_SCHEMA_METHOD("qmfMethod");
+const string Protocol::AMQP_CONTENT_MAP("amqp/map");
+const string Protocol::AMQP_CONTENT_LIST("amqp/list");
+
+const string Protocol::APP_OPCODE("qmf.opcode");
+
+const string Protocol::OP_AGENT_LOCATE_REQUEST("_agent_locate_request");
+const string Protocol::OP_AGENT_LOCATE_RESPONSE("_agent_locate_response");
+const string Protocol::OP_AGENT_HEARTBEAT_INDICATION("_agent_heartbeat_indication");
+const string Protocol::OP_QUERY_REQUEST("_query_request");
+const string Protocol::OP_QUERY_RESPONSE("_query_response");
+const string Protocol::OP_SUBSCRIBE_REQUEST("_subscribe_request");
+const string Protocol::OP_SUBSCRIBE_RESPONSE("_subscribe_response");
+const string Protocol::OP_SUBSCRIBE_CANCEL_INDICATION("_subscribe_cancel_indication");
+const string Protocol::OP_SUBSCRIBE_REFRESH_REQUEST("_subscribe_refresh_request");
+const string Protocol::OP_DATA_INDICATION("_data_indication");
+const string Protocol::OP_METHOD_REQUEST("_method_request");
+const string Protocol::OP_METHOD_RESPONSE("_method_response");
+
+const string Protocol::CONTENT_PACKAGE("_schema_package");
+const string Protocol::CONTENT_SCHEMA_ID("_schema_id");
+const string Protocol::CONTENT_SCHEMA_CLASS("_schema_class");
+const string Protocol::CONTENT_OBJECT_ID("_object_id");
+const string Protocol::CONTENT_DATA("_data");
+const string Protocol::CONTENT_EVENT("_event");
#if 0
bool Protocol::checkHeader(const Message& /*msg*/, string& /*opcode*/, uint32_t* /*seq*/)
diff --git a/qpid/cpp/src/qmf/engine/Agent.cpp b/qpid/cpp/src/qmf/engine/Agent.cpp
index 22d53e93b7..c0c8b69bc8 100644
--- a/qpid/cpp/src/qmf/engine/Agent.cpp
+++ b/qpid/cpp/src/qmf/engine/Agent.cpp
@@ -31,6 +31,7 @@
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Message.h>
+#include <qpid/messaging/MapView.h>
#include <string>
#include <deque>
#include <map>
@@ -83,6 +84,8 @@ namespace engine {
AgentImpl(const char* vendor, const char* product, const char* name, const char* domain, bool internalStore);
~AgentImpl();
+ void setNotifyCallback(Agent::notifyCb handler);
+ void setNotifyCallback(Notifiable* handler);
void setAttr(const char* key, const Variant& value);
void setStoreDir(const char* path);
void setTransferDir(const char* path);
@@ -107,10 +110,12 @@ namespace engine {
const string name;
const string domain;
string directAddr;
- map<string, Variant> attrMap;
+ Variant::Map attrMap;
string storeDir;
string transferDir;
bool internalStore;
+ Agent::notifyCb notifyHandler;
+ Notifiable* notifiable;
Uuid systemId;
uint16_t bootSequence;
uint32_t nextContextNum;
@@ -156,14 +161,20 @@ namespace engine {
AgentEventImpl::Ptr eventMethod(uint32_t num, const string& userId, const string& method,
const string& key, boost::shared_ptr<Variant::Map> argMap,
const SchemaClass* cls);
- void handleRcvMessageLH(qpid::messaging::Message& message);
+ void notify();
+ void handleRcvMessageLH(const Message& message);
+ void handleAgentLocateLH(const Message& message);
+ void handleQueryRequestLH(const Message& message);
+ void handleSubscribeRequest(const Message& message);
+ void handleSubscribeCancel(const Message& message);
+ void handleSubscribeRefresh(const Message& message);
+ void handleMethodRequest(const Message& message);
void sendPackageIndicationLH(const string& packageName);
void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key);
void sendCommandCompleteLH(const string& exchange, const string& key, uint32_t seq,
uint32_t code = 0, const string& text = "OK");
void sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text="");
- void handleAttachResponse(Message& msg);
void handlePackageRequest(Message& msg);
void handleClassQuery(Message& msg);
void handleSchemaRequest(Message& msg, uint32_t sequence,
@@ -198,12 +209,16 @@ AgentEvent AgentEventImpl::copy()
AgentImpl::AgentImpl(const char* _v, const char* _p, const char* _n, const char* _d, bool _i) :
vendor(_v), product(_p), name(_n), domain(_d ? _d : "default"), internalStore(_i),
+ notifyHandler(0), notifiable(0),
bootSequence(1), nextContextNum(1), running(true), thread(0)
{
directAddr = "qmf." + domain + ".direct/" + vendor + ":" + product + ":" + name;
if (_d == 0) {
directAddr += " { create:always }";
}
+ attrMap["vendor"] = vendor;
+ attrMap["product"] = product;
+ attrMap["name"] = name;
}
@@ -211,9 +226,22 @@ AgentImpl::~AgentImpl()
{
}
+void AgentImpl::setNotifyCallback(Agent::notifyCb handler)
+{
+ Mutex::ScopedLock _lock(lock);
+ notifyHandler = handler;
+}
+
+void AgentImpl::setNotifyCallback(Notifiable* handler)
+{
+ Mutex::ScopedLock _lock(lock);
+ notifiable = handler;
+}
+
void AgentImpl::setAttr(const char* key, const Variant& value)
{
- attrMap.insert(pair<string, Variant>(key, value));
+ Mutex::ScopedLock _lock(lock);
+ attrMap[key] = value;
}
void AgentImpl::setStoreDir(const char* path)
@@ -234,29 +262,6 @@ void AgentImpl::setTransferDir(const char* path)
transferDir.clear();
}
-/*
-void AgentImpl::handleRcvMessage(Message& message)
-{
- Buffer inBuffer(message.body, message.length);
- uint8_t opcode;
- uint32_t sequence;
- string replyToExchange(message.replyExchange ? message.replyExchange : "");
- string replyToKey(message.replyKey ? message.replyKey : "");
- string userId(message.userId ? message.userId : "");
-
- while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) {
- if (opcode == Protocol::OP_ATTACH_RESPONSE) handleAttachResponse(inBuffer);
- else if (opcode == Protocol::OP_SCHEMA_REQUEST) handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey);
- else if (opcode == Protocol::OP_CONSOLE_ADDED_INDICATION) handleConsoleAddedIndication();
- else if (opcode == Protocol::OP_GET_QUERY) handleGetQuery(inBuffer, sequence, replyToKey, userId);
- else if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(inBuffer, sequence, replyToKey, userId);
- else {
- QPID_LOG(error, "AgentImpl::handleRcvMessage invalid opcode=" << opcode);
- break;
- }
- }
-}
-*/
bool AgentImpl::getEvent(AgentEvent& event) const
{
@@ -277,9 +282,18 @@ void AgentImpl::popEvent()
void AgentImpl::setConnection(Connection& conn)
{
Mutex::ScopedLock _lock(lock);
+
+ //
+ // Don't permit the overwriting of an existing connection
+ // TODO: return an error or throw an exception if an overwrite is attempted.
+ //
if (connection == 0)
return;
connection = conn;
+
+ //
+ // Start the Agent thread now that we have a connection to work with.
+ //
thread = new qpid::sys::Thread(*this);
}
@@ -384,6 +398,61 @@ void AgentImpl::stop()
running = false;
}
+void AgentImpl::handleRcvMessageLH(const Message& message)
+{
+ Variant::Map headers(message.getHeaders());
+ cout << "AgentImpl::handleRcvMessageLH headers=" << headers << endl;
+
+ if (message.getContentType() != Protocol::AMQP_CONTENT_MAP)
+ return;
+
+ Variant::Map::const_iterator iter = headers.find(Protocol::APP_OPCODE);
+ if (iter == headers.end())
+ return;
+ string opcode = iter->second.asString();
+
+ if (opcode == Protocol::OP_AGENT_LOCATE_REQUEST) handleAgentLocateLH(message);
+ if (opcode == Protocol::OP_QUERY_REQUEST) handleQueryRequestLH(message);
+ if (opcode == Protocol::OP_SUBSCRIBE_REQUEST) handleSubscribeRequest(message);
+ if (opcode == Protocol::OP_SUBSCRIBE_CANCEL_INDICATION) handleSubscribeCancel(message);
+ if (opcode == Protocol::OP_SUBSCRIBE_REFRESH_REQUEST) handleSubscribeRefresh(message);
+ if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(message);
+}
+
+void AgentImpl::handleAgentLocateLH(const Message& message)
+{
+ const MapView predicate(message);
+
+ //if (predicateMatches(predicate, attrMap)) {
+ // sendResponse(message, Protocol::OP_AGENT_LOCATE_RESPONSE, attrMap);
+ //}
+}
+
+void AgentImpl::handleQueryRequestLH(const Message& message)
+{
+ const MapView map(message);
+}
+
+void AgentImpl::handleSubscribeRequest(const Message& message)
+{
+ const MapView map(message);
+}
+
+void AgentImpl::handleSubscribeCancel(const Message& message)
+{
+ const MapView map(message);
+}
+
+void AgentImpl::handleSubscribeRefresh(const Message& message)
+{
+ const MapView map(message);
+}
+
+void AgentImpl::handleMethodRequest(const Message& message)
+{
+ const MapView map(message);
+}
+
AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string&, const string&, const string& key)
{
AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY));
@@ -407,8 +476,12 @@ AgentEventImpl::Ptr AgentImpl::eventMethod(uint32_t num, const string& userId, c
return event;
}
-void AgentImpl::handleRcvMessageLH(qpid::messaging::Message& /*msg*/)
+void AgentImpl::notify()
{
+ if (notifyHandler != 0)
+ notifyHandler();
+ if (notifiable != 0)
+ notifiable->notify();
}
void AgentImpl::sendPackageIndicationLH(const string& packageName)
@@ -471,6 +544,8 @@ void AgentImpl::handleMethodRequest(Message& /*msg*/, uint32_t sequence, const s
Agent::Agent(const char* v, const char* p, const char* n, const char* d, bool i) { impl = new AgentImpl(v, p, n, d, i); }
Agent::~Agent() { delete impl; }
+void Agent::setNotifyCallback(notifyCb handler) { impl->setNotifyCallback(handler); }
+void Agent::setNotifyCallback(Notifiable* handler) { impl->setNotifyCallback(handler); }
void Agent::setAttr(const char* key, const Variant& value) { impl->setAttr(key, value); }
void Agent::setStoreDir(const char* path) { impl->setStoreDir(path); }
void Agent::setTransferDir(const char* path) { impl->setTransferDir(path); }
diff --git a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp
index 46ed653576..f76c69b446 100644
--- a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp
+++ b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp
@@ -19,7 +19,7 @@
#include "qmf/engine/BrokerProxyImpl.h"
#include "qmf/engine/ConsoleImpl.h"
-#include "qmf/engine/Protocol.h"
+#include "qmf/Protocol.h"
#include "qpid/Address.h"
#include "qpid/sys/SystemInfo.h"
#include <qpid/log/Statement.h>
@@ -30,7 +30,7 @@
using namespace std;
using namespace qmf::engine;
-using namespace qpid::framing;
+using namespace qpid::messaging;
using namespace qpid::sys;
namespace {
@@ -64,10 +64,6 @@ BrokerEvent BrokerEventImpl::copy()
::memset(&item, 0, sizeof(BrokerEvent));
item.kind = kind;
-
- STRING_REF(name);
- STRING_REF(exchange);
- STRING_REF(bindingKey);
item.context = context;
item.queryResponse = queryResponse.get();
item.methodResponse = methodResponse.get();
@@ -87,63 +83,12 @@ BrokerProxyImpl::BrokerProxyImpl(BrokerProxy& pub, Console& _console) : publicOb
seqMgr.setUnsolicitedContext(SequenceContext::Ptr(new StaticContext(*this)));
}
-void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/)
-{
- Mutex::ScopedLock _lock(lock);
- agentList.clear();
- eventQueue.clear();
- xmtQueue.clear();
- eventQueue.push_back(eventDeclareQueue(queueName));
- eventQueue.push_back(eventBind(DIR_EXCHANGE, queueName, queueName));
- eventQueue.push_back(eventSetupComplete());
-
- // TODO: Store session handle
-}
-
-void BrokerProxyImpl::sessionClosed()
-{
- Mutex::ScopedLock _lock(lock);
- agentList.clear();
- eventQueue.clear();
- xmtQueue.clear();
-}
-
-void BrokerProxyImpl::startProtocol()
-{
- AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, 0, "Agent embedded in broker"));
- {
- Mutex::ScopedLock _lock(lock);
- char rawbuffer[512];
- Buffer buffer(rawbuffer, 512);
-
- agentList[0] = agent;
-
- requestsOutstanding = 1;
- topicBound = false;
- uint32_t sequence(seqMgr.reserve());
- Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence);
- sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence);
- }
-
- console.impl->eventAgentAdded(agent);
-}
-
-void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
+void BrokerProxyImpl::sendBufferLH(Buffer&, const string&, const string&)
{
- uint32_t length = buf.getPosition();
- MessageImpl::Ptr message(new MessageImpl);
-
- buf.reset();
- buf.getRawData(message->body, length);
- message->destination = destination;
- message->routingKey = routingKey;
- message->replyExchange = DIR_EXCHANGE;
- message->replyKey = queueName;
-
- xmtQueue.push_back(message);
+ // TODO
}
+/*
void BrokerProxyImpl::handleRcvMessage(Message& message)
{
Buffer inBuffer(message.body, message.length);
@@ -153,22 +98,7 @@ void BrokerProxyImpl::handleRcvMessage(Message& message)
while (Protocol::checkHeader(inBuffer, &opcode, &sequence))
seqMgr.dispatch(opcode, sequence, message.routingKey ? string(message.routingKey) : string(), inBuffer);
}
-
-bool BrokerProxyImpl::getXmtMessage(Message& item) const
-{
- Mutex::ScopedLock _lock(lock);
- if (xmtQueue.empty())
- return false;
- item = xmtQueue.front()->copy();
- return true;
-}
-
-void BrokerProxyImpl::popXmt()
-{
- Mutex::ScopedLock _lock(lock);
- if (!xmtQueue.empty())
- xmtQueue.pop_front();
-}
+*/
bool BrokerProxyImpl::getEvent(BrokerEvent& event) const
{
@@ -227,10 +157,6 @@ void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentPr
bool BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent)
{
- if (query.impl->singleAgent()) {
- if (query.impl->agentBank() != agent->getAgentBank())
- return false;
- }
stringstream key;
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t sequence(seqMgr.reserve(queryContext));
@@ -269,7 +195,7 @@ string BrokerProxyImpl::encodeMethodArguments(const SchemaMethod* schema, const
return string();
}
-void BrokerProxyImpl::sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls,
+void BrokerProxyImpl::sendMethodRequest(ObjectId* oid, const SchemaClass* cls,
const string& methodName, const Value* args, void* userContext)
{
int methodCount = cls->getMethodCount();
@@ -452,43 +378,13 @@ void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq
void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq)
{
- SchemaObjectClass* oClassPtr;
- SchemaEventClass* eClassPtr;
+ SchemaClass* classPtr;
uint8_t kind = inBuffer.getOctet();
const SchemaClassKey* key;
- if (kind == CLASS_OBJECT) {
- oClassPtr = SchemaObjectClassImpl::factory(inBuffer);
- console.impl->learnClass(oClassPtr);
- key = oClassPtr->getClassKey();
- QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->impl->str());
-
- //
- // If we have just learned about the org.apache.qpid.broker:agent class, send a get
- // request for the current list of agents so we can have it on-hand before we declare
- // this session "stable".
- //
- if (key->impl->getClassName() == AGENT_CLASS && key->impl->getPackageName() == BROKER_PACKAGE) {
- Mutex::ScopedLock _lock(lock);
- incOutstandingLH();
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve());
- Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
- FieldTable ft;
- ft.setString("_class", AGENT_CLASS);
- ft.setString("_package", BROKER_PACKAGE);
- ft.encode(outBuffer);
- sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_AGENT_KEY);
- QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << BROKER_AGENT_KEY);
- }
- } else if (kind == CLASS_EVENT) {
- eClassPtr = SchemaEventClassImpl::factory(inBuffer);
- console.impl->learnClass(eClassPtr);
- key = eClassPtr->getClassKey();
- QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=event key=" << key->impl->str());
- }
- else {
- QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind);
- }
+ classPtr = SchemaClassImpl::factory(inBuffer);
+ console.impl->learnClass(classPtr);
+ key = classPtr->getClassKey();
+ QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->impl->str());
}
ObjectPtr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat)
@@ -496,7 +392,7 @@ ObjectPtr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq
auto_ptr<SchemaClassKey> classKey(SchemaClassKeyImpl::factory(inBuffer));
QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey->impl->str());
- SchemaObjectClass* schema = console.impl->getSchema(classKey.get());
+ SchemaClass* schema = console.impl->getSchema(classKey.get());
if (schema == 0) {
QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey->impl->str());
return ObjectPtr();
@@ -749,12 +645,6 @@ uint32_t AgentProxy::getAgentBank() const { return impl->getAgentBank(); }
BrokerProxy::BrokerProxy(Console& console) : impl(new BrokerProxyImpl(*this, console)) {}
BrokerProxy::~BrokerProxy() { delete impl; }
-void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); }
-void BrokerProxy::sessionClosed() { impl->sessionClosed(); }
-void BrokerProxy::startProtocol() { impl->startProtocol(); }
-void BrokerProxy::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); }
-bool BrokerProxy::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); }
-void BrokerProxy::popXmt() { impl->popXmt(); }
bool BrokerProxy::getEvent(BrokerEvent& event) const { return impl->getEvent(event); }
void BrokerProxy::popEvent() { impl->popEvent(); }
uint32_t BrokerProxy::agentCount() const { return impl->agentCount(); }
diff --git a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h
index 031eb698e0..ab45982dfe 100644
--- a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h
+++ b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h
@@ -25,8 +25,6 @@
#include "qmf/engine/SchemaImpl.h"
#include "qmf/engine/QueryImpl.h"
#include "qmf/engine/SequenceManager.h"
-#include "qmf/engine/MessageImpl.h"
-#include "qpid/framing/Uuid.h"
#include "qpid/messaging/Variant.h"
#include "qpid/sys/Mutex.h"
#include "boost/shared_ptr.hpp"
@@ -80,9 +78,6 @@ namespace engine {
struct BrokerEventImpl {
typedef boost::shared_ptr<BrokerEventImpl> Ptr;
BrokerEvent::EventKind kind;
- std::string name;
- std::string exchange;
- std::string bindingKey;
void* context;
QueryResponsePtr queryResponse;
MethodResponsePtr methodResponse;
@@ -123,14 +118,7 @@ namespace engine {
BrokerProxyImpl(BrokerProxy& pub, Console& _console);
~BrokerProxyImpl() {}
- void sessionOpened(SessionHandle& sh);
- void sessionClosed();
- void startProtocol();
-
void sendBufferLH(qpid::framing::Buffer& buf, const std::string& destination, const std::string& routingKey);
- void handleRcvMessage(Message& message);
- bool getXmtMessage(Message& item) const;
- void popXmt();
bool getEvent(BrokerEvent& event) const;
void popEvent();
@@ -140,7 +128,7 @@ namespace engine {
void sendQuery(const Query& query, void* context, const AgentProxy* agent);
bool sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent);
std::string encodeMethodArguments(const SchemaMethod* schema, const qpid::messaging::Variant::Map* args, qpid::framing::Buffer& buffer);
- void sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls, const std::string& method, const qpid::messaging::Variant::Map* args, void* context);
+ void sendMethodRequest(ObjectId* oid, const SchemaClass* cls, const std::string& method, const qpid::messaging::Variant::Map* args, void* context);
void addBinding(const std::string& exchange, const std::string& key);
void staticRelease() { decOutstanding(); }
@@ -153,12 +141,11 @@ namespace engine {
mutable qpid::sys::Mutex lock;
Console& console;
std::string queueName;
- qpid::framing::Uuid brokerId;
+ qpid::messaging::Uuid brokerId;
SequenceManager seqMgr;
uint32_t requestsOutstanding;
bool topicBound;
std::map<uint32_t, AgentProxyPtr> agentList;
- std::deque<MessageImpl::Ptr> xmtQueue;
std::deque<BrokerEventImpl::Ptr> eventQueue;
# define MA_BUFFER_SIZE 65536
diff --git a/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp b/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp
index c2d1f51f2b..75cfbed822 100644
--- a/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp
+++ b/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp
@@ -18,20 +18,6 @@
*/
#include "qmf/engine/ConsoleImpl.h"
-#include "qmf/engine/MessageImpl.h"
-#include "qmf/engine/SchemaImpl.h"
-#include "qmf/engine/Typecode.h"
-#include "qmf/engine/ObjectImpl.h"
-#include "qmf/engine/ObjectIdImpl.h"
-#include "qmf/engine/QueryImpl.h"
-#include "qmf/engine/ValueImpl.h"
-#include "qmf/engine/Protocol.h"
-#include "qmf/engine/SequenceManager.h"
-#include "qmf/engine/BrokerProxyImpl.h"
-#include <qpid/framing/Buffer.h>
-#include <qpid/framing/Uuid.h>
-#include <qpid/framing/FieldTable.h>
-#include <qpid/framing/FieldValue.h>
#include <qpid/log/Statement.h>
#include <qpid/sys/Time.h>
#include <qpid/sys/SystemInfo.h>
diff --git a/qpid/cpp/src/qmf/engine/ConsoleImpl.h b/qpid/cpp/src/qmf/engine/ConsoleImpl.h
index d459e128f9..30b565058b 100644
--- a/qpid/cpp/src/qmf/engine/ConsoleImpl.h
+++ b/qpid/cpp/src/qmf/engine/ConsoleImpl.h
@@ -21,15 +21,13 @@
*/
#include "qmf/engine/Console.h"
-#include "qmf/engine/MessageImpl.h"
#include "qmf/engine/SchemaImpl.h"
#include "qmf/engine/ObjectImpl.h"
#include "qmf/engine/ObjectIdImpl.h"
#include "qmf/engine/QueryImpl.h"
-#include "qmf/engine/Protocol.h"
+#include "qmf/Protocol.h"
#include "qmf/engine/SequenceManager.h"
#include "qmf/engine/BrokerProxyImpl.h"
-#include <qpid/framing/Uuid.h>
#include <qpid/sys/Mutex.h>
#include <qpid/sys/Time.h>
#include <qpid/sys/SystemInfo.h>
diff --git a/qpid/cpp/src/qpid/messaging/ListContent.cpp b/qpid/cpp/src/qpid/messaging/ListContent.cpp
index 0c3ca5fc62..038c1fad0b 100644
--- a/qpid/cpp/src/qpid/messaging/ListContent.cpp
+++ b/qpid/cpp/src/qpid/messaging/ListContent.cpp
@@ -37,6 +37,11 @@ class ListContentImpl : public Variant
}
}
+ ListContentImpl(Message& m, const Variant::List& i) : Variant(i), msg(&m)
+ {
+ msg->getContent().clear();
+ }
+
void encode()
{
qpid::client::amqp0_10::ListCodec codec;
@@ -45,6 +50,7 @@ class ListContentImpl : public Variant
};
ListContent::ListContent(Message& m) : impl(new ListContentImpl(m)) {}
+ListContent::ListContent(Message& m, const Variant::List& i) : impl(new ListContentImpl(m, i)) {}
ListContent::~ListContent() { delete impl; }
ListContent& ListContent::operator=(const ListContent& l) { *impl = *l.impl; return *this; }
diff --git a/qpid/cpp/src/qpid/messaging/MapContent.cpp b/qpid/cpp/src/qpid/messaging/MapContent.cpp
index 6dba22be99..1f190b85aa 100644
--- a/qpid/cpp/src/qpid/messaging/MapContent.cpp
+++ b/qpid/cpp/src/qpid/messaging/MapContent.cpp
@@ -37,6 +37,11 @@ class MapContentImpl : public Variant
}
}
+ MapContentImpl(Message& m, const Variant::Map& i) : Variant(i), msg(&m)
+ {
+ msg->getContent().clear();
+ }
+
void encode()
{
qpid::client::amqp0_10::MapCodec codec;
@@ -46,6 +51,7 @@ class MapContentImpl : public Variant
};
MapContent::MapContent(Message& m) : impl(new MapContentImpl(m)) {}
+MapContent::MapContent(Message& m, const Variant::Map& i) : impl(new MapContentImpl(m, i)) {}
MapContent::~MapContent() { delete impl; }
MapContent& MapContent::operator=(const MapContent& m) { *impl = *m.impl; return *this; }
diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp
index 9c6f066d64..98b92e809c 100644
--- a/qpid/cpp/src/tests/MessagingSessionTests.cpp
+++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp
@@ -346,6 +346,25 @@ QPID_AUTO_TEST_CASE(testMapMessage)
fix.session.acknowledge();
}
+QPID_AUTO_TEST_CASE(testMapMessageWithInitial)
+{
+ QueueFixture fix;
+ Sender sender = fix.session.createSender(fix.queue);
+ Message out;
+ Variant::Map imap;
+ imap["abc"] = "def";
+ imap["pi"] = 3.14f;
+ MapContent content(out, imap);
+ content.encode();
+ sender.send(out);
+ Receiver receiver = fix.session.createReceiver(fix.queue);
+ Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
+ MapView view(in);
+ BOOST_CHECK_EQUAL(view["abc"].asString(), "def");
+ BOOST_CHECK_EQUAL(view["pi"].asFloat(), 3.14f);
+ fix.session.acknowledge();
+}
+
QPID_AUTO_TEST_CASE(testListMessage)
{
QueueFixture fix;
@@ -379,6 +398,40 @@ QPID_AUTO_TEST_CASE(testListMessage)
fix.session.acknowledge();
}
+QPID_AUTO_TEST_CASE(testListMessageWithInitial)
+{
+ QueueFixture fix;
+ Sender sender = fix.session.createSender(fix.queue);
+ Message out;
+ Variant::List ilist;
+ ilist.push_back(Variant("abc"));
+ ilist.push_back(Variant(1234));
+ ilist.push_back(Variant("def"));
+ ilist.push_back(Variant(56.789));
+ ListContent content(out, ilist);
+ content.encode();
+ sender.send(out);
+ Receiver receiver = fix.session.createReceiver(fix.queue);
+ Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
+ ListView view(in);
+ BOOST_CHECK_EQUAL(view.size(), content.size());
+ BOOST_CHECK_EQUAL(view.front().asString(), "abc");
+ BOOST_CHECK_EQUAL(view.back().asDouble(), 56.789);
+
+ ListView::const_iterator i = view.begin();
+ BOOST_CHECK(i != view.end());
+ BOOST_CHECK_EQUAL(i->asString(), "abc");
+ BOOST_CHECK(++i != view.end());
+ BOOST_CHECK_EQUAL(i->asInt64(), 1234);
+ BOOST_CHECK(++i != view.end());
+ BOOST_CHECK_EQUAL(i->asString(), "def");
+ BOOST_CHECK(++i != view.end());
+ BOOST_CHECK_EQUAL(i->asDouble(), 56.789);
+ BOOST_CHECK(++i == view.end());
+
+ fix.session.acknowledge();
+}
+
QPID_AUTO_TEST_CASE(testReject)
{
QueueFixture fix;