summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Exchange.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Exchange.cpp')
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp43
1 files changed, 43 insertions, 0 deletions
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index 90d81b81c6..9b5796bde3 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -76,6 +76,49 @@ Exchange::PreRoute::~PreRoute(){
}
}
+void Exchange::blockContentReleaseCheck(Deliverable& msg, qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::ConstPtr p)
+{
+ bool allQueuesPersistent = true;
+ for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); allQueuesPersistent && i!=p->end(); i++) {
+ allQueuesPersistent = (*i)->queue->getPersistenceId() > 0;
+ }
+ if (msg.getMessage().contentSize() && (!allQueuesPersistent || (p->size() > 1 && !msg.getMessage().isPersistent()))) {
+ msg.getMessage().blockRelease();
+ }
+}
+
+void Exchange::doRoute(Deliverable& msg, qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::ConstPtr p)
+{
+ int count = 0;
+
+ if (p.get()) {
+ blockContentReleaseCheck(msg, p);
+
+ for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) {
+ msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding != 0)
+ (*i)->mgmtBinding->inc_msgMatched();
+ }
+ }
+
+ if (mgmtExchange != 0)
+ {
+ mgmtExchange->inc_msgReceives ();
+ mgmtExchange->inc_byteReceives (msg.contentSize ());
+ if (count == 0)
+ {
+ //QPID_LOG(warning, "Exchange " << getName() << " could not route message; no matching binding found");
+ mgmtExchange->inc_msgDrops ();
+ mgmtExchange->inc_byteDrops (msg.contentSize ());
+ }
+ else
+ {
+ mgmtExchange->inc_msgRoutes (count);
+ mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ }
+ }
+}
+
void Exchange::routeIVE(){
if (ive && lastMsg.get()){
DeliverableMessage dmsg(lastMsg);