summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-10-21 23:30:32 +0000
committerGordon Sim <gsim@apache.org>2008-10-21 23:30:32 +0000
commitd2925719fe1db290ff33621d448e5e3a2c2c26eb (patch)
tree859a01045df935c68821c1d7fb95c0c30d15e7ee /qpid/cpp/src/qpid/broker/SemanticState.cpp
parent573739f14409d3e2f3c42a3452712dbf30c70472 (diff)
downloadqpid-python-d2925719fe1db290ff33621d448e5e3a2c2c26eb.tar.gz
Refactored DeliveryRecord and delivery path to remove some redundant code.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@706811 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp30
1 files changed, 12 insertions, 18 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 4d735c9abc..915b7e147c 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -84,16 +84,14 @@ bool SemanticState::exists(const string& consumerTag){
return consumers.find(consumerTag) != consumers.end();
}
-void SemanticState::consume(DeliveryToken::shared_ptr token, string& tagInOut,
- Queue::shared_ptr queue, bool nolocal, bool ackRequired, bool acquire,
+void SemanticState::consume(const string& tag,
+ Queue::shared_ptr queue, bool ackRequired, bool acquire,
bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments)
{
- if(tagInOut.empty())
- tagInOut = tagGenerator.generate();
- ConsumerImpl::shared_ptr c(new ConsumerImpl(this, token, tagInOut, queue, ackRequired, nolocal, acquire, exclusive, resumeId, resumeTtl, arguments));
+ ConsumerImpl::shared_ptr c(new ConsumerImpl(this, tag, queue, ackRequired, acquire, exclusive, resumeId, resumeTtl, arguments));
queue->consume(c, exclusive);//may throw exception
outputTasks.addOutputTask(c.get());
- consumers[tagInOut] = c;
+ consumers[tag] = c;
}
void SemanticState::cancel(const string& tag){
@@ -233,11 +231,9 @@ void SemanticState::record(const DeliveryRecord& delivery)
}
SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
- DeliveryToken::shared_ptr _token,
const string& _name,
Queue::shared_ptr _queue,
bool ack,
- bool _nolocal,
bool _acquire,
bool _exclusive,
const string& _resumeId,
@@ -248,11 +244,9 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
) :
Consumer(_acquire),
parent(_parent),
- token(_token),
name(_name),
queue(_queue),
ackExpected(ack),
- nolocal(_nolocal),
acquire(_acquire),
blocked(true),
windowing(true),
@@ -272,10 +266,11 @@ OwnershipToken* SemanticState::ConsumerImpl::getSession()
bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
allocateCredit(msg.payload);
- DeliveryId deliveryTag =
- parent->deliveryAdapter.deliver(msg, token);
+ DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing);
+ parent->deliver(record);
+ if (!ackExpected) record.setEnded();//allows message to be released now its been delivered
if (windowing || ackExpected || !acquire) {
- parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected, windowing));
+ parent->record(record);
}
if (acquire && !ackExpected) {
queue->dequeue(0, msg);
@@ -283,10 +278,9 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
return true;
}
-bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message> msg)
+bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>)
{
- return !(nolocal &&
- &parent->getSession().getConnection() == msg->getPublisher());
+ return true;
}
bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
@@ -454,9 +448,9 @@ void SemanticState::recover(bool requeue)
}
}
-DeliveryId SemanticState::redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
+void SemanticState::deliver(DeliveryRecord& msg)
{
- return deliveryAdapter.deliver(msg, token);
+ return deliveryAdapter.deliver(msg);
}
SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination)