summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2009-09-29 03:21:49 +0000
committerTed Ross <tross@apache.org>2009-09-29 03:21:49 +0000
commit7661c82fc7aaca543582ef45582d87de3c5de5b7 (patch)
tree9de25825187c0a45df5880ce74e58befb6c4ec50 /cpp/src
parent576b578d61d0d31082587bf77a25a59da2ba738f (diff)
downloadqpid-python-7661c82fc7aaca543582ef45582d87de3c5de5b7.tar.gz
QMF Engine updates:
- Connected console handler callbacks - Added string representations for a number of object classes - Added a feature that completes query requests sent to disconnected agents git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@819819 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qmf/engine/BrokerProxyImpl.cpp109
-rw-r--r--cpp/src/qmf/engine/BrokerProxyImpl.h17
-rw-r--r--cpp/src/qmf/engine/ConsoleImpl.cpp69
-rw-r--r--cpp/src/qmf/engine/ConsoleImpl.h16
-rw-r--r--cpp/src/qmf/engine/ObjectIdImpl.cpp6
-rw-r--r--cpp/src/qmf/engine/ObjectIdImpl.h3
-rw-r--r--cpp/src/qmf/engine/ResilientConnection.cpp15
-rw-r--r--cpp/src/qmf/engine/SchemaImpl.cpp6
-rw-r--r--cpp/src/qmf/engine/SchemaImpl.h3
-rw-r--r--cpp/src/qmf/engine/SequenceManager.cpp6
-rw-r--r--cpp/src/qmf/engine/SequenceManager.h4
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp2
12 files changed, 201 insertions, 55 deletions
diff --git a/cpp/src/qmf/engine/BrokerProxyImpl.cpp b/cpp/src/qmf/engine/BrokerProxyImpl.cpp
index 36d3ffe361..e296254bf8 100644
--- a/cpp/src/qmf/engine/BrokerProxyImpl.cpp
+++ b/cpp/src/qmf/engine/BrokerProxyImpl.cpp
@@ -23,6 +23,7 @@
#include "qpid/Address.h"
#include "qpid/sys/SystemInfo.h"
#include <qpid/log/Statement.h>
+#include <qpid/StringUtils.h>
#include <string.h>
#include <iostream>
#include <fstream>
@@ -109,18 +110,23 @@ void BrokerProxyImpl::sessionClosed()
void BrokerProxyImpl::startProtocol()
{
- Mutex::ScopedLock _lock(lock);
- char rawbuffer[512];
- Buffer buffer(rawbuffer, 512);
+ AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, 0, "Agent embedded in broker"));
+ {
+ Mutex::ScopedLock _lock(lock);
+ char rawbuffer[512];
+ Buffer buffer(rawbuffer, 512);
- agentList[0] = AgentProxyPtr(AgentProxyImpl::factory(console, publicObject, 0, "Agent embedded in broker"));
+ agentList[0] = agent;
- requestsOutstanding = 1;
- topicBound = false;
- uint32_t sequence(seqMgr.reserve());
- Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence);
- sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence);
+ requestsOutstanding = 1;
+ topicBound = false;
+ uint32_t sequence(seqMgr.reserve());
+ Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence);
+ sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence);
+ }
+
+ console.impl->eventAgentAdded(agent);
}
void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
@@ -145,7 +151,7 @@ void BrokerProxyImpl::handleRcvMessage(Message& message)
uint32_t sequence;
while (Protocol::checkHeader(inBuffer, &opcode, &sequence))
- seqMgr.dispatch(opcode, sequence, inBuffer);
+ seqMgr.dispatch(opcode, sequence, message.routingKey ? string(message.routingKey) : string(), inBuffer);
}
bool BrokerProxyImpl::getXmtMessage(Message& item) const
@@ -216,6 +222,7 @@ void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const
stringstream key;
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t sequence(seqMgr.reserve(queryContext));
+ agent->impl->addSequence(sequence);
Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
query.impl->encode(outBuffer);
@@ -406,9 +413,23 @@ MethodResponsePtr BrokerProxyImpl::handleMethodResponse(Buffer& inBuffer, uint32
return response;
}
-void BrokerProxyImpl::handleHeartbeatIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
+void BrokerProxyImpl::handleHeartbeatIndication(Buffer& inBuffer, uint32_t seq, const string& routingKey)
{
- // TODO
+ vector<string> tokens = qpid::split(routingKey, ".");
+ uint32_t agentBank;
+ uint64_t timestamp;
+
+ if (routingKey.empty() || tokens.size() != 4)
+ agentBank = 0;
+ else
+ agentBank = ::atoi(tokens[3].c_str());
+
+ timestamp = inBuffer.getLongLong();
+ map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.find(agentBank);
+ if (iter != agentList.end()) {
+ console.impl->eventAgentHeartbeat(iter->second, timestamp);
+ }
+ QPID_LOG(trace, "RCVD HeartbeatIndication seq=" << seq << " agentBank=" << agentBank);
}
void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
@@ -481,11 +502,24 @@ ObjectPtr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq
void BrokerProxyImpl::updateAgentList(ObjectPtr obj)
{
Value* value = obj->getValue("agentBank");
+ Mutex::ScopedLock _lock(lock);
if (value != 0 && value->isUint()) {
uint32_t agentBank = value->asUint();
if (obj->isDeleted()) {
- agentList.erase(agentBank);
- QPID_LOG(trace, "Agent at bank " << agentBank << " removed from agent list");
+ map<uint32_t, AgentProxyPtr>::iterator iter = agentList.find(agentBank);
+ if (iter != agentList.end()) {
+ AgentProxyPtr agent(iter->second);
+ console.impl->eventAgentDeleted(agent);
+ agentList.erase(agentBank);
+ QPID_LOG(trace, "Agent at bank " << agentBank << " removed from agent list");
+
+ //
+ // Release all sequence numbers for requests in-flight to this agent.
+ // Since the agent is no longer connected, these requests would not
+ // otherwise complete.
+ //
+ agent->impl->releaseInFlight(seqMgr);
+ }
} else {
Value* str = obj->getValue("label");
string label;
@@ -493,7 +527,9 @@ void BrokerProxyImpl::updateAgentList(ObjectPtr obj)
label = str->asString();
map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.find(agentBank);
if (iter == agentList.end()) {
- agentList[agentBank] = AgentProxyPtr(AgentProxyImpl::factory(console, publicObject, agentBank, label));
+ AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, agentBank, label));
+ agentList[agentBank] = agent;
+ console.impl->eventAgentAdded(agent);
QPID_LOG(trace, "Agent '" << label << "' found at bank " << agentBank);
}
}
@@ -572,9 +608,11 @@ MethodResponse* MethodResponseImpl::factory(uint32_t status, const std::string&
return new MethodResponse(impl);
}
-bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
+bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& routingKey, Buffer& buffer)
{
+ ObjectPtr object;
bool completeContext = false;
+
if (opcode == Protocol::OP_BROKER_RESPONSE) {
broker.handleBrokerResponse(buffer, sequence);
completeContext = true;
@@ -592,15 +630,21 @@ bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buf
else if (opcode == Protocol::OP_CLASS_INDICATION)
broker.handleClassIndication(buffer, sequence);
else if (opcode == Protocol::OP_HEARTBEAT_INDICATION)
- broker.handleHeartbeatIndication(buffer, sequence);
+ broker.handleHeartbeatIndication(buffer, sequence, routingKey);
else if (opcode == Protocol::OP_EVENT_INDICATION)
broker.handleEventIndication(buffer, sequence);
- else if (opcode == Protocol::OP_PROPERTY_INDICATION)
- broker.handleObjectIndication(buffer, sequence, true, false);
- else if (opcode == Protocol::OP_STATISTIC_INDICATION)
- broker.handleObjectIndication(buffer, sequence, false, true);
- else if (opcode == Protocol::OP_OBJECT_INDICATION)
- broker.handleObjectIndication(buffer, sequence, true, true);
+ else if (opcode == Protocol::OP_PROPERTY_INDICATION) {
+ object = broker.handleObjectIndication(buffer, sequence, true, false);
+ broker.console.impl->eventObjectUpdate(object, true, false);
+ }
+ else if (opcode == Protocol::OP_STATISTIC_INDICATION) {
+ object = broker.handleObjectIndication(buffer, sequence, false, true);
+ broker.console.impl->eventObjectUpdate(object, false, true);
+ }
+ else if (opcode == Protocol::OP_OBJECT_INDICATION) {
+ object = broker.handleObjectIndication(buffer, sequence, true, true);
+ broker.console.impl->eventObjectUpdate(object, true, true);
+ }
else {
QPID_LOG(trace, "StaticContext::handleMessage invalid opcode: " << opcode);
completeContext = true;
@@ -627,7 +671,7 @@ void QueryContext::release()
broker.eventQueue.push_back(broker.eventQueryComplete(userContext, queryResponse));
}
-bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
+bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& /*routingKey*/, Buffer& buffer)
{
bool completeContext = false;
ObjectPtr object;
@@ -635,6 +679,19 @@ bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buff
if (opcode == Protocol::OP_COMMAND_COMPLETE) {
broker.handleCommandComplete(buffer, sequence);
completeContext = true;
+
+ //
+ // Visit each agent and remove the sequence from that agent's in-flight list.
+ // This could be made more efficient because only one agent will have this sequence
+ // in its list.
+ //
+ map<uint32_t, AgentProxyPtr> copy;
+ {
+ Mutex::ScopedLock _block(broker.lock);
+ copy = broker.agentList;
+ }
+ for (map<uint32_t, AgentProxyPtr>::iterator iter = copy.begin(); iter != copy.end(); iter++)
+ iter->second->impl->delSequence(sequence);
}
else if (opcode == Protocol::OP_OBJECT_INDICATION) {
object = broker.handleObjectIndication(buffer, sequence, true, true);
@@ -655,7 +712,7 @@ void MethodContext::release()
broker.eventQueue.push_back(broker.eventMethodResponse(userContext, methodResponse));
}
-bool MethodContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
+bool MethodContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& /*routingKey*/, Buffer& buffer)
{
if (opcode == Protocol::OP_METHOD_RESPONSE)
methodResponse = broker.handleMethodResponse(buffer, sequence, schema);
diff --git a/cpp/src/qmf/engine/BrokerProxyImpl.h b/cpp/src/qmf/engine/BrokerProxyImpl.h
index 660cb86c61..738424bce1 100644
--- a/cpp/src/qmf/engine/BrokerProxyImpl.h
+++ b/cpp/src/qmf/engine/BrokerProxyImpl.h
@@ -36,6 +36,7 @@
#include <string>
#include <deque>
#include <map>
+#include <set>
#include <vector>
namespace qmf {
@@ -98,6 +99,7 @@ namespace engine {
BrokerProxy& broker;
uint32_t agentBank;
std::string label;
+ std::set<uint32_t> inFlightSequences;
AgentProxyImpl(Console& c, BrokerProxy& b, uint32_t ab, const std::string& l) : console(c), broker(b), agentBank(ab), label(l) {}
static AgentProxy* factory(Console& c, BrokerProxy& b, uint32_t ab, const std::string& l) {
@@ -106,6 +108,13 @@ namespace engine {
}
~AgentProxyImpl() {}
const std::string& getLabel() const { return label; }
+ void addSequence(uint32_t seq) { inFlightSequences.insert(seq); }
+ void delSequence(uint32_t seq) { inFlightSequences.erase(seq); }
+ void releaseInFlight(SequenceManager& seqMgr) {
+ for (std::set<uint32_t>::iterator iter = inFlightSequences.begin(); iter != inFlightSequences.end(); iter++)
+ seqMgr.release(*iter);
+ inFlightSequences.clear();
+ }
};
class BrokerProxyImpl : public boost::noncopyable {
@@ -166,7 +175,7 @@ namespace engine {
void handleCommandComplete(qpid::framing::Buffer& inBuffer, uint32_t seq);
void handleClassIndication(qpid::framing::Buffer& inBuffer, uint32_t seq);
MethodResponsePtr handleMethodResponse(qpid::framing::Buffer& inBuffer, uint32_t seq, const SchemaMethod* schema);
- void handleHeartbeatIndication(qpid::framing::Buffer& inBuffer, uint32_t seq);
+ void handleHeartbeatIndication(qpid::framing::Buffer& inBuffer, uint32_t seq, const std::string& routingKey);
void handleEventIndication(qpid::framing::Buffer& inBuffer, uint32_t seq);
void handleSchemaResponse(qpid::framing::Buffer& inBuffer, uint32_t seq);
ObjectPtr handleObjectIndication(qpid::framing::Buffer& inBuffer, uint32_t seq, bool prop, bool stat);
@@ -186,7 +195,7 @@ namespace engine {
virtual ~StaticContext() {}
void reserve() {}
void release() { broker.staticRelease(); }
- bool handleMessage(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer);
+ bool handleMessage(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer);
BrokerProxyImpl& broker;
};
@@ -199,7 +208,7 @@ namespace engine {
virtual ~QueryContext() {}
void reserve();
void release();
- bool handleMessage(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer);
+ bool handleMessage(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer);
mutable qpid::sys::Mutex lock;
BrokerProxyImpl& broker;
@@ -213,7 +222,7 @@ namespace engine {
virtual ~MethodContext() {}
void reserve() {}
void release();
- bool handleMessage(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer);
+ bool handleMessage(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer);
BrokerProxyImpl& broker;
void* userContext;
diff --git a/cpp/src/qmf/engine/ConsoleImpl.cpp b/cpp/src/qmf/engine/ConsoleImpl.cpp
index c856f04c51..c2d1f51f2b 100644
--- a/cpp/src/qmf/engine/ConsoleImpl.cpp
+++ b/cpp/src/qmf/engine/ConsoleImpl.cpp
@@ -57,11 +57,13 @@ ConsoleEvent ConsoleEventImpl::copy()
::memset(&item, 0, sizeof(ConsoleEvent));
item.kind = kind;
item.agent = agent.get();
- item.classKey = classKey.get();
- item.object = object;
+ item.classKey = classKey;
+ item.object = object.get();
item.context = context;
item.event = event;
item.timestamp = timestamp;
+ item.hasProps = hasProps;
+ item.hasStats = hasStats;
STRING_REF(name);
@@ -274,9 +276,11 @@ void ConsoleImpl::endSync(SyncQuery& sync)
void ConsoleImpl::learnPackage(const string& packageName)
{
Mutex::ScopedLock _lock(lock);
- if (packages.find(packageName) == packages.end())
+ if (packages.find(packageName) == packages.end()) {
packages.insert(pair<string, pair<ObjectClassList, EventClassList> >
(packageName, pair<ObjectClassList, EventClassList>(ObjectClassList(), EventClassList())));
+ eventNewPackage(packageName);
+ }
}
void ConsoleImpl::learnClass(SchemaObjectClass* cls)
@@ -288,8 +292,10 @@ void ConsoleImpl::learnClass(SchemaObjectClass* cls)
return;
ObjectClassList& list = pIter->second.first;
- if (list.find(key) == list.end())
+ if (list.find(key) == list.end()) {
list[key] = cls;
+ eventNewClass(key);
+ }
}
void ConsoleImpl::learnClass(SchemaEventClass* cls)
@@ -301,8 +307,10 @@ void ConsoleImpl::learnClass(SchemaEventClass* cls)
return;
EventClassList& list = pIter->second.second;
- if (list.find(key) == list.end())
+ if (list.find(key) == list.end()) {
list[key] = cls;
+ eventNewClass(key);
+ }
}
bool ConsoleImpl::haveClass(const SchemaClassKey* key) const
@@ -333,6 +341,57 @@ SchemaObjectClass* ConsoleImpl::getSchema(const SchemaClassKey* key) const
return iter->second;
}
+void ConsoleImpl::eventAgentAdded(boost::shared_ptr<AgentProxy> agent)
+{
+ ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_ADDED));
+ event->agent = agent;
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(event);
+}
+
+void ConsoleImpl::eventAgentDeleted(boost::shared_ptr<AgentProxy> agent)
+{
+ ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_DELETED));
+ event->agent = agent;
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(event);
+}
+
+void ConsoleImpl::eventNewPackage(const string& packageName)
+{
+ ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::NEW_PACKAGE));
+ event->name = packageName;
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(event);
+}
+
+void ConsoleImpl::eventNewClass(const SchemaClassKey* key)
+{
+ ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::NEW_CLASS));
+ event->classKey = key;
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(event);
+}
+
+void ConsoleImpl::eventObjectUpdate(ObjectPtr object, bool prop, bool stat)
+{
+ ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::OBJECT_UPDATE));
+ event->object = object;
+ event->hasProps = prop;
+ event->hasStats = stat;
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(event);
+}
+
+void ConsoleImpl::eventAgentHeartbeat(boost::shared_ptr<AgentProxy> agent, uint64_t timestamp)
+{
+ ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_HEARTBEAT));
+ event->agent = agent;
+ event->timestamp = timestamp;
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(event);
+}
+
//==================================================================
// Wrappers
//==================================================================
diff --git a/cpp/src/qmf/engine/ConsoleImpl.h b/cpp/src/qmf/engine/ConsoleImpl.h
index 2c4ee48a02..8f99c5e6b9 100644
--- a/cpp/src/qmf/engine/ConsoleImpl.h
+++ b/cpp/src/qmf/engine/ConsoleImpl.h
@@ -56,14 +56,16 @@ namespace engine {
ConsoleEvent::EventKind kind;
boost::shared_ptr<AgentProxy> agent;
std::string name;
- boost::shared_ptr<SchemaClassKey> classKey;
- Object* object;
+ const SchemaClassKey* classKey;
+ boost::shared_ptr<Object> object;
void* context;
Event* event;
uint64_t timestamp;
+ bool hasProps;
+ bool hasStats;
ConsoleEventImpl(ConsoleEvent::EventKind k) :
- kind(k), object(0), context(0), event(0), timestamp(0) {}
+ kind(k), classKey(0), context(0), event(0), timestamp(0) {}
~ConsoleEventImpl() {}
ConsoleEvent copy();
};
@@ -101,6 +103,7 @@ namespace engine {
private:
friend class BrokerProxyImpl;
+ friend struct StaticContext;
const ConsoleSettings& settings;
mutable qpid::sys::Mutex lock;
std::deque<ConsoleEventImpl::Ptr> eventQueue;
@@ -127,6 +130,13 @@ namespace engine {
void learnClass(SchemaEventClass* cls);
bool haveClass(const SchemaClassKey* key) const;
SchemaObjectClass* getSchema(const SchemaClassKey* key) const;
+
+ void eventAgentAdded(boost::shared_ptr<AgentProxy> agent);
+ void eventAgentDeleted(boost::shared_ptr<AgentProxy> agent);
+ void eventNewPackage(const std::string& packageName);
+ void eventNewClass(const SchemaClassKey* key);
+ void eventObjectUpdate(ObjectPtr object, bool prop, bool stat);
+ void eventAgentHeartbeat(boost::shared_ptr<AgentProxy> agent, uint64_t timestamp);
};
}
}
diff --git a/cpp/src/qmf/engine/ObjectIdImpl.cpp b/cpp/src/qmf/engine/ObjectIdImpl.cpp
index 032bc557c0..5b925045bf 100644
--- a/cpp/src/qmf/engine/ObjectIdImpl.cpp
+++ b/cpp/src/qmf/engine/ObjectIdImpl.cpp
@@ -111,13 +111,14 @@ void ObjectIdImpl::fromString(const std::string& repr)
agent = 0;
}
-std::string ObjectIdImpl::asString() const
+const string& ObjectIdImpl::asString() const
{
stringstream val;
val << (int) getFlags() << "-" << getSequence() << "-" << getBrokerBank() << "-" <<
getAgentBank() << "-" << getObjectNum();
- return val.str();
+ repr = val.str();
+ return repr;
}
bool ObjectIdImpl::operator==(const ObjectIdImpl& other) const
@@ -154,6 +155,7 @@ uint64_t ObjectId::getObjectNum() const { return impl->getObjectNum(); }
uint32_t ObjectId::getObjectNumHi() const { return impl->getObjectNumHi(); }
uint32_t ObjectId::getObjectNumLo() const { return impl->getObjectNumLo(); }
bool ObjectId::isDurable() const { return impl->isDurable(); }
+const char* ObjectId::str() const { return impl->asString().c_str(); }
bool ObjectId::operator==(const ObjectId& other) const { return *impl == *other.impl; }
bool ObjectId::operator<(const ObjectId& other) const { return *impl < *other.impl; }
bool ObjectId::operator>(const ObjectId& other) const { return *impl > *other.impl; }
diff --git a/cpp/src/qmf/engine/ObjectIdImpl.h b/cpp/src/qmf/engine/ObjectIdImpl.h
index 44fa8adffc..d9871ac217 100644
--- a/cpp/src/qmf/engine/ObjectIdImpl.h
+++ b/cpp/src/qmf/engine/ObjectIdImpl.h
@@ -38,6 +38,7 @@ namespace engine {
AgentAttachment* agent;
uint64_t first;
uint64_t second;
+ mutable std::string repr;
ObjectIdImpl() : agent(0), first(0), second(0) {}
ObjectIdImpl(qpid::framing::Buffer& buffer);
@@ -49,7 +50,7 @@ namespace engine {
void decode(qpid::framing::Buffer& buffer);
void encode(qpid::framing::Buffer& buffer) const;
void fromString(const std::string& repr);
- std::string asString() const;
+ const std::string& asString() const;
uint8_t getFlags() const { return (first & 0xF000000000000000LL) >> 60; }
uint16_t getSequence() const { return (first & 0x0FFF000000000000LL) >> 48; }
uint32_t getBrokerBank() const { return (first & 0x0000FFFFF0000000LL) >> 28; }
diff --git a/cpp/src/qmf/engine/ResilientConnection.cpp b/cpp/src/qmf/engine/ResilientConnection.cpp
index 709cfd1236..9502130288 100644
--- a/cpp/src/qmf/engine/ResilientConnection.cpp
+++ b/cpp/src/qmf/engine/ResilientConnection.cpp
@@ -171,15 +171,20 @@ void RCSession::received(client::Message& msg)
MessageImpl qmsg;
qmsg.body = msg.getData();
- qpid::framing::MessageProperties p = msg.getMessageProperties();
- if (p.hasReplyTo()) {
- const qpid::framing::ReplyTo& rt = p.getReplyTo();
+ qpid::framing::DeliveryProperties dp = msg.getDeliveryProperties();
+ if (dp.hasRoutingKey()) {
+ qmsg.routingKey = dp.getRoutingKey();
+ }
+
+ qpid::framing::MessageProperties mp = msg.getMessageProperties();
+ if (mp.hasReplyTo()) {
+ const qpid::framing::ReplyTo& rt = mp.getReplyTo();
qmsg.replyExchange = rt.getExchange();
qmsg.replyKey = rt.getRoutingKey();
}
- if (p.hasUserId()) {
- qmsg.userId = p.getUserId();
+ if (mp.hasUserId()) {
+ qmsg.userId = mp.getUserId();
}
connImpl.EnqueueEvent(ResilientConnectionEvent::RECV, userContext, qmsg);
diff --git a/cpp/src/qmf/engine/SchemaImpl.cpp b/cpp/src/qmf/engine/SchemaImpl.cpp
index fb09980680..e366a66826 100644
--- a/cpp/src/qmf/engine/SchemaImpl.cpp
+++ b/cpp/src/qmf/engine/SchemaImpl.cpp
@@ -326,12 +326,13 @@ bool SchemaClassKeyImpl::operator<(const SchemaClassKeyImpl& other) const
return hash < other.hash;
}
-string SchemaClassKeyImpl::str() const
+const string& SchemaClassKeyImpl::str() const
{
Uuid printableHash(hash.get());
stringstream str;
str << package << ":" << name << "(" << printableHash << ")";
- return str.str();
+ repr = str.str();
+ return repr;
}
SchemaObjectClassImpl::SchemaObjectClassImpl(Buffer& buffer) : hasHash(true), classKey(SchemaClassKeyImpl::factory(package, name, hash))
@@ -579,6 +580,7 @@ SchemaClassKey::~SchemaClassKey() { delete impl; }
const char* SchemaClassKey::getPackageName() const { return impl->getPackageName().c_str(); }
const char* SchemaClassKey::getClassName() const { return impl->getClassName().c_str(); }
const uint8_t* SchemaClassKey::getHash() const { return impl->getHash(); }
+const char* SchemaClassKey::asString() const { return impl->str().c_str(); }
bool SchemaClassKey::operator==(const SchemaClassKey& other) const { return *impl == *(other.impl); }
bool SchemaClassKey::operator<(const SchemaClassKey& other) const { return *impl < *(other.impl); }
diff --git a/cpp/src/qmf/engine/SchemaImpl.h b/cpp/src/qmf/engine/SchemaImpl.h
index 865556f076..af3a1d98e4 100644
--- a/cpp/src/qmf/engine/SchemaImpl.h
+++ b/cpp/src/qmf/engine/SchemaImpl.h
@@ -142,6 +142,7 @@ namespace engine {
const std::string& package;
const std::string& name;
const SchemaHash& hash;
+ mutable std::string repr;
// The *Container elements are only used if there isn't an external place to
// store these values.
@@ -161,7 +162,7 @@ namespace engine {
void encode(qpid::framing::Buffer& buffer) const;
bool operator==(const SchemaClassKeyImpl& other) const;
bool operator<(const SchemaClassKeyImpl& other) const;
- std::string str() const;
+ const std::string& str() const;
};
struct SchemaObjectClassImpl {
diff --git a/cpp/src/qmf/engine/SequenceManager.cpp b/cpp/src/qmf/engine/SequenceManager.cpp
index 3708105b46..4a4644a8b9 100644
--- a/cpp/src/qmf/engine/SequenceManager.cpp
+++ b/cpp/src/qmf/engine/SequenceManager.cpp
@@ -68,14 +68,14 @@ void SequenceManager::releaseAll()
contextMap.clear();
}
-void SequenceManager::dispatch(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer)
+void SequenceManager::dispatch(uint8_t opcode, uint32_t sequence, const string& routingKey, qpid::framing::Buffer& buffer)
{
Mutex::ScopedLock _lock(lock);
bool done;
if (sequence == 0) {
if (unsolicitedContext.get() != 0) {
- done = unsolicitedContext->handleMessage(opcode, sequence, buffer);
+ done = unsolicitedContext->handleMessage(opcode, sequence, routingKey, buffer);
if (done)
unsolicitedContext->release();
}
@@ -85,7 +85,7 @@ void SequenceManager::dispatch(uint8_t opcode, uint32_t sequence, qpid::framing:
map<uint32_t, SequenceContext::Ptr>::iterator iter = contextMap.find(sequence);
if (iter != contextMap.end()) {
if (iter->second != 0) {
- done = iter->second->handleMessage(opcode, sequence, buffer);
+ done = iter->second->handleMessage(opcode, sequence, routingKey, buffer);
if (done) {
iter->second->release();
contextMap.erase(iter);
diff --git a/cpp/src/qmf/engine/SequenceManager.h b/cpp/src/qmf/engine/SequenceManager.h
index 5f7db8bdb3..9e47e38610 100644
--- a/cpp/src/qmf/engine/SequenceManager.h
+++ b/cpp/src/qmf/engine/SequenceManager.h
@@ -40,7 +40,7 @@ namespace engine {
virtual ~SequenceContext() {}
virtual void reserve() = 0;
- virtual bool handleMessage(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer) = 0;
+ virtual bool handleMessage(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer) = 0;
virtual void release() = 0;
};
@@ -52,7 +52,7 @@ namespace engine {
uint32_t reserve(SequenceContext::Ptr ctx = SequenceContext::Ptr());
void release(uint32_t sequence);
void releaseAll();
- void dispatch(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer);
+ void dispatch(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer);
private:
mutable qpid::sys::Mutex lock;
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index a59d29c3cc..ed9b6653c3 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -65,7 +65,7 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
tagGenerator("sgen"),
dtxSelected(false),
authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()),
- userID(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@')))
+ userID(getSession().getConnection().getUserId())
{
acl = getSession().getBroker().getAcl();
}