diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/protocol')
4 files changed, 96 insertions, 201 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 8911d4ee3e..d380402da7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -20,18 +20,9 @@ */ package org.apache.qpid.client.protocol; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import org.apache.qpid.util.BytesDataOutput; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; @@ -66,8 +57,16 @@ import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; /** * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the @@ -300,7 +299,6 @@ public class AMQProtocolHandler implements ProtocolEngine { if (_failoverState == FailoverState.NOT_STARTED) { - // if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException))) if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException) { _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); @@ -314,7 +312,7 @@ public class AMQProtocolHandler implements ProtocolEngine } // FIXME Need to correctly handle other exceptions. Things like ... - // if (cause instanceof AMQChannelClosedException) + // AMQChannelClosedException // which will cause the JMSSession to end due to a channel close and so that Session needs // to be removed from the map so we can correctly still call close without an exception when trying to close // the server closed session. See also CloseChannelMethodHandler as the sessionClose is never called on exception @@ -865,160 +863,6 @@ public class AMQProtocolHandler implements ProtocolEngine return _suggestedProtocolVersion; } - private static class BytesDataOutput implements DataOutput - { - int _pos = 0; - byte[] _buf; - - public BytesDataOutput(byte[] buf) - { - _buf = buf; - } - - public void setBuffer(byte[] buf) - { - _buf = buf; - _pos = 0; - } - - public void reset() - { - _pos = 0; - } - - public int length() - { - return _pos; - } - - public void write(int b) - { - _buf[_pos++] = (byte) b; - } - - public void write(byte[] b) - { - System.arraycopy(b, 0, _buf, _pos, b.length); - _pos+=b.length; - } - - - public void write(byte[] b, int off, int len) - { - System.arraycopy(b, off, _buf, _pos, len); - _pos+=len; - - } - - public void writeBoolean(boolean v) - { - _buf[_pos++] = v ? (byte) 1 : (byte) 0; - } - - public void writeByte(int v) - { - _buf[_pos++] = (byte) v; - } - - public void writeShort(int v) - { - _buf[_pos++] = (byte) (v >>> 8); - _buf[_pos++] = (byte) v; - } - - public void writeChar(int v) - { - _buf[_pos++] = (byte) (v >>> 8); - _buf[_pos++] = (byte) v; - } - - public void writeInt(int v) - { - _buf[_pos++] = (byte) (v >>> 24); - _buf[_pos++] = (byte) (v >>> 16); - _buf[_pos++] = (byte) (v >>> 8); - _buf[_pos++] = (byte) v; - } - - public void writeLong(long v) - { - _buf[_pos++] = (byte) (v >>> 56); - _buf[_pos++] = (byte) (v >>> 48); - _buf[_pos++] = (byte) (v >>> 40); - _buf[_pos++] = (byte) (v >>> 32); - _buf[_pos++] = (byte) (v >>> 24); - _buf[_pos++] = (byte) (v >>> 16); - _buf[_pos++] = (byte) (v >>> 8); - _buf[_pos++] = (byte)v; - } - - public void writeFloat(float v) - { - writeInt(Float.floatToIntBits(v)); - } - - public void writeDouble(double v) - { - writeLong(Double.doubleToLongBits(v)); - } - - public void writeBytes(String s) - { - int len = s.length(); - for (int i = 0 ; i < len ; i++) - { - _buf[_pos++] = ((byte)s.charAt(i)); - } - } - - public void writeChars(String s) - { - int len = s.length(); - for (int i = 0 ; i < len ; i++) - { - int v = s.charAt(i); - _buf[_pos++] = (byte) (v >>> 8); - _buf[_pos++] = (byte) v; - } - } - - public void writeUTF(String s) - { - int strlen = s.length(); - - int pos = _pos; - _pos+=2; - - - for (int i = 0; i < strlen; i++) - { - int c = s.charAt(i); - if ((c >= 0x0001) && (c <= 0x007F)) - { - c = s.charAt(i); - _buf[_pos++] = (byte) c; - - } - else if (c > 0x07FF) - { - _buf[_pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); - _buf[_pos++] = (byte) (0x80 | ((c >> 6) & 0x3F)); - _buf[_pos++] = (byte) (0x80 | (c & 0x3F)); - } - else - { - _buf[_pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); - _buf[_pos++] = (byte) (0x80 | (c & 0x3F)); - } - } - - int len = _pos - (pos + 2); - - _buf[pos++] = (byte) (len >>> 8); - _buf[pos] = (byte) len; - } - - } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index b7253e6e9c..af57fd98fc 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -20,11 +20,8 @@ */ package org.apache.qpid.client.protocol; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import javax.jms.JMSException; -import javax.security.sasl.SaslClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; @@ -48,8 +45,11 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import javax.jms.JMSException; +import javax.security.sasl.SaslClient; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** * Wrapper for protocol session that provides type-safe access to session attributes. <p/> The underlying protocol @@ -73,16 +73,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession protected static final String SASL_CLIENT = "SASLClient"; - /** - * The handler from which this session was created and which is used to handle protocol events. We send failover - * events to the handler. - */ - protected final AMQProtocolHandler _protocolHandler; + private final AMQProtocolHandler _protocolHandler; - /** Maps from the channel id to the AMQSession that it represents. */ - protected ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>(); + private ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>(); - protected ConcurrentMap _closingChannels = new ConcurrentHashMap(); + private ConcurrentMap _closingChannels = new ConcurrentHashMap(); /** * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives @@ -91,20 +86,17 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession private final ConcurrentMap<Integer, UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer, UnprocessedMessage>(); private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16]; - /** Counter to ensure unique queue names */ - protected int _queueId = 1; - protected final Object _queueIdLock = new Object(); + private int _queueId = 1; + private final Object _queueIdLock = new Object(); private ProtocolVersion _protocolVersion; -// private VersionSpecificRegistry _registry = -// MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion()); private MethodRegistry _methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.getLatestSupportedVersion()); private MethodDispatcher _methodDispatcher; - protected final AMQConnection _connection; + private final AMQConnection _connection; private ConnectionTuneParameters _connectionTuneParameters; @@ -116,7 +108,10 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { _protocolHandler = protocolHandler; _protocolVersion = connection.getProtocolVersion(); - _logger.info("Using ProtocolVersion for Session:" + _protocolVersion); + if (_logger.isDebugEnabled()) + { + _logger.debug("Using ProtocolVersion for Session:" + _protocolVersion); + } _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), this); _connection = connection; @@ -223,7 +218,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } msg.setContentHeader(contentHeader); - if (contentHeader.bodySize == 0) + if (contentHeader.getBodySize() == 0) { deliverMessageToAMQSession(channelId, msg); } @@ -310,7 +305,10 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession */ public void closeSession(AMQSession session) { - _logger.debug("closeSession called on protocol session for session " + session.getChannelId()); + if (_logger.isDebugEnabled()) + { + _logger.debug("closeSession called on protocol session for session " + session.getChannelId()); + } final int channelId = session.getChannelId(); if (channelId <= 0) { @@ -401,7 +399,10 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public void setProtocolVersion(final ProtocolVersion pv) { - _logger.info("Setting ProtocolVersion to :" + pv); + if (_logger.isDebugEnabled()) + { + _logger.debug("Setting ProtocolVersion to :" + pv); + } _protocolVersion = pv; _methodRegistry = MethodRegistry.getMethodRegistry(pv); _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this); @@ -470,4 +471,55 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { return "AMQProtocolSession[" + _connection + ']'; } + + /** + * The handler from which this session was created and which is used to handle protocol events. We send failover + * events to the handler. + */ + protected AMQProtocolHandler getProtocolHandler() + { + return _protocolHandler; + } + + /** Maps from the channel id to the AMQSession that it represents. */ + protected ConcurrentMap<Integer, AMQSession> getChannelId2SessionMap() + { + return _channelId2SessionMap; + } + + protected void setChannelId2SessionMap(ConcurrentMap<Integer, AMQSession> channelId2SessionMap) + { + _channelId2SessionMap = channelId2SessionMap; + } + + protected ConcurrentMap getClosingChannels() + { + return _closingChannels; + } + + protected void setClosingChannels(ConcurrentMap closingChannels) + { + _closingChannels = closingChannels; + } + + /** Counter to ensure unique queue names */ + protected int getQueueId() + { + return _queueId; + } + + protected void setQueueId(int queueId) + { + _queueId = queueId; + } + + protected Object getQueueIdLock() + { + return _queueIdLock; + } + + protected AMQConnection getConnection() + { + return _connection; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java index 2bc609ebf2..b865c51cb7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java @@ -20,12 +20,7 @@ */ package org.apache.qpid.client.protocol; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - import org.apache.qpid.AMQException; -import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.util.BlockingWaiter; import org.apache.qpid.framing.AMQMethodBody; @@ -68,7 +63,7 @@ public abstract class BlockingMethodFrameListener extends BlockingWaiter<AMQMeth { /** Holds the channel id for the channel upon which this listener is waiting for a response. */ - protected int _channelId; + private int _channelId; /** * Creates a new method listener, that filters incoming method to just those that match the specified channel id. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java index d44faeab04..d387a8ba93 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java @@ -24,6 +24,10 @@ class HeartbeatDiagnostics { private static final Diagnostics _impl = init(); + private HeartbeatDiagnostics() + { + } + private static Diagnostics init() { return Boolean.getBoolean("amqj.heartbeat.diagnostics") ? new On() : new Off(); |