summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp66
1 files changed, 33 insertions, 33 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index cfc379f47c..ba1f989f7c 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -88,7 +88,7 @@ void SemanticState::closed() {
//prevent requeued messages being redelivered to consumers
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
disable(i->second);
- }
+ }
if (dtxBuffer.get()) {
dtxBuffer->fail();
}
@@ -107,7 +107,7 @@ bool SemanticState::exists(const string& consumerTag){
return consumers.find(consumerTag) != consumers.end();
}
-void SemanticState::consume(const string& tag,
+void SemanticState::consume(const string& tag,
Queue::shared_ptr queue, bool ackRequired, bool acquire,
bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments)
{
@@ -116,7 +116,8 @@ void SemanticState::consume(const string& tag,
consumers[tag] = c;
}
-void SemanticState::cancel(const string& tag){
+bool SemanticState::cancel(const string& tag)
+{
ConsumerImplMap::iterator i = consumers.find(tag);
if (i != consumers.end()) {
cancel(i->second);
@@ -124,7 +125,9 @@ void SemanticState::cancel(const string& tag){
//should cancel all unacked messages for this consumer so that
//they are not redelivered on recovery
for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag));
-
+ return true;
+ } else {
+ return false;
}
}
@@ -194,7 +197,7 @@ void SemanticState::endDtx(const std::string& xid, bool fail)
dtxBuffer->fail();
} else {
dtxBuffer->markEnded();
- }
+ }
dtxBuffer.reset();
}
@@ -254,9 +257,9 @@ void SemanticState::record(const DeliveryRecord& delivery)
const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
-SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
- const string& _name,
- Queue::shared_ptr _queue,
+SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
+ const string& _name,
+ Queue::shared_ptr _queue,
bool ack,
bool _acquire,
bool _exclusive,
@@ -265,20 +268,20 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
const framing::FieldTable& _arguments
-) :
+) :
Consumer(_acquire),
- parent(_parent),
- name(_name),
- queue(_queue),
- ackExpected(ack),
+ parent(_parent),
+ name(_name),
+ queue(_queue),
+ ackExpected(ack),
acquire(_acquire),
- blocked(true),
+ blocked(true),
windowing(true),
exclusive(_exclusive),
resumeId(_resumeId),
resumeTtl(_resumeTtl),
arguments(_arguments),
- msgCredit(0),
+ msgCredit(0),
byteCredit(0),
notifyEnabled(true),
syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)),
@@ -289,7 +292,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
{
ManagementAgent* agent = parent->session.getBroker().getManagementAgent();
qpid::management::Manageable* ms = dynamic_cast<qpid::management::Manageable*> (&(parent->session));
-
+
if (agent != 0)
{
mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name,
@@ -331,7 +334,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
if (!ackExpected && acquire) record.setEnded();//allows message to be released now its been delivered
if (windowing || ackExpected || !acquire) {
parent->record(record);
- }
+ }
if (acquire && !ackExpected) {
queue->dequeue(0, msg);
}
@@ -351,7 +354,7 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
// checkCredit fails because the message is to big, we should
// remain on queue's listener list for possible smaller messages
// in future.
- //
+ //
blocked = !(filter(msg) && checkCredit(msg));
return !blocked;
}
@@ -372,7 +375,7 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
{
assertClusterSafe();
uint32_t originalMsgCredit = msgCredit;
- uint32_t originalByteCredit = byteCredit;
+ uint32_t originalByteCredit = byteCredit;
if (msgCredit != 0xFFFFFFFF) {
msgCredit--;
}
@@ -382,7 +385,7 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this)
<< ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit
<< " now bytes: " << byteCredit << " msgs: " << msgCredit);
-
+
}
bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
@@ -396,7 +399,7 @@ bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
return enoughCredit;
}
-SemanticState::ConsumerImpl::~ConsumerImpl()
+SemanticState::ConsumerImpl::~ConsumerImpl()
{
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
@@ -414,7 +417,7 @@ void SemanticState::unsubscribe(ConsumerImpl::shared_ptr c)
Queue::shared_ptr queue = c->getQueue();
if(queue) {
queue->cancel(c);
- if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
+ if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
Queue::tryAutoDelete(session.getBroker(), queue);
}
}
@@ -457,7 +460,7 @@ const std::string nullstring;
void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
msg->setTimestamp(getSession().getBroker().getExpiryPolicy());
-
+
std::string exchangeName = msg->getExchangeName();
if (!cacheExchange || cacheExchange->getName() != exchangeName)
cacheExchange = session.getBroker().getExchanges().get(exchangeName);
@@ -466,7 +469,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
/* verify the userid if specified: */
std::string id =
msg->hasProperties<MessageProperties>() ? msg->getProperties<MessageProperties>()->getUserId() : nullstring;
-
+
if (authMsg && !id.empty() && !(id == userID || (isDefaultRealm && id == userName)))
{
QPID_LOG(debug, "authorised user id : " << userID << " but user id in message declared as " << id);
@@ -484,7 +487,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
if (!strategy.delivered) {
//TODO:if discard-unroutable, just drop it
- //TODO:else if accept-mode is explicit, reject it
+ //TODO:else if accept-mode is explicit, reject it
//else route it to alternate exchange
if (cacheExchange->getAlternate()) {
cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
@@ -513,7 +516,7 @@ void SemanticState::ConsumerImpl::requestDispatch()
}
bool SemanticState::complete(DeliveryRecord& delivery)
-{
+{
ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
if (i != consumers.end()) {
i->second->complete(delivery);
@@ -541,7 +544,7 @@ void SemanticState::recover(bool requeue)
unacked.clear();
for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
}else{
- for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this));
+ for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this));
//unconfirmed messages re redelivered and therefore have their
//id adjusted, confirmed messages are not and so the ordering
//w.r.t id is lost
@@ -673,7 +676,7 @@ Queue::shared_ptr SemanticState::getQueue(const string& name) const {
}
AckRange SemanticState::findRange(DeliveryId first, DeliveryId last)
-{
+{
return DeliveryRecord::findRange(unacked, first, last);
}
@@ -764,13 +767,13 @@ void SemanticState::accepted(const SequenceSet& commands) {
//in transactional mode, don't dequeue or remove, just
//maintain set of acknowledged messages:
accumulatedAck.add(commands);
-
+
if (dtxBuffer.get()) {
//if enlisted in a dtx, copy the relevant slice from
//unacked and record it against that transaction
TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
accumulatedAck.clear();
- dtxBuffer->enlist(txAck);
+ dtxBuffer->enlist(txAck);
//mark the relevant messages as 'ended' in unacked
//if the messages are already completed, they can be
@@ -792,7 +795,6 @@ void SemanticState::accepted(const SequenceSet& commands) {
}
void SemanticState::completed(const SequenceSet& commands) {
- assertClusterSafe();
DeliveryRecords::iterator removed =
remove_if(unacked.begin(), unacked.end(),
isInSequenceSetAnd(commands,
@@ -803,7 +805,6 @@ void SemanticState::completed(const SequenceSet& commands) {
void SemanticState::attached()
{
- assertClusterSafe();
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
i->second->enableNotify();
session.getConnection().outputTasks.addOutputTask(i->second.get());
@@ -813,7 +814,6 @@ void SemanticState::attached()
void SemanticState::detached()
{
- assertClusterSafe();
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
i->second->disableNotify();
session.getConnection().outputTasks.removeOutputTask(i->second.get());