diff options
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 58 |
1 files changed, 29 insertions, 29 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 680488d66a..f97d37e893 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -285,7 +285,7 @@ void SemanticState::record(const DeliveryRecord& delivery) const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency"); -SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, +SemanticStateConsumerImpl::SemanticStateConsumerImpl(SemanticState* _parent, const string& _name, Queue::shared_ptr _queue, bool ack, @@ -328,12 +328,12 @@ Consumer(_name, type), } } -ManagementObject::shared_ptr SemanticState::ConsumerImpl::GetManagementObject (void) const +ManagementObject::shared_ptr SemanticStateConsumerImpl::GetManagementObject (void) const { return mgmtObject; } -Manageable::status_t SemanticState::ConsumerImpl::ManagementMethod (uint32_t methodId, Args&, string&) +Manageable::status_t SemanticStateConsumerImpl::ManagementMethod (uint32_t methodId, Args&, string&) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; @@ -343,16 +343,16 @@ Manageable::status_t SemanticState::ConsumerImpl::ManagementMethod (uint32_t met } -OwnershipToken* SemanticState::ConsumerImpl::getSession() +OwnershipToken* SemanticStateConsumerImpl::getSession() { return &(parent->session); } -bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg) +bool SemanticStateConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg) { return deliver(cursor, msg, shared_from_this()); } -bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg, boost::shared_ptr<Consumer> consumer) +bool SemanticStateConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg, boost::shared_ptr<Consumer> consumer) { allocateCredit(msg); boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg); @@ -377,12 +377,12 @@ bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Messa return true; } -bool SemanticState::ConsumerImpl::filter(const Message&) +bool SemanticStateConsumerImpl::filter(const Message&) { return true; } -bool SemanticState::ConsumerImpl::accept(const Message& msg) +bool SemanticStateConsumerImpl::accept(const Message& msg) { // TODO aconway 2009-06-08: if we have byte & message credit but // checkCredit fails because the message is to big, we should @@ -395,8 +395,8 @@ bool SemanticState::ConsumerImpl::accept(const Message& msg) namespace { struct ConsumerName { - const SemanticState::ConsumerImpl& consumer; - ConsumerName(const SemanticState::ConsumerImpl& ci) : consumer(ci) {} + const SemanticStateConsumerImpl& consumer; + ConsumerName(const SemanticStateConsumerImpl& ci) : consumer(ci) {} }; ostream& operator<<(ostream& o, const ConsumerName& pc) { @@ -405,7 +405,7 @@ ostream& operator<<(ostream& o, const ConsumerName& pc) { } } -void SemanticState::ConsumerImpl::allocateCredit(const Message& msg) +void SemanticStateConsumerImpl::allocateCredit(const Message& msg) { Credit original = credit; boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg); @@ -415,7 +415,7 @@ void SemanticState::ConsumerImpl::allocateCredit(const Message& msg) } -bool SemanticState::ConsumerImpl::checkCredit(const Message& msg) +bool SemanticStateConsumerImpl::checkCredit(const Message& msg) { boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg); bool enoughCredit = credit.check(1, transfer->getRequiredCredit()); @@ -425,7 +425,7 @@ bool SemanticState::ConsumerImpl::checkCredit(const Message& msg) return enoughCredit; } -SemanticState::ConsumerImpl::~ConsumerImpl() +SemanticStateConsumerImpl::~SemanticStateConsumerImpl() { if (mgmtObject != 0) mgmtObject->resourceDestroy (); @@ -498,7 +498,7 @@ void SemanticState::requestDispatch() i->second->requestDispatch(); } -void SemanticState::ConsumerImpl::requestDispatch() +void SemanticStateConsumerImpl::requestDispatch() { if (blocked) { parent->session.getConnection().outputTasks.addOutputTask(this); @@ -516,7 +516,7 @@ bool SemanticState::complete(DeliveryRecord& delivery) return delivery.isRedundant(); } -void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery) +void SemanticStateConsumerImpl::complete(DeliveryRecord& delivery) { if (!delivery.isComplete()) { delivery.complete(); @@ -541,7 +541,7 @@ SessionContext& SemanticState::getSession() { return session; } const SessionContext& SemanticState::getSession() const { return session; } -const SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const +const SemanticStateConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const { ConsumerImpl::shared_ptr consumer; if (!find(destination, consumer)) { @@ -598,7 +598,7 @@ void SemanticState::stop(const std::string& destination) find(destination)->stop(); } -void SemanticState::ConsumerImpl::setWindowMode() +void SemanticStateConsumerImpl::setWindowMode() { credit.setWindowMode(true); if (mgmtObject){ @@ -606,7 +606,7 @@ void SemanticState::ConsumerImpl::setWindowMode() } } -void SemanticState::ConsumerImpl::setCreditMode() +void SemanticStateConsumerImpl::setCreditMode() { credit.setWindowMode(false); if (mgmtObject){ @@ -614,17 +614,17 @@ void SemanticState::ConsumerImpl::setCreditMode() } } -void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) +void SemanticStateConsumerImpl::addByteCredit(uint32_t value) { credit.addByteCredit(value); } -void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) +void SemanticStateConsumerImpl::addMessageCredit(uint32_t value) { credit.addMessageCredit(value); } -bool SemanticState::ConsumerImpl::haveCredit() +bool SemanticStateConsumerImpl::haveCredit() { if (credit) { return true; @@ -634,19 +634,19 @@ bool SemanticState::ConsumerImpl::haveCredit() } } -bool SemanticState::ConsumerImpl::doDispatch() +bool SemanticStateConsumerImpl::doDispatch() { return queue->dispatch(shared_from_this()); } -void SemanticState::ConsumerImpl::flush() +void SemanticStateConsumerImpl::flush() { while(haveCredit() && doDispatch()) ; credit.cancel(); } -void SemanticState::ConsumerImpl::stop() +void SemanticStateConsumerImpl::stop() { credit.cancel(); } @@ -701,7 +701,7 @@ void SemanticState::reject(DeliveryId first, DeliveryId last) getSession().setUnackedCount(unacked.size()); } -bool SemanticState::ConsumerImpl::doOutput() +bool SemanticStateConsumerImpl::doOutput() { try { return haveCredit() && doDispatch(); @@ -710,24 +710,24 @@ bool SemanticState::ConsumerImpl::doOutput() } } -void SemanticState::ConsumerImpl::enableNotify() +void SemanticStateConsumerImpl::enableNotify() { Mutex::ScopedLock l(lock); notifyEnabled = true; } -void SemanticState::ConsumerImpl::disableNotify() +void SemanticStateConsumerImpl::disableNotify() { Mutex::ScopedLock l(lock); notifyEnabled = false; } -bool SemanticState::ConsumerImpl::isNotifyEnabled() const { +bool SemanticStateConsumerImpl::isNotifyEnabled() const { Mutex::ScopedLock l(lock); return notifyEnabled; } -void SemanticState::ConsumerImpl::notify() +void SemanticStateConsumerImpl::notify() { Mutex::ScopedLock l(lock); if (notifyEnabled) { |
