summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-01-18 22:07:58 +0000
committerAlan Conway <aconway@apache.org>2012-01-18 22:07:58 +0000
commit95d12bc60580be2acf85af1ec8cb13ed46ddc7a9 (patch)
tree0a9887c89748f5e83a0ff1c4b73bdc3d21200f35 /cpp/src
parent7614a68ecdac345920fc00e4f4117c5b36830de4 (diff)
downloadqpid-python-95d12bc60580be2acf85af1ec8cb13ed46ddc7a9.tar.gz
QPID-3603: Replace public broker::Consumer::position variable with get/set function pair.
Public member variables are not good sytle. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1233080 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Consumer.h13
-rw-r--r--cpp/src/qpid/broker/FifoDistributor.cpp2
-rw-r--r--cpp/src/qpid/broker/MessageGroupManager.cpp4
-rw-r--r--cpp/src/qpid/broker/Queue.cpp10
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp2
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp2
-rw-r--r--cpp/src/tests/QueueTest.cpp2
7 files changed, 21 insertions, 14 deletions
diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h
index 2af9b0c121..74052844c9 100644
--- a/cpp/src/qpid/broker/Consumer.h
+++ b/cpp/src/qpid/broker/Consumer.h
@@ -42,19 +42,26 @@ class Consumer {
public:
typedef boost::shared_ptr<Consumer> shared_ptr;
- framing::SequenceNumber position;
-
Consumer(const std::string& _name, bool preAcquires = true)
: acquires(preAcquires), inListeners(false), name(_name), position(0) {}
+ virtual ~Consumer(){}
+
bool preAcquires() const { return acquires; }
const std::string& getName() const { return name; }
+ virtual framing::SequenceNumber getPosition() const { return position; }
+ virtual void setPosition(framing::SequenceNumber pos) { position = pos; }
+
virtual bool deliver(QueuedMessage& msg) = 0;
virtual void notify() = 0;
virtual bool filter(boost::intrusive_ptr<Message>) { return true; }
virtual bool accept(boost::intrusive_ptr<Message>) { return true; }
virtual OwnershipToken* getSession() = 0;
- virtual ~Consumer(){}
+
+ protected:
+ framing::SequenceNumber position;
+
+ private:
friend class QueueListeners;
};
diff --git a/cpp/src/qpid/broker/FifoDistributor.cpp b/cpp/src/qpid/broker/FifoDistributor.cpp
index cdb32d8c8c..eb1f0a402e 100644
--- a/cpp/src/qpid/broker/FifoDistributor.cpp
+++ b/cpp/src/qpid/broker/FifoDistributor.cpp
@@ -46,7 +46,7 @@ bool FifoDistributor::allocate(const std::string&, const QueuedMessage& )
bool FifoDistributor::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
{
- if (!messages.empty() && messages.next(c->position, next))
+ if (!messages.empty() && messages.next(c->getPosition(), next))
return true;
return false;
}
diff --git a/cpp/src/qpid/broker/MessageGroupManager.cpp b/cpp/src/qpid/broker/MessageGroupManager.cpp
index ddef66b80a..7054ef0310 100644
--- a/cpp/src/qpid/broker/MessageGroupManager.cpp
+++ b/cpp/src/qpid/broker/MessageGroupManager.cpp
@@ -207,7 +207,7 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued
if (messages.empty())
return false;
- next.position = c->position;
+ next.position = c->getPosition();
if (!freeGroups.empty()) {
const framing::SequenceNumber& nextFree = freeGroups.begin()->first;
if (nextFree < next.position) { // a free message is older than current
@@ -249,7 +249,7 @@ bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMess
bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
{
// browse: allow access to any available msg, regardless of group ownership (?ok?)
- if (!messages.empty() && messages.next(c->position, next))
+ if (!messages.empty() && messages.next(c->getPosition(), next))
return true;
return false;
}
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 4627b1409a..f87041390b 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -309,7 +309,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
if (msg.payload->hasExpired()) {
QPID_LOG(debug, "Message expired from queue '" << name << "'");
- c->position = msg.position;
+ c->getPosition() = msg.position;
acquire( msg.position, msg, locker);
dequeue( 0, msg );
continue;
@@ -324,7 +324,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
ok = acquire( msg.position, msg, locker);
(void) ok; assert(ok);
m = msg;
- c->position = m.position;
+ c->getPosition() = m.position;
return CONSUMED;
} else {
//message(s) are available but consumer hasn't got enough credit
@@ -334,7 +334,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
} else {
//consumer will never want this message
QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
- c->position = msg.position;
+ c->getPosition() = msg.position;
return CANT_CONSUME;
}
}
@@ -356,7 +356,7 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
if (c->filter(msg.payload) && !msg.payload->hasExpired()) {
if (c->accept(msg.payload)) {
//consumer wants the message
- c->position = msg.position;
+ c->setPosition(msg.position);
m = msg;
return true;
} else {
@@ -367,7 +367,7 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
} else {
//consumer will never want this message, continue seeking
QPID_LOG(debug, "Browser skipping message from '" << name << "'");
- c->position = msg.position;
+ c->setPosition(msg.position);
}
}
return false;
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index d81fbd0494..c16ab72876 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -406,7 +406,7 @@ void Connection::consumerState(const string& name, bool blocked, bool notifyEnab
uint32_t usedMsgCredit, uint32_t usedByteCredit)
{
broker::SemanticState::ConsumerImpl::shared_ptr c = semanticState().find(name);
- c->position = position;
+ c->setPosition(position);
c->setBlocked(blocked);
if (c->getCredit().isWindowMode()) c->getCredit().consume(usedMsgCredit, usedByteCredit);
if (notifyEnabled) c->enableNotify(); else c->disableNotify();
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 25a3a3327c..95c64ff060 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -542,7 +542,7 @@ void UpdateClient::updateConsumer(
ci->getTag(),
ci->isBlocked(),
ci->isNotifyEnabled(),
- ci->position,
+ ci->getPosition(),
ci->getCredit().used().messages,
ci->getCredit().used().bytes
);
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index aaa2721021..ccdb7810e1 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -300,7 +300,7 @@ QPID_AUTO_TEST_CASE(testSeek){
TestConsumer::shared_ptr consumer(new TestConsumer("test", false));
SequenceNumber seq(2);
- consumer->position = seq;
+ consumer->setPosition(seq);
QueuedMessage qm;
queue->dispatch(consumer);