diff options
Diffstat (limited to 'cpp/src/qpid/broker/Exchange.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 43 |
1 files changed, 43 insertions, 0 deletions
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 90d81b81c6..9b5796bde3 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -76,6 +76,49 @@ Exchange::PreRoute::~PreRoute(){ } } +void Exchange::blockContentReleaseCheck(Deliverable& msg, qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::ConstPtr p) +{ + bool allQueuesPersistent = true; + for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); allQueuesPersistent && i!=p->end(); i++) { + allQueuesPersistent = (*i)->queue->getPersistenceId() > 0; + } + if (msg.getMessage().contentSize() && (!allQueuesPersistent || (p->size() > 1 && !msg.getMessage().isPersistent()))) { + msg.getMessage().blockRelease(); + } +} + +void Exchange::doRoute(Deliverable& msg, qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::ConstPtr p) +{ + int count = 0; + + if (p.get()) { + blockContentReleaseCheck(msg, p); + + for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) { + msg.deliverTo((*i)->queue); + if ((*i)->mgmtBinding != 0) + (*i)->mgmtBinding->inc_msgMatched(); + } + } + + if (mgmtExchange != 0) + { + mgmtExchange->inc_msgReceives (); + mgmtExchange->inc_byteReceives (msg.contentSize ()); + if (count == 0) + { + //QPID_LOG(warning, "Exchange " << getName() << " could not route message; no matching binding found"); + mgmtExchange->inc_msgDrops (); + mgmtExchange->inc_byteDrops (msg.contentSize ()); + } + else + { + mgmtExchange->inc_msgRoutes (count); + mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + } + } +} + void Exchange::routeIVE(){ if (ive && lastMsg.get()){ DeliverableMessage dmsg(lastMsg); |