summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp55
1 files changed, 32 insertions, 23 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 73a0a5cf7b..c8f77ba64e 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -269,9 +269,8 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
) :
- Consumer(_acquire),
+ Consumer(_name, _acquire),
parent(_parent),
- name(_name),
queue(_queue),
ackExpected(ack),
acquire(_acquire),
@@ -295,7 +294,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
if (agent != 0)
{
- mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name,
+ mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getName(),
!acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments));
agent->addObject (mgmtObject);
mgmtObject->set_creditMode("WINDOW");
@@ -327,16 +326,15 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
assertClusterSafe();
allocateCredit(msg.payload);
- DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing);
+ DeliveryRecord record(msg, queue, getName(), acquire, !ackExpected, windowing);
bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
if (sync) deliveryCount = 0;//reset
parent->deliver(record, sync);
- if (!ackExpected && acquire) record.setEnded();//allows message to be released now its been delivered
if (windowing || ackExpected || !acquire) {
parent->record(record);
}
- if (acquire && !ackExpected) {
- queue->dequeue(0, msg);
+ if (acquire && !ackExpected) { // auto acquire && auto accept
+ record.accept( 0 /*no ctxt*/ );
}
if (mgmtObject) { mgmtObject->inc_delivered(); }
return true;
@@ -556,50 +554,61 @@ void SemanticState::deliver(DeliveryRecord& msg, bool sync)
return deliveryAdapter.deliver(msg, sync);
}
-SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination)
+const SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const
{
- ConsumerImplMap::iterator i = consumers.find(destination);
- if (i == consumers.end()) {
- throw NotFoundException(QPID_MSG("Unknown destination " << destination));
+ ConsumerImpl::shared_ptr consumer;
+ if (!find(destination, consumer)) {
+ throw NotFoundException(QPID_MSG("Unknown destination " << destination << " session=" << session.getSessionId()));
} else {
- return *(i->second);
+ return consumer;
+ }
+}
+
+bool SemanticState::find(const std::string& destination, ConsumerImpl::shared_ptr& consumer) const
+{
+ // @todo KAG gsim: shouldn't the consumers map be locked????
+ ConsumerImplMap::const_iterator i = consumers.find(destination);
+ if (i == consumers.end()) {
+ return false;
}
+ consumer = i->second;
+ return true;
}
void SemanticState::setWindowMode(const std::string& destination)
{
- find(destination).setWindowMode();
+ find(destination)->setWindowMode();
}
void SemanticState::setCreditMode(const std::string& destination)
{
- find(destination).setCreditMode();
+ find(destination)->setCreditMode();
}
void SemanticState::addByteCredit(const std::string& destination, uint32_t value)
{
- ConsumerImpl& c = find(destination);
- c.addByteCredit(value);
- c.requestDispatch();
+ ConsumerImpl::shared_ptr c = find(destination);
+ c->addByteCredit(value);
+ c->requestDispatch();
}
void SemanticState::addMessageCredit(const std::string& destination, uint32_t value)
{
- ConsumerImpl& c = find(destination);
- c.addMessageCredit(value);
- c.requestDispatch();
+ ConsumerImpl::shared_ptr c = find(destination);
+ c->addMessageCredit(value);
+ c->requestDispatch();
}
void SemanticState::flush(const std::string& destination)
{
- find(destination).flush();
+ find(destination)->flush();
}
void SemanticState::stop(const std::string& destination)
{
- find(destination).stop();
+ find(destination)->stop();
}
void SemanticState::ConsumerImpl::setWindowMode()
@@ -682,7 +691,7 @@ AckRange SemanticState::findRange(DeliveryId first, DeliveryId last)
void SemanticState::acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired)
{
AckRange range = findRange(first, last);
- for_each(range.start, range.end, AcquireFunctor(acquired));
+ for_each(range.start, range.end, AcquireFunctor(this, acquired));
}
void SemanticState::release(DeliveryId first, DeliveryId last, bool setRedelivered)