diff options
Diffstat (limited to 'java')
5 files changed, 119 insertions, 112 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java index 1fc43f3496..a2189ba248 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java @@ -49,10 +49,10 @@ public class VmPipeTransportConnection implements ITransportConnection final VmPipeConnector ioConnector = new VmPipeConnector(); final IoServiceConfig cfg = ioConnector.getDefaultConfig(); ReferenceCountingExecutorService executorService = ReferenceCountingExecutorService.getInstance(); - PoolingFilter asyncRead = new PoolingFilter(executorService, PoolingFilter.READ_EVENTS, + PoolingFilter asyncRead = PoolingFilter.createAynschReadPoolingFilter(executorService, "AsynchronousReadFilter"); cfg.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead); - PoolingFilter asyncWrite = new PoolingFilter(executorService, PoolingFilter.WRITE_EVENTS, + PoolingFilter asyncWrite = PoolingFilter.createAynschWritePoolingFilter(executorService, "AsynchronousWriteFilter"); cfg.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite); diff --git a/java/common/src/main/java/org/apache/qpid/pool/Event.java b/java/common/src/main/java/org/apache/qpid/pool/Event.java index 7364b9293a..43ff8f6a19 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/Event.java +++ b/java/common/src/main/java/org/apache/qpid/pool/Event.java @@ -25,90 +25,66 @@ import org.apache.mina.common.IoFilter; import org.apache.mina.common.IoSession; import org.apache.mina.common.IdleStatus; -/** - * Represents an operation on IoFilter. - */ -enum EventType -{ - OPENED, CLOSED, READ, WRITE, WRITTEN, RECEIVED, SENT, IDLE, EXCEPTION -} -class Event +abstract public class Event { - private static final Logger _log = Logger.getLogger(Event.class); - - private final EventType type; - private final IoFilter.NextFilter nextFilter; - private final Object data; - - public Event(IoFilter.NextFilter nextFilter, EventType type, Object data) - { - this.type = type; - this.nextFilter = nextFilter; - this.data = data; - if (type == EventType.EXCEPTION) - { - _log.error("Exception event constructed: " + data, (Throwable) data); - } - } - public Object getData() + public Event() { - return data; } - public IoFilter.NextFilter getNextFilter() - { - return nextFilter; - } + abstract public void process(IoSession session); - public EventType getType() + public static final class ReceivedEvent extends Event { - return type; - } + private final Object _data; - void process(IoSession session) - { - if (_log.isDebugEnabled()) - { - _log.debug("Processing " + this); - } - if (type == EventType.RECEIVED) - { - nextFilter.messageReceived(session, data); - //ByteBufferUtil.releaseIfPossible( data ); - } - else if (type == EventType.SENT) + private final IoFilter.NextFilter _nextFilter; + + public ReceivedEvent(final IoFilter.NextFilter nextFilter, final Object data) { - nextFilter.messageSent(session, data); - //ByteBufferUtil.releaseIfPossible( data ); + super(); + _nextFilter = nextFilter; + _data = data; } - else if (type == EventType.EXCEPTION) + + public void process(IoSession session) { - nextFilter.exceptionCaught(session, (Throwable) data); + _nextFilter.messageReceived(session, _data); } - else if (type == EventType.IDLE) + + public IoFilter.NextFilter getNextFilter() { - nextFilter.sessionIdle(session, (IdleStatus) data); + return _nextFilter; } - else if (type == EventType.OPENED) + } + + + public static final class WriteEvent extends Event + { + private final IoFilter.WriteRequest _data; + private final IoFilter.NextFilter _nextFilter; + + public WriteEvent(final IoFilter.NextFilter nextFilter, final IoFilter.WriteRequest data) { - nextFilter.sessionOpened(session); + super(); + _nextFilter = nextFilter; + _data = data; } - else if (type == EventType.WRITE) + + + public void process(IoSession session) { - nextFilter.filterWrite(session, (IoFilter.WriteRequest) data); + _nextFilter.filterWrite(session, _data); } - else if (type == EventType.CLOSED) + + public IoFilter.NextFilter getNextFilter() { - nextFilter.sessionClosed(session); + return _nextFilter; } } - public String toString() - { - return "Event: type " + type + ", data: " + data; - } + } diff --git a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java index 38cfa68c78..c0026c1f36 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java +++ b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java @@ -25,51 +25,39 @@ import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoFilterAdapter; import org.apache.mina.common.IoSession; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; +import java.util.EnumSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler { private static final Logger _logger = Logger.getLogger(PoolingFilter.class); - public static final Set<EventType> READ_EVENTS = new HashSet<EventType>(Arrays.asList(EventType.RECEIVED)); - public static final Set<EventType> WRITE_EVENTS = new HashSet<EventType>(Arrays.asList(EventType.WRITE)); private final ConcurrentMap<IoSession, Job> _jobs = new ConcurrentHashMap<IoSession, Job>(); private final ReferenceCountingExecutorService _poolReference; - private final Set<EventType> _asyncTypes; private final String _name; private final int _maxEvents = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); - public PoolingFilter(ReferenceCountingExecutorService refCountingPool, Set<EventType> asyncTypes, String name) + public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) { _poolReference = refCountingPool; - _asyncTypes = asyncTypes; _name = name; } - private void fireEvent(IoSession session, Event event) + void fireAsynchEvent(IoSession session, Event event) { - if (_asyncTypes.contains(event.getType())) - { - Job job = getJobForSession(session); - job.acquire(); //prevents this job being removed from _jobs - job.add(event); + Job job = getJobForSession(session); + job.acquire(); //prevents this job being removed from _jobs + job.add(event); - //Additional checks on pool to check that it hasn't shutdown. - // The alternative is to catch the RejectedExecutionException that will result from executing on a shutdown pool - if (job.activate() && _poolReference.getPool() != null && !_poolReference.getPool().isShutdown()) - { - _poolReference.getPool().execute(job); - } - } - else + //Additional checks on pool to check that it hasn't shutdown. + // The alternative is to catch the RejectedExecutionException that will result from executing on a shutdown pool + if (job.activate() && _poolReference.getPool() != null && !_poolReference.getPool().isShutdown()) { - event.process(session); + _poolReference.getPool().execute(job); } + } private Job getJobForSession(IoSession session) @@ -114,45 +102,44 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH //IoFilter methods that are processed by threads on the pool - public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception + public void sessionOpened(final NextFilter nextFilter, final IoSession session) throws Exception { - fireEvent(session, new Event(nextFilter, EventType.OPENED, null)); + nextFilter.sessionOpened(session); } - public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception + public void sessionClosed(final NextFilter nextFilter, final IoSession session) throws Exception { - fireEvent(session, new Event(nextFilter, EventType.CLOSED, null)); + nextFilter.sessionClosed(session); } - public void sessionIdle(NextFilter nextFilter, IoSession session, - IdleStatus status) throws Exception + public void sessionIdle(final NextFilter nextFilter, final IoSession session, + final IdleStatus status) throws Exception { - fireEvent(session, new Event(nextFilter, EventType.IDLE, status)); + nextFilter.sessionIdle(session, status); } - public void exceptionCaught(NextFilter nextFilter, IoSession session, - Throwable cause) throws Exception + public void exceptionCaught(final NextFilter nextFilter, final IoSession session, + final Throwable cause) throws Exception { - fireEvent(session, new Event(nextFilter, EventType.EXCEPTION, cause)); + nextFilter.exceptionCaught(session,cause); } - public void messageReceived(NextFilter nextFilter, IoSession session, - Object message) throws Exception + public void messageReceived(final NextFilter nextFilter, final IoSession session, + final Object message) throws Exception { - //ByteBufferUtil.acquireIfPossible( message ); - fireEvent(session, new Event(nextFilter, EventType.RECEIVED, message)); + nextFilter.messageReceived(session,message); } - public void messageSent(NextFilter nextFilter, IoSession session, - Object message) throws Exception + public void messageSent(final NextFilter nextFilter, final IoSession session, + final Object message) throws Exception { - //ByteBufferUtil.acquireIfPossible( message ); - fireEvent(session, new Event(nextFilter, EventType.SENT, message)); + nextFilter.messageSent(session, message); } - public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception + public void filterWrite(final NextFilter nextFilter, final IoSession session, + final WriteRequest writeRequest) throws Exception { - fireEvent(session, new Event(nextFilter, EventType.WRITE, writeRequest)); + nextFilter.filterWrite(session, writeRequest); } //IoFilter methods that are processed on current thread (NOT on pooled thread) @@ -188,5 +175,52 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH // when the reference count gets to zero we release the executor service _poolReference.releaseExecutorService(); } + + public static class AsynchReadPoolingFilter extends PoolingFilter + { + + public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) + { + super(refCountingPool, name); + } + + public void messageReceived(final NextFilter nextFilter, final IoSession session, + final Object message) throws Exception + { + + fireAsynchEvent(session, new Event.ReceivedEvent(nextFilter, message)); + } + + + } + + public static class AsynchWritePoolingFilter extends PoolingFilter + { + + public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) + { + super(refCountingPool, name); + } + + + public void filterWrite(final NextFilter nextFilter, final IoSession session, + final WriteRequest writeRequest) throws Exception + { + fireAsynchEvent(session, new Event.WriteEvent(nextFilter, writeRequest)); + } + + } + + public static PoolingFilter createAynschReadPoolingFilter(ReferenceCountingExecutorService refCountingPool,String name) + { + return new AsynchReadPoolingFilter(refCountingPool,name); + } + + + public static PoolingFilter createAynschWritePoolingFilter(ReferenceCountingExecutorService refCountingPool,String name) + { + return new AsynchWritePoolingFilter(refCountingPool,name); + } + } diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java index d4dbf1309a..84b72bb0dc 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java +++ b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java @@ -29,11 +29,8 @@ public class ReadWriteThreadModel implements ThreadModel public void buildFilterChain(IoFilterChain chain) throws Exception { ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance(); - PoolingFilter asyncRead = new PoolingFilter(executor, PoolingFilter.READ_EVENTS, - "AsynchronousReadFilter"); - PoolingFilter asyncWrite = new PoolingFilter(executor, PoolingFilter.WRITE_EVENTS, - "AsynchronousWriteFilter"); - + PoolingFilter asyncRead = PoolingFilter.createAynschReadPoolingFilter(executor, "AsynchronousReadFilter"); + PoolingFilter asyncWrite = PoolingFilter.createAynschWritePoolingFilter(executor, "AsynchronousWriteFilter"); chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(asyncRead)); chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(asyncWrite)); } diff --git a/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java b/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java index 972a935257..9a5208662b 100644 --- a/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java +++ b/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java @@ -39,7 +39,7 @@ public class PoolingFilterTest extends TestCase //Create Pool _executorService = ReferenceCountingExecutorService.getInstance(); _executorService.acquireExecutorService(); - _pool = new PoolingFilter(_executorService, PoolingFilter.WRITE_EVENTS, + _pool = PoolingFilter.createAynschWritePoolingFilter(_executorService, "AsynchronousWriteFilter"); } |