diff options
Diffstat (limited to 'cpp/src/qpid/broker/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 230 |
1 files changed, 178 insertions, 52 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index ab18d1f035..17de83e033 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,14 +18,17 @@ * under the License. * */ -#include "Connection.h" -#include "SessionState.h" -#include "Bridge.h" +#include "qpid/broker/Connection.h" +#include "qpid/broker/SessionState.h" +#include "qpid/broker/Bridge.h" +#include "qpid/broker/Broker.h" #include "qpid/log/Statement.h" #include "qpid/ptr_map.h" #include "qpid/framing/AMQP_ClientProxy.h" -#include "qpid/agent/ManagementAgent.h" +#include "qpid/framing/enum.h" +#include "qmf/org/apache/qpid/broker/EventClientConnect.h" +#include "qmf/org/apache/qpid/broker/EventClientDisconnect.h" #include <boost/bind.hpp> #include <boost/ptr_container/ptr_vector.hpp> @@ -34,30 +37,53 @@ #include <iostream> #include <assert.h> -using namespace boost; using namespace qpid::sys; using namespace qpid::framing; -using namespace qpid::sys; using qpid::ptr_map_ptr; using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; +namespace _qmf = qmf::org::apache::qpid::broker; namespace qpid { namespace broker { -Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_) : +struct ConnectionTimeoutTask : public sys::TimerTask { + sys::Timer& timer; + Connection& connection; + + ConnectionTimeoutTask(uint16_t hb, sys::Timer& t, Connection& c) : + TimerTask(Duration(hb*2*TIME_SEC)), + timer(t), + connection(c) + {} + + void touch() { + restart(); + } + + void fire() { + // If we get here then we've not received any traffic in the timeout period + // Schedule closing the connection for the io thread + QPID_LOG(error, "Connection timed out: closing"); + connection.abort(); + } +}; + +Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, unsigned int ssf, bool isLink_, uint64_t objectId) : ConnectionState(out_, broker_), - receivedFn(boost::bind(&Connection::receivedImpl, this, _1)), - closedFn(boost::bind(&Connection::closedImpl, this)), - doOutputFn(boost::bind(&Connection::doOutputImpl, this)), + ssf(ssf), adapter(*this, isLink_), isLink(isLink_), mgmtClosing(false), mgmtId(mgmtId_), mgmtObject(0), - links(broker_.getLinks()) + links(broker_.getLinks()), + agent(0), + timer(broker_.getTimer()), + errorListener(0), + shadow(false) { Manageable* parent = broker.GetVhostObject(); @@ -66,33 +92,48 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std if (parent != 0) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + agent = broker_.getManagementAgent(); - if (agent != 0) - mgmtObject = new management::Connection(agent, this, parent, mgmtId, !isLink); - agent->addObject(mgmtObject); - } - Plugin::initializeAll(*this); // Let plug-ins update extension points. + // TODO set last bool true if system connection + if (agent != 0) { + mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false); + agent->addObject(mgmtObject, objectId, true); + } + ConnectionState::setUrl(mgmtId); + } + if (!isShadow()) broker.getConnectionCounter().inc_connectionCount(); } void Connection::requestIOProcessing(boost::function0<void> callback) { - ioCallback = callback; - out->activateOutput(); + ScopedLock<Mutex> l(ioCallbackLock); + ioCallbacks.push(callback); + out.activateOutput(); } Connection::~Connection() { - if (mgmtObject != 0) + if (mgmtObject != 0) { mgmtObject->resourceDestroy(); + if (!isLink) + agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, ConnectionState::getUserId())); + } if (isLink) links.notifyClosed(mgmtId); + + if (heartbeatTimer) + heartbeatTimer->cancel(); + if (timeoutTimer) + timeoutTimer->cancel(); + + if (!isShadow()) broker.getConnectionCounter().dec_connectionCount(); } -void Connection::received(framing::AMQFrame& frame) { receivedFn(frame); } +void Connection::received(framing::AMQFrame& frame) { + // Received frame on connection so delay timeout + restartTimeout(); -void Connection::receivedImpl(framing::AMQFrame& frame){ if (frame.getChannel() == 0 && frame.getMethod()) { adapter.handle(frame); } else { @@ -110,7 +151,7 @@ void Connection::recordFromServer(framing::AMQFrame& frame) if (mgmtObject != 0) { mgmtObject->inc_framesToClient(); - mgmtObject->inc_bytesToClient(frame.size()); + mgmtObject->inc_bytesToClient(frame.encodedSize()); } } @@ -119,7 +160,7 @@ void Connection::recordFromClient(framing::AMQFrame& frame) if (mgmtObject != 0) { mgmtObject->inc_framesFromClient(); - mgmtObject->inc_bytesFromClient(frame.size()); + mgmtObject->inc_bytesFromClient(frame.encodedSize()); } } @@ -156,29 +197,55 @@ void Connection::notifyConnectionForced(const string& text) void Connection::setUserId(const string& userId) { ConnectionState::setUserId(userId); - if (mgmtObject != 0) + if (mgmtObject != 0) { mgmtObject->set_authIdentity(userId); + agent->raiseEvent(_qmf::EventClientConnect(mgmtId, userId)); + } } -void Connection::close( - ReplyCode code, const string& text, ClassId classId, MethodId methodId) +void Connection::setFederationLink(bool b) { - adapter.close(code, text, classId, methodId); + ConnectionState::setFederationLink(b); + if (mgmtObject != 0) + mgmtObject->set_federationLink(b); +} + +void Connection::close(connection::CloseCode code, const string& text) +{ + QPID_LOG_IF(error, code != connection::CLOSE_CODE_NORMAL, "Connection " << mgmtId << " closed by error: " << text << "(" << code << ")"); + if (heartbeatTimer) + heartbeatTimer->cancel(); + if (timeoutTimer) + timeoutTimer->cancel(); + adapter.close(code, text); + //make sure we delete dangling pointers from outputTasks before deleting sessions + outputTasks.removeAll(); channels.clear(); getOutput().close(); } +// Send a close to the client but keep the channels. Used by cluster. +void Connection::sendClose() { + if (heartbeatTimer) + heartbeatTimer->cancel(); + if (timeoutTimer) + timeoutTimer->cancel(); + adapter.close(connection::CLOSE_CODE_NORMAL, "OK"); + getOutput().close(); +} + void Connection::idleOut(){} void Connection::idleIn(){} -void Connection::closed() { closedFn(); } - -void Connection::closedImpl(){ // Physically closed, suspend open sessions. +void Connection::closed(){ // Physically closed, suspend open sessions. + if (heartbeatTimer) + heartbeatTimer->cancel(); + if (timeoutTimer) + timeoutTimer->cancel(); try { - while (!channels.empty()) + while (!channels.empty()) ptr_map_ptr(channels.begin())->handleDetach(); - // FIXME aconway 2008-07-15: exclusive is per-session not per-connection in 0-10. while (!exclusiveQueues.empty()) { Queue::shared_ptr q(exclusiveQueues.front()); q->releaseExclusiveOwnership(); @@ -195,27 +262,36 @@ void Connection::closedImpl(){ // Physically closed, suspend open sessions. bool Connection::hasOutput() { return outputTasks.hasOutput(); } -bool Connection::doOutput() { return doOutputFn(); } - -bool Connection::doOutputImpl() { - try{ - if (ioCallback) - ioCallback(); // Lend the IO thread for management processing - ioCallback = 0; - - if (mgmtClosing) - close(403, "Closed by Management Request", 0, 0); - else +bool Connection::doOutput() { + try { + { + ScopedLock<Mutex> l(ioCallbackLock); + while (!ioCallbacks.empty()) { + boost::function0<void> cb = ioCallbacks.front(); + ioCallbacks.pop(); + ScopedUnlock<Mutex> ul(ioCallbackLock); + cb(); // Lend the IO thread for management processing + } + } + if (mgmtClosing) { + closed(); + close(connection::CLOSE_CODE_CONNECTION_FORCED, "Closed by Management Request"); + } else { //then do other output as needed: return outputTasks.doOutput(); + } }catch(ConnectionException& e){ - close(e.code, e.getMessage(), 0, 0); + close(e.code, e.getMessage()); }catch(std::exception& e){ - close(541/*internal error*/, e.what(), 0, 0); + close(connection::CLOSE_CODE_CONNECTION_FORCED, e.what()); } return false; } +void Connection::sendHeartbeat() { + adapter.heartbeat(); +} + void Connection::closeChannel(uint16_t id) { ChannelMap::iterator i = channels.find(id); if (i != channels.end()) channels.erase(i); @@ -234,7 +310,7 @@ ManagementObject* Connection::GetManagementObject(void) const return (ManagementObject*) mgmtObject; } -Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&) +Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&, string&) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; @@ -242,10 +318,10 @@ Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&) switch (methodId) { - case management::Connection::METHOD_CLOSE : + case _qmf::Connection::METHOD_CLOSE : mgmtClosing = true; if (mgmtObject != 0) mgmtObject->set_closing(1); - out->activateOutput(); + out.activateOutput(); status = Manageable::STATUS_OK; break; } @@ -253,5 +329,55 @@ Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&) return status; } -}} +void Connection::setSecureConnection(SecureConnection* s) +{ + adapter.setSecureConnection(s); +} + +struct ConnectionHeartbeatTask : public sys::TimerTask { + sys::Timer& timer; + Connection& connection; + ConnectionHeartbeatTask(uint16_t hb, sys::Timer& t, Connection& c) : + TimerTask(Duration(hb*TIME_SEC)), + timer(t), + connection(c) + {} + + void fire() { + // Setup next firing + setupNextFire(); + timer.add(this); + + // Send Heartbeat + connection.sendHeartbeat(); + } +}; +void Connection::abort() +{ + // Make sure that we don't try to send a heartbeat as we're + // aborting the connection + if (heartbeatTimer) + heartbeatTimer->cancel(); + + out.abort(); +} + +void Connection::setHeartbeatInterval(uint16_t heartbeat) +{ + setHeartbeat(heartbeat); + if (heartbeat > 0 && !isShadow()) { + heartbeatTimer = new ConnectionHeartbeatTask(heartbeat, timer, *this); + timer.add(heartbeatTimer); + timeoutTimer = new ConnectionTimeoutTask(heartbeat, timer, *this); + timer.add(timeoutTimer); + } +} + +void Connection::restartTimeout() +{ + if (timeoutTimer) + timeoutTimer->touch(); +} + +}} |