summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java91
1 files changed, 50 insertions, 41 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
index 66ed6f1e62..28d8cb2ec7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
@@ -20,17 +20,10 @@
*/
package org.apache.qpid.server.transport;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.StringTokenizer;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
+import static org.apache.qpid.transport.Connection.State.CLOSE_RCVD;
+
import org.apache.qpid.common.ServerPropertyNames;
+import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.configuration.BrokerConfig;
import org.apache.qpid.server.protocol.AMQConnectionModel;
@@ -41,24 +34,25 @@ import org.apache.qpid.server.security.auth.AuthenticationResult;
import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.Binary;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ConnectionClose;
-import org.apache.qpid.transport.ConnectionCloseCode;
-import org.apache.qpid.transport.ConnectionOpen;
-import org.apache.qpid.transport.ConnectionOpenOk;
-import org.apache.qpid.transport.ConnectionStartOk;
-import org.apache.qpid.transport.ConnectionTuneOk;
-import org.apache.qpid.transport.ServerDelegate;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionAttach;
-import org.apache.qpid.transport.SessionDelegate;
-import org.apache.qpid.transport.SessionDetach;
-import org.apache.qpid.transport.SessionDetachCode;
-import org.apache.qpid.transport.SessionDetached;
+import org.apache.qpid.transport.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
public class ServerConnectionDelegate extends ServerDelegate
{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ServerConnectionDelegate.class);
+
private final String _localFQDN;
private final IApplicationRegistry _appRegistry;
private int _maxNoOfChannels;
@@ -140,17 +134,20 @@ public class ServerConnectionDelegate extends ServerDelegate
}
}
+ @Override
public void connectionClose(Connection conn, ConnectionClose close)
{
+ final ServerConnection sconn = (ServerConnection) conn;
try
{
- ((ServerConnection) conn).logClosed();
+ sconn.logClosed();
}
finally
{
- super.connectionClose(conn, close);
+ sconn.closeCode(close);
+ sconn.setState(CLOSE_RCVD);
+ sendConnectionCloseOkAndCloseSender(conn);
}
-
}
public void connectionOpen(Connection conn, ConnectionOpen open)
@@ -177,19 +174,19 @@ public class ServerConnectionDelegate extends ServerDelegate
if (!vhost.getSecurityManager().accessVirtualhost(vhostName, ((ProtocolEngine) sconn.getConfig()).getRemoteAddress()))
{
- sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Permission denied '"+vhostName+"'"));
sconn.setState(Connection.State.CLOSING);
+ sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Permission denied '"+vhostName+"'"));
}
else
{
- sconn.invoke(new ConnectionOpenOk(Collections.emptyList()));
- sconn.setState(Connection.State.OPEN);
+ sconn.setState(Connection.State.OPEN);
+ sconn.invoke(new ConnectionOpenOk(Collections.emptyList()));
}
}
else
{
- sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown virtualhost '"+vhostName+"'"));
sconn.setState(Connection.State.CLOSING);
+ sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown virtualhost '"+vhostName+"'"));
}
}
@@ -202,9 +199,9 @@ public class ServerConnectionDelegate extends ServerDelegate
if (okChannelMax > getChannelMax())
{
- _logger.error("Connection '" + sconn.getConnectionId() + "' being severed, " +
+ LOGGER.error("Connection '" + sconn.getConnectionId() + "' being severed, " +
"client connectionTuneOk returned a channelMax (" + okChannelMax +
- ") above the servers offered limit (" + getChannelMax() +")");
+ ") above the server's offered limit (" + getChannelMax() +")");
//Due to the error we must forcefully close the connection without negotiation
sconn.getSender().close();
@@ -234,23 +231,26 @@ public class ServerConnectionDelegate extends ServerDelegate
@Override public void sessionDetach(Connection conn, SessionDetach dtc)
{
- // To ensure a clean detach, we unregister any remaining subscriptions. Unregister ensures
- // that any in-progress delivery (SubFlushRunner/QueueRunner) is completed before the unregister
+ // To ensure a clean detach, we stop any remaining subscriptions. Stop ensures
+ // that any in-progress delivery (SubFlushRunner/QueueRunner) is completed before the stop
// completes.
- unregisterAllSubscriptions(conn, dtc);
+ stopAllSubscriptions(conn, dtc);
+ Session ssn = conn.getSession(dtc.getChannel());
+ ((ServerSession)ssn).setClose(true);
super.sessionDetach(conn, dtc);
}
- private void unregisterAllSubscriptions(Connection conn, SessionDetach dtc)
+ private void stopAllSubscriptions(Connection conn, SessionDetach dtc)
{
final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel());
final Collection<Subscription_0_10> subs = ssn.getSubscriptions();
for (Subscription_0_10 subscription_0_10 : subs)
{
- ssn.unregister(subscription_0_10);
+ subscription_0_10.stop();
}
}
+
@Override
public void sessionAttach(final Connection conn, final SessionAttach atc)
{
@@ -258,8 +258,7 @@ public class ServerConnectionDelegate extends ServerDelegate
if(isSessionNameUnique(atc.getName(), conn))
{
- ssn = sessionAttachImpl(conn, atc);
- conn.registerSession(ssn);
+ super.sessionAttach(conn, atc);
((ServerConnection)conn).checkForNotification();
}
else
@@ -299,4 +298,14 @@ public class ServerConnectionDelegate extends ServerDelegate
{
return _clientProperties;
}
+
+ public String getClientId()
+ {
+ return _clientProperties == null ? null : (String) _clientProperties.get(ConnectionStartProperties.CLIENT_ID_0_10);
+ }
+
+ public String getClientVersion()
+ {
+ return _clientProperties == null ? null : (String) _clientProperties.get(ConnectionStartProperties.VERSION_0_10);
+ }
}