summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-10-23 18:57:38 +0000
committerGordon Sim <gsim@apache.org>2008-10-23 18:57:38 +0000
commit74128983b961a029e6ad206d9ecc6a1299d67ec2 (patch)
treef38b3840fc1c913468ee502aa2e16b5138031c55
parent6431dd50333cce065260e19d4a47a335c775ea1f (diff)
downloadqpid-python-74128983b961a029e6ad206d9ecc6a1299d67ec2.tar.gz
Some fixes to the LVQ (primarily a patch from cctrieloff@redhat.com)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@707446 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp14
-rw-r--r--qpid/cpp/src/qpid/broker/Message.h7
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp66
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h19
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp43
5 files changed, 106 insertions, 43 deletions
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index 0302bc1dbd..4a63962ecf 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -324,3 +324,17 @@ bool Message::hasExpired() const
{
return expiration < FAR_FUTURE && expiration < AbsTime::now();
}
+
+boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor) const
+{
+ Replacement::iterator i = replacement.find(qfor);
+ if (i != replacement.end()){
+ return i->second;
+ }
+ return empty;
+}
+
+void Message::setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor)
+{
+ replacement[qfor] = msg;
+}
diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h
index f6eec361bb..f7f49f1857 100644
--- a/qpid/cpp/src/qpid/broker/Message.h
+++ b/qpid/cpp/src/qpid/broker/Message.h
@@ -138,6 +138,9 @@ public:
void addTraceId(const std::string& id);
void forcePersistent();
+
+ boost::intrusive_ptr<Message>& getReplacementMessage(const Queue* qfor) const;
+ void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor);
private:
mutable sys::Mutex lock;
@@ -155,6 +158,10 @@ public:
static TransferAdapter TRANSFER;
MessageAdapter& getAdapter() const;
+ typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement;
+
+ mutable Replacement replacement;
+ mutable boost::intrusive_ptr<Message> empty;
};
}}
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 52404c826c..968720050d 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -198,17 +198,23 @@ void Queue::requeue(const QueuedMessage& msg){
for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
}
+void Queue::clearLVQIndex(const QueuedMessage& msg){
+ if (lastValueQueue){
+ const framing::FieldTable* ft = msg.payload->getApplicationHeaders();
+ string key = ft->getAsString(qpidVQMatchProperty);
+ lvq.erase(key);
+ }
+}
+
bool Queue::acquire(const QueuedMessage& msg) {
Mutex::ScopedLock locker(messageLock);
QPID_LOG(debug, "attempting to acquire " << msg.position);
for (Messages::iterator i = messages.begin(); i != messages.end(); i++) {
if ((i->position == msg.position && !lastValueQueue) // note that in some cases payload not be set
- || (lastValueQueue && i->position == msg.position && i->payload.get() == msg.payload.get())) {
- if (lastValueQueue){
- const framing::FieldTable* ft = msg.payload->getApplicationHeaders();
- string key = ft->getAsString(qpidVQMatchProperty);
- lvq.erase(key);
- }
+ || (lastValueQueue && (i->position == msg.position) &&
+ msg.payload.get() == checkLvqReplace(*i).payload.get()) ) {
+
+ clearLVQIndex(msg);
messages.erase(i);
QPID_LOG(debug, "Match found, acquire succeeded: " << i->position << " == " << msg.position);
return true;
@@ -238,7 +244,7 @@ bool Queue::checkForMessages(Consumer::shared_ptr c)
addListener(c);
return false;
} else {
- QueuedMessage msg = messages.front();
+ QueuedMessage msg = getFront();
if (store && !msg.payload->isEnqueueComplete()) {
//though a message is on the queue, it has not yet been
//enqueued and so is not available for consumption yet,
@@ -264,7 +270,7 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
addListener(c);
return false;
} else {
- QueuedMessage msg = messages.front();
+ QueuedMessage msg = getFront();
if (msg.payload->hasExpired()) {
QPID_LOG(debug, "Message expired from queue '" << name << "'");
popAndDequeue();
@@ -306,6 +312,7 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
//consumer wants the message
c->position = msg.position;
m = msg;
+ clearLVQIndex(msg);
return true;
} else {
//browser hasn't got enough credit for the message
@@ -348,8 +355,8 @@ bool Queue::dispatch(Consumer::shared_ptr c)
bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
Mutex::ScopedLock locker(messageLock);
if (!messages.empty() && messages.back().position > c->position) {
- if (c->position < messages.front().position) {
- msg = messages.front();
+ if (c->position < getFront().position) {
+ msg = getFront();
return true;
} else {
//TODO: can improve performance of this search, for now just searching linearly from end
@@ -416,7 +423,7 @@ QueuedMessage Queue::get(){
QueuedMessage msg(this);
if(!messages.empty()){
- msg = messages.front();
+ msg = getFront();
popMsg(msg);
}
return msg;
@@ -432,6 +439,7 @@ void Queue::purgeExpired()
{
Mutex::ScopedLock locker(messageLock);
for (Messages::iterator i = messages.begin(); i != messages.end();) {
+ if (lastValueQueue) checkLvqReplace(*i);
if (i->payload->hasExpired()) {
expired.push_back(*i);
i = messages.erase(i);
@@ -471,7 +479,7 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
uint32_t count = 0; // count how many were moved for returning
while((!qty || move_count--) && !messages.empty()) {
- QueuedMessage qmsg = messages.front();
+ QueuedMessage qmsg = getFront();
boost::intrusive_ptr<Message> msg = qmsg.payload;
destq->deliver(msg); // deliver message to the destination queue
popMsg(qmsg);
@@ -509,12 +517,11 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){
if (i == lvq.end()){
messages.push_back(qm);
listeners.swap(copy);
- lvq[key] = &messages.back();
+ lvq[key] = msg;
}else {
- i->second->payload = msg;
+ i->second->setReplacementMessage(msg,this);
}
}else {
-
messages.push_back(qm);
listeners.swap(copy);
}
@@ -522,13 +529,33 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){
for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
}
+QueuedMessage Queue::getFront()
+{
+ QueuedMessage msg = messages.front();
+ if (lastValueQueue) {
+ boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
+ if (replacement.get()) msg.payload = replacement;
+ }
+ return msg;
+}
+
+QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg) const
+{
+ boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
+ if (replacement.get()) msg.payload = replacement;
+ return msg;
+}
+
/** function only provided for unit tests, or code not in critical message path */
uint32_t Queue::getMessageCount() const
{
Mutex::ScopedLock locker(messageLock);
- uint32_t count =0;
+ uint32_t count = 0;
for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) {
+ //NOTE: don't need to use checkLvqReplace() here as it
+ //is only relevant for LVQ which does not support persistence
+ //so the enqueueComplete check has no effect
if ( i->payload->isEnqueueComplete() ) count ++;
}
@@ -556,7 +583,8 @@ void Queue::setLastNodeFailure()
{
if (persistLastNode){
Mutex::ScopedLock locker(messageLock);
- for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) {
+ for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) {
+ if (lastValueQueue) checkLvqReplace(*i);
i->payload->forcePersistent();
if (i->payload->getPersistenceId() == 0){
enqueue(0, i->payload);
@@ -609,7 +637,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
*/
void Queue::popAndDequeue()
{
- QueuedMessage msg = messages.front();
+ QueuedMessage msg = getFront();
popMsg(msg);
dequeue(0, msg);
}
@@ -667,7 +695,7 @@ void Queue::destroy()
if (alternateExchange.get()) {
Mutex::ScopedLock locker(messageLock);
while(!messages.empty()){
- DeliverableMessage msg(messages.front().payload);
+ DeliverableMessage msg(getFront().payload);
alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
msg.getMessage().getApplicationHeaders());
popAndDequeue();
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 6becb77ff5..bca01f7ef5 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -66,7 +66,7 @@ namespace qpid {
typedef std::list<Consumer::shared_ptr> Listeners;
typedef std::deque<QueuedMessage> Messages;
- typedef std::map<string,QueuedMessage*> LVQ;
+ typedef std::map<string,boost::intrusive_ptr<Message> > LVQ;
const string name;
const bool autodelete;
@@ -111,7 +111,10 @@ RateTracker dequeueTracker;
void dequeued(const QueuedMessage& msg);
void popAndDequeue();
-
+ QueuedMessage getFront();
+ QueuedMessage& checkLvqReplace(QueuedMessage& msg) const;
+ void clearLVQIndex(const QueuedMessage& msg);
+
inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg)
{
if (mgmtObject != 0) {
@@ -193,8 +196,8 @@ RateTracker dequeueTracker;
uint32_t purge(const uint32_t purge_request = 0); //defaults to all messages
void purgeExpired();
- //move qty # of messages to destination Queue destq
- uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
+ //move qty # of messages to destination Queue destq
+ uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
uint32_t getMessageCount() const;
uint32_t getConsumerCount() const;
@@ -211,10 +214,10 @@ RateTracker dequeueTracker;
const QueueBindings& getBindings() const { return bindings; }
/**
- * used to take messages from in memory and flush down to disk.
- */
- void setLastNodeFailure();
- void clearLastNodeFailure();
+ * used to take messages from in memory and flush down to disk.
+ */
+ void setLastNodeFailure();
+ void clearLastNodeFailure();
bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg);
/**
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index ef8aa69dd6..ab9f146fd7 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -438,27 +438,38 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
}
+QPID_AUTO_TEST_CASE(testLVQMultiQueue){
-QPID_AUTO_TEST_CASE(testLVQSaftyCheck){
-
-// This test is to check std::deque memory copy does not change out under us
-// if this test fails, then lvq would no longer be safe.
+ client::QueueOptions args;
+ // set queue mode
+ args.setOrdering(client::LVQ);
- std::deque<string> deq;
+ Queue::shared_ptr queue1(new Queue("my-queue", true ));
+ Queue::shared_ptr queue2(new Queue("my-queue", true ));
+ intrusive_ptr<Message> received;
+ queue1->configure(args);
+ queue2->configure(args);
- string a;
- string b;
+ intrusive_ptr<Message> msg1 = message("e", "A");
+ intrusive_ptr<Message> msg2 = message("e", "A");
+
+ string key;
+ args.getLVQKey(key);
+ BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
+
+ msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
- deq.push_back(a);
- deq.push_back(b);
- string* tmp = &deq.back();
- for (int a =0; a<=100000; a++){
- string z;
- deq.push_back(z);
- }
- deq.pop_front();
- BOOST_CHECK_EQUAL(&deq.front(),tmp);
+ queue1->deliver(msg1);
+ queue2->deliver(msg1);
+ queue1->deliver(msg2);
+
+ received = queue1->get().payload;
+ BOOST_CHECK_EQUAL(msg2.get(), received.get());
+ received = queue2->get().payload;
+ BOOST_CHECK_EQUAL(msg1.get(), received.get());
+
}
void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0)