diff options
author | Alan Conway <aconway@apache.org> | 2012-01-18 22:07:58 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-01-18 22:07:58 +0000 |
commit | 95d12bc60580be2acf85af1ec8cb13ed46ddc7a9 (patch) | |
tree | 0a9887c89748f5e83a0ff1c4b73bdc3d21200f35 /cpp/src | |
parent | 7614a68ecdac345920fc00e4f4117c5b36830de4 (diff) | |
download | qpid-python-95d12bc60580be2acf85af1ec8cb13ed46ddc7a9.tar.gz |
QPID-3603: Replace public broker::Consumer::position variable with get/set function pair.
Public member variables are not good sytle.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1233080 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Consumer.h | 13 | ||||
-rw-r--r-- | cpp/src/qpid/broker/FifoDistributor.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageGroupManager.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/QueueTest.cpp | 2 |
7 files changed, 21 insertions, 14 deletions
diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h index 2af9b0c121..74052844c9 100644 --- a/cpp/src/qpid/broker/Consumer.h +++ b/cpp/src/qpid/broker/Consumer.h @@ -42,19 +42,26 @@ class Consumer { public: typedef boost::shared_ptr<Consumer> shared_ptr; - framing::SequenceNumber position; - Consumer(const std::string& _name, bool preAcquires = true) : acquires(preAcquires), inListeners(false), name(_name), position(0) {} + virtual ~Consumer(){} + bool preAcquires() const { return acquires; } const std::string& getName() const { return name; } + virtual framing::SequenceNumber getPosition() const { return position; } + virtual void setPosition(framing::SequenceNumber pos) { position = pos; } + virtual bool deliver(QueuedMessage& msg) = 0; virtual void notify() = 0; virtual bool filter(boost::intrusive_ptr<Message>) { return true; } virtual bool accept(boost::intrusive_ptr<Message>) { return true; } virtual OwnershipToken* getSession() = 0; - virtual ~Consumer(){} + + protected: + framing::SequenceNumber position; + + private: friend class QueueListeners; }; diff --git a/cpp/src/qpid/broker/FifoDistributor.cpp b/cpp/src/qpid/broker/FifoDistributor.cpp index cdb32d8c8c..eb1f0a402e 100644 --- a/cpp/src/qpid/broker/FifoDistributor.cpp +++ b/cpp/src/qpid/broker/FifoDistributor.cpp @@ -46,7 +46,7 @@ bool FifoDistributor::allocate(const std::string&, const QueuedMessage& ) bool FifoDistributor::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) { - if (!messages.empty() && messages.next(c->position, next)) + if (!messages.empty() && messages.next(c->getPosition(), next)) return true; return false; } diff --git a/cpp/src/qpid/broker/MessageGroupManager.cpp b/cpp/src/qpid/broker/MessageGroupManager.cpp index ddef66b80a..7054ef0310 100644 --- a/cpp/src/qpid/broker/MessageGroupManager.cpp +++ b/cpp/src/qpid/broker/MessageGroupManager.cpp @@ -207,7 +207,7 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued if (messages.empty()) return false; - next.position = c->position; + next.position = c->getPosition(); if (!freeGroups.empty()) { const framing::SequenceNumber& nextFree = freeGroups.begin()->first; if (nextFree < next.position) { // a free message is older than current @@ -249,7 +249,7 @@ bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMess bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) { // browse: allow access to any available msg, regardless of group ownership (?ok?) - if (!messages.empty() && messages.next(c->position, next)) + if (!messages.empty() && messages.next(c->getPosition(), next)) return true; return false; } diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 4627b1409a..f87041390b 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -309,7 +309,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ if (msg.payload->hasExpired()) { QPID_LOG(debug, "Message expired from queue '" << name << "'"); - c->position = msg.position; + c->getPosition() = msg.position; acquire( msg.position, msg, locker); dequeue( 0, msg ); continue; @@ -324,7 +324,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ ok = acquire( msg.position, msg, locker); (void) ok; assert(ok); m = msg; - c->position = m.position; + c->getPosition() = m.position; return CONSUMED; } else { //message(s) are available but consumer hasn't got enough credit @@ -334,7 +334,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ } else { //consumer will never want this message QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); - c->position = msg.position; + c->getPosition() = msg.position; return CANT_CONSUME; } } @@ -356,7 +356,7 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) if (c->filter(msg.payload) && !msg.payload->hasExpired()) { if (c->accept(msg.payload)) { //consumer wants the message - c->position = msg.position; + c->setPosition(msg.position); m = msg; return true; } else { @@ -367,7 +367,7 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) } else { //consumer will never want this message, continue seeking QPID_LOG(debug, "Browser skipping message from '" << name << "'"); - c->position = msg.position; + c->setPosition(msg.position); } } return false; diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index d81fbd0494..c16ab72876 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -406,7 +406,7 @@ void Connection::consumerState(const string& name, bool blocked, bool notifyEnab uint32_t usedMsgCredit, uint32_t usedByteCredit) { broker::SemanticState::ConsumerImpl::shared_ptr c = semanticState().find(name); - c->position = position; + c->setPosition(position); c->setBlocked(blocked); if (c->getCredit().isWindowMode()) c->getCredit().consume(usedMsgCredit, usedByteCredit); if (notifyEnabled) c->enableNotify(); else c->disableNotify(); diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 25a3a3327c..95c64ff060 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -542,7 +542,7 @@ void UpdateClient::updateConsumer( ci->getTag(), ci->isBlocked(), ci->isNotifyEnabled(), - ci->position, + ci->getPosition(), ci->getCredit().used().messages, ci->getCredit().used().bytes ); diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index aaa2721021..ccdb7810e1 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -300,7 +300,7 @@ QPID_AUTO_TEST_CASE(testSeek){ TestConsumer::shared_ptr consumer(new TestConsumer("test", false)); SequenceNumber seq(2); - consumer->position = seq; + consumer->setPosition(seq); QueuedMessage qm; queue->dispatch(consumer); |