diff options
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 30 |
1 files changed, 12 insertions, 18 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 4d735c9abc..915b7e147c 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -84,16 +84,14 @@ bool SemanticState::exists(const string& consumerTag){ return consumers.find(consumerTag) != consumers.end(); } -void SemanticState::consume(DeliveryToken::shared_ptr token, string& tagInOut, - Queue::shared_ptr queue, bool nolocal, bool ackRequired, bool acquire, +void SemanticState::consume(const string& tag, + Queue::shared_ptr queue, bool ackRequired, bool acquire, bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments) { - if(tagInOut.empty()) - tagInOut = tagGenerator.generate(); - ConsumerImpl::shared_ptr c(new ConsumerImpl(this, token, tagInOut, queue, ackRequired, nolocal, acquire, exclusive, resumeId, resumeTtl, arguments)); + ConsumerImpl::shared_ptr c(new ConsumerImpl(this, tag, queue, ackRequired, acquire, exclusive, resumeId, resumeTtl, arguments)); queue->consume(c, exclusive);//may throw exception outputTasks.addOutputTask(c.get()); - consumers[tagInOut] = c; + consumers[tag] = c; } void SemanticState::cancel(const string& tag){ @@ -233,11 +231,9 @@ void SemanticState::record(const DeliveryRecord& delivery) } SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, - DeliveryToken::shared_ptr _token, const string& _name, Queue::shared_ptr _queue, bool ack, - bool _nolocal, bool _acquire, bool _exclusive, const string& _resumeId, @@ -248,11 +244,9 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, ) : Consumer(_acquire), parent(_parent), - token(_token), name(_name), queue(_queue), ackExpected(ack), - nolocal(_nolocal), acquire(_acquire), blocked(true), windowing(true), @@ -272,10 +266,11 @@ OwnershipToken* SemanticState::ConsumerImpl::getSession() bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) { allocateCredit(msg.payload); - DeliveryId deliveryTag = - parent->deliveryAdapter.deliver(msg, token); + DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing); + parent->deliver(record); + if (!ackExpected) record.setEnded();//allows message to be released now its been delivered if (windowing || ackExpected || !acquire) { - parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected, windowing)); + parent->record(record); } if (acquire && !ackExpected) { queue->dequeue(0, msg); @@ -283,10 +278,9 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) return true; } -bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message> msg) +bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>) { - return !(nolocal && - &parent->getSession().getConnection() == msg->getPublisher()); + return true; } bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) @@ -454,9 +448,9 @@ void SemanticState::recover(bool requeue) } } -DeliveryId SemanticState::redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) +void SemanticState::deliver(DeliveryRecord& msg) { - return deliveryAdapter.deliver(msg, token); + return deliveryAdapter.deliver(msg); } SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination) |