diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-10-07 14:21:48 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-10-07 14:21:48 +0000 |
commit | 4fbbc6ecf68bd8f118f4a6165c8f5bfca2c3c8b6 (patch) | |
tree | 4a54f245efa1c2df1601d648c1fdd41fba08b802 /cpp/src/qpid/broker/SemanticState.cpp | |
parent | 92d889931fe1cea19d1e33658d5f30348bd7070e (diff) | |
download | qpid-python-4fbbc6ecf68bd8f118f4a6165c8f5bfca2c3c8b6.tar.gz |
QPID-3346: move message group feature into trunk.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1180050 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 66 |
1 files changed, 42 insertions, 24 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index b4f146e699..94d0cc87f7 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -107,11 +107,18 @@ bool SemanticState::exists(const string& consumerTag){ return consumers.find(consumerTag) != consumers.end(); } +namespace { + const std::string SEPARATOR("::"); +} + void SemanticState::consume(const string& tag, Queue::shared_ptr queue, bool ackRequired, bool acquire, bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments) { - ConsumerImpl::shared_ptr c(new ConsumerImpl(this, tag, queue, ackRequired, acquire, exclusive, resumeId, resumeTtl, arguments)); + // "tag" is only guaranteed to be unique to this session (see AMQP 0-10 Message.subscribe, destination). + // Create a globally unique name so the broker can identify individual consumers + std::string name = session.getSessionId().str() + SEPARATOR + tag; + ConsumerImpl::shared_ptr c(new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); queue->consume(c, exclusive);//may throw exception consumers[tag] = c; } @@ -267,15 +274,15 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, bool ack, bool _acquire, bool _exclusive, + const string& _tag, const string& _resumeId, uint64_t _resumeTtl, const framing::FieldTable& _arguments ) : - Consumer(_acquire), + Consumer(_name, _acquire), parent(_parent), - name(_name), queue(_queue), ackExpected(ack), acquire(_acquire), @@ -284,6 +291,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, windowActive(false), exclusive(_exclusive), resumeId(_resumeId), + tag(_tag), resumeTtl(_resumeTtl), arguments(_arguments), msgCredit(0), @@ -300,7 +308,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, if (agent != 0) { - mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name, + mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(), !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments)); agent->addObject (mgmtObject); mgmtObject->set_creditMode("WINDOW"); @@ -332,16 +340,15 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) { assertClusterSafe(); allocateCredit(msg.payload); - DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing); + DeliveryRecord record(msg, queue, getTag(), acquire, !ackExpected, windowing); bool sync = syncFrequency && ++deliveryCount >= syncFrequency; if (sync) deliveryCount = 0;//reset parent->deliver(record, sync); - if (!ackExpected && acquire) record.setEnded();//allows message to be released now its been delivered if (windowing || ackExpected || !acquire) { parent->record(record); } - if (acquire && !ackExpected) { - queue->dequeue(0, msg); + if (acquire && !ackExpected) { // auto acquire && auto accept + record.accept( 0 /*no ctxt*/ ); } if (mgmtObject) { mgmtObject->inc_delivered(); } return true; @@ -371,7 +378,7 @@ struct ConsumerName { }; ostream& operator<<(ostream& o, const ConsumerName& pc) { - return o << pc.consumer.getName() << " on " + return o << pc.consumer.getTag() << " on " << pc.consumer.getParent().getSession().getSessionId(); } } @@ -561,50 +568,61 @@ void SemanticState::deliver(DeliveryRecord& msg, bool sync) return deliveryAdapter.deliver(msg, sync); } -SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination) +const SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const { - ConsumerImplMap::iterator i = consumers.find(destination); - if (i == consumers.end()) { - throw NotFoundException(QPID_MSG("Unknown destination " << destination)); + ConsumerImpl::shared_ptr consumer; + if (!find(destination, consumer)) { + throw NotFoundException(QPID_MSG("Unknown destination " << destination << " session=" << session.getSessionId())); } else { - return *(i->second); + return consumer; + } +} + +bool SemanticState::find(const std::string& destination, ConsumerImpl::shared_ptr& consumer) const +{ + // @todo KAG gsim: shouldn't the consumers map be locked???? + ConsumerImplMap::const_iterator i = consumers.find(destination); + if (i == consumers.end()) { + return false; } + consumer = i->second; + return true; } void SemanticState::setWindowMode(const std::string& destination) { - find(destination).setWindowMode(); + find(destination)->setWindowMode(); } void SemanticState::setCreditMode(const std::string& destination) { - find(destination).setCreditMode(); + find(destination)->setCreditMode(); } void SemanticState::addByteCredit(const std::string& destination, uint32_t value) { - ConsumerImpl& c = find(destination); - c.addByteCredit(value); - c.requestDispatch(); + ConsumerImpl::shared_ptr c = find(destination); + c->addByteCredit(value); + c->requestDispatch(); } void SemanticState::addMessageCredit(const std::string& destination, uint32_t value) { - ConsumerImpl& c = find(destination); - c.addMessageCredit(value); - c.requestDispatch(); + ConsumerImpl::shared_ptr c = find(destination); + c->addMessageCredit(value); + c->requestDispatch(); } void SemanticState::flush(const std::string& destination) { - find(destination).flush(); + find(destination)->flush(); } void SemanticState::stop(const std::string& destination) { - find(destination).stop(); + find(destination)->stop(); } void SemanticState::ConsumerImpl::setWindowMode() |