summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp30
1 files changed, 12 insertions, 18 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 4d735c9abc..915b7e147c 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -84,16 +84,14 @@ bool SemanticState::exists(const string& consumerTag){
return consumers.find(consumerTag) != consumers.end();
}
-void SemanticState::consume(DeliveryToken::shared_ptr token, string& tagInOut,
- Queue::shared_ptr queue, bool nolocal, bool ackRequired, bool acquire,
+void SemanticState::consume(const string& tag,
+ Queue::shared_ptr queue, bool ackRequired, bool acquire,
bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments)
{
- if(tagInOut.empty())
- tagInOut = tagGenerator.generate();
- ConsumerImpl::shared_ptr c(new ConsumerImpl(this, token, tagInOut, queue, ackRequired, nolocal, acquire, exclusive, resumeId, resumeTtl, arguments));
+ ConsumerImpl::shared_ptr c(new ConsumerImpl(this, tag, queue, ackRequired, acquire, exclusive, resumeId, resumeTtl, arguments));
queue->consume(c, exclusive);//may throw exception
outputTasks.addOutputTask(c.get());
- consumers[tagInOut] = c;
+ consumers[tag] = c;
}
void SemanticState::cancel(const string& tag){
@@ -233,11 +231,9 @@ void SemanticState::record(const DeliveryRecord& delivery)
}
SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
- DeliveryToken::shared_ptr _token,
const string& _name,
Queue::shared_ptr _queue,
bool ack,
- bool _nolocal,
bool _acquire,
bool _exclusive,
const string& _resumeId,
@@ -248,11 +244,9 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
) :
Consumer(_acquire),
parent(_parent),
- token(_token),
name(_name),
queue(_queue),
ackExpected(ack),
- nolocal(_nolocal),
acquire(_acquire),
blocked(true),
windowing(true),
@@ -272,10 +266,11 @@ OwnershipToken* SemanticState::ConsumerImpl::getSession()
bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
allocateCredit(msg.payload);
- DeliveryId deliveryTag =
- parent->deliveryAdapter.deliver(msg, token);
+ DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing);
+ parent->deliver(record);
+ if (!ackExpected) record.setEnded();//allows message to be released now its been delivered
if (windowing || ackExpected || !acquire) {
- parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected, windowing));
+ parent->record(record);
}
if (acquire && !ackExpected) {
queue->dequeue(0, msg);
@@ -283,10 +278,9 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
return true;
}
-bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message> msg)
+bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>)
{
- return !(nolocal &&
- &parent->getSession().getConnection() == msg->getPublisher());
+ return true;
}
bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
@@ -454,9 +448,9 @@ void SemanticState::recover(bool requeue)
}
}
-DeliveryId SemanticState::redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
+void SemanticState::deliver(DeliveryRecord& msg)
{
- return deliveryAdapter.deliver(msg, token);
+ return deliveryAdapter.deliver(msg);
}
SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination)