diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 38 |
1 files changed, 28 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 2b9fd247f5..e7d2259c80 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -106,15 +106,25 @@ bool SemanticState::exists(const string& consumerTag){ namespace { const std::string SEPARATOR("::"); } - + void SemanticState::consume(const string& tag, Queue::shared_ptr queue, bool ackRequired, bool acquire, - bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments) + bool exclusive, const string& resumeId, uint64_t resumeTtl, + const FieldTable& arguments) { // "tag" is only guaranteed to be unique to this session (see AMQP 0-10 Message.subscribe, destination). // Create a globally unique name so the broker can identify individual consumers std::string name = session.getSessionId().str() + SEPARATOR + tag; - ConsumerImpl::shared_ptr c(new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); + const ConsumerFactories::Factories& cf( + session.getBroker().getConsumerFactories().get()); + ConsumerImpl::shared_ptr c; + for (ConsumerFactories::Factories::const_iterator i = cf.begin(); i != cf.end() && !c; ++i) + c = (*i)->create(this, name, queue, ackRequired, acquire, exclusive, tag, + resumeId, resumeTtl, arguments); + if (!c) // Create plain consumer + c = ConsumerImpl::shared_ptr( + new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag, + resumeId, resumeTtl, arguments)); queue->consume(c, exclusive);//may throw exception consumers[tag] = c; } @@ -275,7 +285,6 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, uint64_t _resumeTtl, const framing::FieldTable& _arguments - ) : Consumer(_name, _acquire), parent(_parent), @@ -332,7 +341,8 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) { assertClusterSafe(); allocateCredit(msg.payload); - DeliveryRecord record(msg, queue, getTag(), acquire, !ackExpected, credit.isWindowMode()); + DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(), + shared_from_this(), acquire, !ackExpected, credit.isWindowMode(), 0); bool sync = syncFrequency && ++deliveryCount >= syncFrequency; if (sync) deliveryCount = 0;//reset parent->deliver(record, sync); @@ -340,7 +350,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) parent->record(record); } if (acquire && !ackExpected) { // auto acquire && auto accept - queue->dequeue(0 /*ctxt*/, msg); + msg.queue->dequeue(0, msg); record.setEnded(); } if (mgmtObject) { mgmtObject->inc_delivered(); } @@ -355,7 +365,7 @@ bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>) bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) { assertClusterSafe(); - // FIXME aconway 2009-06-08: if we have byte & message credit but + // TODO aconway 2009-06-08: if we have byte & message credit but // checkCredit fails because the message is to big, we should // remain on queue's listener list for possible smaller messages // in future. @@ -455,8 +465,11 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { msg->computeExpiration(getSession().getBroker().getExpiryPolicy()); std::string exchangeName = msg->getExchangeName(); - if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed()) + if (!cacheExchange || cacheExchange->getName() != exchangeName + || cacheExchange->isDestroyed()) + { cacheExchange = session.getBroker().getExchanges().get(exchangeName); + } cacheExchange->setProperties(msg); /* verify the userid if specified: */ @@ -646,9 +659,14 @@ bool SemanticState::ConsumerImpl::haveCredit() } } +bool SemanticState::ConsumerImpl::doDispatch() +{ + return queue->dispatch(shared_from_this()); +} + void SemanticState::ConsumerImpl::flush() { - while(haveCredit() && queue->dispatch(shared_from_this())) + while(haveCredit() && doDispatch()) ; credit.cancel(); } @@ -710,7 +728,7 @@ void SemanticState::reject(DeliveryId first, DeliveryId last) bool SemanticState::ConsumerImpl::doOutput() { try { - return haveCredit() && queue->dispatch(shared_from_this()); + return haveCredit() && doDispatch(); } catch (const SessionException& e) { throw SessionOutputException(e, parent->session.getChannel()); } |