diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java | 184 |
1 files changed, 14 insertions, 170 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 8911d4ee3e..d380402da7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -20,18 +20,9 @@ */ package org.apache.qpid.client.protocol; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import org.apache.qpid.util.BytesDataOutput; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; @@ -66,8 +57,16 @@ import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; /** * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the @@ -300,7 +299,6 @@ public class AMQProtocolHandler implements ProtocolEngine { if (_failoverState == FailoverState.NOT_STARTED) { - // if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException))) if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException) { _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); @@ -314,7 +312,7 @@ public class AMQProtocolHandler implements ProtocolEngine } // FIXME Need to correctly handle other exceptions. Things like ... - // if (cause instanceof AMQChannelClosedException) + // AMQChannelClosedException // which will cause the JMSSession to end due to a channel close and so that Session needs // to be removed from the map so we can correctly still call close without an exception when trying to close // the server closed session. See also CloseChannelMethodHandler as the sessionClose is never called on exception @@ -865,160 +863,6 @@ public class AMQProtocolHandler implements ProtocolEngine return _suggestedProtocolVersion; } - private static class BytesDataOutput implements DataOutput - { - int _pos = 0; - byte[] _buf; - - public BytesDataOutput(byte[] buf) - { - _buf = buf; - } - - public void setBuffer(byte[] buf) - { - _buf = buf; - _pos = 0; - } - - public void reset() - { - _pos = 0; - } - - public int length() - { - return _pos; - } - - public void write(int b) - { - _buf[_pos++] = (byte) b; - } - - public void write(byte[] b) - { - System.arraycopy(b, 0, _buf, _pos, b.length); - _pos+=b.length; - } - - - public void write(byte[] b, int off, int len) - { - System.arraycopy(b, off, _buf, _pos, len); - _pos+=len; - - } - - public void writeBoolean(boolean v) - { - _buf[_pos++] = v ? (byte) 1 : (byte) 0; - } - - public void writeByte(int v) - { - _buf[_pos++] = (byte) v; - } - - public void writeShort(int v) - { - _buf[_pos++] = (byte) (v >>> 8); - _buf[_pos++] = (byte) v; - } - - public void writeChar(int v) - { - _buf[_pos++] = (byte) (v >>> 8); - _buf[_pos++] = (byte) v; - } - - public void writeInt(int v) - { - _buf[_pos++] = (byte) (v >>> 24); - _buf[_pos++] = (byte) (v >>> 16); - _buf[_pos++] = (byte) (v >>> 8); - _buf[_pos++] = (byte) v; - } - - public void writeLong(long v) - { - _buf[_pos++] = (byte) (v >>> 56); - _buf[_pos++] = (byte) (v >>> 48); - _buf[_pos++] = (byte) (v >>> 40); - _buf[_pos++] = (byte) (v >>> 32); - _buf[_pos++] = (byte) (v >>> 24); - _buf[_pos++] = (byte) (v >>> 16); - _buf[_pos++] = (byte) (v >>> 8); - _buf[_pos++] = (byte)v; - } - - public void writeFloat(float v) - { - writeInt(Float.floatToIntBits(v)); - } - - public void writeDouble(double v) - { - writeLong(Double.doubleToLongBits(v)); - } - - public void writeBytes(String s) - { - int len = s.length(); - for (int i = 0 ; i < len ; i++) - { - _buf[_pos++] = ((byte)s.charAt(i)); - } - } - - public void writeChars(String s) - { - int len = s.length(); - for (int i = 0 ; i < len ; i++) - { - int v = s.charAt(i); - _buf[_pos++] = (byte) (v >>> 8); - _buf[_pos++] = (byte) v; - } - } - - public void writeUTF(String s) - { - int strlen = s.length(); - - int pos = _pos; - _pos+=2; - - - for (int i = 0; i < strlen; i++) - { - int c = s.charAt(i); - if ((c >= 0x0001) && (c <= 0x007F)) - { - c = s.charAt(i); - _buf[_pos++] = (byte) c; - - } - else if (c > 0x07FF) - { - _buf[_pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); - _buf[_pos++] = (byte) (0x80 | ((c >> 6) & 0x3F)); - _buf[_pos++] = (byte) (0x80 | (c & 0x3F)); - } - else - { - _buf[_pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); - _buf[_pos++] = (byte) (0x80 | (c & 0x3F)); - } - } - - int len = _pos - (pos + 2); - - _buf[pos++] = (byte) (len >>> 8); - _buf[pos] = (byte) len; - } - - } } |