summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp35
1 files changed, 24 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index bdd5f33601..7e3090bf17 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -65,7 +65,7 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
tagGenerator("sgen"),
dtxSelected(false),
authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()),
- userID(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@')))
+ userID(getSession().getConnection().getUserId())
{
acl = getSession().getBroker().getAcl();
}
@@ -302,6 +302,18 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
return !blocked;
}
+namespace {
+struct ConsumerName {
+ const SemanticState::ConsumerImpl& consumer;
+ ConsumerName(const SemanticState::ConsumerImpl& ci) : consumer(ci) {}
+};
+
+ostream& operator<<(ostream& o, const ConsumerName& pc) {
+ return o << pc.consumer.getName() << " on "
+ << pc.consumer.getParent().getSession().getSessionId();
+}
+}
+
void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
{
uint32_t originalMsgCredit = msgCredit;
@@ -312,7 +324,7 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
if (byteCredit != 0xFFFFFFFF) {
byteCredit -= msg->getRequiredCredit();
}
- QPID_LOG(debug, "Credit allocated for '" << name << "' on " << parent
+ QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this)
<< ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit
<< " now bytes: " << byteCredit << " msgs: " << msgCredit);
@@ -320,15 +332,13 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
{
- if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) {
- QPID_LOG(debug, "Not enough credit for '" << name << "' on " << parent
- << ", bytes: " << byteCredit << " msgs: " << msgCredit);
- return false;
- } else {
- QPID_LOG(debug, "Credit available for '" << name << "' on " << parent
- << " bytes: " << byteCredit << " msgs: " << msgCredit);
- return true;
- }
+ bool enoughCredit = msgCredit > 0 &&
+ (byteCredit == 0xFFFFFFFF || byteCredit >= msg->getRequiredCredit());
+ QPID_LOG(debug, (enoughCredit ? "Sufficient credit for " : "Insufficient credit for ")
+ << ConsumerName(*this)
+ << ", have bytes: " << byteCredit << " msgs: " << msgCredit
+ << ", need " << msg->getRequiredCredit() << " bytes");
+ return enoughCredit;
}
SemanticState::ConsumerImpl::~ConsumerImpl() {}
@@ -356,6 +366,9 @@ void SemanticState::handle(intrusive_ptr<Message> msg) {
} else {
DeliverableMessage deliverable(msg);
route(msg, deliverable);
+ if (msg->checkContentReleasable()) {
+ msg->releaseContent();
+ }
}
}