diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java | 91 |
1 files changed, 50 insertions, 41 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java index 66ed6f1e62..28d8cb2ec7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java @@ -20,17 +20,10 @@ */ package org.apache.qpid.server.transport; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.StringTokenizer; -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; +import static org.apache.qpid.transport.Connection.State.CLOSE_RCVD; + import org.apache.qpid.common.ServerPropertyNames; +import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.server.configuration.BrokerConfig; import org.apache.qpid.server.protocol.AMQConnectionModel; @@ -41,24 +34,25 @@ import org.apache.qpid.server.security.auth.AuthenticationResult; import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus; import org.apache.qpid.server.subscription.Subscription_0_10; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.transport.Binary; -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.ConnectionClose; -import org.apache.qpid.transport.ConnectionCloseCode; -import org.apache.qpid.transport.ConnectionOpen; -import org.apache.qpid.transport.ConnectionOpenOk; -import org.apache.qpid.transport.ConnectionStartOk; -import org.apache.qpid.transport.ConnectionTuneOk; -import org.apache.qpid.transport.ServerDelegate; -import org.apache.qpid.transport.Session; -import org.apache.qpid.transport.SessionAttach; -import org.apache.qpid.transport.SessionDelegate; -import org.apache.qpid.transport.SessionDetach; -import org.apache.qpid.transport.SessionDetachCode; -import org.apache.qpid.transport.SessionDetached; +import org.apache.qpid.transport.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.StringTokenizer; public class ServerConnectionDelegate extends ServerDelegate { + private static final Logger LOGGER = LoggerFactory.getLogger(ServerConnectionDelegate.class); + private final String _localFQDN; private final IApplicationRegistry _appRegistry; private int _maxNoOfChannels; @@ -140,17 +134,20 @@ public class ServerConnectionDelegate extends ServerDelegate } } + @Override public void connectionClose(Connection conn, ConnectionClose close) { + final ServerConnection sconn = (ServerConnection) conn; try { - ((ServerConnection) conn).logClosed(); + sconn.logClosed(); } finally { - super.connectionClose(conn, close); + sconn.closeCode(close); + sconn.setState(CLOSE_RCVD); + sendConnectionCloseOkAndCloseSender(conn); } - } public void connectionOpen(Connection conn, ConnectionOpen open) @@ -177,19 +174,19 @@ public class ServerConnectionDelegate extends ServerDelegate if (!vhost.getSecurityManager().accessVirtualhost(vhostName, ((ProtocolEngine) sconn.getConfig()).getRemoteAddress())) { - sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Permission denied '"+vhostName+"'")); sconn.setState(Connection.State.CLOSING); + sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Permission denied '"+vhostName+"'")); } else { - sconn.invoke(new ConnectionOpenOk(Collections.emptyList())); - sconn.setState(Connection.State.OPEN); + sconn.setState(Connection.State.OPEN); + sconn.invoke(new ConnectionOpenOk(Collections.emptyList())); } } else { - sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown virtualhost '"+vhostName+"'")); sconn.setState(Connection.State.CLOSING); + sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown virtualhost '"+vhostName+"'")); } } @@ -202,9 +199,9 @@ public class ServerConnectionDelegate extends ServerDelegate if (okChannelMax > getChannelMax()) { - _logger.error("Connection '" + sconn.getConnectionId() + "' being severed, " + + LOGGER.error("Connection '" + sconn.getConnectionId() + "' being severed, " + "client connectionTuneOk returned a channelMax (" + okChannelMax + - ") above the servers offered limit (" + getChannelMax() +")"); + ") above the server's offered limit (" + getChannelMax() +")"); //Due to the error we must forcefully close the connection without negotiation sconn.getSender().close(); @@ -234,23 +231,26 @@ public class ServerConnectionDelegate extends ServerDelegate @Override public void sessionDetach(Connection conn, SessionDetach dtc) { - // To ensure a clean detach, we unregister any remaining subscriptions. Unregister ensures - // that any in-progress delivery (SubFlushRunner/QueueRunner) is completed before the unregister + // To ensure a clean detach, we stop any remaining subscriptions. Stop ensures + // that any in-progress delivery (SubFlushRunner/QueueRunner) is completed before the stop // completes. - unregisterAllSubscriptions(conn, dtc); + stopAllSubscriptions(conn, dtc); + Session ssn = conn.getSession(dtc.getChannel()); + ((ServerSession)ssn).setClose(true); super.sessionDetach(conn, dtc); } - private void unregisterAllSubscriptions(Connection conn, SessionDetach dtc) + private void stopAllSubscriptions(Connection conn, SessionDetach dtc) { final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel()); final Collection<Subscription_0_10> subs = ssn.getSubscriptions(); for (Subscription_0_10 subscription_0_10 : subs) { - ssn.unregister(subscription_0_10); + subscription_0_10.stop(); } } + @Override public void sessionAttach(final Connection conn, final SessionAttach atc) { @@ -258,8 +258,7 @@ public class ServerConnectionDelegate extends ServerDelegate if(isSessionNameUnique(atc.getName(), conn)) { - ssn = sessionAttachImpl(conn, atc); - conn.registerSession(ssn); + super.sessionAttach(conn, atc); ((ServerConnection)conn).checkForNotification(); } else @@ -299,4 +298,14 @@ public class ServerConnectionDelegate extends ServerDelegate { return _clientProperties; } + + public String getClientId() + { + return _clientProperties == null ? null : (String) _clientProperties.get(ConnectionStartProperties.CLIENT_ID_0_10); + } + + public String getClientVersion() + { + return _clientProperties == null ? null : (String) _clientProperties.get(ConnectionStartProperties.VERSION_0_10); + } } |