summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp95
1 files changed, 58 insertions, 37 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 11b2682575..a3a7336f35 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -247,18 +247,18 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess
{
Mutex::ScopedLock locker(messageLock);
QPID_LOG(debug, "Attempting to acquire message at " << position);
- for (Messages::iterator i = messages.begin(); i != messages.end(); i++) {
- if (i->position == position) {
- message = *i;
- if (lastValueQueue) {
- clearLVQIndex(*i);
- }
- QPID_LOG(debug,
- "Acquired message at " << i->position << " from " << name);
- messages.erase(i);
- return true;
+
+ Messages::iterator i = findAt(position);
+ if (i != messages.end() ) {
+ message = *i;
+ if (lastValueQueue) {
+ clearLVQIndex(*i);
}
- }
+ QPID_LOG(debug,
+ "Acquired message at " << i->position << " from " << name);
+ messages.erase(i);
+ return true;
+ }
QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
return false;
}
@@ -266,21 +266,21 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess
bool Queue::acquire(const QueuedMessage& msg) {
Mutex::ScopedLock locker(messageLock);
QPID_LOG(debug, "attempting to acquire " << msg.position);
- for (Messages::iterator i = messages.begin(); i != messages.end(); i++) {
- if ((i->position == msg.position && !lastValueQueue) // note that in some cases payload not be set
- || (lastValueQueue && (i->position == msg.position) &&
- msg.payload.get() == checkLvqReplace(*i).payload.get()) ) {
-
- clearLVQIndex(msg);
- QPID_LOG(debug,
- "Match found, acquire succeeded: " <<
- i->position << " == " << msg.position);
- messages.erase(i);
- return true;
- } else {
- QPID_LOG(debug, "No match: " << i->position << " != " << msg.position);
- }
+ Messages::iterator i = findAt(msg.position);
+ if ((i != messages.end() && !lastValueQueue) // note that in some cases payload not be set
+ || (lastValueQueue && (i->position == msg.position) &&
+ msg.payload.get() == checkLvqReplace(*i).payload.get()) ) {
+
+ clearLVQIndex(msg);
+ QPID_LOG(debug,
+ "Match found, acquire succeeded: " <<
+ i->position << " == " << msg.position);
+ messages.erase(i);
+ return true;
+ } else {
+ QPID_LOG(debug, "No match: " << i->position << " != " << msg.position);
}
+
QPID_LOG(debug, "Acquire failed for " << msg.position);
return false;
}
@@ -449,19 +449,35 @@ bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
return false;
}
-namespace {
-struct PositionEquals {
- SequenceNumber pos;
- PositionEquals(SequenceNumber p) : pos(p) {}
- bool operator()(const QueuedMessage& msg) const { return msg.position == pos; }
-};
-}// namespace
+Queue::Messages::iterator Queue::findAt(SequenceNumber pos) {
+
+ if(!messages.empty()){
+ QueuedMessage compM;
+ compM.position = pos;
+ unsigned long diff = pos.getValue() - messages.front().position.getValue();
+ long maxEnd = diff < messages.size()? diff : messages.size();
+
+ Messages::iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
+ if (i!= messages.end() && i->position == pos)
+ return i;
+ }
+ return messages.end(); // no match found.
+}
+
QueuedMessage Queue::find(SequenceNumber pos) const {
+
Mutex::ScopedLock locker(messageLock);
- Messages::const_iterator i = std::find_if(messages.begin(), messages.end(), PositionEquals(pos));
- if (i != messages.end())
- return *i;
+ if(!messages.empty()){
+ QueuedMessage compM;
+ compM.position = pos;
+ unsigned long diff = pos.getValue() - messages.front().position.getValue();
+ long maxEnd = diff < messages.size()? diff : messages.size();
+
+ Messages::const_iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
+ if (i != messages.end())
+ return *i;
+ }
return QueuedMessage();
}
@@ -646,10 +662,9 @@ QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg)
}
/** function only provided for unit tests, or code not in critical message path */
-uint32_t Queue::getMessageCount() const
+uint32_t Queue::getEnqueueCompleteMessageCount() const
{
Mutex::ScopedLock locker(messageLock);
-
uint32_t count = 0;
for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) {
//NOTE: don't need to use checkLvqReplace() here as it
@@ -661,6 +676,12 @@ uint32_t Queue::getMessageCount() const
return count;
}
+uint32_t Queue::getMessageCount() const
+{
+ Mutex::ScopedLock locker(messageLock);
+ return messages.size();
+}
+
uint32_t Queue::getConsumerCount() const
{
Mutex::ScopedLock locker(consumerLock);