summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp38
1 files changed, 28 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 2b9fd247f5..e7d2259c80 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -106,15 +106,25 @@ bool SemanticState::exists(const string& consumerTag){
namespace {
const std::string SEPARATOR("::");
}
-
+
void SemanticState::consume(const string& tag,
Queue::shared_ptr queue, bool ackRequired, bool acquire,
- bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments)
+ bool exclusive, const string& resumeId, uint64_t resumeTtl,
+ const FieldTable& arguments)
{
// "tag" is only guaranteed to be unique to this session (see AMQP 0-10 Message.subscribe, destination).
// Create a globally unique name so the broker can identify individual consumers
std::string name = session.getSessionId().str() + SEPARATOR + tag;
- ConsumerImpl::shared_ptr c(new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
+ const ConsumerFactories::Factories& cf(
+ session.getBroker().getConsumerFactories().get());
+ ConsumerImpl::shared_ptr c;
+ for (ConsumerFactories::Factories::const_iterator i = cf.begin(); i != cf.end() && !c; ++i)
+ c = (*i)->create(this, name, queue, ackRequired, acquire, exclusive, tag,
+ resumeId, resumeTtl, arguments);
+ if (!c) // Create plain consumer
+ c = ConsumerImpl::shared_ptr(
+ new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag,
+ resumeId, resumeTtl, arguments));
queue->consume(c, exclusive);//may throw exception
consumers[tag] = c;
}
@@ -275,7 +285,6 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
uint64_t _resumeTtl,
const framing::FieldTable& _arguments
-
) :
Consumer(_name, _acquire),
parent(_parent),
@@ -332,7 +341,8 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
assertClusterSafe();
allocateCredit(msg.payload);
- DeliveryRecord record(msg, queue, getTag(), acquire, !ackExpected, credit.isWindowMode());
+ DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(),
+ shared_from_this(), acquire, !ackExpected, credit.isWindowMode(), 0);
bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
if (sync) deliveryCount = 0;//reset
parent->deliver(record, sync);
@@ -340,7 +350,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
parent->record(record);
}
if (acquire && !ackExpected) { // auto acquire && auto accept
- queue->dequeue(0 /*ctxt*/, msg);
+ msg.queue->dequeue(0, msg);
record.setEnded();
}
if (mgmtObject) { mgmtObject->inc_delivered(); }
@@ -355,7 +365,7 @@ bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>)
bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
{
assertClusterSafe();
- // FIXME aconway 2009-06-08: if we have byte & message credit but
+ // TODO aconway 2009-06-08: if we have byte & message credit but
// checkCredit fails because the message is to big, we should
// remain on queue's listener list for possible smaller messages
// in future.
@@ -455,8 +465,11 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
msg->computeExpiration(getSession().getBroker().getExpiryPolicy());
std::string exchangeName = msg->getExchangeName();
- if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed())
+ if (!cacheExchange || cacheExchange->getName() != exchangeName
+ || cacheExchange->isDestroyed())
+ {
cacheExchange = session.getBroker().getExchanges().get(exchangeName);
+ }
cacheExchange->setProperties(msg);
/* verify the userid if specified: */
@@ -646,9 +659,14 @@ bool SemanticState::ConsumerImpl::haveCredit()
}
}
+bool SemanticState::ConsumerImpl::doDispatch()
+{
+ return queue->dispatch(shared_from_this());
+}
+
void SemanticState::ConsumerImpl::flush()
{
- while(haveCredit() && queue->dispatch(shared_from_this()))
+ while(haveCredit() && doDispatch())
;
credit.cancel();
}
@@ -710,7 +728,7 @@ void SemanticState::reject(DeliveryId first, DeliveryId last)
bool SemanticState::ConsumerImpl::doOutput()
{
try {
- return haveCredit() && queue->dispatch(shared_from_this());
+ return haveCredit() && doDispatch();
} catch (const SessionException& e) {
throw SessionOutputException(e, parent->session.getChannel());
}