diff options
Diffstat (limited to 'cpp/src/qpid/broker/amqp/Connection.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/amqp/Connection.cpp | 25 |
1 files changed, 22 insertions, 3 deletions
diff --git a/cpp/src/qpid/broker/amqp/Connection.cpp b/cpp/src/qpid/broker/amqp/Connection.cpp index 514add7e44..c95a2d3537 100644 --- a/cpp/src/qpid/broker/amqp/Connection.cpp +++ b/cpp/src/qpid/broker/amqp/Connection.cpp @@ -70,7 +70,7 @@ Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, Broker : BrokerContext(b), ManagedConnection(getBroker(), i), connection(pn_connection()), transport(pn_transport()), - out(o), id(i), haveOutput(true), closeInitiated(false) + out(o), id(i), haveOutput(true), closeInitiated(false), closeRequested(false) { if (pn_transport_bind(transport, connection)) { //error @@ -169,9 +169,22 @@ size_t Connection::encode(char* buffer, size_t size) bool Connection::canEncode() { if (!closeInitiated) { + if (closeRequested) { + close(); + return true; + } try { - for (Sessions::iterator i = sessions.begin();i != sessions.end(); ++i) { - if (i->second->dispatch()) haveOutput = true; + for (Sessions::iterator i = sessions.begin();i != sessions.end();) { + if (i->second->endedByManagement()) { + pn_session_close(i->first); + i->second->close(); + sessions.erase(i++); + haveOutput = true; + QPID_LOG_CAT(debug, model, id << " session ended by management"); + } else { + if (i->second->dispatch()) haveOutput = true; + ++i; + } } process(); } catch (const Exception& e) { @@ -372,4 +385,10 @@ void Connection::setUserId(const std::string& user) throw Exception(qpid::amqp::error_conditions::RESOURCE_LIMIT_EXCEEDED, "User connection denied by configured limit"); } } + +void Connection::closedByManagement() +{ + closeRequested = true; + out.activateOutput(); +} }}} // namespace qpid::broker::amqp |
