summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp58
1 files changed, 30 insertions, 28 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 8f918ff40f..40c9bf296e 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -61,7 +61,6 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
deliveryAdapter(da),
tagGenerator("sgen"),
dtxSelected(false),
- outputTasks(ss),
authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()),
userID(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@')))
{
@@ -90,7 +89,6 @@ void SemanticState::consume(const string& tag,
{
ConsumerImpl::shared_ptr c(new ConsumerImpl(this, tag, queue, ackRequired, acquire, exclusive, resumeId, resumeTtl, arguments));
queue->consume(c, exclusive);//may throw exception
- outputTasks.addOutputTask(c.get());
consumers[tag] = c;
}
@@ -98,7 +96,7 @@ void SemanticState::cancel(const string& tag){
ConsumerImplMap::iterator i = consumers.find(tag);
if (i != consumers.end()) {
cancel(i->second);
- consumers.erase(i);
+ consumers.erase(i);
//should cancel all unacked messages for this consumer so that
//they are not redelivered on recovery
for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag));
@@ -257,9 +255,9 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
msgCredit(0),
byteCredit(0),
notifyEnabled(true),
- queueHasMessages(1),
syncFrequency(_arguments.getAsInt("qpid.sync_frequency")),
- deliveryCount(0) {}
+ deliveryCount(0)
+{}
OwnershipToken* SemanticState::ConsumerImpl::getSession()
{
@@ -290,6 +288,11 @@ bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>)
bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
{
+ // FIXME aconway 2009-06-08: if we have byte & message credit but
+ // checkCredit fails because the message is to big, we should
+ // remain on queue's listener list for possible smaller messages
+ // in future.
+ //
blocked = !(filter(msg) && checkCredit(msg));
return !blocked;
}
@@ -328,7 +331,8 @@ SemanticState::ConsumerImpl::~ConsumerImpl() {}
void SemanticState::cancel(ConsumerImpl::shared_ptr c)
{
c->disableNotify();
- outputTasks.removeOutputTask(c.get());
+ if (session.isAttached())
+ session.getConnection().outputTasks.removeOutputTask(c.get());
Queue::shared_ptr queue = c->getQueue();
if(queue) {
queue->cancel(c);
@@ -397,16 +401,18 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
}
void SemanticState::requestDispatch()
-{
- for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
- requestDispatch(*(i->second));
- }
+{
+ for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++)
+ i->second->requestDispatch();
}
-void SemanticState::requestDispatch(ConsumerImpl& c)
-{
- if(c.isBlocked())
- outputTasks.activateOutput();
+void SemanticState::ConsumerImpl::requestDispatch()
+{
+ if (blocked) {
+ parent->session.getConnection().outputTasks.addOutputTask(this);
+ parent->session.getConnection().outputTasks.activateOutput();
+ blocked = false;
+ }
}
bool SemanticState::complete(DeliveryRecord& delivery)
@@ -475,7 +481,7 @@ void SemanticState::addByteCredit(const std::string& destination, uint32_t value
{
ConsumerImpl& c = find(destination);
c.addByteCredit(value);
- requestDispatch(c);
+ c.requestDispatch();
}
@@ -483,7 +489,7 @@ void SemanticState::addMessageCredit(const std::string& destination, uint32_t va
{
ConsumerImpl& c = find(destination);
c.addMessageCredit(value);
- requestDispatch(c);
+ c.requestDispatch();
}
void SemanticState::flush(const std::string& destination)
@@ -593,11 +599,7 @@ bool SemanticState::ConsumerImpl::hasOutput() {
bool SemanticState::ConsumerImpl::doOutput()
{
- if (!haveCredit() || !queueHasMessages.boolCompareAndSwap(1, 0))
- return false;
- if (queue->dispatch(shared_from_this()))
- queueHasMessages.boolCompareAndSwap(0, 1);
- return queueHasMessages.get();
+ return haveCredit() && queue->dispatch(shared_from_this());
}
void SemanticState::ConsumerImpl::enableNotify()
@@ -619,14 +621,11 @@ bool SemanticState::ConsumerImpl::isNotifyEnabled() const {
void SemanticState::ConsumerImpl::notify()
{
- queueHasMessages.boolCompareAndSwap(0, 1);
-
- //TODO: alter this, don't want to hold locks across external
- //calls; for now its is required to protect the notify() from
- //having part of the object chain of the invocation being
- //concurrently deleted
Mutex::ScopedLock l(lock);
- if (notifyEnabled) parent->outputTasks.activateOutput();
+ if (notifyEnabled) {
+ parent->session.getConnection().outputTasks.addOutputTask(this);
+ parent->session.getConnection().outputTasks.activateOutput();
+ }
}
@@ -670,13 +669,16 @@ void SemanticState::attached()
{
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
i->second->enableNotify();
+ session.getConnection().outputTasks.addOutputTask(i->second.get());
}
+ session.getConnection().outputTasks.activateOutput();
}
void SemanticState::detached()
{
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
i->second->disableNotify();
+ session.getConnection().outputTasks.removeOutputTask(i->second.get());
}
}