summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2012-03-05 19:12:32 +0000
committerAndrew Stitcher <astitcher@apache.org>2012-03-05 19:12:32 +0000
commit91361618c09bc49e7fdf0a0909ee3d6df8e7d495 (patch)
tree52ab92ee925fa1ac2894f5ba09c5c28dc688fd01
parent08b6ab7e60a3c8cce985e9f5362a785b75e9ef43 (diff)
downloadqpid-python-91361618c09bc49e7fdf0a0909ee3d6df8e7d495.tar.gz
QPID-3883: Using application headers in messages causes a very large slowdown
Change Exchange route interface not to require a fieldtable - Exchanges that actually use the fieldtable for routing need to extract it directly from the message themselves. This avoids the need to extract the fieldtable from the message unnecessarily. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1297183 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/DirectExchange.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/DirectExchange.h4
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.h2
-rw-r--r--qpid/cpp/src/qpid/broker/FanOutExchange.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/FanOutExchange.h4
-rw-r--r--qpid/cpp/src/qpid/broker/HeadersExchange.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/HeadersExchange.h4
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/TopicExchange.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/TopicExchange.h4
-rw-r--r--qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp3
-rw-r--r--qpid/cpp/src/qpid/cluster/CredentialsExchange.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/FailoverExchange.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/FailoverExchange.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp4
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateDataExchange.h3
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp3
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h2
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp3
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h2
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp4
-rw-r--r--qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp8
-rw-r--r--qpid/cpp/src/qpid/management/ManagementDirectExchange.h4
-rw-r--r--qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp8
-rw-r--r--qpid/cpp/src/qpid/management/ManagementTopicExchange.h4
-rw-r--r--qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp2
-rw-r--r--qpid/cpp/src/qpid/replication/ReplicationExchange.cpp3
-rw-r--r--qpid/cpp/src/qpid/replication/ReplicationExchange.h2
-rw-r--r--qpid/cpp/src/qpid/xml/XmlExchange.cpp4
-rw-r--r--qpid/cpp/src/qpid/xml/XmlExchange.h2
-rw-r--r--qpid/cpp/src/tests/ExchangeTest.cpp46
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp52
34 files changed, 102 insertions, 104 deletions
diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.cpp b/qpid/cpp/src/qpid/broker/DirectExchange.cpp
index 5591539853..5d9aea7509 100644
--- a/qpid/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/DirectExchange.cpp
@@ -153,8 +153,9 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c
return true;
}
-void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
+void DirectExchange::route(Deliverable& msg)
{
+ const string& routingKey = msg.getMessage().getRoutingKey();
PreRoute pr(msg, this);
ConstBindingList b;
{
diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.h b/qpid/cpp/src/qpid/broker/DirectExchange.h
index a6f9cf91af..833be52c1c 100644
--- a/qpid/cpp/src/qpid/broker/DirectExchange.h
+++ b/qpid/cpp/src/qpid/broker/DirectExchange.h
@@ -57,9 +57,7 @@ public:
const std::string& routingKey,
const qpid::framing::FieldTable* args);
virtual bool unbind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
- QPID_BROKER_EXTERN virtual void route(Deliverable& msg,
- const std::string& routingKey,
- const qpid::framing::FieldTable* args);
+ QPID_BROKER_EXTERN virtual void route(Deliverable& msg);
QPID_BROKER_EXTERN virtual bool isBound(boost::shared_ptr<Queue> queue,
const std::string* const routingKey,
const qpid::framing::FieldTable* const args);
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp
index 16a6427a65..f311b79578 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.cpp
+++ b/qpid/cpp/src/qpid/broker/Exchange.cpp
@@ -159,7 +159,7 @@ void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
void Exchange::routeIVE(){
if (ive && lastMsg.get()){
DeliverableMessage dmsg(lastMsg);
- route(dmsg, lastMsg->getRoutingKey(), lastMsg->getApplicationHeaders());
+ route(dmsg);
}
}
@@ -402,9 +402,9 @@ void Exchange::setProperties(const boost::intrusive_ptr<Message>& msg) {
bool Exchange::routeWithAlternate(Deliverable& msg)
{
- route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders());
+ route(msg);
if (!msg.delivered && alternate) {
- alternate->route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders());
+ alternate->route(msg);
}
return msg.delivered;
}
diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h
index 9179dd5c7c..7376f814ed 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.h
+++ b/qpid/cpp/src/qpid/broker/Exchange.h
@@ -196,7 +196,7 @@ public:
virtual bool unbind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
virtual bool isBound(boost::shared_ptr<Queue> queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0;
QPID_BROKER_EXTERN virtual void setProperties(const boost::intrusive_ptr<Message>&);
- virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
+ virtual void route(Deliverable& msg) = 0;
//PersistableExchange:
QPID_BROKER_EXTERN void setPersistenceId(uint64_t id) const;
diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
index 5879fa0892..2bce99b6fe 100644
--- a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -101,7 +101,7 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, cons
return true;
}
-void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/)
+void FanOutExchange::route(Deliverable& msg)
{
PreRoute pr(msg, this);
doRoute(msg, bindings.snapshot());
diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.h b/qpid/cpp/src/qpid/broker/FanOutExchange.h
index 1a7d486796..c979fdca25 100644
--- a/qpid/cpp/src/qpid/broker/FanOutExchange.h
+++ b/qpid/cpp/src/qpid/broker/FanOutExchange.h
@@ -54,9 +54,7 @@ class FanOutExchange : public virtual Exchange {
virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
- QPID_BROKER_EXTERN virtual void route(Deliverable& msg,
- const std::string& routingKey,
- const qpid::framing::FieldTable* args);
+ QPID_BROKER_EXTERN virtual void route(Deliverable& msg);
QPID_BROKER_EXTERN virtual bool isBound(Queue::shared_ptr queue,
const std::string* const routingKey,
diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
index 142c23f276..6648ae0422 100644
--- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -191,8 +191,9 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey,
}
-void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args)
+void HeadersExchange::route(Deliverable& msg)
{
+ const FieldTable* args = msg.getMessage().getApplicationHeaders();
if (!args) {
//can't match if there were no headers passed in
if (mgmtExchange != 0) {
diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.h b/qpid/cpp/src/qpid/broker/HeadersExchange.h
index 3b939d6851..d10892b9cc 100644
--- a/qpid/cpp/src/qpid/broker/HeadersExchange.h
+++ b/qpid/cpp/src/qpid/broker/HeadersExchange.h
@@ -98,9 +98,7 @@ class HeadersExchange : public virtual Exchange {
virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
- QPID_BROKER_EXTERN virtual void route(Deliverable& msg,
- const std::string& routingKey,
- const qpid::framing::FieldTable* args);
+ QPID_BROKER_EXTERN virtual void route(Deliverable& msg);
QPID_BROKER_EXTERN virtual bool isBound(Queue::shared_ptr queue,
const std::string* const routingKey,
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 226ba77fb2..01ce017289 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -215,7 +215,7 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){
if (msg->isImmediate() && getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg);
- alternateExchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders());
+ alternateExchange->route(deliverable);
}
} else if (isLocal(msg)) {
//drop message
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index e7d2259c80..64924bdd4c 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -489,14 +489,14 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
exchangeName << " with routing-key " << msg->getRoutingKey()));
}
- cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
+ cacheExchange->route(strategy);
if (!strategy.delivered) {
//TODO:if discard-unroutable, just drop it
//TODO:else if accept-mode is explicit, reject it
//else route it to alternate exchange
if (cacheExchange->getAlternate()) {
- cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
+ cacheExchange->getAlternate()->route(strategy);
}
if (!strategy.delivered) {
msg->destroy();
diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp
index 644a3d628e..dd3ec13019 100644
--- a/qpid/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp
@@ -350,8 +350,9 @@ TopicExchange::BindingKey *TopicExchange::getQueueBinding(Queue::shared_ptr queu
return (q != qv.end()) ? bk : 0;
}
-void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
+void TopicExchange::route(Deliverable& msg)
{
+ const string& routingKey = msg.getMessage().getRoutingKey();
// Note: PERFORMANCE CRITICAL!!!
BindingList b;
std::map<std::string, BindingList>::iterator it;
diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.h b/qpid/cpp/src/qpid/broker/TopicExchange.h
index 636918f8a1..cc24e1411e 100644
--- a/qpid/cpp/src/qpid/broker/TopicExchange.h
+++ b/qpid/cpp/src/qpid/broker/TopicExchange.h
@@ -185,9 +185,7 @@ class TopicExchange : public virtual Exchange {
virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
- QPID_BROKER_EXTERN virtual void route(Deliverable& msg,
- const std::string& routingKey,
- const qpid::framing::FieldTable* args);
+ QPID_BROKER_EXTERN virtual void route(Deliverable& msg);
QPID_BROKER_EXTERN virtual bool isBound(Queue::shared_ptr queue,
const std::string* const routingKey,
diff --git a/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp b/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp
index 0fafc521cd..416a3636e9 100644
--- a/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp
+++ b/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp
@@ -62,7 +62,8 @@ bool CredentialsExchange::check(MemberId member) {
return valid;
}
-void CredentialsExchange::route(broker::Deliverable& msg, const string& /*routingKey*/, const framing::FieldTable* args) {
+void CredentialsExchange::route(broker::Deliverable& msg) {
+ const framing::FieldTable* args = msg.getMessage().getApplicationHeaders();
sys::Mutex::ScopedLock l(lock);
const broker::ConnectionState* connection =
static_cast<const broker::ConnectionState*>(msg.getMessage().getPublisher());
diff --git a/qpid/cpp/src/qpid/cluster/CredentialsExchange.h b/qpid/cpp/src/qpid/cluster/CredentialsExchange.h
index 90fd188271..74cf8350a6 100644
--- a/qpid/cpp/src/qpid/cluster/CredentialsExchange.h
+++ b/qpid/cpp/src/qpid/cluster/CredentialsExchange.h
@@ -50,7 +50,7 @@ class CredentialsExchange : public broker::Exchange
bool check(MemberId member);
/** Throw an exception if the calling connection is not the cluster user. Store credentials in msg. */
- void route(broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args);
+ void route(broker::Deliverable& msg);
// Exchange overrides
std::string getType() const;
diff --git a/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp b/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp
index 43ec27cf2c..87202a887c 100644
--- a/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp
+++ b/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp
@@ -80,7 +80,7 @@ bool FailoverExchange::isBound(Queue::shared_ptr queue, const string* const, con
return queues.find(queue) != queues.end();
}
-void FailoverExchange::route(Deliverable&, const string& , const framing::FieldTable* ) {
+void FailoverExchange::route(Deliverable&) {
QPID_LOG(warning, "Message received by exchange " << typeName << " ignoring");
}
diff --git a/qpid/cpp/src/qpid/cluster/FailoverExchange.h b/qpid/cpp/src/qpid/cluster/FailoverExchange.h
index c3e50c6929..5ac734a7ac 100644
--- a/qpid/cpp/src/qpid/cluster/FailoverExchange.h
+++ b/qpid/cpp/src/qpid/cluster/FailoverExchange.h
@@ -54,7 +54,7 @@ class FailoverExchange : public broker::Exchange
bool bind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const framing::FieldTable* args);
bool unbind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const framing::FieldTable* args);
bool isBound(boost::shared_ptr<broker::Queue> queue, const std::string* const routingKey, const framing::FieldTable* const args);
- void route(broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args);
+ void route(broker::Deliverable& msg);
private:
void sendUpdate(const boost::shared_ptr<broker::Queue>&);
diff --git a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp
index e5cd82e3d3..31d96c67ca 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp
@@ -40,9 +40,9 @@ UpdateDataExchange::UpdateDataExchange(Cluster& cluster) :
Exchange(EXCHANGE_NAME, &cluster)
{}
-void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& routingKey,
- const qpid::framing::FieldTable* )
+void UpdateDataExchange::route(broker::Deliverable& msg)
{
+ const std::string& routingKey = msg.getMessage().getRoutingKey();
std::string data = msg.getMessage().getFrames().getContent();
if (routingKey == MANAGEMENT_AGENTS_KEY) managementAgents = data;
else if (routingKey == MANAGEMENT_SCHEMAS_KEY) managementSchemas = data;
diff --git a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h
index d2f6c35ad0..f79430f111 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h
+++ b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h
@@ -50,8 +50,7 @@ class UpdateDataExchange : public broker::Exchange
UpdateDataExchange(Cluster& parent);
- void route(broker::Deliverable& msg, const std::string& routingKey,
- const framing::FieldTable* args);
+ void route(broker::Deliverable& msg);
// Not implemented
std::string getType() const { return EXCHANGE_TYPE; }
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 75e4ed893d..85b97e7e3e 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -226,7 +226,8 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH
}
// FIXME aconway 2011-12-02: error handling in route.
-void BrokerReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) {
+void BrokerReplicator::route(Deliverable& msg) {
+ const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders();
Variant::List list;
try {
if (!isQMFv2(msg.getMessage()) || !headers)
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index cfb6cf9a28..483c251126 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -58,7 +58,7 @@ class BrokerReplicator : public broker::Exchange
// Exchange methods
bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
- void route(broker::Deliverable&, const std::string&, const framing::FieldTable*);
+ void route(broker::Deliverable&);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
private:
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 7351a8d74d..6aff4879e3 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -138,8 +138,9 @@ void QueueReplicator::dequeue(SequenceNumber n, const sys::Mutex::ScopedLock&)
}
// Called in connection thread of the queues bridge to primary.
-void QueueReplicator::route(Deliverable& msg, const std::string& key, const FieldTable*)
+void QueueReplicator::route(Deliverable& msg)
{
+ const std::string& key = msg.getMessage().getRoutingKey();
sys::Mutex::ScopedLock l(lock);
if (key == DEQUEUE_EVENT_KEY) {
SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage());
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index 9de7dd480c..a1ebbd788a 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -66,7 +66,7 @@ class QueueReplicator : public broker::Exchange,
bool bind(boost::shared_ptr<broker::Queue
>, const std::string&, const framing::FieldTable*);
bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
- void route(broker::Deliverable&, const std::string&, const framing::FieldTable*);
+ void route(broker::Deliverable&);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
private:
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 372393886d..8c2cb95faa 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -564,7 +564,7 @@ void ManagementAgent::sendBufferLH(Buffer& buf,
DeliverableMessage deliverable (msg);
try {
- exchange->route(deliverable, routingKey, 0);
+ exchange->route(deliverable);
} catch(exception&) {}
}
buf.reset();
@@ -641,7 +641,7 @@ void ManagementAgent::sendBufferLH(const string& data,
DeliverableMessage deliverable (msg);
try {
- exchange->route(deliverable, routingKey, 0);
+ exchange->route(deliverable);
} catch(exception&) {}
}
}
diff --git a/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp b/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp
index 1d5f8bbd6b..312eacf462 100644
--- a/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp
@@ -40,17 +40,17 @@ ManagementDirectExchange::ManagementDirectExchange(const std::string& _name,
DirectExchange(_name, _durable, _args, _parent, b),
managementAgent(0) {}
-void ManagementDirectExchange::route(Deliverable& msg,
- const string& routingKey,
- const FieldTable* args)
+void ManagementDirectExchange::route(Deliverable& msg)
{
bool routeIt = true;
+ const string& routingKey = msg.getMessage().getRoutingKey();
+ const FieldTable* args = msg.getMessage().getApplicationHeaders();
if (managementAgent)
routeIt = managementAgent->dispatchCommand(msg, routingKey, args, false, qmfVersion);
if (routeIt)
- DirectExchange::route(msg, routingKey, args);
+ DirectExchange::route(msg);
}
void ManagementDirectExchange::setManagmentAgent(ManagementAgent* agent, int qv)
diff --git a/qpid/cpp/src/qpid/management/ManagementDirectExchange.h b/qpid/cpp/src/qpid/management/ManagementDirectExchange.h
index 7507179c06..582354d723 100644
--- a/qpid/cpp/src/qpid/management/ManagementDirectExchange.h
+++ b/qpid/cpp/src/qpid/management/ManagementDirectExchange.h
@@ -43,9 +43,7 @@ class ManagementDirectExchange : public virtual DirectExchange
virtual std::string getType() const { return typeName; }
- virtual void route(Deliverable& msg,
- const std::string& routingKey,
- const qpid::framing::FieldTable* args);
+ virtual void route(Deliverable& msg);
void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion);
diff --git a/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp b/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp
index ee8657646f..587cc660df 100644
--- a/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp
@@ -39,18 +39,18 @@ ManagementTopicExchange::ManagementTopicExchange(const std::string& _name,
TopicExchange(_name, _durable, _args, _parent, b),
managementAgent(0) {}
-void ManagementTopicExchange::route(Deliverable& msg,
- const string& routingKey,
- const FieldTable* args)
+void ManagementTopicExchange::route(Deliverable& msg)
{
bool routeIt = true;
+ const string& routingKey = msg.getMessage().getRoutingKey();
+ const FieldTable* args = msg.getMessage().getApplicationHeaders();
// Intercept management agent commands
if (managementAgent)
routeIt = managementAgent->dispatchCommand(msg, routingKey, args, true, qmfVersion);
if (routeIt)
- TopicExchange::route(msg, routingKey, args);
+ TopicExchange::route(msg);
}
bool ManagementTopicExchange::bind(Queue::shared_ptr queue,
diff --git a/qpid/cpp/src/qpid/management/ManagementTopicExchange.h b/qpid/cpp/src/qpid/management/ManagementTopicExchange.h
index 232300265e..eff01a8552 100644
--- a/qpid/cpp/src/qpid/management/ManagementTopicExchange.h
+++ b/qpid/cpp/src/qpid/management/ManagementTopicExchange.h
@@ -43,9 +43,7 @@ class ManagementTopicExchange : public virtual TopicExchange
virtual std::string getType() const { return typeName; }
- virtual void route(Deliverable& msg,
- const std::string& routingKey,
- const qpid::framing::FieldTable* args);
+ virtual void route(Deliverable& msg);
virtual bool bind(Queue::shared_ptr queue,
const std::string& routingKey,
diff --git a/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp b/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
index 0ced4d9161..3d84a1ce3c 100644
--- a/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
+++ b/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
@@ -80,7 +80,7 @@ void ReplicatingEventListener::route(boost::intrusive_ptr<qpid::broker::Message>
try {
if (exchange) {
DeliverableMessage deliverable(msg);
- exchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders());
+ exchange->route(deliverable);
} else if (queue) {
queue->deliver(msg);
} else {
diff --git a/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp b/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp
index 89a2bf516d..dce0d750a4 100644
--- a/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp
+++ b/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp
@@ -50,8 +50,9 @@ ReplicationExchange::ReplicationExchange(const std::string& name, bool durable,
std::string ReplicationExchange::getType() const { return typeName; }
-void ReplicationExchange::route(Deliverable& msg, const std::string& /*routingKey*/, const FieldTable* args)
+void ReplicationExchange::route(Deliverable& msg)
{
+ const FieldTable* args = msg.getMessage().getApplicationHeaders();
if (mgmtExchange != 0) {
mgmtExchange->inc_msgReceives();
mgmtExchange->inc_byteReceives(msg.contentSize());
diff --git a/qpid/cpp/src/qpid/replication/ReplicationExchange.h b/qpid/cpp/src/qpid/replication/ReplicationExchange.h
index 4b34e0df13..ff0a98c48e 100644
--- a/qpid/cpp/src/qpid/replication/ReplicationExchange.h
+++ b/qpid/cpp/src/qpid/replication/ReplicationExchange.h
@@ -52,7 +52,7 @@ class ReplicationExchange : public qpid::broker::Exchange
std::string getType() const;
- void route(qpid::broker::Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ void route(qpid::broker::Deliverable& msg);
bool bind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
bool unbind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
diff --git a/qpid/cpp/src/qpid/xml/XmlExchange.cpp b/qpid/cpp/src/qpid/xml/XmlExchange.cpp
index b7ff5d211d..01770e22a6 100644
--- a/qpid/cpp/src/qpid/xml/XmlExchange.cpp
+++ b/qpid/cpp/src/qpid/xml/XmlExchange.cpp
@@ -283,8 +283,10 @@ bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::F
// But for very large messages, if all these queries are on the first part of the data,
// it could still be a big win.
-void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* args)
+void XmlExchange::route(Deliverable& msg)
{
+ const string& routingKey = msg.getMessage().getRoutingKey();
+ const FieldTable* args = msg.getMessage().getApplicationHeaders();
PreRoute pr(msg, this);
try {
XmlBinding::vector::ConstPtr p;
diff --git a/qpid/cpp/src/qpid/xml/XmlExchange.h b/qpid/cpp/src/qpid/xml/XmlExchange.h
index 958bad4931..9ef389d9bf 100644
--- a/qpid/cpp/src/qpid/xml/XmlExchange.h
+++ b/qpid/cpp/src/qpid/xml/XmlExchange.h
@@ -82,7 +82,7 @@ class XmlExchange : public virtual Exchange {
virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
- virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual void route(Deliverable& msg);
virtual bool isBound(Queue::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args);
diff --git a/qpid/cpp/src/tests/ExchangeTest.cpp b/qpid/cpp/src/tests/ExchangeTest.cpp
index fe72f42a46..2fb284741a 100644
--- a/qpid/cpp/src/tests/ExchangeTest.cpp
+++ b/qpid/cpp/src/tests/ExchangeTest.cpp
@@ -60,10 +60,10 @@ QPID_AUTO_TEST_CASE(testMe)
queue.reset();
queue2.reset();
- intrusive_ptr<Message> msgPtr(MessageUtils::createMessage("exchange", "key", false, "id"));
+ intrusive_ptr<Message> msgPtr(MessageUtils::createMessage("exchange", "abc", false, "id"));
DeliverableMessage msg(msgPtr);
- topic.route(msg, "abc", 0);
- direct.route(msg, "abc", 0);
+ topic.route(msg);
+ direct.route(msg);
}
@@ -187,17 +187,17 @@ QPID_AUTO_TEST_CASE(testSequenceOptions)
{
DirectExchange direct("direct1", false, args);
- intrusive_ptr<Message> msg1 = cmessage("e", "A");
- intrusive_ptr<Message> msg2 = cmessage("e", "B");
- intrusive_ptr<Message> msg3 = cmessage("e", "C");
+ intrusive_ptr<Message> msg1 = cmessage("e", "abc");
+ intrusive_ptr<Message> msg2 = cmessage("e", "abc");
+ intrusive_ptr<Message> msg3 = cmessage("e", "abc");
DeliverableMessage dmsg1(msg1);
DeliverableMessage dmsg2(msg2);
DeliverableMessage dmsg3(msg3);
- direct.route(dmsg1, "abc", 0);
- direct.route(dmsg2, "abc", 0);
- direct.route(dmsg3, "abc", 0);
+ direct.route(dmsg1);
+ direct.route(dmsg2);
+ direct.route(dmsg3);
BOOST_CHECK_EQUAL(1, msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
BOOST_CHECK_EQUAL(2, msg2->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
@@ -208,22 +208,24 @@ QPID_AUTO_TEST_CASE(testSequenceOptions)
TopicExchange topic ("topic1", false, args);
// check other exchanges, that they preroute
- intrusive_ptr<Message> msg4 = cmessage("e", "A");
- intrusive_ptr<Message> msg5 = cmessage("e", "B");
- intrusive_ptr<Message> msg6 = cmessage("e", "C");
+ intrusive_ptr<Message> msg4 = cmessage("e", "abc");
+ intrusive_ptr<Message> msg5 = cmessage("e", "abc");
+
+ // Need at least empty header for the HeadersExchange to route at all
+ msg5->insertCustomProperty("", "");
+ intrusive_ptr<Message> msg6 = cmessage("e", "abc");
DeliverableMessage dmsg4(msg4);
DeliverableMessage dmsg5(msg5);
DeliverableMessage dmsg6(msg6);
- fanout.route(dmsg4, "abc", 0);
+ fanout.route(dmsg4);
BOOST_CHECK_EQUAL(1, msg4->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
- FieldTable headers;
- header.route(dmsg5, "abc", &headers);
+ header.route(dmsg5);
BOOST_CHECK_EQUAL(1, msg5->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
- topic.route(dmsg6, "abc", 0);
+ topic.route(dmsg6);
BOOST_CHECK_EQUAL(1, msg6->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
direct.encode(buffer);
}
@@ -233,9 +235,9 @@ QPID_AUTO_TEST_CASE(testSequenceOptions)
buffer.reset();
DirectExchange::shared_ptr exch_dec = Exchange::decode(exchanges, buffer);
- intrusive_ptr<Message> msg1 = cmessage("e", "A");
+ intrusive_ptr<Message> msg1 = cmessage("e", "abc");
DeliverableMessage dmsg1(msg1);
- exch_dec->route(dmsg1, "abc", 0);
+ exch_dec->route(dmsg1);
BOOST_CHECK_EQUAL(4, msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
@@ -260,10 +262,10 @@ QPID_AUTO_TEST_CASE(testIVEOption)
args2.setString("x-match", "any");
args2.setString("a", "abc");
- direct.route(dmsg1, "abc", 0);
- fanout.route(dmsg1, "abc", 0);
- header.route(dmsg1, "abc", &args2);
- topic.route(dmsg1, "abc", 0);
+ direct.route(dmsg1);
+ fanout.route(dmsg1);
+ header.route(dmsg1);
+ topic.route(dmsg1);
Queue::shared_ptr queue(new Queue("queue", true));
Queue::shared_ptr queue1(new Queue("queue1", true));
Queue::shared_ptr queue2(new Queue("queue2", true));
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index bb4f7b9f4b..0058aa5133 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -254,8 +254,8 @@ QPID_AUTO_TEST_CASE(testBound){
//ensure the remaining exchanges don't still have the queue bound to them:
FailOnDeliver deliverable;
- exchange1->route(deliverable, key, &args);
- exchange3->route(deliverable, key, &args);
+ exchange1->route(deliverable);
+ exchange3->route(deliverable);
}
QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){
@@ -1151,7 +1151,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
intrusive_ptr<Message> msg01 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
DeliverableMessage dmsg01(msg01);
- sbtFanout1.route(dmsg01, "", 0); // Brings queue 1 to capacity limit
+ sbtFanout1.route(dmsg01); // Brings queue 1 to capacity limit
msg01->tryReleaseContent();
BOOST_CHECK_EQUAL(msg01->isContentReleased(), false);
BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
@@ -1160,7 +1160,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
DeliverableMessage dmsg02(msg02);
{
ScopedSuppressLogging sl; // suppress expected error messages.
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg02, "", 0), ResourceLimitExceededException);
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg02), ResourceLimitExceededException);
}
msg02->tryReleaseContent();
BOOST_CHECK_EQUAL(msg02->isContentReleased(), false);
@@ -1170,7 +1170,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
DeliverableMessage dmsg03(msg03);
{
ScopedSuppressLogging sl; // suppress expected error messages.
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg03, "", 0), ResourceLimitExceededException);
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg03), ResourceLimitExceededException);
}
msg03->tryReleaseContent();
BOOST_CHECK_EQUAL(msg03->isContentReleased(), false);
@@ -1180,7 +1180,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
DeliverableMessage dmsg04(msg04);
{
ScopedSuppressLogging sl; // suppress expected error messages.
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg04, "", 0), ResourceLimitExceededException);
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg04), ResourceLimitExceededException);
}
msg04->tryReleaseContent();
BOOST_CHECK_EQUAL(msg04->isContentReleased(), false);
@@ -1190,7 +1190,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
DeliverableMessage dmsg05(msg05);
{
ScopedSuppressLogging sl; // suppress expected error messages.
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg05, "", 0), ResourceLimitExceededException);
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg05), ResourceLimitExceededException);
}
msg05->tryReleaseContent();
BOOST_CHECK_EQUAL(msg05->isContentReleased(), false);
@@ -1205,35 +1205,35 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
intrusive_ptr<Message> msg06 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
DeliverableMessage dmsg06(msg06);
- sbdFanout2.route(dmsg06, "", 0); // Brings queue 2 to capacity limit
+ sbdFanout2.route(dmsg06); // Brings queue 2 to capacity limit
msg06->tryReleaseContent();
BOOST_CHECK_EQUAL(msg06->isContentReleased(), false);
BOOST_CHECK_EQUAL(1u, dq2->getMessageCount());
intrusive_ptr<Message> msg07 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
DeliverableMessage dmsg07(msg07);
- sbdFanout2.route(dmsg07, "", 0);
+ sbdFanout2.route(dmsg07);
msg07->tryReleaseContent();
BOOST_CHECK_EQUAL(msg07->isContentReleased(), true);
BOOST_CHECK_EQUAL(2u, dq2->getMessageCount());
intrusive_ptr<Message> msg08 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
DeliverableMessage dmsg08(msg08);
- sbdFanout2.route(dmsg08, "", 0);
+ sbdFanout2.route(dmsg08);
msg08->tryReleaseContent();
BOOST_CHECK_EQUAL(msg08->isContentReleased(), true);
BOOST_CHECK_EQUAL(3u, dq2->getMessageCount());
intrusive_ptr<Message> msg09 = mkMsg(testStore); // transient no content
DeliverableMessage dmsg09(msg09);
- sbdFanout2.route(dmsg09, "", 0);
+ sbdFanout2.route(dmsg09);
msg09->tryReleaseContent();
BOOST_CHECK_EQUAL(msg09->isContentReleased(), true);
BOOST_CHECK_EQUAL(4u, dq2->getMessageCount());
intrusive_ptr<Message> msg10 = mkMsg(testStore, "", true); // durable no content
DeliverableMessage dmsg10(msg10);
- sbdFanout2.route(dmsg10, "", 0);
+ sbdFanout2.route(dmsg10);
msg10->tryReleaseContent();
BOOST_CHECK_EQUAL(msg10->isContentReleased(), true);
BOOST_CHECK_EQUAL(5u, dq2->getMessageCount());
@@ -1253,7 +1253,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
intrusive_ptr<Message> msg11 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
DeliverableMessage dmsg11(msg11);
- mbdFanout3.route(dmsg11, "", 0); // Brings queues 3 and 4 to capacity limit
+ mbdFanout3.route(dmsg11); // Brings queues 3 and 4 to capacity limit
msg11->tryReleaseContent();
BOOST_CHECK_EQUAL(msg11->isContentReleased(), false);
BOOST_CHECK_EQUAL(1u, dq3->getMessageCount());
@@ -1262,7 +1262,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
intrusive_ptr<Message> msg12 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
DeliverableMessage dmsg12(msg12);
- mbdFanout3.route(dmsg12, "", 0);
+ mbdFanout3.route(dmsg12);
msg12->tryReleaseContent();
BOOST_CHECK_EQUAL(msg12->isContentReleased(), false); // XXXX - consequence of transient msg multi-queue ftd policy-handling limitations, fix in broker at some point!
BOOST_CHECK_EQUAL(2u, dq3->getMessageCount());
@@ -1271,7 +1271,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
intrusive_ptr<Message> msg13 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
DeliverableMessage dmsg13(msg13);
- mbdFanout3.route(dmsg13, "", 0);
+ mbdFanout3.route(dmsg13);
msg13->tryReleaseContent();
BOOST_CHECK_EQUAL(msg13->isContentReleased(), true);
BOOST_CHECK_EQUAL(3u, dq3->getMessageCount());
@@ -1280,7 +1280,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
intrusive_ptr<Message> msg14 = mkMsg(testStore); // transient no content
DeliverableMessage dmsg14(msg14);
- mbdFanout3.route(dmsg14, "", 0);
+ mbdFanout3.route(dmsg14);
msg14->tryReleaseContent();
BOOST_CHECK_EQUAL(msg14->isContentReleased(), false); // XXXX - consequence of transient msg multi-queue ftd policy-handling limitations, fix in broker at some point!
BOOST_CHECK_EQUAL(4u, dq3->getMessageCount());
@@ -1289,7 +1289,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
intrusive_ptr<Message> msg15 = mkMsg(testStore, "", true); // durable no content
DeliverableMessage dmsg15(msg15);
- mbdFanout3.route(dmsg15, "", 0);
+ mbdFanout3.route(dmsg15);
msg15->tryReleaseContent();
BOOST_CHECK_EQUAL(msg15->isContentReleased(), true);
BOOST_CHECK_EQUAL(5u, dq3->getMessageCount());
@@ -1307,7 +1307,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
intrusive_ptr<Message> msg16 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
DeliverableMessage dmsg16(msg16);
- mbdFanout3.route(dmsg16, "", 0);
+ mbdFanout3.route(dmsg16);
msg16->tryReleaseContent();
BOOST_CHECK_EQUAL(msg16->isContentReleased(), false);
BOOST_CHECK_EQUAL(6u, dq3->getMessageCount());
@@ -1316,7 +1316,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
intrusive_ptr<Message> msg17 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
DeliverableMessage dmsg17(msg17);
- mbdFanout3.route(dmsg17, "", 0);
+ mbdFanout3.route(dmsg17);
msg17->tryReleaseContent();
BOOST_CHECK_EQUAL(msg17->isContentReleased(), false);
BOOST_CHECK_EQUAL(7u, dq3->getMessageCount());
@@ -1325,7 +1325,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
intrusive_ptr<Message> msg18 = mkMsg(testStore); // transient no content
DeliverableMessage dmsg18(msg18);
- mbdFanout3.route(dmsg18, "", 0);
+ mbdFanout3.route(dmsg18);
msg18->tryReleaseContent();
BOOST_CHECK_EQUAL(msg18->isContentReleased(), false);
BOOST_CHECK_EQUAL(8u, dq3->getMessageCount());
@@ -1334,7 +1334,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
intrusive_ptr<Message> msg19 = mkMsg(testStore, "", true); // durable no content
DeliverableMessage dmsg19(msg19);
- mbdFanout3.route(dmsg19, "", 0);
+ mbdFanout3.route(dmsg19);
msg19->tryReleaseContent();
BOOST_CHECK_EQUAL(msg19->isContentReleased(), false);
BOOST_CHECK_EQUAL(9u, dq3->getMessageCount());
@@ -1357,7 +1357,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
intrusive_ptr<Message> msg20 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
DeliverableMessage dmsg20(msg20);
- mbmFanout4.route(dmsg20, "", 0); // Brings queue 7 to capacity limit
+ mbmFanout4.route(dmsg20); // Brings queue 7 to capacity limit
msg20->tryReleaseContent();
BOOST_CHECK_EQUAL(msg20->isContentReleased(), false);
BOOST_CHECK_EQUAL(1u, dq7->getMessageCount());
@@ -1366,7 +1366,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
intrusive_ptr<Message> msg21 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
DeliverableMessage dmsg21(msg21);
- mbmFanout4.route(dmsg21, "", 0);
+ mbmFanout4.route(dmsg21);
msg21->tryReleaseContent();
BOOST_CHECK_EQUAL(msg21->isContentReleased(), false);
BOOST_CHECK_EQUAL(2u, dq7->getMessageCount()); // over limit
@@ -1375,7 +1375,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
intrusive_ptr<Message> msg22 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
DeliverableMessage dmsg22(msg22);
- mbmFanout4.route(dmsg22, "", 0);
+ mbmFanout4.route(dmsg22);
msg22->tryReleaseContent();
BOOST_CHECK_EQUAL(msg22->isContentReleased(), false);
BOOST_CHECK_EQUAL(3u, dq7->getMessageCount()); // over limit
@@ -1384,7 +1384,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
intrusive_ptr<Message> msg23 = mkMsg(testStore); // transient no content
DeliverableMessage dmsg23(msg23);
- mbmFanout4.route(dmsg23, "", 0);
+ mbmFanout4.route(dmsg23);
msg23->tryReleaseContent();
BOOST_CHECK_EQUAL(msg23->isContentReleased(), false);
BOOST_CHECK_EQUAL(4u, dq7->getMessageCount()); // over limit
@@ -1393,7 +1393,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
intrusive_ptr<Message> msg24 = mkMsg(testStore, "", true); // durable no content
DeliverableMessage dmsg24(msg24);
- mbmFanout4.route(dmsg24, "", 0);
+ mbmFanout4.route(dmsg24);
msg24->tryReleaseContent();
BOOST_CHECK_EQUAL(msg24->isContentReleased(), false);
BOOST_CHECK_EQUAL(5u, dq7->getMessageCount()); // over limit