diff options
author | Ted Ross <tross@apache.org> | 2009-09-29 03:21:49 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2009-09-29 03:21:49 +0000 |
commit | 7661c82fc7aaca543582ef45582d87de3c5de5b7 (patch) | |
tree | 9de25825187c0a45df5880ce74e58befb6c4ec50 /cpp/src | |
parent | 576b578d61d0d31082587bf77a25a59da2ba738f (diff) | |
download | qpid-python-7661c82fc7aaca543582ef45582d87de3c5de5b7.tar.gz |
QMF Engine updates:
- Connected console handler callbacks
- Added string representations for a number of object classes
- Added a feature that completes query requests sent to disconnected agents
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@819819 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qmf/engine/BrokerProxyImpl.cpp | 109 | ||||
-rw-r--r-- | cpp/src/qmf/engine/BrokerProxyImpl.h | 17 | ||||
-rw-r--r-- | cpp/src/qmf/engine/ConsoleImpl.cpp | 69 | ||||
-rw-r--r-- | cpp/src/qmf/engine/ConsoleImpl.h | 16 | ||||
-rw-r--r-- | cpp/src/qmf/engine/ObjectIdImpl.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qmf/engine/ObjectIdImpl.h | 3 | ||||
-rw-r--r-- | cpp/src/qmf/engine/ResilientConnection.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qmf/engine/SchemaImpl.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qmf/engine/SchemaImpl.h | 3 | ||||
-rw-r--r-- | cpp/src/qmf/engine/SequenceManager.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qmf/engine/SequenceManager.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 2 |
12 files changed, 201 insertions, 55 deletions
diff --git a/cpp/src/qmf/engine/BrokerProxyImpl.cpp b/cpp/src/qmf/engine/BrokerProxyImpl.cpp index 36d3ffe361..e296254bf8 100644 --- a/cpp/src/qmf/engine/BrokerProxyImpl.cpp +++ b/cpp/src/qmf/engine/BrokerProxyImpl.cpp @@ -23,6 +23,7 @@ #include "qpid/Address.h" #include "qpid/sys/SystemInfo.h" #include <qpid/log/Statement.h> +#include <qpid/StringUtils.h> #include <string.h> #include <iostream> #include <fstream> @@ -109,18 +110,23 @@ void BrokerProxyImpl::sessionClosed() void BrokerProxyImpl::startProtocol() { - Mutex::ScopedLock _lock(lock); - char rawbuffer[512]; - Buffer buffer(rawbuffer, 512); + AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, 0, "Agent embedded in broker")); + { + Mutex::ScopedLock _lock(lock); + char rawbuffer[512]; + Buffer buffer(rawbuffer, 512); - agentList[0] = AgentProxyPtr(AgentProxyImpl::factory(console, publicObject, 0, "Agent embedded in broker")); + agentList[0] = agent; - requestsOutstanding = 1; - topicBound = false; - uint32_t sequence(seqMgr.reserve()); - Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence); - sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); - QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence); + requestsOutstanding = 1; + topicBound = false; + uint32_t sequence(seqMgr.reserve()); + Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence); + sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); + QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence); + } + + console.impl->eventAgentAdded(agent); } void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey) @@ -145,7 +151,7 @@ void BrokerProxyImpl::handleRcvMessage(Message& message) uint32_t sequence; while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) - seqMgr.dispatch(opcode, sequence, inBuffer); + seqMgr.dispatch(opcode, sequence, message.routingKey ? string(message.routingKey) : string(), inBuffer); } bool BrokerProxyImpl::getXmtMessage(Message& item) const @@ -216,6 +222,7 @@ void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const stringstream key; Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t sequence(seqMgr.reserve(queryContext)); + agent->impl->addSequence(sequence); Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence); query.impl->encode(outBuffer); @@ -406,9 +413,23 @@ MethodResponsePtr BrokerProxyImpl::handleMethodResponse(Buffer& inBuffer, uint32 return response; } -void BrokerProxyImpl::handleHeartbeatIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/) +void BrokerProxyImpl::handleHeartbeatIndication(Buffer& inBuffer, uint32_t seq, const string& routingKey) { - // TODO + vector<string> tokens = qpid::split(routingKey, "."); + uint32_t agentBank; + uint64_t timestamp; + + if (routingKey.empty() || tokens.size() != 4) + agentBank = 0; + else + agentBank = ::atoi(tokens[3].c_str()); + + timestamp = inBuffer.getLongLong(); + map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.find(agentBank); + if (iter != agentList.end()) { + console.impl->eventAgentHeartbeat(iter->second, timestamp); + } + QPID_LOG(trace, "RCVD HeartbeatIndication seq=" << seq << " agentBank=" << agentBank); } void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/) @@ -481,11 +502,24 @@ ObjectPtr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq void BrokerProxyImpl::updateAgentList(ObjectPtr obj) { Value* value = obj->getValue("agentBank"); + Mutex::ScopedLock _lock(lock); if (value != 0 && value->isUint()) { uint32_t agentBank = value->asUint(); if (obj->isDeleted()) { - agentList.erase(agentBank); - QPID_LOG(trace, "Agent at bank " << agentBank << " removed from agent list"); + map<uint32_t, AgentProxyPtr>::iterator iter = agentList.find(agentBank); + if (iter != agentList.end()) { + AgentProxyPtr agent(iter->second); + console.impl->eventAgentDeleted(agent); + agentList.erase(agentBank); + QPID_LOG(trace, "Agent at bank " << agentBank << " removed from agent list"); + + // + // Release all sequence numbers for requests in-flight to this agent. + // Since the agent is no longer connected, these requests would not + // otherwise complete. + // + agent->impl->releaseInFlight(seqMgr); + } } else { Value* str = obj->getValue("label"); string label; @@ -493,7 +527,9 @@ void BrokerProxyImpl::updateAgentList(ObjectPtr obj) label = str->asString(); map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.find(agentBank); if (iter == agentList.end()) { - agentList[agentBank] = AgentProxyPtr(AgentProxyImpl::factory(console, publicObject, agentBank, label)); + AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, agentBank, label)); + agentList[agentBank] = agent; + console.impl->eventAgentAdded(agent); QPID_LOG(trace, "Agent '" << label << "' found at bank " << agentBank); } } @@ -572,9 +608,11 @@ MethodResponse* MethodResponseImpl::factory(uint32_t status, const std::string& return new MethodResponse(impl); } -bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer) +bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& routingKey, Buffer& buffer) { + ObjectPtr object; bool completeContext = false; + if (opcode == Protocol::OP_BROKER_RESPONSE) { broker.handleBrokerResponse(buffer, sequence); completeContext = true; @@ -592,15 +630,21 @@ bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buf else if (opcode == Protocol::OP_CLASS_INDICATION) broker.handleClassIndication(buffer, sequence); else if (opcode == Protocol::OP_HEARTBEAT_INDICATION) - broker.handleHeartbeatIndication(buffer, sequence); + broker.handleHeartbeatIndication(buffer, sequence, routingKey); else if (opcode == Protocol::OP_EVENT_INDICATION) broker.handleEventIndication(buffer, sequence); - else if (opcode == Protocol::OP_PROPERTY_INDICATION) - broker.handleObjectIndication(buffer, sequence, true, false); - else if (opcode == Protocol::OP_STATISTIC_INDICATION) - broker.handleObjectIndication(buffer, sequence, false, true); - else if (opcode == Protocol::OP_OBJECT_INDICATION) - broker.handleObjectIndication(buffer, sequence, true, true); + else if (opcode == Protocol::OP_PROPERTY_INDICATION) { + object = broker.handleObjectIndication(buffer, sequence, true, false); + broker.console.impl->eventObjectUpdate(object, true, false); + } + else if (opcode == Protocol::OP_STATISTIC_INDICATION) { + object = broker.handleObjectIndication(buffer, sequence, false, true); + broker.console.impl->eventObjectUpdate(object, false, true); + } + else if (opcode == Protocol::OP_OBJECT_INDICATION) { + object = broker.handleObjectIndication(buffer, sequence, true, true); + broker.console.impl->eventObjectUpdate(object, true, true); + } else { QPID_LOG(trace, "StaticContext::handleMessage invalid opcode: " << opcode); completeContext = true; @@ -627,7 +671,7 @@ void QueryContext::release() broker.eventQueue.push_back(broker.eventQueryComplete(userContext, queryResponse)); } -bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer) +bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& /*routingKey*/, Buffer& buffer) { bool completeContext = false; ObjectPtr object; @@ -635,6 +679,19 @@ bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buff if (opcode == Protocol::OP_COMMAND_COMPLETE) { broker.handleCommandComplete(buffer, sequence); completeContext = true; + + // + // Visit each agent and remove the sequence from that agent's in-flight list. + // This could be made more efficient because only one agent will have this sequence + // in its list. + // + map<uint32_t, AgentProxyPtr> copy; + { + Mutex::ScopedLock _block(broker.lock); + copy = broker.agentList; + } + for (map<uint32_t, AgentProxyPtr>::iterator iter = copy.begin(); iter != copy.end(); iter++) + iter->second->impl->delSequence(sequence); } else if (opcode == Protocol::OP_OBJECT_INDICATION) { object = broker.handleObjectIndication(buffer, sequence, true, true); @@ -655,7 +712,7 @@ void MethodContext::release() broker.eventQueue.push_back(broker.eventMethodResponse(userContext, methodResponse)); } -bool MethodContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer) +bool MethodContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& /*routingKey*/, Buffer& buffer) { if (opcode == Protocol::OP_METHOD_RESPONSE) methodResponse = broker.handleMethodResponse(buffer, sequence, schema); diff --git a/cpp/src/qmf/engine/BrokerProxyImpl.h b/cpp/src/qmf/engine/BrokerProxyImpl.h index 660cb86c61..738424bce1 100644 --- a/cpp/src/qmf/engine/BrokerProxyImpl.h +++ b/cpp/src/qmf/engine/BrokerProxyImpl.h @@ -36,6 +36,7 @@ #include <string> #include <deque> #include <map> +#include <set> #include <vector> namespace qmf { @@ -98,6 +99,7 @@ namespace engine { BrokerProxy& broker; uint32_t agentBank; std::string label; + std::set<uint32_t> inFlightSequences; AgentProxyImpl(Console& c, BrokerProxy& b, uint32_t ab, const std::string& l) : console(c), broker(b), agentBank(ab), label(l) {} static AgentProxy* factory(Console& c, BrokerProxy& b, uint32_t ab, const std::string& l) { @@ -106,6 +108,13 @@ namespace engine { } ~AgentProxyImpl() {} const std::string& getLabel() const { return label; } + void addSequence(uint32_t seq) { inFlightSequences.insert(seq); } + void delSequence(uint32_t seq) { inFlightSequences.erase(seq); } + void releaseInFlight(SequenceManager& seqMgr) { + for (std::set<uint32_t>::iterator iter = inFlightSequences.begin(); iter != inFlightSequences.end(); iter++) + seqMgr.release(*iter); + inFlightSequences.clear(); + } }; class BrokerProxyImpl : public boost::noncopyable { @@ -166,7 +175,7 @@ namespace engine { void handleCommandComplete(qpid::framing::Buffer& inBuffer, uint32_t seq); void handleClassIndication(qpid::framing::Buffer& inBuffer, uint32_t seq); MethodResponsePtr handleMethodResponse(qpid::framing::Buffer& inBuffer, uint32_t seq, const SchemaMethod* schema); - void handleHeartbeatIndication(qpid::framing::Buffer& inBuffer, uint32_t seq); + void handleHeartbeatIndication(qpid::framing::Buffer& inBuffer, uint32_t seq, const std::string& routingKey); void handleEventIndication(qpid::framing::Buffer& inBuffer, uint32_t seq); void handleSchemaResponse(qpid::framing::Buffer& inBuffer, uint32_t seq); ObjectPtr handleObjectIndication(qpid::framing::Buffer& inBuffer, uint32_t seq, bool prop, bool stat); @@ -186,7 +195,7 @@ namespace engine { virtual ~StaticContext() {} void reserve() {} void release() { broker.staticRelease(); } - bool handleMessage(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer); + bool handleMessage(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer); BrokerProxyImpl& broker; }; @@ -199,7 +208,7 @@ namespace engine { virtual ~QueryContext() {} void reserve(); void release(); - bool handleMessage(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer); + bool handleMessage(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer); mutable qpid::sys::Mutex lock; BrokerProxyImpl& broker; @@ -213,7 +222,7 @@ namespace engine { virtual ~MethodContext() {} void reserve() {} void release(); - bool handleMessage(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer); + bool handleMessage(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer); BrokerProxyImpl& broker; void* userContext; diff --git a/cpp/src/qmf/engine/ConsoleImpl.cpp b/cpp/src/qmf/engine/ConsoleImpl.cpp index c856f04c51..c2d1f51f2b 100644 --- a/cpp/src/qmf/engine/ConsoleImpl.cpp +++ b/cpp/src/qmf/engine/ConsoleImpl.cpp @@ -57,11 +57,13 @@ ConsoleEvent ConsoleEventImpl::copy() ::memset(&item, 0, sizeof(ConsoleEvent)); item.kind = kind; item.agent = agent.get(); - item.classKey = classKey.get(); - item.object = object; + item.classKey = classKey; + item.object = object.get(); item.context = context; item.event = event; item.timestamp = timestamp; + item.hasProps = hasProps; + item.hasStats = hasStats; STRING_REF(name); @@ -274,9 +276,11 @@ void ConsoleImpl::endSync(SyncQuery& sync) void ConsoleImpl::learnPackage(const string& packageName) { Mutex::ScopedLock _lock(lock); - if (packages.find(packageName) == packages.end()) + if (packages.find(packageName) == packages.end()) { packages.insert(pair<string, pair<ObjectClassList, EventClassList> > (packageName, pair<ObjectClassList, EventClassList>(ObjectClassList(), EventClassList()))); + eventNewPackage(packageName); + } } void ConsoleImpl::learnClass(SchemaObjectClass* cls) @@ -288,8 +292,10 @@ void ConsoleImpl::learnClass(SchemaObjectClass* cls) return; ObjectClassList& list = pIter->second.first; - if (list.find(key) == list.end()) + if (list.find(key) == list.end()) { list[key] = cls; + eventNewClass(key); + } } void ConsoleImpl::learnClass(SchemaEventClass* cls) @@ -301,8 +307,10 @@ void ConsoleImpl::learnClass(SchemaEventClass* cls) return; EventClassList& list = pIter->second.second; - if (list.find(key) == list.end()) + if (list.find(key) == list.end()) { list[key] = cls; + eventNewClass(key); + } } bool ConsoleImpl::haveClass(const SchemaClassKey* key) const @@ -333,6 +341,57 @@ SchemaObjectClass* ConsoleImpl::getSchema(const SchemaClassKey* key) const return iter->second; } +void ConsoleImpl::eventAgentAdded(boost::shared_ptr<AgentProxy> agent) +{ + ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_ADDED)); + event->agent = agent; + Mutex::ScopedLock _lock(lock); + eventQueue.push_back(event); +} + +void ConsoleImpl::eventAgentDeleted(boost::shared_ptr<AgentProxy> agent) +{ + ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_DELETED)); + event->agent = agent; + Mutex::ScopedLock _lock(lock); + eventQueue.push_back(event); +} + +void ConsoleImpl::eventNewPackage(const string& packageName) +{ + ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::NEW_PACKAGE)); + event->name = packageName; + Mutex::ScopedLock _lock(lock); + eventQueue.push_back(event); +} + +void ConsoleImpl::eventNewClass(const SchemaClassKey* key) +{ + ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::NEW_CLASS)); + event->classKey = key; + Mutex::ScopedLock _lock(lock); + eventQueue.push_back(event); +} + +void ConsoleImpl::eventObjectUpdate(ObjectPtr object, bool prop, bool stat) +{ + ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::OBJECT_UPDATE)); + event->object = object; + event->hasProps = prop; + event->hasStats = stat; + Mutex::ScopedLock _lock(lock); + eventQueue.push_back(event); +} + +void ConsoleImpl::eventAgentHeartbeat(boost::shared_ptr<AgentProxy> agent, uint64_t timestamp) +{ + ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_HEARTBEAT)); + event->agent = agent; + event->timestamp = timestamp; + Mutex::ScopedLock _lock(lock); + eventQueue.push_back(event); +} + //================================================================== // Wrappers //================================================================== diff --git a/cpp/src/qmf/engine/ConsoleImpl.h b/cpp/src/qmf/engine/ConsoleImpl.h index 2c4ee48a02..8f99c5e6b9 100644 --- a/cpp/src/qmf/engine/ConsoleImpl.h +++ b/cpp/src/qmf/engine/ConsoleImpl.h @@ -56,14 +56,16 @@ namespace engine { ConsoleEvent::EventKind kind; boost::shared_ptr<AgentProxy> agent; std::string name; - boost::shared_ptr<SchemaClassKey> classKey; - Object* object; + const SchemaClassKey* classKey; + boost::shared_ptr<Object> object; void* context; Event* event; uint64_t timestamp; + bool hasProps; + bool hasStats; ConsoleEventImpl(ConsoleEvent::EventKind k) : - kind(k), object(0), context(0), event(0), timestamp(0) {} + kind(k), classKey(0), context(0), event(0), timestamp(0) {} ~ConsoleEventImpl() {} ConsoleEvent copy(); }; @@ -101,6 +103,7 @@ namespace engine { private: friend class BrokerProxyImpl; + friend struct StaticContext; const ConsoleSettings& settings; mutable qpid::sys::Mutex lock; std::deque<ConsoleEventImpl::Ptr> eventQueue; @@ -127,6 +130,13 @@ namespace engine { void learnClass(SchemaEventClass* cls); bool haveClass(const SchemaClassKey* key) const; SchemaObjectClass* getSchema(const SchemaClassKey* key) const; + + void eventAgentAdded(boost::shared_ptr<AgentProxy> agent); + void eventAgentDeleted(boost::shared_ptr<AgentProxy> agent); + void eventNewPackage(const std::string& packageName); + void eventNewClass(const SchemaClassKey* key); + void eventObjectUpdate(ObjectPtr object, bool prop, bool stat); + void eventAgentHeartbeat(boost::shared_ptr<AgentProxy> agent, uint64_t timestamp); }; } } diff --git a/cpp/src/qmf/engine/ObjectIdImpl.cpp b/cpp/src/qmf/engine/ObjectIdImpl.cpp index 032bc557c0..5b925045bf 100644 --- a/cpp/src/qmf/engine/ObjectIdImpl.cpp +++ b/cpp/src/qmf/engine/ObjectIdImpl.cpp @@ -111,13 +111,14 @@ void ObjectIdImpl::fromString(const std::string& repr) agent = 0; } -std::string ObjectIdImpl::asString() const +const string& ObjectIdImpl::asString() const { stringstream val; val << (int) getFlags() << "-" << getSequence() << "-" << getBrokerBank() << "-" << getAgentBank() << "-" << getObjectNum(); - return val.str(); + repr = val.str(); + return repr; } bool ObjectIdImpl::operator==(const ObjectIdImpl& other) const @@ -154,6 +155,7 @@ uint64_t ObjectId::getObjectNum() const { return impl->getObjectNum(); } uint32_t ObjectId::getObjectNumHi() const { return impl->getObjectNumHi(); } uint32_t ObjectId::getObjectNumLo() const { return impl->getObjectNumLo(); } bool ObjectId::isDurable() const { return impl->isDurable(); } +const char* ObjectId::str() const { return impl->asString().c_str(); } bool ObjectId::operator==(const ObjectId& other) const { return *impl == *other.impl; } bool ObjectId::operator<(const ObjectId& other) const { return *impl < *other.impl; } bool ObjectId::operator>(const ObjectId& other) const { return *impl > *other.impl; } diff --git a/cpp/src/qmf/engine/ObjectIdImpl.h b/cpp/src/qmf/engine/ObjectIdImpl.h index 44fa8adffc..d9871ac217 100644 --- a/cpp/src/qmf/engine/ObjectIdImpl.h +++ b/cpp/src/qmf/engine/ObjectIdImpl.h @@ -38,6 +38,7 @@ namespace engine { AgentAttachment* agent; uint64_t first; uint64_t second; + mutable std::string repr; ObjectIdImpl() : agent(0), first(0), second(0) {} ObjectIdImpl(qpid::framing::Buffer& buffer); @@ -49,7 +50,7 @@ namespace engine { void decode(qpid::framing::Buffer& buffer); void encode(qpid::framing::Buffer& buffer) const; void fromString(const std::string& repr); - std::string asString() const; + const std::string& asString() const; uint8_t getFlags() const { return (first & 0xF000000000000000LL) >> 60; } uint16_t getSequence() const { return (first & 0x0FFF000000000000LL) >> 48; } uint32_t getBrokerBank() const { return (first & 0x0000FFFFF0000000LL) >> 28; } diff --git a/cpp/src/qmf/engine/ResilientConnection.cpp b/cpp/src/qmf/engine/ResilientConnection.cpp index 709cfd1236..9502130288 100644 --- a/cpp/src/qmf/engine/ResilientConnection.cpp +++ b/cpp/src/qmf/engine/ResilientConnection.cpp @@ -171,15 +171,20 @@ void RCSession::received(client::Message& msg) MessageImpl qmsg; qmsg.body = msg.getData(); - qpid::framing::MessageProperties p = msg.getMessageProperties(); - if (p.hasReplyTo()) { - const qpid::framing::ReplyTo& rt = p.getReplyTo(); + qpid::framing::DeliveryProperties dp = msg.getDeliveryProperties(); + if (dp.hasRoutingKey()) { + qmsg.routingKey = dp.getRoutingKey(); + } + + qpid::framing::MessageProperties mp = msg.getMessageProperties(); + if (mp.hasReplyTo()) { + const qpid::framing::ReplyTo& rt = mp.getReplyTo(); qmsg.replyExchange = rt.getExchange(); qmsg.replyKey = rt.getRoutingKey(); } - if (p.hasUserId()) { - qmsg.userId = p.getUserId(); + if (mp.hasUserId()) { + qmsg.userId = mp.getUserId(); } connImpl.EnqueueEvent(ResilientConnectionEvent::RECV, userContext, qmsg); diff --git a/cpp/src/qmf/engine/SchemaImpl.cpp b/cpp/src/qmf/engine/SchemaImpl.cpp index fb09980680..e366a66826 100644 --- a/cpp/src/qmf/engine/SchemaImpl.cpp +++ b/cpp/src/qmf/engine/SchemaImpl.cpp @@ -326,12 +326,13 @@ bool SchemaClassKeyImpl::operator<(const SchemaClassKeyImpl& other) const return hash < other.hash; } -string SchemaClassKeyImpl::str() const +const string& SchemaClassKeyImpl::str() const { Uuid printableHash(hash.get()); stringstream str; str << package << ":" << name << "(" << printableHash << ")"; - return str.str(); + repr = str.str(); + return repr; } SchemaObjectClassImpl::SchemaObjectClassImpl(Buffer& buffer) : hasHash(true), classKey(SchemaClassKeyImpl::factory(package, name, hash)) @@ -579,6 +580,7 @@ SchemaClassKey::~SchemaClassKey() { delete impl; } const char* SchemaClassKey::getPackageName() const { return impl->getPackageName().c_str(); } const char* SchemaClassKey::getClassName() const { return impl->getClassName().c_str(); } const uint8_t* SchemaClassKey::getHash() const { return impl->getHash(); } +const char* SchemaClassKey::asString() const { return impl->str().c_str(); } bool SchemaClassKey::operator==(const SchemaClassKey& other) const { return *impl == *(other.impl); } bool SchemaClassKey::operator<(const SchemaClassKey& other) const { return *impl < *(other.impl); } diff --git a/cpp/src/qmf/engine/SchemaImpl.h b/cpp/src/qmf/engine/SchemaImpl.h index 865556f076..af3a1d98e4 100644 --- a/cpp/src/qmf/engine/SchemaImpl.h +++ b/cpp/src/qmf/engine/SchemaImpl.h @@ -142,6 +142,7 @@ namespace engine { const std::string& package; const std::string& name; const SchemaHash& hash; + mutable std::string repr; // The *Container elements are only used if there isn't an external place to // store these values. @@ -161,7 +162,7 @@ namespace engine { void encode(qpid::framing::Buffer& buffer) const; bool operator==(const SchemaClassKeyImpl& other) const; bool operator<(const SchemaClassKeyImpl& other) const; - std::string str() const; + const std::string& str() const; }; struct SchemaObjectClassImpl { diff --git a/cpp/src/qmf/engine/SequenceManager.cpp b/cpp/src/qmf/engine/SequenceManager.cpp index 3708105b46..4a4644a8b9 100644 --- a/cpp/src/qmf/engine/SequenceManager.cpp +++ b/cpp/src/qmf/engine/SequenceManager.cpp @@ -68,14 +68,14 @@ void SequenceManager::releaseAll() contextMap.clear(); } -void SequenceManager::dispatch(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer) +void SequenceManager::dispatch(uint8_t opcode, uint32_t sequence, const string& routingKey, qpid::framing::Buffer& buffer) { Mutex::ScopedLock _lock(lock); bool done; if (sequence == 0) { if (unsolicitedContext.get() != 0) { - done = unsolicitedContext->handleMessage(opcode, sequence, buffer); + done = unsolicitedContext->handleMessage(opcode, sequence, routingKey, buffer); if (done) unsolicitedContext->release(); } @@ -85,7 +85,7 @@ void SequenceManager::dispatch(uint8_t opcode, uint32_t sequence, qpid::framing: map<uint32_t, SequenceContext::Ptr>::iterator iter = contextMap.find(sequence); if (iter != contextMap.end()) { if (iter->second != 0) { - done = iter->second->handleMessage(opcode, sequence, buffer); + done = iter->second->handleMessage(opcode, sequence, routingKey, buffer); if (done) { iter->second->release(); contextMap.erase(iter); diff --git a/cpp/src/qmf/engine/SequenceManager.h b/cpp/src/qmf/engine/SequenceManager.h index 5f7db8bdb3..9e47e38610 100644 --- a/cpp/src/qmf/engine/SequenceManager.h +++ b/cpp/src/qmf/engine/SequenceManager.h @@ -40,7 +40,7 @@ namespace engine { virtual ~SequenceContext() {} virtual void reserve() = 0; - virtual bool handleMessage(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer) = 0; + virtual bool handleMessage(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer) = 0; virtual void release() = 0; }; @@ -52,7 +52,7 @@ namespace engine { uint32_t reserve(SequenceContext::Ptr ctx = SequenceContext::Ptr()); void release(uint32_t sequence); void releaseAll(); - void dispatch(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer); + void dispatch(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer); private: mutable qpid::sys::Mutex lock; diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index a59d29c3cc..ed9b6653c3 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -65,7 +65,7 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) tagGenerator("sgen"), dtxSelected(false), authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()), - userID(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))) + userID(getSession().getConnection().getUserId()) { acl = getSession().getBroker().getAcl(); } |