diff options
author | Gordon Sim <gsim@apache.org> | 2008-10-23 18:57:38 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-10-23 18:57:38 +0000 |
commit | 74128983b961a029e6ad206d9ecc6a1299d67ec2 (patch) | |
tree | f38b3840fc1c913468ee502aa2e16b5138031c55 | |
parent | 6431dd50333cce065260e19d4a47a335c775ea1f (diff) | |
download | qpid-python-74128983b961a029e6ad206d9ecc6a1299d67ec2.tar.gz |
Some fixes to the LVQ (primarily a patch from cctrieloff@redhat.com)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@707446 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Message.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Message.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 66 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 19 | ||||
-rw-r--r-- | qpid/cpp/src/tests/QueueTest.cpp | 43 |
5 files changed, 106 insertions, 43 deletions
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 0302bc1dbd..4a63962ecf 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -324,3 +324,17 @@ bool Message::hasExpired() const { return expiration < FAR_FUTURE && expiration < AbsTime::now(); } + +boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor) const +{ + Replacement::iterator i = replacement.find(qfor); + if (i != replacement.end()){ + return i->second; + } + return empty; +} + +void Message::setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor) +{ + replacement[qfor] = msg; +} diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h index f6eec361bb..f7f49f1857 100644 --- a/qpid/cpp/src/qpid/broker/Message.h +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -138,6 +138,9 @@ public: void addTraceId(const std::string& id); void forcePersistent(); + + boost::intrusive_ptr<Message>& getReplacementMessage(const Queue* qfor) const; + void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor); private: mutable sys::Mutex lock; @@ -155,6 +158,10 @@ public: static TransferAdapter TRANSFER; MessageAdapter& getAdapter() const; + typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement; + + mutable Replacement replacement; + mutable boost::intrusive_ptr<Message> empty; }; }} diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 52404c826c..968720050d 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -198,17 +198,23 @@ void Queue::requeue(const QueuedMessage& msg){ for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify)); } +void Queue::clearLVQIndex(const QueuedMessage& msg){ + if (lastValueQueue){ + const framing::FieldTable* ft = msg.payload->getApplicationHeaders(); + string key = ft->getAsString(qpidVQMatchProperty); + lvq.erase(key); + } +} + bool Queue::acquire(const QueuedMessage& msg) { Mutex::ScopedLock locker(messageLock); QPID_LOG(debug, "attempting to acquire " << msg.position); for (Messages::iterator i = messages.begin(); i != messages.end(); i++) { if ((i->position == msg.position && !lastValueQueue) // note that in some cases payload not be set - || (lastValueQueue && i->position == msg.position && i->payload.get() == msg.payload.get())) { - if (lastValueQueue){ - const framing::FieldTable* ft = msg.payload->getApplicationHeaders(); - string key = ft->getAsString(qpidVQMatchProperty); - lvq.erase(key); - } + || (lastValueQueue && (i->position == msg.position) && + msg.payload.get() == checkLvqReplace(*i).payload.get()) ) { + + clearLVQIndex(msg); messages.erase(i); QPID_LOG(debug, "Match found, acquire succeeded: " << i->position << " == " << msg.position); return true; @@ -238,7 +244,7 @@ bool Queue::checkForMessages(Consumer::shared_ptr c) addListener(c); return false; } else { - QueuedMessage msg = messages.front(); + QueuedMessage msg = getFront(); if (store && !msg.payload->isEnqueueComplete()) { //though a message is on the queue, it has not yet been //enqueued and so is not available for consumption yet, @@ -264,7 +270,7 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) addListener(c); return false; } else { - QueuedMessage msg = messages.front(); + QueuedMessage msg = getFront(); if (msg.payload->hasExpired()) { QPID_LOG(debug, "Message expired from queue '" << name << "'"); popAndDequeue(); @@ -306,6 +312,7 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c) //consumer wants the message c->position = msg.position; m = msg; + clearLVQIndex(msg); return true; } else { //browser hasn't got enough credit for the message @@ -348,8 +355,8 @@ bool Queue::dispatch(Consumer::shared_ptr c) bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) { Mutex::ScopedLock locker(messageLock); if (!messages.empty() && messages.back().position > c->position) { - if (c->position < messages.front().position) { - msg = messages.front(); + if (c->position < getFront().position) { + msg = getFront(); return true; } else { //TODO: can improve performance of this search, for now just searching linearly from end @@ -416,7 +423,7 @@ QueuedMessage Queue::get(){ QueuedMessage msg(this); if(!messages.empty()){ - msg = messages.front(); + msg = getFront(); popMsg(msg); } return msg; @@ -432,6 +439,7 @@ void Queue::purgeExpired() { Mutex::ScopedLock locker(messageLock); for (Messages::iterator i = messages.begin(); i != messages.end();) { + if (lastValueQueue) checkLvqReplace(*i); if (i->payload->hasExpired()) { expired.push_back(*i); i = messages.erase(i); @@ -471,7 +479,7 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) { uint32_t count = 0; // count how many were moved for returning while((!qty || move_count--) && !messages.empty()) { - QueuedMessage qmsg = messages.front(); + QueuedMessage qmsg = getFront(); boost::intrusive_ptr<Message> msg = qmsg.payload; destq->deliver(msg); // deliver message to the destination queue popMsg(qmsg); @@ -509,12 +517,11 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){ if (i == lvq.end()){ messages.push_back(qm); listeners.swap(copy); - lvq[key] = &messages.back(); + lvq[key] = msg; }else { - i->second->payload = msg; + i->second->setReplacementMessage(msg,this); } }else { - messages.push_back(qm); listeners.swap(copy); } @@ -522,13 +529,33 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){ for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify)); } +QueuedMessage Queue::getFront() +{ + QueuedMessage msg = messages.front(); + if (lastValueQueue) { + boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this); + if (replacement.get()) msg.payload = replacement; + } + return msg; +} + +QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg) const +{ + boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this); + if (replacement.get()) msg.payload = replacement; + return msg; +} + /** function only provided for unit tests, or code not in critical message path */ uint32_t Queue::getMessageCount() const { Mutex::ScopedLock locker(messageLock); - uint32_t count =0; + uint32_t count = 0; for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) { + //NOTE: don't need to use checkLvqReplace() here as it + //is only relevant for LVQ which does not support persistence + //so the enqueueComplete check has no effect if ( i->payload->isEnqueueComplete() ) count ++; } @@ -556,7 +583,8 @@ void Queue::setLastNodeFailure() { if (persistLastNode){ Mutex::ScopedLock locker(messageLock); - for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) { + for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) { + if (lastValueQueue) checkLvqReplace(*i); i->payload->forcePersistent(); if (i->payload->getPersistenceId() == 0){ enqueue(0, i->payload); @@ -609,7 +637,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) */ void Queue::popAndDequeue() { - QueuedMessage msg = messages.front(); + QueuedMessage msg = getFront(); popMsg(msg); dequeue(0, msg); } @@ -667,7 +695,7 @@ void Queue::destroy() if (alternateExchange.get()) { Mutex::ScopedLock locker(messageLock); while(!messages.empty()){ - DeliverableMessage msg(messages.front().payload); + DeliverableMessage msg(getFront().payload); alternateExchange->route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders()); popAndDequeue(); diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 6becb77ff5..bca01f7ef5 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -66,7 +66,7 @@ namespace qpid { typedef std::list<Consumer::shared_ptr> Listeners; typedef std::deque<QueuedMessage> Messages; - typedef std::map<string,QueuedMessage*> LVQ; + typedef std::map<string,boost::intrusive_ptr<Message> > LVQ; const string name; const bool autodelete; @@ -111,7 +111,10 @@ RateTracker dequeueTracker; void dequeued(const QueuedMessage& msg); void popAndDequeue(); - + QueuedMessage getFront(); + QueuedMessage& checkLvqReplace(QueuedMessage& msg) const; + void clearLVQIndex(const QueuedMessage& msg); + inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg) { if (mgmtObject != 0) { @@ -193,8 +196,8 @@ RateTracker dequeueTracker; uint32_t purge(const uint32_t purge_request = 0); //defaults to all messages void purgeExpired(); - //move qty # of messages to destination Queue destq - uint32_t move(const Queue::shared_ptr destq, uint32_t qty); + //move qty # of messages to destination Queue destq + uint32_t move(const Queue::shared_ptr destq, uint32_t qty); uint32_t getMessageCount() const; uint32_t getConsumerCount() const; @@ -211,10 +214,10 @@ RateTracker dequeueTracker; const QueueBindings& getBindings() const { return bindings; } /** - * used to take messages from in memory and flush down to disk. - */ - void setLastNodeFailure(); - void clearLastNodeFailure(); + * used to take messages from in memory and flush down to disk. + */ + void setLastNodeFailure(); + void clearLastNodeFailure(); bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg); /** diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index ef8aa69dd6..ab9f146fd7 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -438,27 +438,38 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){ } +QPID_AUTO_TEST_CASE(testLVQMultiQueue){ -QPID_AUTO_TEST_CASE(testLVQSaftyCheck){ - -// This test is to check std::deque memory copy does not change out under us -// if this test fails, then lvq would no longer be safe. + client::QueueOptions args; + // set queue mode + args.setOrdering(client::LVQ); - std::deque<string> deq; + Queue::shared_ptr queue1(new Queue("my-queue", true )); + Queue::shared_ptr queue2(new Queue("my-queue", true )); + intrusive_ptr<Message> received; + queue1->configure(args); + queue2->configure(args); - string a; - string b; + intrusive_ptr<Message> msg1 = message("e", "A"); + intrusive_ptr<Message> msg2 = message("e", "A"); + + string key; + args.getLVQKey(key); + BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); + + msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); + msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); - deq.push_back(a); - deq.push_back(b); - string* tmp = &deq.back(); - for (int a =0; a<=100000; a++){ - string z; - deq.push_back(z); - } - deq.pop_front(); - BOOST_CHECK_EQUAL(&deq.front(),tmp); + queue1->deliver(msg1); + queue2->deliver(msg1); + queue1->deliver(msg2); + + received = queue1->get().payload; + BOOST_CHECK_EQUAL(msg2.get(), received.get()); + received = queue2->get().payload; + BOOST_CHECK_EQUAL(msg1.get(), received.get()); + } void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0) |