summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-10-07 14:21:48 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-10-07 14:21:48 +0000
commit4fbbc6ecf68bd8f118f4a6165c8f5bfca2c3c8b6 (patch)
tree4a54f245efa1c2df1601d648c1fdd41fba08b802 /cpp/src/qpid/broker/SemanticState.cpp
parent92d889931fe1cea19d1e33658d5f30348bd7070e (diff)
downloadqpid-python-4fbbc6ecf68bd8f118f4a6165c8f5bfca2c3c8b6.tar.gz
QPID-3346: move message group feature into trunk.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1180050 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp66
1 files changed, 42 insertions, 24 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index b4f146e699..94d0cc87f7 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -107,11 +107,18 @@ bool SemanticState::exists(const string& consumerTag){
return consumers.find(consumerTag) != consumers.end();
}
+namespace {
+ const std::string SEPARATOR("::");
+}
+
void SemanticState::consume(const string& tag,
Queue::shared_ptr queue, bool ackRequired, bool acquire,
bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments)
{
- ConsumerImpl::shared_ptr c(new ConsumerImpl(this, tag, queue, ackRequired, acquire, exclusive, resumeId, resumeTtl, arguments));
+ // "tag" is only guaranteed to be unique to this session (see AMQP 0-10 Message.subscribe, destination).
+ // Create a globally unique name so the broker can identify individual consumers
+ std::string name = session.getSessionId().str() + SEPARATOR + tag;
+ ConsumerImpl::shared_ptr c(new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
queue->consume(c, exclusive);//may throw exception
consumers[tag] = c;
}
@@ -267,15 +274,15 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
bool ack,
bool _acquire,
bool _exclusive,
+ const string& _tag,
const string& _resumeId,
uint64_t _resumeTtl,
const framing::FieldTable& _arguments
) :
- Consumer(_acquire),
+ Consumer(_name, _acquire),
parent(_parent),
- name(_name),
queue(_queue),
ackExpected(ack),
acquire(_acquire),
@@ -284,6 +291,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
windowActive(false),
exclusive(_exclusive),
resumeId(_resumeId),
+ tag(_tag),
resumeTtl(_resumeTtl),
arguments(_arguments),
msgCredit(0),
@@ -300,7 +308,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
if (agent != 0)
{
- mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name,
+ mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(),
!acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments));
agent->addObject (mgmtObject);
mgmtObject->set_creditMode("WINDOW");
@@ -332,16 +340,15 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
assertClusterSafe();
allocateCredit(msg.payload);
- DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing);
+ DeliveryRecord record(msg, queue, getTag(), acquire, !ackExpected, windowing);
bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
if (sync) deliveryCount = 0;//reset
parent->deliver(record, sync);
- if (!ackExpected && acquire) record.setEnded();//allows message to be released now its been delivered
if (windowing || ackExpected || !acquire) {
parent->record(record);
}
- if (acquire && !ackExpected) {
- queue->dequeue(0, msg);
+ if (acquire && !ackExpected) { // auto acquire && auto accept
+ record.accept( 0 /*no ctxt*/ );
}
if (mgmtObject) { mgmtObject->inc_delivered(); }
return true;
@@ -371,7 +378,7 @@ struct ConsumerName {
};
ostream& operator<<(ostream& o, const ConsumerName& pc) {
- return o << pc.consumer.getName() << " on "
+ return o << pc.consumer.getTag() << " on "
<< pc.consumer.getParent().getSession().getSessionId();
}
}
@@ -561,50 +568,61 @@ void SemanticState::deliver(DeliveryRecord& msg, bool sync)
return deliveryAdapter.deliver(msg, sync);
}
-SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination)
+const SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const
{
- ConsumerImplMap::iterator i = consumers.find(destination);
- if (i == consumers.end()) {
- throw NotFoundException(QPID_MSG("Unknown destination " << destination));
+ ConsumerImpl::shared_ptr consumer;
+ if (!find(destination, consumer)) {
+ throw NotFoundException(QPID_MSG("Unknown destination " << destination << " session=" << session.getSessionId()));
} else {
- return *(i->second);
+ return consumer;
+ }
+}
+
+bool SemanticState::find(const std::string& destination, ConsumerImpl::shared_ptr& consumer) const
+{
+ // @todo KAG gsim: shouldn't the consumers map be locked????
+ ConsumerImplMap::const_iterator i = consumers.find(destination);
+ if (i == consumers.end()) {
+ return false;
}
+ consumer = i->second;
+ return true;
}
void SemanticState::setWindowMode(const std::string& destination)
{
- find(destination).setWindowMode();
+ find(destination)->setWindowMode();
}
void SemanticState::setCreditMode(const std::string& destination)
{
- find(destination).setCreditMode();
+ find(destination)->setCreditMode();
}
void SemanticState::addByteCredit(const std::string& destination, uint32_t value)
{
- ConsumerImpl& c = find(destination);
- c.addByteCredit(value);
- c.requestDispatch();
+ ConsumerImpl::shared_ptr c = find(destination);
+ c->addByteCredit(value);
+ c->requestDispatch();
}
void SemanticState::addMessageCredit(const std::string& destination, uint32_t value)
{
- ConsumerImpl& c = find(destination);
- c.addMessageCredit(value);
- c.requestDispatch();
+ ConsumerImpl::shared_ptr c = find(destination);
+ c->addMessageCredit(value);
+ c->requestDispatch();
}
void SemanticState::flush(const std::string& destination)
{
- find(destination).flush();
+ find(destination)->flush();
}
void SemanticState::stop(const std::string& destination)
{
- find(destination).stop();
+ find(destination)->stop();
}
void SemanticState::ConsumerImpl::setWindowMode()