diff options
author | Ted Ross <tross@apache.org> | 2010-02-26 23:11:19 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2010-02-26 23:11:19 +0000 |
commit | 3296ad1ca8f77bf82fe9fd059c5e44580a4f2f4b (patch) | |
tree | 16f81a8ee4c6ff28ea4c3fd6e2aaf9f9a24e71ef /qpid/cpp/src | |
parent | c1611f64a7c9dce39c19794dd3d887e3f1815b29 (diff) | |
download | qpid-python-3296ad1ca8f77bf82fe9fd059c5e44580a4f2f4b.tar.gz |
Checkpointing Agent engine code.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7@916854 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qmf.mk | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/Agent.cpp | 145 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/Protocol.cpp | 24 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/Agent.cpp | 131 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp | 136 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/BrokerProxyImpl.h | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/ConsoleImpl.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/ConsoleImpl.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/ListContent.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/MapContent.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/tests/MessagingSessionTests.cpp | 53 |
11 files changed, 358 insertions, 184 deletions
diff --git a/qpid/cpp/src/qmf.mk b/qpid/cpp/src/qmf.mk index 34f93c63ed..80e4f5bc46 100644 --- a/qpid/cpp/src/qmf.mk +++ b/qpid/cpp/src/qmf.mk @@ -44,7 +44,9 @@ QMF_ENGINE_API = \ ../include/qmf/engine/Object.h \ ../include/qmf/engine/QmfEngineImportExport.h \ ../include/qmf/engine/Query.h \ - ../include/qmf/engine/Schema.h + ../include/qmf/engine/Schema.h \ + ../include/qmf/Agent.h \ + ../include/qmf/Notifiable.h # ../include/qmf/engine/ObjectId.h @@ -58,6 +60,8 @@ libqmf_la_SOURCES = \ qpid/agent/ManagementAgentImpl.cpp \ qpid/agent/ManagementAgentImpl.h +# qmf/Agent.cpp + libqmfengine_la_SOURCES = \ $(QMF_ENGINE_API) \ qmf/engine/Agent.cpp \ diff --git a/qpid/cpp/src/qmf/Agent.cpp b/qpid/cpp/src/qmf/Agent.cpp new file mode 100644 index 0000000000..bdaf064f0a --- /dev/null +++ b/qpid/cpp/src/qmf/Agent.cpp @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qmf/Agent.h" +#include "qmf/engine/Agent.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Condition.h" + +using namespace std; +using namespace qmf; +using namespace qpid::messaging; +using qpid::sys::Duration; + +namespace qmf { + class AgentImpl { + public: + AgentImpl(const string& vendor, const string& product, const string& instance, const string& domain, bool internalStore, + AgentHandler* handler, Notifiable* notifiable); + ~AgentImpl(); + void setAttribute(const string& name, const Variant& value) { agentEngine.setAttr(name.c_str(), value); } + void setStoreDir(const string& path) { agentEngine.setStoreDir(path.c_str()); } + void setConnection(Connection& conn) { agentEngine.setConnection(conn); } + void registerClass(SchemaClass* cls) { agentEngine.registerClass(cls); } + uint32_t invokeHandler(uint32_t limit, Duration timeout); + // const ObjectId* addObject(AgentObject& obj, bool persistent, uint64_t oid); + // const ObjectId* addObject(AgentObject& obj, bool persistent, uint32_t oidLo, uint32_t oidHi); + void raiseEvent(Event& event); + void queryResponse(uint32_t context, AgentObject& object); + void queryComplete(uint32_t context); + void methodResponse(uint32_t context, const Variant::Map& args, const Variant& exception); + private: + const string vendor; + const string product; + const string instance; + const string domain; + const bool internalStore; + AgentHandler* handler; + Notifiable* notifiable; + engine::Agent agentEngine; + qpid::sys::Mutex lock; + qpid::sys::Condition cond; + }; +} + +AgentImpl::AgentImpl(const string& _vendor, const string& _product, const string& _instance, const string& _domain, + bool _internalStore, AgentHandler* _handler, Notifiable* _notifiable) : + vendor(_vendor), product(_product), instance(_instance.empty() ? "TODO" : _instance), + domain(_domain.empty() ? "default" : _domain), internalStore(_internalStore), + handler(_handler), notifiable(_notifiable), + agentEngine(vendor.c_str(), product.c_str(), instance.c_str(), domain.c_str(), internalStore) +{ +} + +AgentImpl::~AgentImpl() +{ +} + +void AgentImpl::registerClass(SchemaClass* /*cls*/) +{ +} + +uint32_t AgentImpl::invokeHandler(uint32_t limit, Duration timeout) +{ + engine::AgentEvent event; + bool valid; + qpid::sys::AbsTime endTime(qpid::sys::now(), timeout); + + { + qpid::sys::Mutex::ScopedLock l(lock); + valid = agentEngine.getEvent(event); + while (!valid) { + if (!cond.wait(lock, endTime)) + return 0; + valid = agentEngine.getEvent(event); + } + } + + uint32_t count = 0; + while (valid) { + // TODO: Process event + count++; + if (limit > 0 && count == limit) + break; + agentEngine.popEvent(); + valid = agentEngine.getEvent(event); + } + + return count; +} + +// const ObjectId* AgentImpl::addObject(AgentObject& obj, bool persistent, uint64_t oid); +// const ObjectId* AgentImpl::addObject(AgentObject& obj, bool persistent, uint32_t oidLo, uint32_t oidHi); + +void AgentImpl::raiseEvent(Event& /*event*/) +{ +} + +void AgentImpl::queryResponse(uint32_t /*context*/, AgentObject& /*object*/) +{ +} + +void AgentImpl::queryComplete(uint32_t /*context*/) +{ +} + +void AgentImpl::methodResponse(uint32_t /*context*/, const Variant::Map& /*args*/, const Variant& /*exception*/) +{ +} + + +//================================================================== +// Wrappers +//================================================================== +Agent::Agent(const string& vendor, const string& product, const string& instance, const string& domain, + bool internalStore, AgentHandler* handler, Notifiable* notifiable) { + impl = new AgentImpl(vendor, product, instance, domain, internalStore, handler, notifiable); } +Agent::~Agent() { delete impl; } +void Agent::setAttribute(const string& name, const qpid::messaging::Variant& value) { impl->setAttribute(name, value); } +void Agent::setStoreDir(const string& path) { impl->setStoreDir(path); } +void Agent::setConnection(qpid::messaging::Connection& conn) { impl->setConnection(conn); } +void Agent::registerClass(SchemaClass* cls) { impl->registerClass(cls); } +uint32_t Agent::invokeHandler(uint32_t limit, qpid::sys::Duration timeout) { return impl->invokeHandler(limit, timeout); } +//const ObjectId* Agent::addObject(AgentObject& obj, bool persistent, uint64_t oid); +//const ObjectId* Agent::addObject(AgentObject& obj, bool persistent, uint32_t oidLo, uint32_t oidHi); +void Agent::raiseEvent(Event& event) { impl->raiseEvent(event); } +void Agent::queryResponse(uint32_t context, AgentObject& object) { impl->queryResponse(context, object); } +void Agent::queryComplete(uint32_t context) { impl->queryComplete(context); } +void Agent::methodResponse(uint32_t context, const Variant::Map& args, const Variant& exception) { impl->methodResponse(context, args, exception); } + diff --git a/qpid/cpp/src/qmf/Protocol.cpp b/qpid/cpp/src/qmf/Protocol.cpp index faaa4c567d..518d263080 100644 --- a/qpid/cpp/src/qmf/Protocol.cpp +++ b/qpid/cpp/src/qmf/Protocol.cpp @@ -43,6 +43,30 @@ const string Protocol::SUBTYPES("_subtypes"); const string Protocol::SUBTYPE_SCHEMA_PROPERTY("qmfProperty"); const string Protocol::SUBTYPE_SCHEMA_METHOD("qmfMethod"); +const string Protocol::AMQP_CONTENT_MAP("amqp/map"); +const string Protocol::AMQP_CONTENT_LIST("amqp/list"); + +const string Protocol::APP_OPCODE("qmf.opcode"); + +const string Protocol::OP_AGENT_LOCATE_REQUEST("_agent_locate_request"); +const string Protocol::OP_AGENT_LOCATE_RESPONSE("_agent_locate_response"); +const string Protocol::OP_AGENT_HEARTBEAT_INDICATION("_agent_heartbeat_indication"); +const string Protocol::OP_QUERY_REQUEST("_query_request"); +const string Protocol::OP_QUERY_RESPONSE("_query_response"); +const string Protocol::OP_SUBSCRIBE_REQUEST("_subscribe_request"); +const string Protocol::OP_SUBSCRIBE_RESPONSE("_subscribe_response"); +const string Protocol::OP_SUBSCRIBE_CANCEL_INDICATION("_subscribe_cancel_indication"); +const string Protocol::OP_SUBSCRIBE_REFRESH_REQUEST("_subscribe_refresh_request"); +const string Protocol::OP_DATA_INDICATION("_data_indication"); +const string Protocol::OP_METHOD_REQUEST("_method_request"); +const string Protocol::OP_METHOD_RESPONSE("_method_response"); + +const string Protocol::CONTENT_PACKAGE("_schema_package"); +const string Protocol::CONTENT_SCHEMA_ID("_schema_id"); +const string Protocol::CONTENT_SCHEMA_CLASS("_schema_class"); +const string Protocol::CONTENT_OBJECT_ID("_object_id"); +const string Protocol::CONTENT_DATA("_data"); +const string Protocol::CONTENT_EVENT("_event"); #if 0 bool Protocol::checkHeader(const Message& /*msg*/, string& /*opcode*/, uint32_t* /*seq*/) diff --git a/qpid/cpp/src/qmf/engine/Agent.cpp b/qpid/cpp/src/qmf/engine/Agent.cpp index 22d53e93b7..c0c8b69bc8 100644 --- a/qpid/cpp/src/qmf/engine/Agent.cpp +++ b/qpid/cpp/src/qmf/engine/Agent.cpp @@ -31,6 +31,7 @@ #include <qpid/messaging/Receiver.h> #include <qpid/messaging/Sender.h> #include <qpid/messaging/Message.h> +#include <qpid/messaging/MapView.h> #include <string> #include <deque> #include <map> @@ -83,6 +84,8 @@ namespace engine { AgentImpl(const char* vendor, const char* product, const char* name, const char* domain, bool internalStore); ~AgentImpl(); + void setNotifyCallback(Agent::notifyCb handler); + void setNotifyCallback(Notifiable* handler); void setAttr(const char* key, const Variant& value); void setStoreDir(const char* path); void setTransferDir(const char* path); @@ -107,10 +110,12 @@ namespace engine { const string name; const string domain; string directAddr; - map<string, Variant> attrMap; + Variant::Map attrMap; string storeDir; string transferDir; bool internalStore; + Agent::notifyCb notifyHandler; + Notifiable* notifiable; Uuid systemId; uint16_t bootSequence; uint32_t nextContextNum; @@ -156,14 +161,20 @@ namespace engine { AgentEventImpl::Ptr eventMethod(uint32_t num, const string& userId, const string& method, const string& key, boost::shared_ptr<Variant::Map> argMap, const SchemaClass* cls); - void handleRcvMessageLH(qpid::messaging::Message& message); + void notify(); + void handleRcvMessageLH(const Message& message); + void handleAgentLocateLH(const Message& message); + void handleQueryRequestLH(const Message& message); + void handleSubscribeRequest(const Message& message); + void handleSubscribeCancel(const Message& message); + void handleSubscribeRefresh(const Message& message); + void handleMethodRequest(const Message& message); void sendPackageIndicationLH(const string& packageName); void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key); void sendCommandCompleteLH(const string& exchange, const string& key, uint32_t seq, uint32_t code = 0, const string& text = "OK"); void sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text=""); - void handleAttachResponse(Message& msg); void handlePackageRequest(Message& msg); void handleClassQuery(Message& msg); void handleSchemaRequest(Message& msg, uint32_t sequence, @@ -198,12 +209,16 @@ AgentEvent AgentEventImpl::copy() AgentImpl::AgentImpl(const char* _v, const char* _p, const char* _n, const char* _d, bool _i) : vendor(_v), product(_p), name(_n), domain(_d ? _d : "default"), internalStore(_i), + notifyHandler(0), notifiable(0), bootSequence(1), nextContextNum(1), running(true), thread(0) { directAddr = "qmf." + domain + ".direct/" + vendor + ":" + product + ":" + name; if (_d == 0) { directAddr += " { create:always }"; } + attrMap["vendor"] = vendor; + attrMap["product"] = product; + attrMap["name"] = name; } @@ -211,9 +226,22 @@ AgentImpl::~AgentImpl() { } +void AgentImpl::setNotifyCallback(Agent::notifyCb handler) +{ + Mutex::ScopedLock _lock(lock); + notifyHandler = handler; +} + +void AgentImpl::setNotifyCallback(Notifiable* handler) +{ + Mutex::ScopedLock _lock(lock); + notifiable = handler; +} + void AgentImpl::setAttr(const char* key, const Variant& value) { - attrMap.insert(pair<string, Variant>(key, value)); + Mutex::ScopedLock _lock(lock); + attrMap[key] = value; } void AgentImpl::setStoreDir(const char* path) @@ -234,29 +262,6 @@ void AgentImpl::setTransferDir(const char* path) transferDir.clear(); } -/* -void AgentImpl::handleRcvMessage(Message& message) -{ - Buffer inBuffer(message.body, message.length); - uint8_t opcode; - uint32_t sequence; - string replyToExchange(message.replyExchange ? message.replyExchange : ""); - string replyToKey(message.replyKey ? message.replyKey : ""); - string userId(message.userId ? message.userId : ""); - - while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) { - if (opcode == Protocol::OP_ATTACH_RESPONSE) handleAttachResponse(inBuffer); - else if (opcode == Protocol::OP_SCHEMA_REQUEST) handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey); - else if (opcode == Protocol::OP_CONSOLE_ADDED_INDICATION) handleConsoleAddedIndication(); - else if (opcode == Protocol::OP_GET_QUERY) handleGetQuery(inBuffer, sequence, replyToKey, userId); - else if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(inBuffer, sequence, replyToKey, userId); - else { - QPID_LOG(error, "AgentImpl::handleRcvMessage invalid opcode=" << opcode); - break; - } - } -} -*/ bool AgentImpl::getEvent(AgentEvent& event) const { @@ -277,9 +282,18 @@ void AgentImpl::popEvent() void AgentImpl::setConnection(Connection& conn) { Mutex::ScopedLock _lock(lock); + + // + // Don't permit the overwriting of an existing connection + // TODO: return an error or throw an exception if an overwrite is attempted. + // if (connection == 0) return; connection = conn; + + // + // Start the Agent thread now that we have a connection to work with. + // thread = new qpid::sys::Thread(*this); } @@ -384,6 +398,61 @@ void AgentImpl::stop() running = false; } +void AgentImpl::handleRcvMessageLH(const Message& message) +{ + Variant::Map headers(message.getHeaders()); + cout << "AgentImpl::handleRcvMessageLH headers=" << headers << endl; + + if (message.getContentType() != Protocol::AMQP_CONTENT_MAP) + return; + + Variant::Map::const_iterator iter = headers.find(Protocol::APP_OPCODE); + if (iter == headers.end()) + return; + string opcode = iter->second.asString(); + + if (opcode == Protocol::OP_AGENT_LOCATE_REQUEST) handleAgentLocateLH(message); + if (opcode == Protocol::OP_QUERY_REQUEST) handleQueryRequestLH(message); + if (opcode == Protocol::OP_SUBSCRIBE_REQUEST) handleSubscribeRequest(message); + if (opcode == Protocol::OP_SUBSCRIBE_CANCEL_INDICATION) handleSubscribeCancel(message); + if (opcode == Protocol::OP_SUBSCRIBE_REFRESH_REQUEST) handleSubscribeRefresh(message); + if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(message); +} + +void AgentImpl::handleAgentLocateLH(const Message& message) +{ + const MapView predicate(message); + + //if (predicateMatches(predicate, attrMap)) { + // sendResponse(message, Protocol::OP_AGENT_LOCATE_RESPONSE, attrMap); + //} +} + +void AgentImpl::handleQueryRequestLH(const Message& message) +{ + const MapView map(message); +} + +void AgentImpl::handleSubscribeRequest(const Message& message) +{ + const MapView map(message); +} + +void AgentImpl::handleSubscribeCancel(const Message& message) +{ + const MapView map(message); +} + +void AgentImpl::handleSubscribeRefresh(const Message& message) +{ + const MapView map(message); +} + +void AgentImpl::handleMethodRequest(const Message& message) +{ + const MapView map(message); +} + AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string&, const string&, const string& key) { AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY)); @@ -407,8 +476,12 @@ AgentEventImpl::Ptr AgentImpl::eventMethod(uint32_t num, const string& userId, c return event; } -void AgentImpl::handleRcvMessageLH(qpid::messaging::Message& /*msg*/) +void AgentImpl::notify() { + if (notifyHandler != 0) + notifyHandler(); + if (notifiable != 0) + notifiable->notify(); } void AgentImpl::sendPackageIndicationLH(const string& packageName) @@ -471,6 +544,8 @@ void AgentImpl::handleMethodRequest(Message& /*msg*/, uint32_t sequence, const s Agent::Agent(const char* v, const char* p, const char* n, const char* d, bool i) { impl = new AgentImpl(v, p, n, d, i); } Agent::~Agent() { delete impl; } +void Agent::setNotifyCallback(notifyCb handler) { impl->setNotifyCallback(handler); } +void Agent::setNotifyCallback(Notifiable* handler) { impl->setNotifyCallback(handler); } void Agent::setAttr(const char* key, const Variant& value) { impl->setAttr(key, value); } void Agent::setStoreDir(const char* path) { impl->setStoreDir(path); } void Agent::setTransferDir(const char* path) { impl->setTransferDir(path); } diff --git a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp index 46ed653576..f76c69b446 100644 --- a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp +++ b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp @@ -19,7 +19,7 @@ #include "qmf/engine/BrokerProxyImpl.h" #include "qmf/engine/ConsoleImpl.h" -#include "qmf/engine/Protocol.h" +#include "qmf/Protocol.h" #include "qpid/Address.h" #include "qpid/sys/SystemInfo.h" #include <qpid/log/Statement.h> @@ -30,7 +30,7 @@ using namespace std; using namespace qmf::engine; -using namespace qpid::framing; +using namespace qpid::messaging; using namespace qpid::sys; namespace { @@ -64,10 +64,6 @@ BrokerEvent BrokerEventImpl::copy() ::memset(&item, 0, sizeof(BrokerEvent)); item.kind = kind; - - STRING_REF(name); - STRING_REF(exchange); - STRING_REF(bindingKey); item.context = context; item.queryResponse = queryResponse.get(); item.methodResponse = methodResponse.get(); @@ -87,63 +83,12 @@ BrokerProxyImpl::BrokerProxyImpl(BrokerProxy& pub, Console& _console) : publicOb seqMgr.setUnsolicitedContext(SequenceContext::Ptr(new StaticContext(*this))); } -void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/) -{ - Mutex::ScopedLock _lock(lock); - agentList.clear(); - eventQueue.clear(); - xmtQueue.clear(); - eventQueue.push_back(eventDeclareQueue(queueName)); - eventQueue.push_back(eventBind(DIR_EXCHANGE, queueName, queueName)); - eventQueue.push_back(eventSetupComplete()); - - // TODO: Store session handle -} - -void BrokerProxyImpl::sessionClosed() -{ - Mutex::ScopedLock _lock(lock); - agentList.clear(); - eventQueue.clear(); - xmtQueue.clear(); -} - -void BrokerProxyImpl::startProtocol() -{ - AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, 0, "Agent embedded in broker")); - { - Mutex::ScopedLock _lock(lock); - char rawbuffer[512]; - Buffer buffer(rawbuffer, 512); - - agentList[0] = agent; - - requestsOutstanding = 1; - topicBound = false; - uint32_t sequence(seqMgr.reserve()); - Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence); - sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); - QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence); - } - - console.impl->eventAgentAdded(agent); -} - -void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey) +void BrokerProxyImpl::sendBufferLH(Buffer&, const string&, const string&) { - uint32_t length = buf.getPosition(); - MessageImpl::Ptr message(new MessageImpl); - - buf.reset(); - buf.getRawData(message->body, length); - message->destination = destination; - message->routingKey = routingKey; - message->replyExchange = DIR_EXCHANGE; - message->replyKey = queueName; - - xmtQueue.push_back(message); + // TODO } +/* void BrokerProxyImpl::handleRcvMessage(Message& message) { Buffer inBuffer(message.body, message.length); @@ -153,22 +98,7 @@ void BrokerProxyImpl::handleRcvMessage(Message& message) while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) seqMgr.dispatch(opcode, sequence, message.routingKey ? string(message.routingKey) : string(), inBuffer); } - -bool BrokerProxyImpl::getXmtMessage(Message& item) const -{ - Mutex::ScopedLock _lock(lock); - if (xmtQueue.empty()) - return false; - item = xmtQueue.front()->copy(); - return true; -} - -void BrokerProxyImpl::popXmt() -{ - Mutex::ScopedLock _lock(lock); - if (!xmtQueue.empty()) - xmtQueue.pop_front(); -} +*/ bool BrokerProxyImpl::getEvent(BrokerEvent& event) const { @@ -227,10 +157,6 @@ void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentPr bool BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent) { - if (query.impl->singleAgent()) { - if (query.impl->agentBank() != agent->getAgentBank()) - return false; - } stringstream key; Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t sequence(seqMgr.reserve(queryContext)); @@ -269,7 +195,7 @@ string BrokerProxyImpl::encodeMethodArguments(const SchemaMethod* schema, const return string(); } -void BrokerProxyImpl::sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls, +void BrokerProxyImpl::sendMethodRequest(ObjectId* oid, const SchemaClass* cls, const string& methodName, const Value* args, void* userContext) { int methodCount = cls->getMethodCount(); @@ -452,43 +378,13 @@ void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq) { - SchemaObjectClass* oClassPtr; - SchemaEventClass* eClassPtr; + SchemaClass* classPtr; uint8_t kind = inBuffer.getOctet(); const SchemaClassKey* key; - if (kind == CLASS_OBJECT) { - oClassPtr = SchemaObjectClassImpl::factory(inBuffer); - console.impl->learnClass(oClassPtr); - key = oClassPtr->getClassKey(); - QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->impl->str()); - - // - // If we have just learned about the org.apache.qpid.broker:agent class, send a get - // request for the current list of agents so we can have it on-hand before we declare - // this session "stable". - // - if (key->impl->getClassName() == AGENT_CLASS && key->impl->getPackageName() == BROKER_PACKAGE) { - Mutex::ScopedLock _lock(lock); - incOutstandingLH(); - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t sequence(seqMgr.reserve()); - Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence); - FieldTable ft; - ft.setString("_class", AGENT_CLASS); - ft.setString("_package", BROKER_PACKAGE); - ft.encode(outBuffer); - sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_AGENT_KEY); - QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << BROKER_AGENT_KEY); - } - } else if (kind == CLASS_EVENT) { - eClassPtr = SchemaEventClassImpl::factory(inBuffer); - console.impl->learnClass(eClassPtr); - key = eClassPtr->getClassKey(); - QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=event key=" << key->impl->str()); - } - else { - QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind); - } + classPtr = SchemaClassImpl::factory(inBuffer); + console.impl->learnClass(classPtr); + key = classPtr->getClassKey(); + QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->impl->str()); } ObjectPtr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat) @@ -496,7 +392,7 @@ ObjectPtr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq auto_ptr<SchemaClassKey> classKey(SchemaClassKeyImpl::factory(inBuffer)); QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey->impl->str()); - SchemaObjectClass* schema = console.impl->getSchema(classKey.get()); + SchemaClass* schema = console.impl->getSchema(classKey.get()); if (schema == 0) { QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey->impl->str()); return ObjectPtr(); @@ -749,12 +645,6 @@ uint32_t AgentProxy::getAgentBank() const { return impl->getAgentBank(); } BrokerProxy::BrokerProxy(Console& console) : impl(new BrokerProxyImpl(*this, console)) {} BrokerProxy::~BrokerProxy() { delete impl; } -void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); } -void BrokerProxy::sessionClosed() { impl->sessionClosed(); } -void BrokerProxy::startProtocol() { impl->startProtocol(); } -void BrokerProxy::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); } -bool BrokerProxy::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); } -void BrokerProxy::popXmt() { impl->popXmt(); } bool BrokerProxy::getEvent(BrokerEvent& event) const { return impl->getEvent(event); } void BrokerProxy::popEvent() { impl->popEvent(); } uint32_t BrokerProxy::agentCount() const { return impl->agentCount(); } diff --git a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h index 031eb698e0..ab45982dfe 100644 --- a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h +++ b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h @@ -25,8 +25,6 @@ #include "qmf/engine/SchemaImpl.h" #include "qmf/engine/QueryImpl.h" #include "qmf/engine/SequenceManager.h" -#include "qmf/engine/MessageImpl.h" -#include "qpid/framing/Uuid.h" #include "qpid/messaging/Variant.h" #include "qpid/sys/Mutex.h" #include "boost/shared_ptr.hpp" @@ -80,9 +78,6 @@ namespace engine { struct BrokerEventImpl { typedef boost::shared_ptr<BrokerEventImpl> Ptr; BrokerEvent::EventKind kind; - std::string name; - std::string exchange; - std::string bindingKey; void* context; QueryResponsePtr queryResponse; MethodResponsePtr methodResponse; @@ -123,14 +118,7 @@ namespace engine { BrokerProxyImpl(BrokerProxy& pub, Console& _console); ~BrokerProxyImpl() {} - void sessionOpened(SessionHandle& sh); - void sessionClosed(); - void startProtocol(); - void sendBufferLH(qpid::framing::Buffer& buf, const std::string& destination, const std::string& routingKey); - void handleRcvMessage(Message& message); - bool getXmtMessage(Message& item) const; - void popXmt(); bool getEvent(BrokerEvent& event) const; void popEvent(); @@ -140,7 +128,7 @@ namespace engine { void sendQuery(const Query& query, void* context, const AgentProxy* agent); bool sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent); std::string encodeMethodArguments(const SchemaMethod* schema, const qpid::messaging::Variant::Map* args, qpid::framing::Buffer& buffer); - void sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls, const std::string& method, const qpid::messaging::Variant::Map* args, void* context); + void sendMethodRequest(ObjectId* oid, const SchemaClass* cls, const std::string& method, const qpid::messaging::Variant::Map* args, void* context); void addBinding(const std::string& exchange, const std::string& key); void staticRelease() { decOutstanding(); } @@ -153,12 +141,11 @@ namespace engine { mutable qpid::sys::Mutex lock; Console& console; std::string queueName; - qpid::framing::Uuid brokerId; + qpid::messaging::Uuid brokerId; SequenceManager seqMgr; uint32_t requestsOutstanding; bool topicBound; std::map<uint32_t, AgentProxyPtr> agentList; - std::deque<MessageImpl::Ptr> xmtQueue; std::deque<BrokerEventImpl::Ptr> eventQueue; # define MA_BUFFER_SIZE 65536 diff --git a/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp b/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp index c2d1f51f2b..75cfbed822 100644 --- a/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp +++ b/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp @@ -18,20 +18,6 @@ */ #include "qmf/engine/ConsoleImpl.h" -#include "qmf/engine/MessageImpl.h" -#include "qmf/engine/SchemaImpl.h" -#include "qmf/engine/Typecode.h" -#include "qmf/engine/ObjectImpl.h" -#include "qmf/engine/ObjectIdImpl.h" -#include "qmf/engine/QueryImpl.h" -#include "qmf/engine/ValueImpl.h" -#include "qmf/engine/Protocol.h" -#include "qmf/engine/SequenceManager.h" -#include "qmf/engine/BrokerProxyImpl.h" -#include <qpid/framing/Buffer.h> -#include <qpid/framing/Uuid.h> -#include <qpid/framing/FieldTable.h> -#include <qpid/framing/FieldValue.h> #include <qpid/log/Statement.h> #include <qpid/sys/Time.h> #include <qpid/sys/SystemInfo.h> diff --git a/qpid/cpp/src/qmf/engine/ConsoleImpl.h b/qpid/cpp/src/qmf/engine/ConsoleImpl.h index d459e128f9..30b565058b 100644 --- a/qpid/cpp/src/qmf/engine/ConsoleImpl.h +++ b/qpid/cpp/src/qmf/engine/ConsoleImpl.h @@ -21,15 +21,13 @@ */ #include "qmf/engine/Console.h" -#include "qmf/engine/MessageImpl.h" #include "qmf/engine/SchemaImpl.h" #include "qmf/engine/ObjectImpl.h" #include "qmf/engine/ObjectIdImpl.h" #include "qmf/engine/QueryImpl.h" -#include "qmf/engine/Protocol.h" +#include "qmf/Protocol.h" #include "qmf/engine/SequenceManager.h" #include "qmf/engine/BrokerProxyImpl.h" -#include <qpid/framing/Uuid.h> #include <qpid/sys/Mutex.h> #include <qpid/sys/Time.h> #include <qpid/sys/SystemInfo.h> diff --git a/qpid/cpp/src/qpid/messaging/ListContent.cpp b/qpid/cpp/src/qpid/messaging/ListContent.cpp index 0c3ca5fc62..038c1fad0b 100644 --- a/qpid/cpp/src/qpid/messaging/ListContent.cpp +++ b/qpid/cpp/src/qpid/messaging/ListContent.cpp @@ -37,6 +37,11 @@ class ListContentImpl : public Variant } } + ListContentImpl(Message& m, const Variant::List& i) : Variant(i), msg(&m) + { + msg->getContent().clear(); + } + void encode() { qpid::client::amqp0_10::ListCodec codec; @@ -45,6 +50,7 @@ class ListContentImpl : public Variant }; ListContent::ListContent(Message& m) : impl(new ListContentImpl(m)) {} +ListContent::ListContent(Message& m, const Variant::List& i) : impl(new ListContentImpl(m, i)) {} ListContent::~ListContent() { delete impl; } ListContent& ListContent::operator=(const ListContent& l) { *impl = *l.impl; return *this; } diff --git a/qpid/cpp/src/qpid/messaging/MapContent.cpp b/qpid/cpp/src/qpid/messaging/MapContent.cpp index 6dba22be99..1f190b85aa 100644 --- a/qpid/cpp/src/qpid/messaging/MapContent.cpp +++ b/qpid/cpp/src/qpid/messaging/MapContent.cpp @@ -37,6 +37,11 @@ class MapContentImpl : public Variant } } + MapContentImpl(Message& m, const Variant::Map& i) : Variant(i), msg(&m) + { + msg->getContent().clear(); + } + void encode() { qpid::client::amqp0_10::MapCodec codec; @@ -46,6 +51,7 @@ class MapContentImpl : public Variant }; MapContent::MapContent(Message& m) : impl(new MapContentImpl(m)) {} +MapContent::MapContent(Message& m, const Variant::Map& i) : impl(new MapContentImpl(m, i)) {} MapContent::~MapContent() { delete impl; } MapContent& MapContent::operator=(const MapContent& m) { *impl = *m.impl; return *this; } diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp index 9c6f066d64..98b92e809c 100644 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp @@ -346,6 +346,25 @@ QPID_AUTO_TEST_CASE(testMapMessage) fix.session.acknowledge(); } +QPID_AUTO_TEST_CASE(testMapMessageWithInitial) +{ + QueueFixture fix; + Sender sender = fix.session.createSender(fix.queue); + Message out; + Variant::Map imap; + imap["abc"] = "def"; + imap["pi"] = 3.14f; + MapContent content(out, imap); + content.encode(); + sender.send(out); + Receiver receiver = fix.session.createReceiver(fix.queue); + Message in = receiver.fetch(5 * qpid::sys::TIME_SEC); + MapView view(in); + BOOST_CHECK_EQUAL(view["abc"].asString(), "def"); + BOOST_CHECK_EQUAL(view["pi"].asFloat(), 3.14f); + fix.session.acknowledge(); +} + QPID_AUTO_TEST_CASE(testListMessage) { QueueFixture fix; @@ -379,6 +398,40 @@ QPID_AUTO_TEST_CASE(testListMessage) fix.session.acknowledge(); } +QPID_AUTO_TEST_CASE(testListMessageWithInitial) +{ + QueueFixture fix; + Sender sender = fix.session.createSender(fix.queue); + Message out; + Variant::List ilist; + ilist.push_back(Variant("abc")); + ilist.push_back(Variant(1234)); + ilist.push_back(Variant("def")); + ilist.push_back(Variant(56.789)); + ListContent content(out, ilist); + content.encode(); + sender.send(out); + Receiver receiver = fix.session.createReceiver(fix.queue); + Message in = receiver.fetch(5 * qpid::sys::TIME_SEC); + ListView view(in); + BOOST_CHECK_EQUAL(view.size(), content.size()); + BOOST_CHECK_EQUAL(view.front().asString(), "abc"); + BOOST_CHECK_EQUAL(view.back().asDouble(), 56.789); + + ListView::const_iterator i = view.begin(); + BOOST_CHECK(i != view.end()); + BOOST_CHECK_EQUAL(i->asString(), "abc"); + BOOST_CHECK(++i != view.end()); + BOOST_CHECK_EQUAL(i->asInt64(), 1234); + BOOST_CHECK(++i != view.end()); + BOOST_CHECK_EQUAL(i->asString(), "def"); + BOOST_CHECK(++i != view.end()); + BOOST_CHECK_EQUAL(i->asDouble(), 56.789); + BOOST_CHECK(++i == view.end()); + + fix.session.acknowledge(); +} + QPID_AUTO_TEST_CASE(testReject) { QueueFixture fix; |