diff options
author | Gordon Sim <gsim@apache.org> | 2008-10-21 23:30:32 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-10-21 23:30:32 +0000 |
commit | d2925719fe1db290ff33621d448e5e3a2c2c26eb (patch) | |
tree | 859a01045df935c68821c1d7fb95c0c30d15e7ee /qpid/cpp/src/qpid/broker/SemanticState.cpp | |
parent | 573739f14409d3e2f3c42a3452712dbf30c70472 (diff) | |
download | qpid-python-d2925719fe1db290ff33621d448e5e3a2c2c26eb.tar.gz |
Refactored DeliveryRecord and delivery path to remove some redundant code.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@706811 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 30 |
1 files changed, 12 insertions, 18 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 4d735c9abc..915b7e147c 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -84,16 +84,14 @@ bool SemanticState::exists(const string& consumerTag){ return consumers.find(consumerTag) != consumers.end(); } -void SemanticState::consume(DeliveryToken::shared_ptr token, string& tagInOut, - Queue::shared_ptr queue, bool nolocal, bool ackRequired, bool acquire, +void SemanticState::consume(const string& tag, + Queue::shared_ptr queue, bool ackRequired, bool acquire, bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments) { - if(tagInOut.empty()) - tagInOut = tagGenerator.generate(); - ConsumerImpl::shared_ptr c(new ConsumerImpl(this, token, tagInOut, queue, ackRequired, nolocal, acquire, exclusive, resumeId, resumeTtl, arguments)); + ConsumerImpl::shared_ptr c(new ConsumerImpl(this, tag, queue, ackRequired, acquire, exclusive, resumeId, resumeTtl, arguments)); queue->consume(c, exclusive);//may throw exception outputTasks.addOutputTask(c.get()); - consumers[tagInOut] = c; + consumers[tag] = c; } void SemanticState::cancel(const string& tag){ @@ -233,11 +231,9 @@ void SemanticState::record(const DeliveryRecord& delivery) } SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, - DeliveryToken::shared_ptr _token, const string& _name, Queue::shared_ptr _queue, bool ack, - bool _nolocal, bool _acquire, bool _exclusive, const string& _resumeId, @@ -248,11 +244,9 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, ) : Consumer(_acquire), parent(_parent), - token(_token), name(_name), queue(_queue), ackExpected(ack), - nolocal(_nolocal), acquire(_acquire), blocked(true), windowing(true), @@ -272,10 +266,11 @@ OwnershipToken* SemanticState::ConsumerImpl::getSession() bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) { allocateCredit(msg.payload); - DeliveryId deliveryTag = - parent->deliveryAdapter.deliver(msg, token); + DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing); + parent->deliver(record); + if (!ackExpected) record.setEnded();//allows message to be released now its been delivered if (windowing || ackExpected || !acquire) { - parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected, windowing)); + parent->record(record); } if (acquire && !ackExpected) { queue->dequeue(0, msg); @@ -283,10 +278,9 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) return true; } -bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message> msg) +bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>) { - return !(nolocal && - &parent->getSession().getConnection() == msg->getPublisher()); + return true; } bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) @@ -454,9 +448,9 @@ void SemanticState::recover(bool requeue) } } -DeliveryId SemanticState::redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) +void SemanticState::deliver(DeliveryRecord& msg) { - return deliveryAdapter.deliver(msg, token); + return deliveryAdapter.deliver(msg); } SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination) |