summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java280
1 files changed, 248 insertions, 32 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index b960ce8608..4e5088808a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol;
+import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
@@ -36,36 +37,17 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
-
import javax.management.JMException;
import javax.security.auth.Subject;
import javax.security.sasl.SaslServer;
-
import org.apache.log4j.Logger;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.common.ClientProperties;
-import org.apache.qpid.framing.AMQBody;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQProtocolHeaderException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.HeartbeatBody;
-import org.apache.qpid.framing.MethodDispatcher;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
@@ -87,11 +69,15 @@ import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.subscription.ClientDeliveryMethod;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionImpl;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.transport.Sender;
@@ -139,7 +125,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
/* AMQP Version for this session */
private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
-
+ private MethodRegistry _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion);
private FieldTable _clientProperties;
private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
@@ -173,6 +159,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
private NetworkConnection _network;
private Sender<ByteBuffer> _sender;
+ private volatile boolean _deferFlush;
+ private long _lastReceivedTime;
+
public ManagedObject getManagedObject()
{
return _managedObject;
@@ -240,14 +229,29 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
return _closing.get();
}
+ public synchronized void flushBatched()
+ {
+ _sender.flush();
+ }
+
+
+ public ClientDeliveryMethod createDeliveryMethod(int channelId)
+ {
+ return new WriteDeliverMethod(channelId);
+ }
+
public void received(final ByteBuffer msg)
{
- _lastIoTime = System.currentTimeMillis();
+ final long arrivalTime = System.currentTimeMillis();
+ _lastReceivedTime = arrivalTime;
+ _lastIoTime = arrivalTime;
try
{
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- for (AMQDataBlock dataBlock : dataBlocks)
+ final int len = dataBlocks.size();
+ for (int i = 0; i < len; i++)
{
+ AMQDataBlock dataBlock = dataBlocks.get(i);
try
{
dataBlockReceived(dataBlock);
@@ -347,7 +351,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
}
}
- private void protocolInitiationReceived(ProtocolInitiation pi)
+ private synchronized void protocolInitiationReceived(ProtocolInitiation pi)
{
// this ensures the codec never checks for a PI message again
(_codecFactory.getDecoder()).setExpectProtocolInitiation(false);
@@ -524,12 +528,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
*/
public synchronized void writeFrame(AMQDataBlock frame)
{
- _lastSent = frame;
+
final ByteBuffer buf = asByteBuffer(frame);
- _lastIoTime = System.currentTimeMillis();
_writtenBytes += buf.remaining();
_sender.send(buf);
- _sender.flush();
+ _lastIoTime = System.currentTimeMillis();
+ if(!_deferFlush)
+ {
+ _sender.flush();
+ }
}
public AMQShortString getContextKey()
@@ -918,7 +925,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
private void setProtocolVersion(ProtocolVersion pv)
{
_protocolVersion = pv;
-
+ _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion);
_protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this);
_dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(_stateManager, _protocolVersion);
}
@@ -1023,7 +1030,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
public MethodRegistry getMethodRegistry()
{
- return MethodRegistry.getMethodRegistry(getProtocolVersion());
+ return _methodRegistry;
}
public MethodDispatcher getMethodDispatcher()
@@ -1052,7 +1059,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
// Nothing
}
- public void writerIdle()
+ public synchronized void writerIdle()
{
_sender.send(asByteBuffer(HeartbeatBody.FRAME));
}
@@ -1109,6 +1116,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
return _lastIoTime;
}
+ public long getLastReceivedTime()
+ {
+ return _lastReceivedTime;
+ }
+
public ProtocolSessionIdentifier getSessionIdentifier()
{
return _sessionIdentifier;
@@ -1395,16 +1407,220 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
_statisticsEnabled = enabled;
}
- @Override
public boolean isSessionNameUnique(byte[] name)
{
// 0-8/0-9/0-9-1 sessions don't have names
return true;
}
- @Override
+ public void setDeferFlush(boolean deferFlush)
+ {
+ _deferFlush = deferFlush;
+ }
+
+
+
public String getUserName()
{
return getAuthorizedPrincipal().getName();
}
+
+ private static class ByteBufferOutputStream extends OutputStream
+ {
+
+
+ private final ByteBuffer _buf;
+
+ public ByteBufferOutputStream(ByteBuffer buf)
+ {
+ _buf = buf;
+ }
+
+ @Override
+ public void write(int b) throws IOException
+ {
+ _buf.put((byte) b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException
+ {
+ _buf.put(b, off, len);
+ }
+ }
+
+ public final class WriteDeliverMethod
+ implements ClientDeliveryMethod
+ {
+ private final int _channelId;
+
+ public WriteDeliverMethod(int channelId)
+ {
+ _channelId = channelId;
+ }
+
+ public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+ throws AMQException
+ {
+ registerMessageDelivered(entry.getMessage().getSize());
+ _protocolOutputConverter.writeDeliver(entry, _channelId, deliveryTag, ((SubscriptionImpl)sub).getConsumerTag());
+ entry.incrementDeliveryCount();
+ }
+
+ }
+
+ private static class BytesDataOutput implements DataOutput
+ {
+ int _pos = 0;
+ byte[] _buf;
+
+ public BytesDataOutput(byte[] buf)
+ {
+ _buf = buf;
+ }
+
+ public void setBuffer(byte[] buf)
+ {
+ _buf = buf;
+ _pos = 0;
+ }
+
+ public void reset()
+ {
+ _pos = 0;
+ }
+
+ public int length()
+ {
+ return _pos;
+ }
+
+ public void write(int b)
+ {
+ _buf[_pos++] = (byte) b;
+ }
+
+ public void write(byte[] b)
+ {
+ System.arraycopy(b, 0, _buf, _pos, b.length);
+ _pos+=b.length;
+ }
+
+
+ public void write(byte[] b, int off, int len)
+ {
+ System.arraycopy(b, off, _buf, _pos, len);
+ _pos+=len;
+
+ }
+
+ public void writeBoolean(boolean v)
+ {
+ _buf[_pos++] = v ? (byte) 1 : (byte) 0;
+ }
+
+ public void writeByte(int v)
+ {
+ _buf[_pos++] = (byte) v;
+ }
+
+ public void writeShort(int v)
+ {
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte) v;
+ }
+
+ public void writeChar(int v)
+ {
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte) v;
+ }
+
+ public void writeInt(int v)
+ {
+ _buf[_pos++] = (byte) (v >>> 24);
+ _buf[_pos++] = (byte) (v >>> 16);
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte) v;
+ }
+
+ public void writeLong(long v)
+ {
+ _buf[_pos++] = (byte) (v >>> 56);
+ _buf[_pos++] = (byte) (v >>> 48);
+ _buf[_pos++] = (byte) (v >>> 40);
+ _buf[_pos++] = (byte) (v >>> 32);
+ _buf[_pos++] = (byte) (v >>> 24);
+ _buf[_pos++] = (byte) (v >>> 16);
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte)v;
+ }
+
+ public void writeFloat(float v)
+ {
+ writeInt(Float.floatToIntBits(v));
+ }
+
+ public void writeDouble(double v)
+ {
+ writeLong(Double.doubleToLongBits(v));
+ }
+
+ public void writeBytes(String s)
+ {
+ int len = s.length();
+ for (int i = 0 ; i < len ; i++)
+ {
+ _buf[_pos++] = ((byte)s.charAt(i));
+ }
+ }
+
+ public void writeChars(String s)
+ {
+ int len = s.length();
+ for (int i = 0 ; i < len ; i++)
+ {
+ int v = s.charAt(i);
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte) v;
+ }
+ }
+
+ public void writeUTF(String s)
+ {
+ int strlen = s.length();
+
+ int pos = _pos;
+ _pos+=2;
+
+
+ for (int i = 0; i < strlen; i++)
+ {
+ int c = s.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F))
+ {
+ c = s.charAt(i);
+ _buf[_pos++] = (byte) c;
+
+ }
+ else if (c > 0x07FF)
+ {
+ _buf[_pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+ _buf[_pos++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+ _buf[_pos++] = (byte) (0x80 | (c & 0x3F));
+ }
+ else
+ {
+ _buf[_pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+ _buf[_pos++] = (byte) (0x80 | (c & 0x3F));
+ }
+ }
+
+ int len = _pos - (pos + 2);
+
+ _buf[pos++] = (byte) (len >>> 8);
+ _buf[pos] = (byte) len;
+ }
+
+ }
}