summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java276
1 files changed, 43 insertions, 233 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 547f2440db..e9ad4ea8e0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -20,34 +20,15 @@
*/
package org.apache.qpid.server.protocol;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.security.Principal;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.management.JMException;
-import javax.security.auth.Subject;
-import javax.security.sasl.SaslServer;
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.*;
+import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
@@ -83,13 +64,30 @@ import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.util.BytesDataOutput;
+
+import javax.management.JMException;
+import javax.security.auth.Subject;
+import javax.security.sasl.SaslServer;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicBoolean;
public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
{
private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
- private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
-
// to save boxing the channelId and looking up in a map... cache in an array the low numbered
// channels. This value must be of the form 2^x - 1.
private static final int CHANNEL_CACHE_SIZE = 0xff;
@@ -97,7 +95,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
private AMQShortString _contextKey;
- private AMQShortString _clientVersion = null;
+ private String _clientVersion = null;
private VirtualHost _virtualHost;
@@ -119,7 +117,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
private Object _lastSent;
- protected volatile boolean _closed;
+ private volatile boolean _closed;
// maximum number of channels this session should have
private long _maxNoOfChannels = ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
@@ -134,9 +132,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
private ProtocolOutputConverter _protocolOutputConverter;
private Subject _authorizedSubject;
private MethodDispatcher _dispatcher;
- private ProtocolSessionIdentifier _sessionIdentifier;
- private final long _sessionID;
+ private final long _connectionID;
+ private Object _reference = new Object();
private AMQPConnectionActor _actor;
private LogSubject _logSubject;
@@ -174,7 +172,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
_codecFactory = new AMQCodecFactory(true, this);
setNetworkConnection(network);
- _sessionID = connectionId;
+ _connectionID = connectionId;
_actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
@@ -183,7 +181,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
_configStore = virtualHostRegistry.getConfigStore();
_id = _configStore.createId();
- _actor.message(ConnectionMessages.OPEN(null, null, false, false));
+ _actor.message(ConnectionMessages.OPEN(null, null, null, false, false, false));
_registry = virtualHostRegistry.getApplicationRegistry();
initialiseStatistics();
@@ -207,7 +205,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
public long getSessionID()
{
- return _sessionID;
+ return _connectionID;
}
public LogActor getLogActor()
@@ -369,7 +367,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
try
{
// Log incomming protocol negotiation request
- _actor.message(ConnectionMessages.OPEN(null, pi._protocolMajor + "-" + pi._protocolMinor, false, true));
+ _actor.message(ConnectionMessages.OPEN(null, pi.getProtocolMajor() + "-" + pi.getProtocolMinor(), null, false, true, false));
ProtocolVersion pv = pi.checkVersion(); // Fails if not correct
@@ -721,7 +719,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
// However, due to the poor exception handling on the client. The client-user will be notified of the
// InvalidArgument and if they then decide to close the session/connection then the there will be time
// for that to occur i.e. a new close method be sent before the exeption handling can mark the session closed.
- //removeChannel(channelId);
+
_closingChannelsList.remove(channelId);
}
@@ -922,31 +920,22 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
_saslServer = saslServer;
}
- public FieldTable getClientProperties()
- {
- return _clientProperties;
- }
-
public void setClientProperties(FieldTable clientProperties)
{
_clientProperties = clientProperties;
if (_clientProperties != null)
{
- if (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null)
+ _clientVersion = _clientProperties.getString(ConnectionStartProperties.VERSION_0_8);
+
+ if (_clientProperties.getString(ConnectionStartProperties.CLIENT_ID_0_8) != null)
{
- String clientID = _clientProperties.getString(CLIENT_PROPERTIES_INSTANCE);
+ String clientID = _clientProperties.getString(ConnectionStartProperties.CLIENT_ID_0_8);
setContextKey(new AMQShortString(clientID));
// Log the Opening of the connection for this client
- _actor.message(ConnectionMessages.OPEN(clientID, _protocolVersion.toString(), true, true));
- }
-
- if (_clientProperties.getString(ClientProperties.version.toString()) != null)
- {
- _clientVersion = new AMQShortString(_clientProperties.getString(ClientProperties.version.toString()));
+ _actor.message(ConnectionMessages.OPEN(clientID, _protocolVersion.toString(), _clientVersion, true, true, true));
}
}
- _sessionIdentifier = new ProtocolSessionIdentifier(this);
}
private void setProtocolVersion(ProtocolVersion pv)
@@ -982,11 +971,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
return getMethodRegistry();
}
- public Object getClientIdentifier()
- {
- return _network.getRemoteAddress();
- }
-
public VirtualHost getVirtualHost()
{
return _virtualHost;
@@ -1155,14 +1139,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
return _lastReceivedTime;
}
- public ProtocolSessionIdentifier getSessionIdentifier()
- {
- return _sessionIdentifier;
- }
-
public String getClientVersion()
{
- return (_clientVersion == null) ? null : _clientVersion.toString();
+ return _clientVersion;
}
public Boolean isIncoming()
@@ -1357,6 +1336,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
(Throwable) null));
}
+ public boolean isClosed()
+ {
+ return _closed;
+ }
+
public List<AMQSessionModel> getSessionModels()
{
List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
@@ -1457,30 +1441,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
return getAuthorizedPrincipal().getName();
}
- private static class ByteBufferOutputStream extends OutputStream
- {
-
-
- private final ByteBuffer _buf;
-
- public ByteBufferOutputStream(ByteBuffer buf)
- {
- _buf = buf;
- }
-
- @Override
- public void write(int b) throws IOException
- {
- _buf.put((byte) b);
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException
- {
- _buf.put(b, off, len);
- }
- }
-
public final class WriteDeliverMethod
implements ClientDeliveryMethod
{
@@ -1501,158 +1461,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
}
- private static class BytesDataOutput implements DataOutput
+ public Object getReference()
{
- int _pos = 0;
- byte[] _buf;
-
- public BytesDataOutput(byte[] buf)
- {
- _buf = buf;
- }
-
- public void setBuffer(byte[] buf)
- {
- _buf = buf;
- _pos = 0;
- }
-
- public void reset()
- {
- _pos = 0;
- }
-
- public int length()
- {
- return _pos;
- }
-
- public void write(int b)
- {
- _buf[_pos++] = (byte) b;
- }
-
- public void write(byte[] b)
- {
- System.arraycopy(b, 0, _buf, _pos, b.length);
- _pos+=b.length;
- }
-
-
- public void write(byte[] b, int off, int len)
- {
- System.arraycopy(b, off, _buf, _pos, len);
- _pos+=len;
-
- }
-
- public void writeBoolean(boolean v)
- {
- _buf[_pos++] = v ? (byte) 1 : (byte) 0;
- }
-
- public void writeByte(int v)
- {
- _buf[_pos++] = (byte) v;
- }
-
- public void writeShort(int v)
- {
- _buf[_pos++] = (byte) (v >>> 8);
- _buf[_pos++] = (byte) v;
- }
-
- public void writeChar(int v)
- {
- _buf[_pos++] = (byte) (v >>> 8);
- _buf[_pos++] = (byte) v;
- }
-
- public void writeInt(int v)
- {
- _buf[_pos++] = (byte) (v >>> 24);
- _buf[_pos++] = (byte) (v >>> 16);
- _buf[_pos++] = (byte) (v >>> 8);
- _buf[_pos++] = (byte) v;
- }
-
- public void writeLong(long v)
- {
- _buf[_pos++] = (byte) (v >>> 56);
- _buf[_pos++] = (byte) (v >>> 48);
- _buf[_pos++] = (byte) (v >>> 40);
- _buf[_pos++] = (byte) (v >>> 32);
- _buf[_pos++] = (byte) (v >>> 24);
- _buf[_pos++] = (byte) (v >>> 16);
- _buf[_pos++] = (byte) (v >>> 8);
- _buf[_pos++] = (byte)v;
- }
-
- public void writeFloat(float v)
- {
- writeInt(Float.floatToIntBits(v));
- }
-
- public void writeDouble(double v)
- {
- writeLong(Double.doubleToLongBits(v));
- }
-
- public void writeBytes(String s)
- {
- int len = s.length();
- for (int i = 0 ; i < len ; i++)
- {
- _buf[_pos++] = ((byte)s.charAt(i));
- }
- }
-
- public void writeChars(String s)
- {
- int len = s.length();
- for (int i = 0 ; i < len ; i++)
- {
- int v = s.charAt(i);
- _buf[_pos++] = (byte) (v >>> 8);
- _buf[_pos++] = (byte) v;
- }
- }
-
- public void writeUTF(String s)
- {
- int strlen = s.length();
-
- int pos = _pos;
- _pos+=2;
-
-
- for (int i = 0; i < strlen; i++)
- {
- int c = s.charAt(i);
- if ((c >= 0x0001) && (c <= 0x007F))
- {
- c = s.charAt(i);
- _buf[_pos++] = (byte) c;
-
- }
- else if (c > 0x07FF)
- {
- _buf[_pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
- _buf[_pos++] = (byte) (0x80 | ((c >> 6) & 0x3F));
- _buf[_pos++] = (byte) (0x80 | (c & 0x3F));
- }
- else
- {
- _buf[_pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
- _buf[_pos++] = (byte) (0x80 | (c & 0x3F));
- }
- }
-
- int len = _pos - (pos + 2);
-
- _buf[pos++] = (byte) (len >>> 8);
- _buf[pos] = (byte) len;
- }
-
+ return _reference;
}
}