diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2008-10-17 01:27:45 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2008-10-17 01:27:45 +0000 |
commit | b67f0fcee107e921a160ab208256c500b8230261 (patch) | |
tree | 79366ced8715409a7b2c0ab4dd33303f333a7f73 /qpid/cpp/src | |
parent | 1832f8b41b85cabdf42dc2af36c3e1b15f574325 (diff) | |
download | qpid-python-b67f0fcee107e921a160ab208256c500b8230261.tar.gz |
Feature requested by AndrewM for M4...
- provide initial value support, for late joining consumers
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@705443 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qpid/broker/DirectExchange.cpp | 21 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Exchange.cpp | 32 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Exchange.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/FanOutExchange.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/HeadersExchange.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/TopicExchange.cpp | 25 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/xml/XmlExchange.cpp | 31 | ||||
-rw-r--r-- | qpid/cpp/src/tests/ExchangeTest.cpp | 41 |
8 files changed, 114 insertions, 42 deletions
diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.cpp b/qpid/cpp/src/qpid/broker/DirectExchange.cpp index f760bc9747..8fc8260f9e 100644 --- a/qpid/cpp/src/qpid/broker/DirectExchange.cpp +++ b/qpid/cpp/src/qpid/broker/DirectExchange.cpp @@ -43,17 +43,20 @@ DirectExchange::DirectExchange(const std::string& _name, bool _durable, } bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){ - Mutex::ScopedLock l(lock); - Binding::shared_ptr b(new Binding (routingKey, queue, this)); - if (bindings[routingKey].add_unless(b, MatchQueue(queue))) { - if (mgmtExchange != 0) { - mgmtExchange->inc_bindingCount(); - ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount(); + { + Mutex::ScopedLock l(lock); + Binding::shared_ptr b(new Binding (routingKey, queue, this)); + if (bindings[routingKey].add_unless(b, MatchQueue(queue))) { + if (mgmtExchange != 0) { + mgmtExchange->inc_bindingCount(); + ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount(); + } + } else { + return false; } - return true; - } else { - return false; } + routeIVE(); + return true; } bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index a3130d9edb..3cea904676 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -24,6 +24,7 @@ #include "qpid/agent/ManagementAgent.h" #include "qpid/log/Statement.h" #include "qpid/framing/MessageProperties.h" +#include "DeliverableMessage.h" using namespace qpid::broker; using namespace qpid::framing; @@ -38,26 +39,41 @@ namespace _qmf = qmf::org::apache::qpid::broker; namespace { const std::string qpidMsgSequence("qpid.msg_sequence"); +const std::string qpidIVE("qpid.ive"); } Exchange::PreRoute::PreRoute(Deliverable& msg, Exchange* _p):parent(_p) { - if (parent && parent->sequence){ - parent->sequenceLock.lock(); - parent->sequenceNo++; - msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo); + if (parent){ + if (parent->sequence || parent->ive) parent->sequenceLock.lock(); + + if (parent->sequence){ + parent->sequenceNo++; + msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo); + } + if (parent->ive) { + parent->lastMsg = &( msg.getMessage()); + } } } Exchange::PreRoute::~PreRoute(){ - if (parent && parent->sequence){ + if (parent && (parent->sequence || parent->ive)){ parent->sequenceLock.unlock(); } } +void Exchange::routeIVE(){ + if (ive && lastMsg.get()){ + DeliverableMessage dmsg(lastMsg); + route(dmsg, lastMsg->getRoutingKey(), lastMsg->getApplicationHeaders()); + } +} + + Exchange::Exchange (const string& _name, Manageable* parent) : name(_name), durable(false), persistenceId(0), sequence(false), - sequenceNo(0), mgmtExchange(0) + sequenceNo(0), ive(false), mgmtExchange(0) { if (parent != 0) { @@ -73,7 +89,7 @@ Exchange::Exchange (const string& _name, Manageable* parent) : Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, Manageable* parent) : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0), - sequence(false), sequenceNo(0), mgmtExchange(0) + sequence(false), sequenceNo(0), ive(false), mgmtExchange(0) { if (parent != 0) { @@ -95,6 +111,8 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel sequence = _args.get(qpidMsgSequence); if (sequence) QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing"); + ive = _args.get(qpidIVE); + if (ive) QPID_LOG(debug, "Configured exchange "+ _name +" with Initial Value"); } Exchange::~Exchange () diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h index 6006a09ea5..154325e67f 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.h +++ b/qpid/cpp/src/qpid/broker/Exchange.h @@ -51,6 +51,8 @@ namespace qpid { bool sequence; mutable qpid::sys::Mutex sequenceLock; uint64_t sequenceNo; + bool ive; + boost::intrusive_ptr<Message> lastMsg; class PreRoute{ public: @@ -60,6 +62,8 @@ namespace qpid { Exchange* parent; }; + void routeIVE(); + struct Binding : public management::Manageable { typedef boost::shared_ptr<Binding> shared_ptr; typedef std::vector<Binding::shared_ptr> vector; diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp index e92fac41dc..2628d8952f 100644 --- a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp @@ -49,6 +49,7 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const mgmtExchange->inc_bindingCount(); ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount(); } + routeIVE(); return true; } else { return false; diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp index b2bd4519bd..104b34da8b 100644 --- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp @@ -84,6 +84,7 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co mgmtExchange->inc_bindingCount(); ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount(); } + routeIVE(); return true; } else { return false; diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp index d9be8b0d68..691b42a1ae 100644 --- a/qpid/cpp/src/qpid/broker/TopicExchange.cpp +++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp @@ -131,19 +131,22 @@ TopicExchange::TopicExchange(const std::string& _name, bool _durable, } bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ - RWlock::ScopedWlock l(lock); - TopicPattern routingPattern(routingKey); - if (isBound(queue, routingPattern)) { - return false; - } else { - Binding::shared_ptr binding (new Binding (routingKey, queue, this)); - bindings[routingPattern].push_back(binding); - if (mgmtExchange != 0) { - mgmtExchange->inc_bindingCount(); - ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount(); + { + RWlock::ScopedWlock l(lock); + TopicPattern routingPattern(routingKey); + if (isBound(queue, routingPattern)) { + return false; + } else { + Binding::shared_ptr binding (new Binding (routingKey, queue, this)); + bindings[routingPattern].push_back(binding); + if (mgmtExchange != 0) { + mgmtExchange->inc_bindingCount(); + ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount(); + } } - return true; } + routeIVE(); + return true; } bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ diff --git a/qpid/cpp/src/qpid/xml/XmlExchange.cpp b/qpid/cpp/src/qpid/xml/XmlExchange.cpp index 53eb0f20b8..d3269882d7 100644 --- a/qpid/cpp/src/qpid/xml/XmlExchange.cpp +++ b/qpid/cpp/src/qpid/xml/XmlExchange.cpp @@ -86,22 +86,21 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const try { RWlock::ScopedWlock l(lock); - XmlBinding::vector& bindings(bindingsMap[routingKey]); - XmlBinding::vector::ConstPtr p = bindings.snapshot(); - if (!p || std::find_if(p->begin(), p->end(), MatchQueue(queue)) == p->end()) { - Query query(xqilla.parse(X(queryText.c_str()))); - XmlBinding::shared_ptr binding(new XmlBinding (routingKey, queue, this, query)); - bindings.add(binding); - QPID_LOG(trace, "Bound successfully with query: " << queryText ); - - if (mgmtExchange != 0) { - mgmtExchange->inc_bindingCount(); - ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount(); + XmlBinding::vector& bindings(bindingsMap[routingKey]); + XmlBinding::vector::ConstPtr p = bindings.snapshot(); + if (!p || std::find_if(p->begin(), p->end(), MatchQueue(queue)) == p->end()) { + Query query(xqilla.parse(X(queryText.c_str()))); + XmlBinding::shared_ptr binding(new XmlBinding (routingKey, queue, this, query)); + bindings.add(binding); + QPID_LOG(trace, "Bound successfully with query: " << queryText ); + + if (mgmtExchange != 0) { + mgmtExchange->inc_bindingCount(); + ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount(); + } + } else { + return false; } - return true; - } else { - return false; - } } catch (XQException& e) { throw InternalErrorException(QPID_MSG("Could not parse xquery:"+ queryText)); @@ -109,6 +108,8 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const catch (...) { throw InternalErrorException(QPID_MSG("Unexpected error - Could not parse xquery:"+ queryText)); } + routeIVE(); + return true; } bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/) diff --git a/qpid/cpp/src/tests/ExchangeTest.cpp b/qpid/cpp/src/tests/ExchangeTest.cpp index 711fede950..c2f6078b33 100644 --- a/qpid/cpp/src/tests/ExchangeTest.cpp +++ b/qpid/cpp/src/tests/ExchangeTest.cpp @@ -223,4 +223,45 @@ QPID_AUTO_TEST_CASE(testSequenceOptions) } +QPID_AUTO_TEST_CASE(testIVEOption) +{ + FieldTable args; + args.setInt("qpid.ive",1); + DirectExchange direct("direct1", false, args); + FanOutExchange fanout("fanout1", false, args); + HeadersExchange header("headers1", false, args); + TopicExchange topic ("topic1", false, args); + + intrusive_ptr<Message> msg1 = cmessage("direct1", "abc"); + msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString("a", "abc"); + DeliverableMessage dmsg1(msg1); + + FieldTable args2; + args2.setString("x-match", "any"); + args2.setString("a", "abc"); + + direct.route(dmsg1, "abc", 0); + fanout.route(dmsg1, "abc", 0); + header.route(dmsg1, "abc", &args2); + topic.route(dmsg1, "abc", 0); + Queue::shared_ptr queue(new Queue("queue", true)); + Queue::shared_ptr queue1(new Queue("queue1", true)); + Queue::shared_ptr queue2(new Queue("queue2", true)); + Queue::shared_ptr queue3(new Queue("queue3", true)); + + BOOST_CHECK(HeadersExchange::match(args2, msg1->getProperties<MessageProperties>()->getApplicationHeaders())); + + BOOST_CHECK(direct.bind(queue, "abc", 0)); + BOOST_CHECK(fanout.bind(queue1, "abc", 0)); + BOOST_CHECK(header.bind(queue2, "", &args2)); + BOOST_CHECK(topic.bind(queue3, "abc", 0)); + + BOOST_CHECK_EQUAL(1u,queue->getMessageCount()); + BOOST_CHECK_EQUAL(1u,queue1->getMessageCount()); + BOOST_CHECK_EQUAL(1u,queue2->getMessageCount()); + BOOST_CHECK_EQUAL(1u,queue3->getMessageCount()); + +} + + QPID_AUTO_TEST_SUITE_END() |