summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2008-10-17 01:27:45 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2008-10-17 01:27:45 +0000
commitb67f0fcee107e921a160ab208256c500b8230261 (patch)
tree79366ced8715409a7b2c0ab4dd33303f333a7f73 /qpid/cpp/src
parent1832f8b41b85cabdf42dc2af36c3e1b15f574325 (diff)
downloadqpid-python-b67f0fcee107e921a160ab208256c500b8230261.tar.gz
Feature requested by AndrewM for M4...
- provide initial value support, for late joining consumers git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@705443 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/DirectExchange.cpp21
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.cpp32
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.h4
-rw-r--r--qpid/cpp/src/qpid/broker/FanOutExchange.cpp1
-rw-r--r--qpid/cpp/src/qpid/broker/HeadersExchange.cpp1
-rw-r--r--qpid/cpp/src/qpid/broker/TopicExchange.cpp25
-rw-r--r--qpid/cpp/src/qpid/xml/XmlExchange.cpp31
-rw-r--r--qpid/cpp/src/tests/ExchangeTest.cpp41
8 files changed, 114 insertions, 42 deletions
diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.cpp b/qpid/cpp/src/qpid/broker/DirectExchange.cpp
index f760bc9747..8fc8260f9e 100644
--- a/qpid/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/DirectExchange.cpp
@@ -43,17 +43,20 @@ DirectExchange::DirectExchange(const std::string& _name, bool _durable,
}
bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){
- Mutex::ScopedLock l(lock);
- Binding::shared_ptr b(new Binding (routingKey, queue, this));
- if (bindings[routingKey].add_unless(b, MatchQueue(queue))) {
- if (mgmtExchange != 0) {
- mgmtExchange->inc_bindingCount();
- ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
+ {
+ Mutex::ScopedLock l(lock);
+ Binding::shared_ptr b(new Binding (routingKey, queue, this));
+ if (bindings[routingKey].add_unless(b, MatchQueue(queue))) {
+ if (mgmtExchange != 0) {
+ mgmtExchange->inc_bindingCount();
+ ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
+ }
+ } else {
+ return false;
}
- return true;
- } else {
- return false;
}
+ routeIVE();
+ return true;
}
bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp
index a3130d9edb..3cea904676 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.cpp
+++ b/qpid/cpp/src/qpid/broker/Exchange.cpp
@@ -24,6 +24,7 @@
#include "qpid/agent/ManagementAgent.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/MessageProperties.h"
+#include "DeliverableMessage.h"
using namespace qpid::broker;
using namespace qpid::framing;
@@ -38,26 +39,41 @@ namespace _qmf = qmf::org::apache::qpid::broker;
namespace
{
const std::string qpidMsgSequence("qpid.msg_sequence");
+const std::string qpidIVE("qpid.ive");
}
Exchange::PreRoute::PreRoute(Deliverable& msg, Exchange* _p):parent(_p) {
- if (parent && parent->sequence){
- parent->sequenceLock.lock();
- parent->sequenceNo++;
- msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo);
+ if (parent){
+ if (parent->sequence || parent->ive) parent->sequenceLock.lock();
+
+ if (parent->sequence){
+ parent->sequenceNo++;
+ msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo);
+ }
+ if (parent->ive) {
+ parent->lastMsg = &( msg.getMessage());
+ }
}
}
Exchange::PreRoute::~PreRoute(){
- if (parent && parent->sequence){
+ if (parent && (parent->sequence || parent->ive)){
parent->sequenceLock.unlock();
}
}
+void Exchange::routeIVE(){
+ if (ive && lastMsg.get()){
+ DeliverableMessage dmsg(lastMsg);
+ route(dmsg, lastMsg->getRoutingKey(), lastMsg->getApplicationHeaders());
+ }
+}
+
+
Exchange::Exchange (const string& _name, Manageable* parent) :
name(_name), durable(false), persistenceId(0), sequence(false),
- sequenceNo(0), mgmtExchange(0)
+ sequenceNo(0), ive(false), mgmtExchange(0)
{
if (parent != 0)
{
@@ -73,7 +89,7 @@ Exchange::Exchange (const string& _name, Manageable* parent) :
Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
Manageable* parent)
: name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0),
- sequence(false), sequenceNo(0), mgmtExchange(0)
+ sequence(false), sequenceNo(0), ive(false), mgmtExchange(0)
{
if (parent != 0)
{
@@ -95,6 +111,8 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel
sequence = _args.get(qpidMsgSequence);
if (sequence) QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing");
+ ive = _args.get(qpidIVE);
+ if (ive) QPID_LOG(debug, "Configured exchange "+ _name +" with Initial Value");
}
Exchange::~Exchange ()
diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h
index 6006a09ea5..154325e67f 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.h
+++ b/qpid/cpp/src/qpid/broker/Exchange.h
@@ -51,6 +51,8 @@ namespace qpid {
bool sequence;
mutable qpid::sys::Mutex sequenceLock;
uint64_t sequenceNo;
+ bool ive;
+ boost::intrusive_ptr<Message> lastMsg;
class PreRoute{
public:
@@ -60,6 +62,8 @@ namespace qpid {
Exchange* parent;
};
+ void routeIVE();
+
struct Binding : public management::Manageable {
typedef boost::shared_ptr<Binding> shared_ptr;
typedef std::vector<Binding::shared_ptr> vector;
diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
index e92fac41dc..2628d8952f 100644
--- a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -49,6 +49,7 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const
mgmtExchange->inc_bindingCount();
((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
}
+ routeIVE();
return true;
} else {
return false;
diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
index b2bd4519bd..104b34da8b 100644
--- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -84,6 +84,7 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co
mgmtExchange->inc_bindingCount();
((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
}
+ routeIVE();
return true;
} else {
return false;
diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp
index d9be8b0d68..691b42a1ae 100644
--- a/qpid/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp
@@ -131,19 +131,22 @@ TopicExchange::TopicExchange(const std::string& _name, bool _durable,
}
bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
- RWlock::ScopedWlock l(lock);
- TopicPattern routingPattern(routingKey);
- if (isBound(queue, routingPattern)) {
- return false;
- } else {
- Binding::shared_ptr binding (new Binding (routingKey, queue, this));
- bindings[routingPattern].push_back(binding);
- if (mgmtExchange != 0) {
- mgmtExchange->inc_bindingCount();
- ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
+ {
+ RWlock::ScopedWlock l(lock);
+ TopicPattern routingPattern(routingKey);
+ if (isBound(queue, routingPattern)) {
+ return false;
+ } else {
+ Binding::shared_ptr binding (new Binding (routingKey, queue, this));
+ bindings[routingPattern].push_back(binding);
+ if (mgmtExchange != 0) {
+ mgmtExchange->inc_bindingCount();
+ ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
+ }
}
- return true;
}
+ routeIVE();
+ return true;
}
bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
diff --git a/qpid/cpp/src/qpid/xml/XmlExchange.cpp b/qpid/cpp/src/qpid/xml/XmlExchange.cpp
index 53eb0f20b8..d3269882d7 100644
--- a/qpid/cpp/src/qpid/xml/XmlExchange.cpp
+++ b/qpid/cpp/src/qpid/xml/XmlExchange.cpp
@@ -86,22 +86,21 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const
try {
RWlock::ScopedWlock l(lock);
- XmlBinding::vector& bindings(bindingsMap[routingKey]);
- XmlBinding::vector::ConstPtr p = bindings.snapshot();
- if (!p || std::find_if(p->begin(), p->end(), MatchQueue(queue)) == p->end()) {
- Query query(xqilla.parse(X(queryText.c_str())));
- XmlBinding::shared_ptr binding(new XmlBinding (routingKey, queue, this, query));
- bindings.add(binding);
- QPID_LOG(trace, "Bound successfully with query: " << queryText );
-
- if (mgmtExchange != 0) {
- mgmtExchange->inc_bindingCount();
- ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
+ XmlBinding::vector& bindings(bindingsMap[routingKey]);
+ XmlBinding::vector::ConstPtr p = bindings.snapshot();
+ if (!p || std::find_if(p->begin(), p->end(), MatchQueue(queue)) == p->end()) {
+ Query query(xqilla.parse(X(queryText.c_str())));
+ XmlBinding::shared_ptr binding(new XmlBinding (routingKey, queue, this, query));
+ bindings.add(binding);
+ QPID_LOG(trace, "Bound successfully with query: " << queryText );
+
+ if (mgmtExchange != 0) {
+ mgmtExchange->inc_bindingCount();
+ ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
+ }
+ } else {
+ return false;
}
- return true;
- } else {
- return false;
- }
}
catch (XQException& e) {
throw InternalErrorException(QPID_MSG("Could not parse xquery:"+ queryText));
@@ -109,6 +108,8 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const
catch (...) {
throw InternalErrorException(QPID_MSG("Unexpected error - Could not parse xquery:"+ queryText));
}
+ routeIVE();
+ return true;
}
bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/)
diff --git a/qpid/cpp/src/tests/ExchangeTest.cpp b/qpid/cpp/src/tests/ExchangeTest.cpp
index 711fede950..c2f6078b33 100644
--- a/qpid/cpp/src/tests/ExchangeTest.cpp
+++ b/qpid/cpp/src/tests/ExchangeTest.cpp
@@ -223,4 +223,45 @@ QPID_AUTO_TEST_CASE(testSequenceOptions)
}
+QPID_AUTO_TEST_CASE(testIVEOption)
+{
+ FieldTable args;
+ args.setInt("qpid.ive",1);
+ DirectExchange direct("direct1", false, args);
+ FanOutExchange fanout("fanout1", false, args);
+ HeadersExchange header("headers1", false, args);
+ TopicExchange topic ("topic1", false, args);
+
+ intrusive_ptr<Message> msg1 = cmessage("direct1", "abc");
+ msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString("a", "abc");
+ DeliverableMessage dmsg1(msg1);
+
+ FieldTable args2;
+ args2.setString("x-match", "any");
+ args2.setString("a", "abc");
+
+ direct.route(dmsg1, "abc", 0);
+ fanout.route(dmsg1, "abc", 0);
+ header.route(dmsg1, "abc", &args2);
+ topic.route(dmsg1, "abc", 0);
+ Queue::shared_ptr queue(new Queue("queue", true));
+ Queue::shared_ptr queue1(new Queue("queue1", true));
+ Queue::shared_ptr queue2(new Queue("queue2", true));
+ Queue::shared_ptr queue3(new Queue("queue3", true));
+
+ BOOST_CHECK(HeadersExchange::match(args2, msg1->getProperties<MessageProperties>()->getApplicationHeaders()));
+
+ BOOST_CHECK(direct.bind(queue, "abc", 0));
+ BOOST_CHECK(fanout.bind(queue1, "abc", 0));
+ BOOST_CHECK(header.bind(queue2, "", &args2));
+ BOOST_CHECK(topic.bind(queue3, "abc", 0));
+
+ BOOST_CHECK_EQUAL(1u,queue->getMessageCount());
+ BOOST_CHECK_EQUAL(1u,queue1->getMessageCount());
+ BOOST_CHECK_EQUAL(1u,queue2->getMessageCount());
+ BOOST_CHECK_EQUAL(1u,queue3->getMessageCount());
+
+}
+
+
QPID_AUTO_TEST_SUITE_END()