summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/Event.java98
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java120
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java7
-rw-r--r--java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java2
5 files changed, 119 insertions, 112 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
index 1fc43f3496..a2189ba248 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
@@ -49,10 +49,10 @@ public class VmPipeTransportConnection implements ITransportConnection
final VmPipeConnector ioConnector = new VmPipeConnector();
final IoServiceConfig cfg = ioConnector.getDefaultConfig();
ReferenceCountingExecutorService executorService = ReferenceCountingExecutorService.getInstance();
- PoolingFilter asyncRead = new PoolingFilter(executorService, PoolingFilter.READ_EVENTS,
+ PoolingFilter asyncRead = PoolingFilter.createAynschReadPoolingFilter(executorService,
"AsynchronousReadFilter");
cfg.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead);
- PoolingFilter asyncWrite = new PoolingFilter(executorService, PoolingFilter.WRITE_EVENTS,
+ PoolingFilter asyncWrite = PoolingFilter.createAynschWritePoolingFilter(executorService,
"AsynchronousWriteFilter");
cfg.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite);
diff --git a/java/common/src/main/java/org/apache/qpid/pool/Event.java b/java/common/src/main/java/org/apache/qpid/pool/Event.java
index 7364b9293a..43ff8f6a19 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/Event.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/Event.java
@@ -25,90 +25,66 @@ import org.apache.mina.common.IoFilter;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IdleStatus;
-/**
- * Represents an operation on IoFilter.
- */
-enum EventType
-{
- OPENED, CLOSED, READ, WRITE, WRITTEN, RECEIVED, SENT, IDLE, EXCEPTION
-}
-class Event
+abstract public class Event
{
- private static final Logger _log = Logger.getLogger(Event.class);
-
- private final EventType type;
- private final IoFilter.NextFilter nextFilter;
- private final Object data;
-
- public Event(IoFilter.NextFilter nextFilter, EventType type, Object data)
- {
- this.type = type;
- this.nextFilter = nextFilter;
- this.data = data;
- if (type == EventType.EXCEPTION)
- {
- _log.error("Exception event constructed: " + data, (Throwable) data);
- }
- }
- public Object getData()
+ public Event()
{
- return data;
}
- public IoFilter.NextFilter getNextFilter()
- {
- return nextFilter;
- }
+ abstract public void process(IoSession session);
- public EventType getType()
+ public static final class ReceivedEvent extends Event
{
- return type;
- }
+ private final Object _data;
- void process(IoSession session)
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Processing " + this);
- }
- if (type == EventType.RECEIVED)
- {
- nextFilter.messageReceived(session, data);
- //ByteBufferUtil.releaseIfPossible( data );
- }
- else if (type == EventType.SENT)
+ private final IoFilter.NextFilter _nextFilter;
+
+ public ReceivedEvent(final IoFilter.NextFilter nextFilter, final Object data)
{
- nextFilter.messageSent(session, data);
- //ByteBufferUtil.releaseIfPossible( data );
+ super();
+ _nextFilter = nextFilter;
+ _data = data;
}
- else if (type == EventType.EXCEPTION)
+
+ public void process(IoSession session)
{
- nextFilter.exceptionCaught(session, (Throwable) data);
+ _nextFilter.messageReceived(session, _data);
}
- else if (type == EventType.IDLE)
+
+ public IoFilter.NextFilter getNextFilter()
{
- nextFilter.sessionIdle(session, (IdleStatus) data);
+ return _nextFilter;
}
- else if (type == EventType.OPENED)
+ }
+
+
+ public static final class WriteEvent extends Event
+ {
+ private final IoFilter.WriteRequest _data;
+ private final IoFilter.NextFilter _nextFilter;
+
+ public WriteEvent(final IoFilter.NextFilter nextFilter, final IoFilter.WriteRequest data)
{
- nextFilter.sessionOpened(session);
+ super();
+ _nextFilter = nextFilter;
+ _data = data;
}
- else if (type == EventType.WRITE)
+
+
+ public void process(IoSession session)
{
- nextFilter.filterWrite(session, (IoFilter.WriteRequest) data);
+ _nextFilter.filterWrite(session, _data);
}
- else if (type == EventType.CLOSED)
+
+ public IoFilter.NextFilter getNextFilter()
{
- nextFilter.sessionClosed(session);
+ return _nextFilter;
}
}
- public String toString()
- {
- return "Event: type " + type + ", data: " + data;
- }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
index 38cfa68c78..c0026c1f36 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
@@ -25,51 +25,39 @@ import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoFilterAdapter;
import org.apache.mina.common.IoSession;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.EnumSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler
{
private static final Logger _logger = Logger.getLogger(PoolingFilter.class);
- public static final Set<EventType> READ_EVENTS = new HashSet<EventType>(Arrays.asList(EventType.RECEIVED));
- public static final Set<EventType> WRITE_EVENTS = new HashSet<EventType>(Arrays.asList(EventType.WRITE));
private final ConcurrentMap<IoSession, Job> _jobs = new ConcurrentHashMap<IoSession, Job>();
private final ReferenceCountingExecutorService _poolReference;
- private final Set<EventType> _asyncTypes;
private final String _name;
private final int _maxEvents = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
- public PoolingFilter(ReferenceCountingExecutorService refCountingPool, Set<EventType> asyncTypes, String name)
+ public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
{
_poolReference = refCountingPool;
- _asyncTypes = asyncTypes;
_name = name;
}
- private void fireEvent(IoSession session, Event event)
+ void fireAsynchEvent(IoSession session, Event event)
{
- if (_asyncTypes.contains(event.getType()))
- {
- Job job = getJobForSession(session);
- job.acquire(); //prevents this job being removed from _jobs
- job.add(event);
+ Job job = getJobForSession(session);
+ job.acquire(); //prevents this job being removed from _jobs
+ job.add(event);
- //Additional checks on pool to check that it hasn't shutdown.
- // The alternative is to catch the RejectedExecutionException that will result from executing on a shutdown pool
- if (job.activate() && _poolReference.getPool() != null && !_poolReference.getPool().isShutdown())
- {
- _poolReference.getPool().execute(job);
- }
- }
- else
+ //Additional checks on pool to check that it hasn't shutdown.
+ // The alternative is to catch the RejectedExecutionException that will result from executing on a shutdown pool
+ if (job.activate() && _poolReference.getPool() != null && !_poolReference.getPool().isShutdown())
{
- event.process(session);
+ _poolReference.getPool().execute(job);
}
+
}
private Job getJobForSession(IoSession session)
@@ -114,45 +102,44 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH
//IoFilter methods that are processed by threads on the pool
- public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception
+ public void sessionOpened(final NextFilter nextFilter, final IoSession session) throws Exception
{
- fireEvent(session, new Event(nextFilter, EventType.OPENED, null));
+ nextFilter.sessionOpened(session);
}
- public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception
+ public void sessionClosed(final NextFilter nextFilter, final IoSession session) throws Exception
{
- fireEvent(session, new Event(nextFilter, EventType.CLOSED, null));
+ nextFilter.sessionClosed(session);
}
- public void sessionIdle(NextFilter nextFilter, IoSession session,
- IdleStatus status) throws Exception
+ public void sessionIdle(final NextFilter nextFilter, final IoSession session,
+ final IdleStatus status) throws Exception
{
- fireEvent(session, new Event(nextFilter, EventType.IDLE, status));
+ nextFilter.sessionIdle(session, status);
}
- public void exceptionCaught(NextFilter nextFilter, IoSession session,
- Throwable cause) throws Exception
+ public void exceptionCaught(final NextFilter nextFilter, final IoSession session,
+ final Throwable cause) throws Exception
{
- fireEvent(session, new Event(nextFilter, EventType.EXCEPTION, cause));
+ nextFilter.exceptionCaught(session,cause);
}
- public void messageReceived(NextFilter nextFilter, IoSession session,
- Object message) throws Exception
+ public void messageReceived(final NextFilter nextFilter, final IoSession session,
+ final Object message) throws Exception
{
- //ByteBufferUtil.acquireIfPossible( message );
- fireEvent(session, new Event(nextFilter, EventType.RECEIVED, message));
+ nextFilter.messageReceived(session,message);
}
- public void messageSent(NextFilter nextFilter, IoSession session,
- Object message) throws Exception
+ public void messageSent(final NextFilter nextFilter, final IoSession session,
+ final Object message) throws Exception
{
- //ByteBufferUtil.acquireIfPossible( message );
- fireEvent(session, new Event(nextFilter, EventType.SENT, message));
+ nextFilter.messageSent(session, message);
}
- public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception
+ public void filterWrite(final NextFilter nextFilter, final IoSession session,
+ final WriteRequest writeRequest) throws Exception
{
- fireEvent(session, new Event(nextFilter, EventType.WRITE, writeRequest));
+ nextFilter.filterWrite(session, writeRequest);
}
//IoFilter methods that are processed on current thread (NOT on pooled thread)
@@ -188,5 +175,52 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH
// when the reference count gets to zero we release the executor service
_poolReference.releaseExecutorService();
}
+
+ public static class AsynchReadPoolingFilter extends PoolingFilter
+ {
+
+ public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
+ {
+ super(refCountingPool, name);
+ }
+
+ public void messageReceived(final NextFilter nextFilter, final IoSession session,
+ final Object message) throws Exception
+ {
+
+ fireAsynchEvent(session, new Event.ReceivedEvent(nextFilter, message));
+ }
+
+
+ }
+
+ public static class AsynchWritePoolingFilter extends PoolingFilter
+ {
+
+ public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
+ {
+ super(refCountingPool, name);
+ }
+
+
+ public void filterWrite(final NextFilter nextFilter, final IoSession session,
+ final WriteRequest writeRequest) throws Exception
+ {
+ fireAsynchEvent(session, new Event.WriteEvent(nextFilter, writeRequest));
+ }
+
+ }
+
+ public static PoolingFilter createAynschReadPoolingFilter(ReferenceCountingExecutorService refCountingPool,String name)
+ {
+ return new AsynchReadPoolingFilter(refCountingPool,name);
+ }
+
+
+ public static PoolingFilter createAynschWritePoolingFilter(ReferenceCountingExecutorService refCountingPool,String name)
+ {
+ return new AsynchWritePoolingFilter(refCountingPool,name);
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
index d4dbf1309a..84b72bb0dc 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
@@ -29,11 +29,8 @@ public class ReadWriteThreadModel implements ThreadModel
public void buildFilterChain(IoFilterChain chain) throws Exception
{
ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance();
- PoolingFilter asyncRead = new PoolingFilter(executor, PoolingFilter.READ_EVENTS,
- "AsynchronousReadFilter");
- PoolingFilter asyncWrite = new PoolingFilter(executor, PoolingFilter.WRITE_EVENTS,
- "AsynchronousWriteFilter");
-
+ PoolingFilter asyncRead = PoolingFilter.createAynschReadPoolingFilter(executor, "AsynchronousReadFilter");
+ PoolingFilter asyncWrite = PoolingFilter.createAynschWritePoolingFilter(executor, "AsynchronousWriteFilter");
chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(asyncRead));
chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(asyncWrite));
}
diff --git a/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java b/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java
index 972a935257..9a5208662b 100644
--- a/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java
+++ b/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java
@@ -39,7 +39,7 @@ public class PoolingFilterTest extends TestCase
//Create Pool
_executorService = ReferenceCountingExecutorService.getInstance();
_executorService.acquireExecutorService();
- _pool = new PoolingFilter(_executorService, PoolingFilter.WRITE_EVENTS,
+ _pool = PoolingFilter.createAynschWritePoolingFilter(_executorService,
"AsynchronousWriteFilter");
}