summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/protocol
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/protocol')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java184
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java102
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java7
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java4
4 files changed, 96 insertions, 201 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 8911d4ee3e..d380402da7 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -20,18 +20,9 @@
*/
package org.apache.qpid.client.protocol;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+import org.apache.qpid.util.BytesDataOutput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
@@ -66,8 +57,16 @@ import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
/**
* AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the
@@ -300,7 +299,6 @@ public class AMQProtocolHandler implements ProtocolEngine
{
if (_failoverState == FailoverState.NOT_STARTED)
{
- // if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException)))
if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException)
{
_logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
@@ -314,7 +312,7 @@ public class AMQProtocolHandler implements ProtocolEngine
}
// FIXME Need to correctly handle other exceptions. Things like ...
- // if (cause instanceof AMQChannelClosedException)
+ // AMQChannelClosedException
// which will cause the JMSSession to end due to a channel close and so that Session needs
// to be removed from the map so we can correctly still call close without an exception when trying to close
// the server closed session. See also CloseChannelMethodHandler as the sessionClose is never called on exception
@@ -865,160 +863,6 @@ public class AMQProtocolHandler implements ProtocolEngine
return _suggestedProtocolVersion;
}
- private static class BytesDataOutput implements DataOutput
- {
- int _pos = 0;
- byte[] _buf;
-
- public BytesDataOutput(byte[] buf)
- {
- _buf = buf;
- }
-
- public void setBuffer(byte[] buf)
- {
- _buf = buf;
- _pos = 0;
- }
-
- public void reset()
- {
- _pos = 0;
- }
-
- public int length()
- {
- return _pos;
- }
-
- public void write(int b)
- {
- _buf[_pos++] = (byte) b;
- }
-
- public void write(byte[] b)
- {
- System.arraycopy(b, 0, _buf, _pos, b.length);
- _pos+=b.length;
- }
-
-
- public void write(byte[] b, int off, int len)
- {
- System.arraycopy(b, off, _buf, _pos, len);
- _pos+=len;
-
- }
-
- public void writeBoolean(boolean v)
- {
- _buf[_pos++] = v ? (byte) 1 : (byte) 0;
- }
-
- public void writeByte(int v)
- {
- _buf[_pos++] = (byte) v;
- }
-
- public void writeShort(int v)
- {
- _buf[_pos++] = (byte) (v >>> 8);
- _buf[_pos++] = (byte) v;
- }
-
- public void writeChar(int v)
- {
- _buf[_pos++] = (byte) (v >>> 8);
- _buf[_pos++] = (byte) v;
- }
-
- public void writeInt(int v)
- {
- _buf[_pos++] = (byte) (v >>> 24);
- _buf[_pos++] = (byte) (v >>> 16);
- _buf[_pos++] = (byte) (v >>> 8);
- _buf[_pos++] = (byte) v;
- }
-
- public void writeLong(long v)
- {
- _buf[_pos++] = (byte) (v >>> 56);
- _buf[_pos++] = (byte) (v >>> 48);
- _buf[_pos++] = (byte) (v >>> 40);
- _buf[_pos++] = (byte) (v >>> 32);
- _buf[_pos++] = (byte) (v >>> 24);
- _buf[_pos++] = (byte) (v >>> 16);
- _buf[_pos++] = (byte) (v >>> 8);
- _buf[_pos++] = (byte)v;
- }
-
- public void writeFloat(float v)
- {
- writeInt(Float.floatToIntBits(v));
- }
-
- public void writeDouble(double v)
- {
- writeLong(Double.doubleToLongBits(v));
- }
-
- public void writeBytes(String s)
- {
- int len = s.length();
- for (int i = 0 ; i < len ; i++)
- {
- _buf[_pos++] = ((byte)s.charAt(i));
- }
- }
-
- public void writeChars(String s)
- {
- int len = s.length();
- for (int i = 0 ; i < len ; i++)
- {
- int v = s.charAt(i);
- _buf[_pos++] = (byte) (v >>> 8);
- _buf[_pos++] = (byte) v;
- }
- }
-
- public void writeUTF(String s)
- {
- int strlen = s.length();
-
- int pos = _pos;
- _pos+=2;
-
-
- for (int i = 0; i < strlen; i++)
- {
- int c = s.charAt(i);
- if ((c >= 0x0001) && (c <= 0x007F))
- {
- c = s.charAt(i);
- _buf[_pos++] = (byte) c;
-
- }
- else if (c > 0x07FF)
- {
- _buf[_pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
- _buf[_pos++] = (byte) (0x80 | ((c >> 6) & 0x3F));
- _buf[_pos++] = (byte) (0x80 | (c & 0x3F));
- }
- else
- {
- _buf[_pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
- _buf[_pos++] = (byte) (0x80 | (c & 0x3F));
- }
- }
-
- int len = _pos - (pos + 2);
-
- _buf[pos++] = (byte) (len >>> 8);
- _buf[pos] = (byte) len;
- }
-
- }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index b7253e6e9c..af57fd98fc 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -20,11 +20,8 @@
*/
package org.apache.qpid.client.protocol;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import javax.jms.JMSException;
-import javax.security.sasl.SaslClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
@@ -48,8 +45,11 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.security.sasl.SaslClient;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
/**
* Wrapper for protocol session that provides type-safe access to session attributes. <p/> The underlying protocol
@@ -73,16 +73,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
protected static final String SASL_CLIENT = "SASLClient";
- /**
- * The handler from which this session was created and which is used to handle protocol events. We send failover
- * events to the handler.
- */
- protected final AMQProtocolHandler _protocolHandler;
+ private final AMQProtocolHandler _protocolHandler;
- /** Maps from the channel id to the AMQSession that it represents. */
- protected ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>();
+ private ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>();
- protected ConcurrentMap _closingChannels = new ConcurrentHashMap();
+ private ConcurrentMap _closingChannels = new ConcurrentHashMap();
/**
* Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives
@@ -91,20 +86,17 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
private final ConcurrentMap<Integer, UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer, UnprocessedMessage>();
private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16];
- /** Counter to ensure unique queue names */
- protected int _queueId = 1;
- protected final Object _queueIdLock = new Object();
+ private int _queueId = 1;
+ private final Object _queueIdLock = new Object();
private ProtocolVersion _protocolVersion;
-// private VersionSpecificRegistry _registry =
-// MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion());
private MethodRegistry _methodRegistry =
MethodRegistry.getMethodRegistry(ProtocolVersion.getLatestSupportedVersion());
private MethodDispatcher _methodDispatcher;
- protected final AMQConnection _connection;
+ private final AMQConnection _connection;
private ConnectionTuneParameters _connectionTuneParameters;
@@ -116,7 +108,10 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
_protocolHandler = protocolHandler;
_protocolVersion = connection.getProtocolVersion();
- _logger.info("Using ProtocolVersion for Session:" + _protocolVersion);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Using ProtocolVersion for Session:" + _protocolVersion);
+ }
_methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
this);
_connection = connection;
@@ -223,7 +218,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
}
msg.setContentHeader(contentHeader);
- if (contentHeader.bodySize == 0)
+ if (contentHeader.getBodySize() == 0)
{
deliverMessageToAMQSession(channelId, msg);
}
@@ -310,7 +305,10 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
*/
public void closeSession(AMQSession session)
{
- _logger.debug("closeSession called on protocol session for session " + session.getChannelId());
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("closeSession called on protocol session for session " + session.getChannelId());
+ }
final int channelId = session.getChannelId();
if (channelId <= 0)
{
@@ -401,7 +399,10 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public void setProtocolVersion(final ProtocolVersion pv)
{
- _logger.info("Setting ProtocolVersion to :" + pv);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Setting ProtocolVersion to :" + pv);
+ }
_protocolVersion = pv;
_methodRegistry = MethodRegistry.getMethodRegistry(pv);
_methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this);
@@ -470,4 +471,55 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
return "AMQProtocolSession[" + _connection + ']';
}
+
+ /**
+ * The handler from which this session was created and which is used to handle protocol events. We send failover
+ * events to the handler.
+ */
+ protected AMQProtocolHandler getProtocolHandler()
+ {
+ return _protocolHandler;
+ }
+
+ /** Maps from the channel id to the AMQSession that it represents. */
+ protected ConcurrentMap<Integer, AMQSession> getChannelId2SessionMap()
+ {
+ return _channelId2SessionMap;
+ }
+
+ protected void setChannelId2SessionMap(ConcurrentMap<Integer, AMQSession> channelId2SessionMap)
+ {
+ _channelId2SessionMap = channelId2SessionMap;
+ }
+
+ protected ConcurrentMap getClosingChannels()
+ {
+ return _closingChannels;
+ }
+
+ protected void setClosingChannels(ConcurrentMap closingChannels)
+ {
+ _closingChannels = closingChannels;
+ }
+
+ /** Counter to ensure unique queue names */
+ protected int getQueueId()
+ {
+ return _queueId;
+ }
+
+ protected void setQueueId(int queueId)
+ {
+ _queueId = queueId;
+ }
+
+ protected Object getQueueIdLock()
+ {
+ return _queueIdLock;
+ }
+
+ protected AMQConnection getConnection()
+ {
+ return _connection;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
index 2bc609ebf2..b865c51cb7 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
@@ -20,12 +20,7 @@
*/
package org.apache.qpid.client.protocol;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.util.BlockingWaiter;
import org.apache.qpid.framing.AMQMethodBody;
@@ -68,7 +63,7 @@ public abstract class BlockingMethodFrameListener extends BlockingWaiter<AMQMeth
{
/** Holds the channel id for the channel upon which this listener is waiting for a response. */
- protected int _channelId;
+ private int _channelId;
/**
* Creates a new method listener, that filters incoming method to just those that match the specified channel id.
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java
index d44faeab04..d387a8ba93 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java
@@ -24,6 +24,10 @@ class HeartbeatDiagnostics
{
private static final Diagnostics _impl = init();
+ private HeartbeatDiagnostics()
+ {
+ }
+
private static Diagnostics init()
{
return Boolean.getBoolean("amqj.heartbeat.diagnostics") ? new On() : new Off();