summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp161
1 files changed, 72 insertions, 89 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index e790e087f0..76775d03d5 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -62,7 +62,8 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionState& ss)
tagGenerator("sgen"),
dtxSelected(false),
accumulatedAck(0),
- flowActive(true)
+ flowActive(true),
+ outputTasks(ss)
{
outstanding.reset();
}
@@ -70,7 +71,7 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionState& ss)
SemanticState::~SemanticState() {
//cancel all consumers
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
- cancel(i->second);
+ cancel(*i);
}
if (dtxBuffer.get()) {
@@ -89,19 +90,19 @@ void SemanticState::consume(DeliveryToken::shared_ptr token, string& tagInOut,
{
if(tagInOut.empty())
tagInOut = tagGenerator.generate();
- ConsumerImpl::shared_ptr c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire));
- queue->consume(c, exclusive);//may throw exception
- consumers[tagInOut] = c;
+ std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire));
+ queue->consume(*c, exclusive);//may throw exception
+ outputTasks.addOutputTask(c.get());
+ consumers.insert(tagInOut, c.release());
}
void SemanticState::cancel(const string& tag){
ConsumerImplMap::iterator i = consumers.find(tag);
if (i != consumers.end()) {
- cancel(i->second);
+ cancel(*i);
consumers.erase(i);
//should cancel all unacked messages for this consumer so that
//they are not redelivered on recovery
- Mutex::ScopedLock locker(deliveryLock);
for_each(unacked.begin(), unacked.end(), boost::bind(mem_fun_ref(&DeliveryRecord::cancel), _1, tag));
}
@@ -232,7 +233,6 @@ void SemanticState::record(const DeliveryRecord& delivery)
bool SemanticState::checkPrefetch(intrusive_ptr<Message>& msg)
{
- Mutex::ScopedLock locker(deliveryLock);
bool countOk = !prefetchCount || prefetchCount > unacked.size();
bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty();
return countOk && sizeOk;
@@ -254,37 +254,27 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
ackExpected(ack),
nolocal(_nolocal),
acquire(_acquire),
- blocked(false),
+ blocked(true),
windowing(true),
msgCredit(0),
byteCredit(0) {}
bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
- if (!parent->getSession().isAttached()) {
- return false;
- }
-
- if (nolocal &&
- &parent->getSession().getConnection() == msg.payload->getPublisher()) {
- return false;
- } else {
- if (!checkCredit(msg.payload) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg.payload))) {
- blocked = true;
- } else {
- blocked = false;
- Mutex::ScopedLock locker(parent->deliveryLock);
-
- DeliveryId deliveryTag =
- parent->deliveryAdapter.deliver(msg, token);
- if (windowing || ackExpected) {
- parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected));
- }
- if (acquire && !ackExpected) {
- queue->dequeue(0, msg.payload);
- }
+ if (parent->getSession().isAttached() && accept(msg.payload)) {
+ allocateCredit(msg.payload);
+ DeliveryId deliveryTag =
+ parent->deliveryAdapter.deliver(msg, token);
+ if (windowing || ackExpected) {
+ parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected));
+ }
+ if (acquire && !ackExpected) {
+ queue->dequeue(0, msg.payload);
}
- return !blocked;
+ return true;
+ } else {
+ QPID_LOG(debug, "Failed to deliver message to '" << name << "' on " << parent);
+ return false;
}
}
@@ -294,35 +284,48 @@ bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message> msg)
&parent->getSession().getConnection() == msg->getPublisher());
}
+bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
+{
+ //TODO: remove the now redundant checks (channel.flow & basic|message.qos removed):
+ blocked = !(filter(msg) && checkCredit(msg) && parent->flowActive && (!ackExpected || parent->checkPrefetch(msg)));
+ return !blocked;
+}
+
+void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
+{
+ uint32_t originalMsgCredit = msgCredit;
+ uint32_t originalByteCredit = byteCredit;
+ if (msgCredit != 0xFFFFFFFF) {
+ msgCredit--;
+ }
+ if (byteCredit != 0xFFFFFFFF) {
+ byteCredit -= msg->getRequiredCredit();
+ }
+ QPID_LOG(debug, "Credit allocated for '" << name << "' on " << parent
+ << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit
+ << " now bytes: " << byteCredit << " msgs: " << msgCredit);
+
+}
+
bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
{
- Mutex::ScopedLock l(lock);
if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) {
QPID_LOG(debug, "Not enough credit for '" << name << "' on " << parent
<< ", bytes: " << byteCredit << " msgs: " << msgCredit);
return false;
} else {
- uint32_t originalMsgCredit = msgCredit;
- uint32_t originalByteCredit = byteCredit;
-
- if (msgCredit != 0xFFFFFFFF) {
- msgCredit--;
- }
- if (byteCredit != 0xFFFFFFFF) {
- byteCredit -= msg->getRequiredCredit();
- }
QPID_LOG(debug, "Credit available for '" << name << "' on " << parent
- << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit
- << " now bytes: " << byteCredit << " msgs: " << msgCredit);
+ << " bytes: " << byteCredit << " msgs: " << msgCredit);
return true;
}
}
SemanticState::ConsumerImpl::~ConsumerImpl() {}
-void SemanticState::cancel(ConsumerImpl::shared_ptr c)
+void SemanticState::cancel(ConsumerImpl& c)
{
- Queue::shared_ptr queue = c->getQueue();
+ outputTasks.removeOutputTask(&c);
+ Queue::shared_ptr queue = c.getQueue();
if(queue) {
queue->cancel(c);
if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
@@ -374,8 +377,6 @@ void SemanticState::ackRange(DeliveryId first, DeliveryId last)
void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
{
{
- Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
-
ack_iterator start = cumulative ? unacked.begin() :
find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
ack_iterator end = start;
@@ -417,14 +418,14 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
void SemanticState::requestDispatch()
{
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
- requestDispatch(i->second);
+ requestDispatch(*i);
}
}
-void SemanticState::requestDispatch(ConsumerImpl::shared_ptr c)
+void SemanticState::requestDispatch(ConsumerImpl& c)
{
- if(c->isBlocked()) {
- c->getQueue()->requestDispatch(c);
+ if(c.isBlocked()) {
+ c.doOutput();
}
}
@@ -433,14 +434,13 @@ void SemanticState::acknowledged(const DeliveryRecord& delivery)
delivery.subtractFrom(outstanding);
ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
if (i != consumers.end()) {
- i->second->acknowledged(delivery);
+ i->acknowledged(delivery);
}
}
void SemanticState::ConsumerImpl::acknowledged(const DeliveryRecord& delivery)
{
if (windowing) {
- Mutex::ScopedLock l(lock);
if (msgCredit != 0xFFFFFFFF) msgCredit++;
if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit);
}
@@ -448,8 +448,6 @@ void SemanticState::ConsumerImpl::acknowledged(const DeliveryRecord& delivery)
void SemanticState::recover(bool requeue)
{
- Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
-
if(requeue){
outstanding.reset();
//take copy and clear unacked as requeue may result in redelivery to this session
@@ -470,7 +468,6 @@ bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue
{
QueuedMessage msg = queue->dequeue();
if(msg.payload){
- Mutex::ScopedLock locker(deliveryLock);
DeliveryId myDeliveryTag = deliveryAdapter.deliver(msg, token);
if(ackExpected){
unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
@@ -483,13 +480,11 @@ bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue
DeliveryId SemanticState::redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
{
- Mutex::ScopedLock locker(deliveryLock);
return deliveryAdapter.deliver(msg, token);
}
void SemanticState::flow(bool active)
{
- Mutex::ScopedLock locker(deliveryLock);
bool requestDelivery(!flowActive && active);
flowActive = active;
if (requestDelivery) {
@@ -499,50 +494,50 @@ void SemanticState::flow(bool active)
}
-SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination)
+SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination)
{
ConsumerImplMap::iterator i = consumers.find(destination);
if (i == consumers.end()) {
throw NotFoundException(QPID_MSG("Unknown destination " << destination));
} else {
- return i->second;
+ return *i;
}
}
void SemanticState::setWindowMode(const std::string& destination)
{
- find(destination)->setWindowMode();
+ find(destination).setWindowMode();
}
void SemanticState::setCreditMode(const std::string& destination)
{
- find(destination)->setCreditMode();
+ find(destination).setCreditMode();
}
void SemanticState::addByteCredit(const std::string& destination, uint32_t value)
{
- ConsumerImpl::shared_ptr c = find(destination);
- c->addByteCredit(value);
+ ConsumerImpl& c = find(destination);
+ c.addByteCredit(value);
requestDispatch(c);
}
void SemanticState::addMessageCredit(const std::string& destination, uint32_t value)
{
- ConsumerImpl::shared_ptr c = find(destination);
- c->addMessageCredit(value);
+ ConsumerImpl& c = find(destination);
+ c.addMessageCredit(value);
requestDispatch(c);
}
void SemanticState::flush(const std::string& destination)
{
- find(destination)->flush();
+ find(destination).flush();
}
void SemanticState::stop(const std::string& destination)
{
- find(destination)->stop();
+ find(destination).stop();
}
void SemanticState::ConsumerImpl::setWindowMode()
@@ -557,7 +552,6 @@ void SemanticState::ConsumerImpl::setCreditMode()
void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
{
- Mutex::ScopedLock l(lock);
if (byteCredit != 0xFFFFFFFF) {
byteCredit += value;
}
@@ -565,7 +559,6 @@ void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
{
- Mutex::ScopedLock l(lock);
if (msgCredit != 0xFFFFFFFF) {
msgCredit += value;
}
@@ -573,16 +566,12 @@ void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
void SemanticState::ConsumerImpl::flush()
{
- //need to prevent delivery after requestDispatch returns but
- //before credit is reduced to zero
- FlushCompletion completion(*this);
- queue->flush(completion);
- completion.wait();
+ while(queue->dispatch(*this));
+ stop();
}
void SemanticState::ConsumerImpl::stop()
{
- Mutex::ScopedLock l(lock);
msgCredit = 0;
byteCredit = 0;
}
@@ -618,14 +607,12 @@ AckRange SemanticState::findRange(DeliveryId first, DeliveryId last)
void SemanticState::acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired)
{
- Mutex::ScopedLock locker(deliveryLock);
AckRange range = findRange(first, last);
for_each(range.start, range.end, AcquireFunctor(acquired));
}
void SemanticState::release(DeliveryId first, DeliveryId last)
{
- Mutex::ScopedLock locker(deliveryLock);
AckRange range = findRange(first, last);
//release results in the message being added to the head so want
//to release in reverse order to keep the original transfer order
@@ -636,26 +623,22 @@ void SemanticState::release(DeliveryId first, DeliveryId last)
void SemanticState::reject(DeliveryId first, DeliveryId last)
{
- Mutex::ScopedLock locker(deliveryLock);
AckRange range = findRange(first, last);
for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject));
//need to remove the delivery records as well
unacked.erase(range.start, range.end);
}
-
-void SemanticState::FlushCompletion::wait()
+bool SemanticState::ConsumerImpl::doOutput()
{
- Monitor::ScopedLock locker(lock);
- while (!complete) lock.wait();
+ //TODO: think through properly
+ return queue->dispatch(*this);
}
-void SemanticState::FlushCompletion::completed()
+void SemanticState::ConsumerImpl::notify()
{
- Monitor::ScopedLock locker(lock);
- consumer.stop();
- complete = true;
- lock.notifyAll();
+ //TODO: think through properly
+ parent->outputTasks.activateOutput();
}
}} // namespace qpid::broker