diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java | 280 |
1 files changed, 248 insertions, 32 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index b960ce8608..4e5088808a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol; +import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -36,36 +37,17 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicBoolean; - import javax.management.JMException; import javax.security.auth.Subject; import javax.security.sasl.SaslServer; - import org.apache.log4j.Logger; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.codec.AMQDecoder; import org.apache.qpid.common.ClientProperties; -import org.apache.qpid.framing.AMQBody; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQProtocolHeaderException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.HeartbeatBody; -import org.apache.qpid.framing.MethodDispatcher; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.ProtocolInitiation; -import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; @@ -87,11 +69,15 @@ import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.subscription.ClientDeliveryMethod; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.subscription.SubscriptionImpl; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.transport.Sender; @@ -139,7 +125,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr /* AMQP Version for this session */ private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); - + private MethodRegistry _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion); private FieldTable _clientProperties; private final List<Task> _taskList = new CopyOnWriteArrayList<Task>(); @@ -173,6 +159,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr private NetworkConnection _network; private Sender<ByteBuffer> _sender; + private volatile boolean _deferFlush; + private long _lastReceivedTime; + public ManagedObject getManagedObject() { return _managedObject; @@ -240,14 +229,29 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr return _closing.get(); } + public synchronized void flushBatched() + { + _sender.flush(); + } + + + public ClientDeliveryMethod createDeliveryMethod(int channelId) + { + return new WriteDeliverMethod(channelId); + } + public void received(final ByteBuffer msg) { - _lastIoTime = System.currentTimeMillis(); + final long arrivalTime = System.currentTimeMillis(); + _lastReceivedTime = arrivalTime; + _lastIoTime = arrivalTime; try { final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); - for (AMQDataBlock dataBlock : dataBlocks) + final int len = dataBlocks.size(); + for (int i = 0; i < len; i++) { + AMQDataBlock dataBlock = dataBlocks.get(i); try { dataBlockReceived(dataBlock); @@ -347,7 +351,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr } } - private void protocolInitiationReceived(ProtocolInitiation pi) + private synchronized void protocolInitiationReceived(ProtocolInitiation pi) { // this ensures the codec never checks for a PI message again (_codecFactory.getDecoder()).setExpectProtocolInitiation(false); @@ -524,12 +528,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr */ public synchronized void writeFrame(AMQDataBlock frame) { - _lastSent = frame; + final ByteBuffer buf = asByteBuffer(frame); - _lastIoTime = System.currentTimeMillis(); _writtenBytes += buf.remaining(); _sender.send(buf); - _sender.flush(); + _lastIoTime = System.currentTimeMillis(); + if(!_deferFlush) + { + _sender.flush(); + } } public AMQShortString getContextKey() @@ -918,7 +925,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr private void setProtocolVersion(ProtocolVersion pv) { _protocolVersion = pv; - + _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion); _protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this); _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(_stateManager, _protocolVersion); } @@ -1023,7 +1030,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr public MethodRegistry getMethodRegistry() { - return MethodRegistry.getMethodRegistry(getProtocolVersion()); + return _methodRegistry; } public MethodDispatcher getMethodDispatcher() @@ -1052,7 +1059,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr // Nothing } - public void writerIdle() + public synchronized void writerIdle() { _sender.send(asByteBuffer(HeartbeatBody.FRAME)); } @@ -1109,6 +1116,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr return _lastIoTime; } + public long getLastReceivedTime() + { + return _lastReceivedTime; + } + public ProtocolSessionIdentifier getSessionIdentifier() { return _sessionIdentifier; @@ -1395,16 +1407,220 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr _statisticsEnabled = enabled; } - @Override public boolean isSessionNameUnique(byte[] name) { // 0-8/0-9/0-9-1 sessions don't have names return true; } - @Override + public void setDeferFlush(boolean deferFlush) + { + _deferFlush = deferFlush; + } + + + public String getUserName() { return getAuthorizedPrincipal().getName(); } + + private static class ByteBufferOutputStream extends OutputStream + { + + + private final ByteBuffer _buf; + + public ByteBufferOutputStream(ByteBuffer buf) + { + _buf = buf; + } + + @Override + public void write(int b) throws IOException + { + _buf.put((byte) b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException + { + _buf.put(b, off, len); + } + } + + public final class WriteDeliverMethod + implements ClientDeliveryMethod + { + private final int _channelId; + + public WriteDeliverMethod(int channelId) + { + _channelId = channelId; + } + + public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag) + throws AMQException + { + registerMessageDelivered(entry.getMessage().getSize()); + _protocolOutputConverter.writeDeliver(entry, _channelId, deliveryTag, ((SubscriptionImpl)sub).getConsumerTag()); + entry.incrementDeliveryCount(); + } + + } + + private static class BytesDataOutput implements DataOutput + { + int _pos = 0; + byte[] _buf; + + public BytesDataOutput(byte[] buf) + { + _buf = buf; + } + + public void setBuffer(byte[] buf) + { + _buf = buf; + _pos = 0; + } + + public void reset() + { + _pos = 0; + } + + public int length() + { + return _pos; + } + + public void write(int b) + { + _buf[_pos++] = (byte) b; + } + + public void write(byte[] b) + { + System.arraycopy(b, 0, _buf, _pos, b.length); + _pos+=b.length; + } + + + public void write(byte[] b, int off, int len) + { + System.arraycopy(b, off, _buf, _pos, len); + _pos+=len; + + } + + public void writeBoolean(boolean v) + { + _buf[_pos++] = v ? (byte) 1 : (byte) 0; + } + + public void writeByte(int v) + { + _buf[_pos++] = (byte) v; + } + + public void writeShort(int v) + { + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte) v; + } + + public void writeChar(int v) + { + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte) v; + } + + public void writeInt(int v) + { + _buf[_pos++] = (byte) (v >>> 24); + _buf[_pos++] = (byte) (v >>> 16); + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte) v; + } + + public void writeLong(long v) + { + _buf[_pos++] = (byte) (v >>> 56); + _buf[_pos++] = (byte) (v >>> 48); + _buf[_pos++] = (byte) (v >>> 40); + _buf[_pos++] = (byte) (v >>> 32); + _buf[_pos++] = (byte) (v >>> 24); + _buf[_pos++] = (byte) (v >>> 16); + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte)v; + } + + public void writeFloat(float v) + { + writeInt(Float.floatToIntBits(v)); + } + + public void writeDouble(double v) + { + writeLong(Double.doubleToLongBits(v)); + } + + public void writeBytes(String s) + { + int len = s.length(); + for (int i = 0 ; i < len ; i++) + { + _buf[_pos++] = ((byte)s.charAt(i)); + } + } + + public void writeChars(String s) + { + int len = s.length(); + for (int i = 0 ; i < len ; i++) + { + int v = s.charAt(i); + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte) v; + } + } + + public void writeUTF(String s) + { + int strlen = s.length(); + + int pos = _pos; + _pos+=2; + + + for (int i = 0; i < strlen; i++) + { + int c = s.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) + { + c = s.charAt(i); + _buf[_pos++] = (byte) c; + + } + else if (c > 0x07FF) + { + _buf[_pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); + _buf[_pos++] = (byte) (0x80 | ((c >> 6) & 0x3F)); + _buf[_pos++] = (byte) (0x80 | (c & 0x3F)); + } + else + { + _buf[_pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); + _buf[_pos++] = (byte) (0x80 | (c & 0x3F)); + } + } + + int len = _pos - (pos + 2); + + _buf[pos++] = (byte) (len >>> 8); + _buf[pos] = (byte) len; + } + + } } |