summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java53
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java23
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java14
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java6
4 files changed, 35 insertions, 61 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 99366101d1..be75fc150f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -20,20 +20,22 @@
*/
package org.apache.qpid.client.protocol;
-import org.apache.mina.filter.ReadThrottleFilterBuilder;
-import org.apache.mina.filter.SSLFilter;
-import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+
import org.apache.mina.filter.codec.ProtocolCodecException;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.SSLConfiguration;
-import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverHandler;
import org.apache.qpid.client.failover.FailoverState;
@@ -42,32 +44,29 @@ import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.Event;
import org.apache.qpid.pool.Job;
-import org.apache.qpid.pool.PoolingFilter;
-import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.network.io.IoTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-
/**
* AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the
* network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the
@@ -107,9 +106,6 @@ import java.util.concurrent.CountDownLatch;
*
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Create the filter chain to filter this handlers events.
- * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}.
- *
* <tr><td> Maintain fail-over state.
* <tr><td>
* </table>
@@ -191,9 +187,8 @@ public class AMQProtocolHandler implements ProtocolEngine
_protocolSession = new AMQProtocolSession(this, _connection);
_stateManager = new AMQStateManager(_protocolSession);
_codecFactory = new AMQCodecFactory(false, _protocolSession);
- ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
- _readJob = new Job(threadModel.getAsynchronousReadFilter(), PoolingFilter.MAX_JOB_EVENTS, true);
- _writeJob = new Job(threadModel.getAsynchronousWriteFilter(), PoolingFilter.MAX_JOB_EVENTS, false);
+ _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
+ _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
_poolReference.acquireExecutorService();
_failoverHandler = new FailoverHandler(this);
}
@@ -436,7 +431,7 @@ public class AMQProtocolHandler implements ProtocolEngine
_readBytes += msg.remaining();
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Event(new Runnable()
+ Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
{
@Override
public void run()
@@ -495,7 +490,7 @@ public class AMQProtocolHandler implements ProtocolEngine
}
}
}
- }));
+ });
}
catch (Exception e)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
index 77c9c40e82..1ac8f62e32 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
@@ -20,37 +20,20 @@
*/
package org.apache.qpid.client.transport;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.SimpleByteBufferAllocator;
-import org.apache.mina.filter.SSLFilter;
-import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
-import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-
import org.apache.qpid.client.SSLConfiguration;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.network.mina.MINANetworkDriver;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import sun.net.InetAddressCachePolicy;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.security.GeneralSecurityException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.net.ssl.SSLEngine;
-
public class SocketTransportConnection implements ITransportConnection
{
private static final Logger _logger = LoggerFactory.getLogger(SocketTransportConnection.class);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 45194750dc..a4f8bb0166 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -20,6 +20,12 @@
*/
package org.apache.qpid.client.transport;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoServiceConfig;
@@ -30,20 +36,12 @@ import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.ReadWriteThreadModel;
-import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.thread.QpidThreadExecutor;
import org.apache.qpid.transport.network.mina.MINANetworkDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.net.Socket;
-
/**
* The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying
* connector, which currently always uses TCP/IP sockets. It creates the "protocol handler" which deals with MINA
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
index 3de6f9b9ea..504d475740 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
@@ -20,20 +20,18 @@
*/
package org.apache.qpid.client.transport;
+import java.io.IOException;
+
import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.transport.vmpipe.QpidVmPipeConnector;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.mina.transport.vmpipe.VmPipeConnector;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.transport.network.mina.MINANetworkDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-
public class VmPipeTransportConnection implements ITransportConnection
{
private static final Logger _logger = LoggerFactory.getLogger(VmPipeTransportConnection.class);