diff options
author | Andrew Stitcher <astitcher@apache.org> | 2012-03-05 19:12:32 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2012-03-05 19:12:32 +0000 |
commit | 91361618c09bc49e7fdf0a0909ee3d6df8e7d495 (patch) | |
tree | 52ab92ee925fa1ac2894f5ba09c5c28dc688fd01 | |
parent | 08b6ab7e60a3c8cce985e9f5362a785b75e9ef43 (diff) | |
download | qpid-python-91361618c09bc49e7fdf0a0909ee3d6df8e7d495.tar.gz |
QPID-3883: Using application headers in messages causes a very large slowdown
Change Exchange route interface not to require a fieldtable
- Exchanges that actually use the fieldtable for routing
need to extract it directly from the message themselves.
This avoids the need to extract the fieldtable from the message
unnecessarily.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1297183 13f79535-47bb-0310-9956-ffa450edef68
34 files changed, 102 insertions, 104 deletions
diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.cpp b/qpid/cpp/src/qpid/broker/DirectExchange.cpp index 5591539853..5d9aea7509 100644 --- a/qpid/cpp/src/qpid/broker/DirectExchange.cpp +++ b/qpid/cpp/src/qpid/broker/DirectExchange.cpp @@ -153,8 +153,9 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c return true; } -void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/) +void DirectExchange::route(Deliverable& msg) { + const string& routingKey = msg.getMessage().getRoutingKey(); PreRoute pr(msg, this); ConstBindingList b; { diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.h b/qpid/cpp/src/qpid/broker/DirectExchange.h index a6f9cf91af..833be52c1c 100644 --- a/qpid/cpp/src/qpid/broker/DirectExchange.h +++ b/qpid/cpp/src/qpid/broker/DirectExchange.h @@ -57,9 +57,7 @@ public: const std::string& routingKey, const qpid::framing::FieldTable* args); virtual bool unbind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args); - QPID_BROKER_EXTERN virtual void route(Deliverable& msg, - const std::string& routingKey, - const qpid::framing::FieldTable* args); + QPID_BROKER_EXTERN virtual void route(Deliverable& msg); QPID_BROKER_EXTERN virtual bool isBound(boost::shared_ptr<Queue> queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args); diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index 16a6427a65..f311b79578 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -159,7 +159,7 @@ void Exchange::doRoute(Deliverable& msg, ConstBindingList b) void Exchange::routeIVE(){ if (ive && lastMsg.get()){ DeliverableMessage dmsg(lastMsg); - route(dmsg, lastMsg->getRoutingKey(), lastMsg->getApplicationHeaders()); + route(dmsg); } } @@ -402,9 +402,9 @@ void Exchange::setProperties(const boost::intrusive_ptr<Message>& msg) { bool Exchange::routeWithAlternate(Deliverable& msg) { - route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders()); + route(msg); if (!msg.delivered && alternate) { - alternate->route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders()); + alternate->route(msg); } return msg.delivered; } diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h index 9179dd5c7c..7376f814ed 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.h +++ b/qpid/cpp/src/qpid/broker/Exchange.h @@ -196,7 +196,7 @@ public: virtual bool unbind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0; virtual bool isBound(boost::shared_ptr<Queue> queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0; QPID_BROKER_EXTERN virtual void setProperties(const boost::intrusive_ptr<Message>&); - virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0; + virtual void route(Deliverable& msg) = 0; //PersistableExchange: QPID_BROKER_EXTERN void setPersistenceId(uint64_t id) const; diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp index 5879fa0892..2bce99b6fe 100644 --- a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp @@ -101,7 +101,7 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, cons return true; } -void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/) +void FanOutExchange::route(Deliverable& msg) { PreRoute pr(msg, this); doRoute(msg, bindings.snapshot()); diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.h b/qpid/cpp/src/qpid/broker/FanOutExchange.h index 1a7d486796..c979fdca25 100644 --- a/qpid/cpp/src/qpid/broker/FanOutExchange.h +++ b/qpid/cpp/src/qpid/broker/FanOutExchange.h @@ -54,9 +54,7 @@ class FanOutExchange : public virtual Exchange { virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); - QPID_BROKER_EXTERN virtual void route(Deliverable& msg, - const std::string& routingKey, - const qpid::framing::FieldTable* args); + QPID_BROKER_EXTERN virtual void route(Deliverable& msg); QPID_BROKER_EXTERN virtual bool isBound(Queue::shared_ptr queue, const std::string* const routingKey, diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp index 142c23f276..6648ae0422 100644 --- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp @@ -191,8 +191,9 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, } -void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args) +void HeadersExchange::route(Deliverable& msg) { + const FieldTable* args = msg.getMessage().getApplicationHeaders(); if (!args) { //can't match if there were no headers passed in if (mgmtExchange != 0) { diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.h b/qpid/cpp/src/qpid/broker/HeadersExchange.h index 3b939d6851..d10892b9cc 100644 --- a/qpid/cpp/src/qpid/broker/HeadersExchange.h +++ b/qpid/cpp/src/qpid/broker/HeadersExchange.h @@ -98,9 +98,7 @@ class HeadersExchange : public virtual Exchange { virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); - QPID_BROKER_EXTERN virtual void route(Deliverable& msg, - const std::string& routingKey, - const qpid::framing::FieldTable* args); + QPID_BROKER_EXTERN virtual void route(Deliverable& msg); QPID_BROKER_EXTERN virtual bool isBound(Queue::shared_ptr queue, const std::string* const routingKey, diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 226ba77fb2..01ce017289 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -215,7 +215,7 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){ if (msg->isImmediate() && getConsumerCount() == 0) { if (alternateExchange) { DeliverableMessage deliverable(msg); - alternateExchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders()); + alternateExchange->route(deliverable); } } else if (isLocal(msg)) { //drop message diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index e7d2259c80..64924bdd4c 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -489,14 +489,14 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { exchangeName << " with routing-key " << msg->getRoutingKey())); } - cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); + cacheExchange->route(strategy); if (!strategy.delivered) { //TODO:if discard-unroutable, just drop it //TODO:else if accept-mode is explicit, reject it //else route it to alternate exchange if (cacheExchange->getAlternate()) { - cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); + cacheExchange->getAlternate()->route(strategy); } if (!strategy.delivered) { msg->destroy(); diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp index 644a3d628e..dd3ec13019 100644 --- a/qpid/cpp/src/qpid/broker/TopicExchange.cpp +++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp @@ -350,8 +350,9 @@ TopicExchange::BindingKey *TopicExchange::getQueueBinding(Queue::shared_ptr queu return (q != qv.end()) ? bk : 0; } -void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/) +void TopicExchange::route(Deliverable& msg) { + const string& routingKey = msg.getMessage().getRoutingKey(); // Note: PERFORMANCE CRITICAL!!! BindingList b; std::map<std::string, BindingList>::iterator it; diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.h b/qpid/cpp/src/qpid/broker/TopicExchange.h index 636918f8a1..cc24e1411e 100644 --- a/qpid/cpp/src/qpid/broker/TopicExchange.h +++ b/qpid/cpp/src/qpid/broker/TopicExchange.h @@ -185,9 +185,7 @@ class TopicExchange : public virtual Exchange { virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); - QPID_BROKER_EXTERN virtual void route(Deliverable& msg, - const std::string& routingKey, - const qpid::framing::FieldTable* args); + QPID_BROKER_EXTERN virtual void route(Deliverable& msg); QPID_BROKER_EXTERN virtual bool isBound(Queue::shared_ptr queue, const std::string* const routingKey, diff --git a/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp b/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp index 0fafc521cd..416a3636e9 100644 --- a/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp +++ b/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp @@ -62,7 +62,8 @@ bool CredentialsExchange::check(MemberId member) { return valid; } -void CredentialsExchange::route(broker::Deliverable& msg, const string& /*routingKey*/, const framing::FieldTable* args) { +void CredentialsExchange::route(broker::Deliverable& msg) { + const framing::FieldTable* args = msg.getMessage().getApplicationHeaders(); sys::Mutex::ScopedLock l(lock); const broker::ConnectionState* connection = static_cast<const broker::ConnectionState*>(msg.getMessage().getPublisher()); diff --git a/qpid/cpp/src/qpid/cluster/CredentialsExchange.h b/qpid/cpp/src/qpid/cluster/CredentialsExchange.h index 90fd188271..74cf8350a6 100644 --- a/qpid/cpp/src/qpid/cluster/CredentialsExchange.h +++ b/qpid/cpp/src/qpid/cluster/CredentialsExchange.h @@ -50,7 +50,7 @@ class CredentialsExchange : public broker::Exchange bool check(MemberId member); /** Throw an exception if the calling connection is not the cluster user. Store credentials in msg. */ - void route(broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); + void route(broker::Deliverable& msg); // Exchange overrides std::string getType() const; diff --git a/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp b/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp index 43ec27cf2c..87202a887c 100644 --- a/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp +++ b/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp @@ -80,7 +80,7 @@ bool FailoverExchange::isBound(Queue::shared_ptr queue, const string* const, con return queues.find(queue) != queues.end(); } -void FailoverExchange::route(Deliverable&, const string& , const framing::FieldTable* ) { +void FailoverExchange::route(Deliverable&) { QPID_LOG(warning, "Message received by exchange " << typeName << " ignoring"); } diff --git a/qpid/cpp/src/qpid/cluster/FailoverExchange.h b/qpid/cpp/src/qpid/cluster/FailoverExchange.h index c3e50c6929..5ac734a7ac 100644 --- a/qpid/cpp/src/qpid/cluster/FailoverExchange.h +++ b/qpid/cpp/src/qpid/cluster/FailoverExchange.h @@ -54,7 +54,7 @@ class FailoverExchange : public broker::Exchange bool bind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const framing::FieldTable* args); bool unbind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const framing::FieldTable* args); bool isBound(boost::shared_ptr<broker::Queue> queue, const std::string* const routingKey, const framing::FieldTable* const args); - void route(broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); + void route(broker::Deliverable& msg); private: void sendUpdate(const boost::shared_ptr<broker::Queue>&); diff --git a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp index e5cd82e3d3..31d96c67ca 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp @@ -40,9 +40,9 @@ UpdateDataExchange::UpdateDataExchange(Cluster& cluster) : Exchange(EXCHANGE_NAME, &cluster) {} -void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& routingKey, - const qpid::framing::FieldTable* ) +void UpdateDataExchange::route(broker::Deliverable& msg) { + const std::string& routingKey = msg.getMessage().getRoutingKey(); std::string data = msg.getMessage().getFrames().getContent(); if (routingKey == MANAGEMENT_AGENTS_KEY) managementAgents = data; else if (routingKey == MANAGEMENT_SCHEMAS_KEY) managementSchemas = data; diff --git a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h index d2f6c35ad0..f79430f111 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h +++ b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h @@ -50,8 +50,7 @@ class UpdateDataExchange : public broker::Exchange UpdateDataExchange(Cluster& parent); - void route(broker::Deliverable& msg, const std::string& routingKey, - const framing::FieldTable* args); + void route(broker::Deliverable& msg); // Not implemented std::string getType() const { return EXCHANGE_TYPE; } diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 75e4ed893d..85b97e7e3e 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -226,7 +226,8 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH } // FIXME aconway 2011-12-02: error handling in route. -void BrokerReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) { +void BrokerReplicator::route(Deliverable& msg) { + const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders(); Variant::List list; try { if (!isQMFv2(msg.getMessage()) || !headers) diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index cfb6cf9a28..483c251126 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -58,7 +58,7 @@ class BrokerReplicator : public broker::Exchange // Exchange methods bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); - void route(broker::Deliverable&, const std::string&, const framing::FieldTable*); + void route(broker::Deliverable&); bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); private: diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 7351a8d74d..6aff4879e3 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -138,8 +138,9 @@ void QueueReplicator::dequeue(SequenceNumber n, const sys::Mutex::ScopedLock&) } // Called in connection thread of the queues bridge to primary. -void QueueReplicator::route(Deliverable& msg, const std::string& key, const FieldTable*) +void QueueReplicator::route(Deliverable& msg) { + const std::string& key = msg.getMessage().getRoutingKey(); sys::Mutex::ScopedLock l(lock); if (key == DEQUEUE_EVENT_KEY) { SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage()); diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index 9de7dd480c..a1ebbd788a 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -66,7 +66,7 @@ class QueueReplicator : public broker::Exchange, bool bind(boost::shared_ptr<broker::Queue >, const std::string&, const framing::FieldTable*); bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); - void route(broker::Deliverable&, const std::string&, const framing::FieldTable*); + void route(broker::Deliverable&); bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); private: diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 372393886d..8c2cb95faa 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -564,7 +564,7 @@ void ManagementAgent::sendBufferLH(Buffer& buf, DeliverableMessage deliverable (msg); try { - exchange->route(deliverable, routingKey, 0); + exchange->route(deliverable); } catch(exception&) {} } buf.reset(); @@ -641,7 +641,7 @@ void ManagementAgent::sendBufferLH(const string& data, DeliverableMessage deliverable (msg); try { - exchange->route(deliverable, routingKey, 0); + exchange->route(deliverable); } catch(exception&) {} } } diff --git a/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp b/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp index 1d5f8bbd6b..312eacf462 100644 --- a/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp +++ b/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp @@ -40,17 +40,17 @@ ManagementDirectExchange::ManagementDirectExchange(const std::string& _name, DirectExchange(_name, _durable, _args, _parent, b), managementAgent(0) {} -void ManagementDirectExchange::route(Deliverable& msg, - const string& routingKey, - const FieldTable* args) +void ManagementDirectExchange::route(Deliverable& msg) { bool routeIt = true; + const string& routingKey = msg.getMessage().getRoutingKey(); + const FieldTable* args = msg.getMessage().getApplicationHeaders(); if (managementAgent) routeIt = managementAgent->dispatchCommand(msg, routingKey, args, false, qmfVersion); if (routeIt) - DirectExchange::route(msg, routingKey, args); + DirectExchange::route(msg); } void ManagementDirectExchange::setManagmentAgent(ManagementAgent* agent, int qv) diff --git a/qpid/cpp/src/qpid/management/ManagementDirectExchange.h b/qpid/cpp/src/qpid/management/ManagementDirectExchange.h index 7507179c06..582354d723 100644 --- a/qpid/cpp/src/qpid/management/ManagementDirectExchange.h +++ b/qpid/cpp/src/qpid/management/ManagementDirectExchange.h @@ -43,9 +43,7 @@ class ManagementDirectExchange : public virtual DirectExchange virtual std::string getType() const { return typeName; } - virtual void route(Deliverable& msg, - const std::string& routingKey, - const qpid::framing::FieldTable* args); + virtual void route(Deliverable& msg); void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion); diff --git a/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp b/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp index ee8657646f..587cc660df 100644 --- a/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp +++ b/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp @@ -39,18 +39,18 @@ ManagementTopicExchange::ManagementTopicExchange(const std::string& _name, TopicExchange(_name, _durable, _args, _parent, b), managementAgent(0) {} -void ManagementTopicExchange::route(Deliverable& msg, - const string& routingKey, - const FieldTable* args) +void ManagementTopicExchange::route(Deliverable& msg) { bool routeIt = true; + const string& routingKey = msg.getMessage().getRoutingKey(); + const FieldTable* args = msg.getMessage().getApplicationHeaders(); // Intercept management agent commands if (managementAgent) routeIt = managementAgent->dispatchCommand(msg, routingKey, args, true, qmfVersion); if (routeIt) - TopicExchange::route(msg, routingKey, args); + TopicExchange::route(msg); } bool ManagementTopicExchange::bind(Queue::shared_ptr queue, diff --git a/qpid/cpp/src/qpid/management/ManagementTopicExchange.h b/qpid/cpp/src/qpid/management/ManagementTopicExchange.h index 232300265e..eff01a8552 100644 --- a/qpid/cpp/src/qpid/management/ManagementTopicExchange.h +++ b/qpid/cpp/src/qpid/management/ManagementTopicExchange.h @@ -43,9 +43,7 @@ class ManagementTopicExchange : public virtual TopicExchange virtual std::string getType() const { return typeName; } - virtual void route(Deliverable& msg, - const std::string& routingKey, - const qpid::framing::FieldTable* args); + virtual void route(Deliverable& msg); virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, diff --git a/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp b/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp index 0ced4d9161..3d84a1ce3c 100644 --- a/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp +++ b/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp @@ -80,7 +80,7 @@ void ReplicatingEventListener::route(boost::intrusive_ptr<qpid::broker::Message> try { if (exchange) { DeliverableMessage deliverable(msg); - exchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders()); + exchange->route(deliverable); } else if (queue) { queue->deliver(msg); } else { diff --git a/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp b/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp index 89a2bf516d..dce0d750a4 100644 --- a/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp +++ b/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp @@ -50,8 +50,9 @@ ReplicationExchange::ReplicationExchange(const std::string& name, bool durable, std::string ReplicationExchange::getType() const { return typeName; } -void ReplicationExchange::route(Deliverable& msg, const std::string& /*routingKey*/, const FieldTable* args) +void ReplicationExchange::route(Deliverable& msg) { + const FieldTable* args = msg.getMessage().getApplicationHeaders(); if (mgmtExchange != 0) { mgmtExchange->inc_msgReceives(); mgmtExchange->inc_byteReceives(msg.contentSize()); diff --git a/qpid/cpp/src/qpid/replication/ReplicationExchange.h b/qpid/cpp/src/qpid/replication/ReplicationExchange.h index 4b34e0df13..ff0a98c48e 100644 --- a/qpid/cpp/src/qpid/replication/ReplicationExchange.h +++ b/qpid/cpp/src/qpid/replication/ReplicationExchange.h @@ -52,7 +52,7 @@ class ReplicationExchange : public qpid::broker::Exchange std::string getType() const; - void route(qpid::broker::Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args); + void route(qpid::broker::Deliverable& msg); bool bind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args); bool unbind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args); diff --git a/qpid/cpp/src/qpid/xml/XmlExchange.cpp b/qpid/cpp/src/qpid/xml/XmlExchange.cpp index b7ff5d211d..01770e22a6 100644 --- a/qpid/cpp/src/qpid/xml/XmlExchange.cpp +++ b/qpid/cpp/src/qpid/xml/XmlExchange.cpp @@ -283,8 +283,10 @@ bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::F // But for very large messages, if all these queries are on the first part of the data, // it could still be a big win. -void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* args) +void XmlExchange::route(Deliverable& msg) { + const string& routingKey = msg.getMessage().getRoutingKey(); + const FieldTable* args = msg.getMessage().getApplicationHeaders(); PreRoute pr(msg, this); try { XmlBinding::vector::ConstPtr p; diff --git a/qpid/cpp/src/qpid/xml/XmlExchange.h b/qpid/cpp/src/qpid/xml/XmlExchange.h index 958bad4931..9ef389d9bf 100644 --- a/qpid/cpp/src/qpid/xml/XmlExchange.h +++ b/qpid/cpp/src/qpid/xml/XmlExchange.h @@ -82,7 +82,7 @@ class XmlExchange : public virtual Exchange { virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); - virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args); + virtual void route(Deliverable& msg); virtual bool isBound(Queue::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args); diff --git a/qpid/cpp/src/tests/ExchangeTest.cpp b/qpid/cpp/src/tests/ExchangeTest.cpp index fe72f42a46..2fb284741a 100644 --- a/qpid/cpp/src/tests/ExchangeTest.cpp +++ b/qpid/cpp/src/tests/ExchangeTest.cpp @@ -60,10 +60,10 @@ QPID_AUTO_TEST_CASE(testMe) queue.reset(); queue2.reset(); - intrusive_ptr<Message> msgPtr(MessageUtils::createMessage("exchange", "key", false, "id")); + intrusive_ptr<Message> msgPtr(MessageUtils::createMessage("exchange", "abc", false, "id")); DeliverableMessage msg(msgPtr); - topic.route(msg, "abc", 0); - direct.route(msg, "abc", 0); + topic.route(msg); + direct.route(msg); } @@ -187,17 +187,17 @@ QPID_AUTO_TEST_CASE(testSequenceOptions) { DirectExchange direct("direct1", false, args); - intrusive_ptr<Message> msg1 = cmessage("e", "A"); - intrusive_ptr<Message> msg2 = cmessage("e", "B"); - intrusive_ptr<Message> msg3 = cmessage("e", "C"); + intrusive_ptr<Message> msg1 = cmessage("e", "abc"); + intrusive_ptr<Message> msg2 = cmessage("e", "abc"); + intrusive_ptr<Message> msg3 = cmessage("e", "abc"); DeliverableMessage dmsg1(msg1); DeliverableMessage dmsg2(msg2); DeliverableMessage dmsg3(msg3); - direct.route(dmsg1, "abc", 0); - direct.route(dmsg2, "abc", 0); - direct.route(dmsg3, "abc", 0); + direct.route(dmsg1); + direct.route(dmsg2); + direct.route(dmsg3); BOOST_CHECK_EQUAL(1, msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); BOOST_CHECK_EQUAL(2, msg2->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); @@ -208,22 +208,24 @@ QPID_AUTO_TEST_CASE(testSequenceOptions) TopicExchange topic ("topic1", false, args); // check other exchanges, that they preroute - intrusive_ptr<Message> msg4 = cmessage("e", "A"); - intrusive_ptr<Message> msg5 = cmessage("e", "B"); - intrusive_ptr<Message> msg6 = cmessage("e", "C"); + intrusive_ptr<Message> msg4 = cmessage("e", "abc"); + intrusive_ptr<Message> msg5 = cmessage("e", "abc"); + + // Need at least empty header for the HeadersExchange to route at all + msg5->insertCustomProperty("", ""); + intrusive_ptr<Message> msg6 = cmessage("e", "abc"); DeliverableMessage dmsg4(msg4); DeliverableMessage dmsg5(msg5); DeliverableMessage dmsg6(msg6); - fanout.route(dmsg4, "abc", 0); + fanout.route(dmsg4); BOOST_CHECK_EQUAL(1, msg4->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); - FieldTable headers; - header.route(dmsg5, "abc", &headers); + header.route(dmsg5); BOOST_CHECK_EQUAL(1, msg5->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); - topic.route(dmsg6, "abc", 0); + topic.route(dmsg6); BOOST_CHECK_EQUAL(1, msg6->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); direct.encode(buffer); } @@ -233,9 +235,9 @@ QPID_AUTO_TEST_CASE(testSequenceOptions) buffer.reset(); DirectExchange::shared_ptr exch_dec = Exchange::decode(exchanges, buffer); - intrusive_ptr<Message> msg1 = cmessage("e", "A"); + intrusive_ptr<Message> msg1 = cmessage("e", "abc"); DeliverableMessage dmsg1(msg1); - exch_dec->route(dmsg1, "abc", 0); + exch_dec->route(dmsg1); BOOST_CHECK_EQUAL(4, msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); @@ -260,10 +262,10 @@ QPID_AUTO_TEST_CASE(testIVEOption) args2.setString("x-match", "any"); args2.setString("a", "abc"); - direct.route(dmsg1, "abc", 0); - fanout.route(dmsg1, "abc", 0); - header.route(dmsg1, "abc", &args2); - topic.route(dmsg1, "abc", 0); + direct.route(dmsg1); + fanout.route(dmsg1); + header.route(dmsg1); + topic.route(dmsg1); Queue::shared_ptr queue(new Queue("queue", true)); Queue::shared_ptr queue1(new Queue("queue1", true)); Queue::shared_ptr queue2(new Queue("queue2", true)); diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index bb4f7b9f4b..0058aa5133 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -254,8 +254,8 @@ QPID_AUTO_TEST_CASE(testBound){ //ensure the remaining exchanges don't still have the queue bound to them: FailOnDeliver deliverable; - exchange1->route(deliverable, key, &args); - exchange3->route(deliverable, key, &args); + exchange1->route(deliverable); + exchange3->route(deliverable); } QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){ @@ -1151,7 +1151,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ intrusive_ptr<Message> msg01 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content DeliverableMessage dmsg01(msg01); - sbtFanout1.route(dmsg01, "", 0); // Brings queue 1 to capacity limit + sbtFanout1.route(dmsg01); // Brings queue 1 to capacity limit msg01->tryReleaseContent(); BOOST_CHECK_EQUAL(msg01->isContentReleased(), false); BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); @@ -1160,7 +1160,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ DeliverableMessage dmsg02(msg02); { ScopedSuppressLogging sl; // suppress expected error messages. - BOOST_CHECK_THROW(sbtFanout1.route(dmsg02, "", 0), ResourceLimitExceededException); + BOOST_CHECK_THROW(sbtFanout1.route(dmsg02), ResourceLimitExceededException); } msg02->tryReleaseContent(); BOOST_CHECK_EQUAL(msg02->isContentReleased(), false); @@ -1170,7 +1170,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ DeliverableMessage dmsg03(msg03); { ScopedSuppressLogging sl; // suppress expected error messages. - BOOST_CHECK_THROW(sbtFanout1.route(dmsg03, "", 0), ResourceLimitExceededException); + BOOST_CHECK_THROW(sbtFanout1.route(dmsg03), ResourceLimitExceededException); } msg03->tryReleaseContent(); BOOST_CHECK_EQUAL(msg03->isContentReleased(), false); @@ -1180,7 +1180,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ DeliverableMessage dmsg04(msg04); { ScopedSuppressLogging sl; // suppress expected error messages. - BOOST_CHECK_THROW(sbtFanout1.route(dmsg04, "", 0), ResourceLimitExceededException); + BOOST_CHECK_THROW(sbtFanout1.route(dmsg04), ResourceLimitExceededException); } msg04->tryReleaseContent(); BOOST_CHECK_EQUAL(msg04->isContentReleased(), false); @@ -1190,7 +1190,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ DeliverableMessage dmsg05(msg05); { ScopedSuppressLogging sl; // suppress expected error messages. - BOOST_CHECK_THROW(sbtFanout1.route(dmsg05, "", 0), ResourceLimitExceededException); + BOOST_CHECK_THROW(sbtFanout1.route(dmsg05), ResourceLimitExceededException); } msg05->tryReleaseContent(); BOOST_CHECK_EQUAL(msg05->isContentReleased(), false); @@ -1205,35 +1205,35 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ intrusive_ptr<Message> msg06 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content DeliverableMessage dmsg06(msg06); - sbdFanout2.route(dmsg06, "", 0); // Brings queue 2 to capacity limit + sbdFanout2.route(dmsg06); // Brings queue 2 to capacity limit msg06->tryReleaseContent(); BOOST_CHECK_EQUAL(msg06->isContentReleased(), false); BOOST_CHECK_EQUAL(1u, dq2->getMessageCount()); intrusive_ptr<Message> msg07 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content DeliverableMessage dmsg07(msg07); - sbdFanout2.route(dmsg07, "", 0); + sbdFanout2.route(dmsg07); msg07->tryReleaseContent(); BOOST_CHECK_EQUAL(msg07->isContentReleased(), true); BOOST_CHECK_EQUAL(2u, dq2->getMessageCount()); intrusive_ptr<Message> msg08 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content DeliverableMessage dmsg08(msg08); - sbdFanout2.route(dmsg08, "", 0); + sbdFanout2.route(dmsg08); msg08->tryReleaseContent(); BOOST_CHECK_EQUAL(msg08->isContentReleased(), true); BOOST_CHECK_EQUAL(3u, dq2->getMessageCount()); intrusive_ptr<Message> msg09 = mkMsg(testStore); // transient no content DeliverableMessage dmsg09(msg09); - sbdFanout2.route(dmsg09, "", 0); + sbdFanout2.route(dmsg09); msg09->tryReleaseContent(); BOOST_CHECK_EQUAL(msg09->isContentReleased(), true); BOOST_CHECK_EQUAL(4u, dq2->getMessageCount()); intrusive_ptr<Message> msg10 = mkMsg(testStore, "", true); // durable no content DeliverableMessage dmsg10(msg10); - sbdFanout2.route(dmsg10, "", 0); + sbdFanout2.route(dmsg10); msg10->tryReleaseContent(); BOOST_CHECK_EQUAL(msg10->isContentReleased(), true); BOOST_CHECK_EQUAL(5u, dq2->getMessageCount()); @@ -1253,7 +1253,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ intrusive_ptr<Message> msg11 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content DeliverableMessage dmsg11(msg11); - mbdFanout3.route(dmsg11, "", 0); // Brings queues 3 and 4 to capacity limit + mbdFanout3.route(dmsg11); // Brings queues 3 and 4 to capacity limit msg11->tryReleaseContent(); BOOST_CHECK_EQUAL(msg11->isContentReleased(), false); BOOST_CHECK_EQUAL(1u, dq3->getMessageCount()); @@ -1262,7 +1262,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ intrusive_ptr<Message> msg12 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content DeliverableMessage dmsg12(msg12); - mbdFanout3.route(dmsg12, "", 0); + mbdFanout3.route(dmsg12); msg12->tryReleaseContent(); BOOST_CHECK_EQUAL(msg12->isContentReleased(), false); // XXXX - consequence of transient msg multi-queue ftd policy-handling limitations, fix in broker at some point! BOOST_CHECK_EQUAL(2u, dq3->getMessageCount()); @@ -1271,7 +1271,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ intrusive_ptr<Message> msg13 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content DeliverableMessage dmsg13(msg13); - mbdFanout3.route(dmsg13, "", 0); + mbdFanout3.route(dmsg13); msg13->tryReleaseContent(); BOOST_CHECK_EQUAL(msg13->isContentReleased(), true); BOOST_CHECK_EQUAL(3u, dq3->getMessageCount()); @@ -1280,7 +1280,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ intrusive_ptr<Message> msg14 = mkMsg(testStore); // transient no content DeliverableMessage dmsg14(msg14); - mbdFanout3.route(dmsg14, "", 0); + mbdFanout3.route(dmsg14); msg14->tryReleaseContent(); BOOST_CHECK_EQUAL(msg14->isContentReleased(), false); // XXXX - consequence of transient msg multi-queue ftd policy-handling limitations, fix in broker at some point! BOOST_CHECK_EQUAL(4u, dq3->getMessageCount()); @@ -1289,7 +1289,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ intrusive_ptr<Message> msg15 = mkMsg(testStore, "", true); // durable no content DeliverableMessage dmsg15(msg15); - mbdFanout3.route(dmsg15, "", 0); + mbdFanout3.route(dmsg15); msg15->tryReleaseContent(); BOOST_CHECK_EQUAL(msg15->isContentReleased(), true); BOOST_CHECK_EQUAL(5u, dq3->getMessageCount()); @@ -1307,7 +1307,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ intrusive_ptr<Message> msg16 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content DeliverableMessage dmsg16(msg16); - mbdFanout3.route(dmsg16, "", 0); + mbdFanout3.route(dmsg16); msg16->tryReleaseContent(); BOOST_CHECK_EQUAL(msg16->isContentReleased(), false); BOOST_CHECK_EQUAL(6u, dq3->getMessageCount()); @@ -1316,7 +1316,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ intrusive_ptr<Message> msg17 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content DeliverableMessage dmsg17(msg17); - mbdFanout3.route(dmsg17, "", 0); + mbdFanout3.route(dmsg17); msg17->tryReleaseContent(); BOOST_CHECK_EQUAL(msg17->isContentReleased(), false); BOOST_CHECK_EQUAL(7u, dq3->getMessageCount()); @@ -1325,7 +1325,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ intrusive_ptr<Message> msg18 = mkMsg(testStore); // transient no content DeliverableMessage dmsg18(msg18); - mbdFanout3.route(dmsg18, "", 0); + mbdFanout3.route(dmsg18); msg18->tryReleaseContent(); BOOST_CHECK_EQUAL(msg18->isContentReleased(), false); BOOST_CHECK_EQUAL(8u, dq3->getMessageCount()); @@ -1334,7 +1334,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ intrusive_ptr<Message> msg19 = mkMsg(testStore, "", true); // durable no content DeliverableMessage dmsg19(msg19); - mbdFanout3.route(dmsg19, "", 0); + mbdFanout3.route(dmsg19); msg19->tryReleaseContent(); BOOST_CHECK_EQUAL(msg19->isContentReleased(), false); BOOST_CHECK_EQUAL(9u, dq3->getMessageCount()); @@ -1357,7 +1357,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ intrusive_ptr<Message> msg20 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content DeliverableMessage dmsg20(msg20); - mbmFanout4.route(dmsg20, "", 0); // Brings queue 7 to capacity limit + mbmFanout4.route(dmsg20); // Brings queue 7 to capacity limit msg20->tryReleaseContent(); BOOST_CHECK_EQUAL(msg20->isContentReleased(), false); BOOST_CHECK_EQUAL(1u, dq7->getMessageCount()); @@ -1366,7 +1366,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ intrusive_ptr<Message> msg21 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content DeliverableMessage dmsg21(msg21); - mbmFanout4.route(dmsg21, "", 0); + mbmFanout4.route(dmsg21); msg21->tryReleaseContent(); BOOST_CHECK_EQUAL(msg21->isContentReleased(), false); BOOST_CHECK_EQUAL(2u, dq7->getMessageCount()); // over limit @@ -1375,7 +1375,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ intrusive_ptr<Message> msg22 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content DeliverableMessage dmsg22(msg22); - mbmFanout4.route(dmsg22, "", 0); + mbmFanout4.route(dmsg22); msg22->tryReleaseContent(); BOOST_CHECK_EQUAL(msg22->isContentReleased(), false); BOOST_CHECK_EQUAL(3u, dq7->getMessageCount()); // over limit @@ -1384,7 +1384,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ intrusive_ptr<Message> msg23 = mkMsg(testStore); // transient no content DeliverableMessage dmsg23(msg23); - mbmFanout4.route(dmsg23, "", 0); + mbmFanout4.route(dmsg23); msg23->tryReleaseContent(); BOOST_CHECK_EQUAL(msg23->isContentReleased(), false); BOOST_CHECK_EQUAL(4u, dq7->getMessageCount()); // over limit @@ -1393,7 +1393,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ intrusive_ptr<Message> msg24 = mkMsg(testStore, "", true); // durable no content DeliverableMessage dmsg24(msg24); - mbmFanout4.route(dmsg24, "", 0); + mbmFanout4.route(dmsg24); msg24->tryReleaseContent(); BOOST_CHECK_EQUAL(msg24->isContentReleased(), false); BOOST_CHECK_EQUAL(5u, dq7->getMessageCount()); // over limit |