summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Connection.cpp')
-rw-r--r--cpp/src/qpid/broker/Connection.cpp230
1 files changed, 178 insertions, 52 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index ab18d1f035..17de83e033 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,14 +18,17 @@
* under the License.
*
*/
-#include "Connection.h"
-#include "SessionState.h"
-#include "Bridge.h"
+#include "qpid/broker/Connection.h"
+#include "qpid/broker/SessionState.h"
+#include "qpid/broker/Bridge.h"
+#include "qpid/broker/Broker.h"
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
#include "qpid/framing/AMQP_ClientProxy.h"
-#include "qpid/agent/ManagementAgent.h"
+#include "qpid/framing/enum.h"
+#include "qmf/org/apache/qpid/broker/EventClientConnect.h"
+#include "qmf/org/apache/qpid/broker/EventClientDisconnect.h"
#include <boost/bind.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
@@ -34,30 +37,53 @@
#include <iostream>
#include <assert.h>
-using namespace boost;
using namespace qpid::sys;
using namespace qpid::framing;
-using namespace qpid::sys;
using qpid::ptr_map_ptr;
using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
+namespace _qmf = qmf::org::apache::qpid::broker;
namespace qpid {
namespace broker {
-Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_) :
+struct ConnectionTimeoutTask : public sys::TimerTask {
+ sys::Timer& timer;
+ Connection& connection;
+
+ ConnectionTimeoutTask(uint16_t hb, sys::Timer& t, Connection& c) :
+ TimerTask(Duration(hb*2*TIME_SEC)),
+ timer(t),
+ connection(c)
+ {}
+
+ void touch() {
+ restart();
+ }
+
+ void fire() {
+ // If we get here then we've not received any traffic in the timeout period
+ // Schedule closing the connection for the io thread
+ QPID_LOG(error, "Connection timed out: closing");
+ connection.abort();
+ }
+};
+
+Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, unsigned int ssf, bool isLink_, uint64_t objectId) :
ConnectionState(out_, broker_),
- receivedFn(boost::bind(&Connection::receivedImpl, this, _1)),
- closedFn(boost::bind(&Connection::closedImpl, this)),
- doOutputFn(boost::bind(&Connection::doOutputImpl, this)),
+ ssf(ssf),
adapter(*this, isLink_),
isLink(isLink_),
mgmtClosing(false),
mgmtId(mgmtId_),
mgmtObject(0),
- links(broker_.getLinks())
+ links(broker_.getLinks()),
+ agent(0),
+ timer(broker_.getTimer()),
+ errorListener(0),
+ shadow(false)
{
Manageable* parent = broker.GetVhostObject();
@@ -66,33 +92,48 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
if (parent != 0)
{
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ agent = broker_.getManagementAgent();
- if (agent != 0)
- mgmtObject = new management::Connection(agent, this, parent, mgmtId, !isLink);
- agent->addObject(mgmtObject);
- }
- Plugin::initializeAll(*this); // Let plug-ins update extension points.
+ // TODO set last bool true if system connection
+ if (agent != 0) {
+ mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false);
+ agent->addObject(mgmtObject, objectId, true);
+ }
+ ConnectionState::setUrl(mgmtId);
+ }
+ if (!isShadow()) broker.getConnectionCounter().inc_connectionCount();
}
void Connection::requestIOProcessing(boost::function0<void> callback)
{
- ioCallback = callback;
- out->activateOutput();
+ ScopedLock<Mutex> l(ioCallbackLock);
+ ioCallbacks.push(callback);
+ out.activateOutput();
}
Connection::~Connection()
{
- if (mgmtObject != 0)
+ if (mgmtObject != 0) {
mgmtObject->resourceDestroy();
+ if (!isLink)
+ agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, ConnectionState::getUserId()));
+ }
if (isLink)
links.notifyClosed(mgmtId);
+
+ if (heartbeatTimer)
+ heartbeatTimer->cancel();
+ if (timeoutTimer)
+ timeoutTimer->cancel();
+
+ if (!isShadow()) broker.getConnectionCounter().dec_connectionCount();
}
-void Connection::received(framing::AMQFrame& frame) { receivedFn(frame); }
+void Connection::received(framing::AMQFrame& frame) {
+ // Received frame on connection so delay timeout
+ restartTimeout();
-void Connection::receivedImpl(framing::AMQFrame& frame){
if (frame.getChannel() == 0 && frame.getMethod()) {
adapter.handle(frame);
} else {
@@ -110,7 +151,7 @@ void Connection::recordFromServer(framing::AMQFrame& frame)
if (mgmtObject != 0)
{
mgmtObject->inc_framesToClient();
- mgmtObject->inc_bytesToClient(frame.size());
+ mgmtObject->inc_bytesToClient(frame.encodedSize());
}
}
@@ -119,7 +160,7 @@ void Connection::recordFromClient(framing::AMQFrame& frame)
if (mgmtObject != 0)
{
mgmtObject->inc_framesFromClient();
- mgmtObject->inc_bytesFromClient(frame.size());
+ mgmtObject->inc_bytesFromClient(frame.encodedSize());
}
}
@@ -156,29 +197,55 @@ void Connection::notifyConnectionForced(const string& text)
void Connection::setUserId(const string& userId)
{
ConnectionState::setUserId(userId);
- if (mgmtObject != 0)
+ if (mgmtObject != 0) {
mgmtObject->set_authIdentity(userId);
+ agent->raiseEvent(_qmf::EventClientConnect(mgmtId, userId));
+ }
}
-void Connection::close(
- ReplyCode code, const string& text, ClassId classId, MethodId methodId)
+void Connection::setFederationLink(bool b)
{
- adapter.close(code, text, classId, methodId);
+ ConnectionState::setFederationLink(b);
+ if (mgmtObject != 0)
+ mgmtObject->set_federationLink(b);
+}
+
+void Connection::close(connection::CloseCode code, const string& text)
+{
+ QPID_LOG_IF(error, code != connection::CLOSE_CODE_NORMAL, "Connection " << mgmtId << " closed by error: " << text << "(" << code << ")");
+ if (heartbeatTimer)
+ heartbeatTimer->cancel();
+ if (timeoutTimer)
+ timeoutTimer->cancel();
+ adapter.close(code, text);
+ //make sure we delete dangling pointers from outputTasks before deleting sessions
+ outputTasks.removeAll();
channels.clear();
getOutput().close();
}
+// Send a close to the client but keep the channels. Used by cluster.
+void Connection::sendClose() {
+ if (heartbeatTimer)
+ heartbeatTimer->cancel();
+ if (timeoutTimer)
+ timeoutTimer->cancel();
+ adapter.close(connection::CLOSE_CODE_NORMAL, "OK");
+ getOutput().close();
+}
+
void Connection::idleOut(){}
void Connection::idleIn(){}
-void Connection::closed() { closedFn(); }
-
-void Connection::closedImpl(){ // Physically closed, suspend open sessions.
+void Connection::closed(){ // Physically closed, suspend open sessions.
+ if (heartbeatTimer)
+ heartbeatTimer->cancel();
+ if (timeoutTimer)
+ timeoutTimer->cancel();
try {
- while (!channels.empty())
+ while (!channels.empty())
ptr_map_ptr(channels.begin())->handleDetach();
- // FIXME aconway 2008-07-15: exclusive is per-session not per-connection in 0-10.
while (!exclusiveQueues.empty()) {
Queue::shared_ptr q(exclusiveQueues.front());
q->releaseExclusiveOwnership();
@@ -195,27 +262,36 @@ void Connection::closedImpl(){ // Physically closed, suspend open sessions.
bool Connection::hasOutput() { return outputTasks.hasOutput(); }
-bool Connection::doOutput() { return doOutputFn(); }
-
-bool Connection::doOutputImpl() {
- try{
- if (ioCallback)
- ioCallback(); // Lend the IO thread for management processing
- ioCallback = 0;
-
- if (mgmtClosing)
- close(403, "Closed by Management Request", 0, 0);
- else
+bool Connection::doOutput() {
+ try {
+ {
+ ScopedLock<Mutex> l(ioCallbackLock);
+ while (!ioCallbacks.empty()) {
+ boost::function0<void> cb = ioCallbacks.front();
+ ioCallbacks.pop();
+ ScopedUnlock<Mutex> ul(ioCallbackLock);
+ cb(); // Lend the IO thread for management processing
+ }
+ }
+ if (mgmtClosing) {
+ closed();
+ close(connection::CLOSE_CODE_CONNECTION_FORCED, "Closed by Management Request");
+ } else {
//then do other output as needed:
return outputTasks.doOutput();
+ }
}catch(ConnectionException& e){
- close(e.code, e.getMessage(), 0, 0);
+ close(e.code, e.getMessage());
}catch(std::exception& e){
- close(541/*internal error*/, e.what(), 0, 0);
+ close(connection::CLOSE_CODE_CONNECTION_FORCED, e.what());
}
return false;
}
+void Connection::sendHeartbeat() {
+ adapter.heartbeat();
+}
+
void Connection::closeChannel(uint16_t id) {
ChannelMap::iterator i = channels.find(id);
if (i != channels.end()) channels.erase(i);
@@ -234,7 +310,7 @@ ManagementObject* Connection::GetManagementObject(void) const
return (ManagementObject*) mgmtObject;
}
-Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&)
+Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&, string&)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
@@ -242,10 +318,10 @@ Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&)
switch (methodId)
{
- case management::Connection::METHOD_CLOSE :
+ case _qmf::Connection::METHOD_CLOSE :
mgmtClosing = true;
if (mgmtObject != 0) mgmtObject->set_closing(1);
- out->activateOutput();
+ out.activateOutput();
status = Manageable::STATUS_OK;
break;
}
@@ -253,5 +329,55 @@ Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&)
return status;
}
-}}
+void Connection::setSecureConnection(SecureConnection* s)
+{
+ adapter.setSecureConnection(s);
+}
+
+struct ConnectionHeartbeatTask : public sys::TimerTask {
+ sys::Timer& timer;
+ Connection& connection;
+ ConnectionHeartbeatTask(uint16_t hb, sys::Timer& t, Connection& c) :
+ TimerTask(Duration(hb*TIME_SEC)),
+ timer(t),
+ connection(c)
+ {}
+
+ void fire() {
+ // Setup next firing
+ setupNextFire();
+ timer.add(this);
+
+ // Send Heartbeat
+ connection.sendHeartbeat();
+ }
+};
+void Connection::abort()
+{
+ // Make sure that we don't try to send a heartbeat as we're
+ // aborting the connection
+ if (heartbeatTimer)
+ heartbeatTimer->cancel();
+
+ out.abort();
+}
+
+void Connection::setHeartbeatInterval(uint16_t heartbeat)
+{
+ setHeartbeat(heartbeat);
+ if (heartbeat > 0 && !isShadow()) {
+ heartbeatTimer = new ConnectionHeartbeatTask(heartbeat, timer, *this);
+ timer.add(heartbeatTimer);
+ timeoutTimer = new ConnectionTimeoutTask(heartbeat, timer, *this);
+ timer.add(timeoutTimer);
+ }
+}
+
+void Connection::restartTimeout()
+{
+ if (timeoutTimer)
+ timeoutTimer->touch();
+}
+
+}}