diff options
author | Aidan Skinner <aidan@apache.org> | 2009-09-17 15:19:54 +0000 |
---|---|---|
committer | Aidan Skinner <aidan@apache.org> | 2009-09-17 15:19:54 +0000 |
commit | 31bbc100ac6b3a31eb25d29f407d60ff23334d1f (patch) | |
tree | f21589bdba604c46c555d48fe7862defcf6e781f | |
parent | 93fa7d17feecb3d27cead67e11b250af1fcc595e (diff) | |
download | qpid-python-31bbc100ac6b3a31eb25d29f407d60ff23334d1f.tar.gz |
QPID-2024 QPID-2105: Remove now unnecessary classes like Event, PoolingFilter,
ReadWriteThreadModel. Move the couple of necessary methods to Job.
Fix imports.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@816232 13f79535-47bb-0310-9956-ffa450edef68
11 files changed, 113 insertions, 1003 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index b776c6ae82..3bcd102858 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -62,10 +62,7 @@ import org.apache.qpid.framing.MethodDispatcher; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.pool.Event; import org.apache.qpid.pool.Job; -import org.apache.qpid.pool.PoolingFilter; -import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; @@ -172,14 +169,13 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol _networkDriver = driver; _codecFactory = new AMQCodecFactory(true, this); - - ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance(); - _readJob = new Job(threadModel.getAsynchronousReadFilter(), PoolingFilter.MAX_JOB_EVENTS, true); - _writeJob = new Job(threadModel.getAsynchronousWriteFilter(), PoolingFilter.MAX_JOB_EVENTS, false); + _poolReference.acquireExecutorService(); + _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true); + _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false); _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); _actor.message(ConnectionMessages.CON_1001(null, null, false, false)); - _poolReference.acquireExecutorService(); + } private AMQProtocolSessionMBean createMBean() throws AMQException @@ -212,7 +208,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol try { final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); - Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Event(new Runnable() + Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable() { @Override public void run() @@ -232,7 +228,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol } } } - })); + }); } catch (Exception e) { @@ -459,14 +455,14 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol final ByteBuffer buf = frame.toNioByteBuffer(); _lastIoTime = System.currentTimeMillis(); _writtenBytes += buf.remaining(); - Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Event(new Runnable() + Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable() { @Override public void run() { _networkDriver.send(buf); } - })); + }); } public AMQShortString getContextKey() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 99366101d1..be75fc150f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -20,20 +20,22 @@ */ package org.apache.qpid.client.protocol; -import org.apache.mina.filter.ReadThrottleFilterBuilder; -import org.apache.mina.filter.SSLFilter; -import org.apache.mina.filter.WriteBufferLimitFilterBuilder; +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; + import org.apache.mina.filter.codec.ProtocolCodecException; -import org.apache.mina.filter.codec.ProtocolCodecFilter; -import org.apache.mina.filter.executor.ExecutorFilter; import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.SSLConfiguration; -import org.apache.qpid.client.configuration.ClientProperties; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverHandler; import org.apache.qpid.client.failover.FailoverState; @@ -42,32 +44,29 @@ import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ConnectionCloseOkBody; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.pool.Event; import org.apache.qpid.pool.Job; -import org.apache.qpid.pool.PoolingFilter; -import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.network.io.IoTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; - /** * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the * network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the @@ -107,9 +106,6 @@ import java.util.concurrent.CountDownLatch; * * <p/><table id="crc"><caption>CRC Card</caption> * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Create the filter chain to filter this handlers events. - * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}. - * * <tr><td> Maintain fail-over state. * <tr><td> * </table> @@ -191,9 +187,8 @@ public class AMQProtocolHandler implements ProtocolEngine _protocolSession = new AMQProtocolSession(this, _connection); _stateManager = new AMQStateManager(_protocolSession); _codecFactory = new AMQCodecFactory(false, _protocolSession); - ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance(); - _readJob = new Job(threadModel.getAsynchronousReadFilter(), PoolingFilter.MAX_JOB_EVENTS, true); - _writeJob = new Job(threadModel.getAsynchronousWriteFilter(), PoolingFilter.MAX_JOB_EVENTS, false); + _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true); + _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false); _poolReference.acquireExecutorService(); _failoverHandler = new FailoverHandler(this); } @@ -436,7 +431,7 @@ public class AMQProtocolHandler implements ProtocolEngine _readBytes += msg.remaining(); final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); - Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Event(new Runnable() + Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable() { @Override public void run() @@ -495,7 +490,7 @@ public class AMQProtocolHandler implements ProtocolEngine } } } - })); + }); } catch (Exception e) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java index 77c9c40e82..1ac8f62e32 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java @@ -20,37 +20,20 @@ */ package org.apache.qpid.client.transport; +import java.io.IOException; +import java.net.InetSocketAddress; + import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IoConnector; import org.apache.mina.common.SimpleByteBufferAllocator; -import org.apache.mina.filter.SSLFilter; -import org.apache.mina.transport.socket.nio.ExistingSocketConnector; -import org.apache.mina.transport.socket.nio.SocketConnectorConfig; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; - import org.apache.qpid.client.SSLConfiguration; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.network.mina.MINANetworkDriver; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import sun.net.InetAddressCachePolicy; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.security.GeneralSecurityException; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import javax.net.ssl.SSLEngine; - public class SocketTransportConnection implements ITransportConnection { private static final Logger _logger = LoggerFactory.getLogger(SocketTransportConnection.class); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 45194750dc..a4f8bb0166 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -20,6 +20,12 @@ */ package org.apache.qpid.client.transport; +import java.io.IOException; +import java.net.Socket; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + import org.apache.mina.common.IoConnector; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoServiceConfig; @@ -30,20 +36,12 @@ import org.apache.mina.transport.vmpipe.VmPipeAcceptor; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.pool.ReadWriteThreadModel; -import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.thread.QpidThreadExecutor; import org.apache.qpid.transport.network.mina.MINANetworkDriver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.net.Socket; - /** * The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying * connector, which currently always uses TCP/IP sockets. It creates the "protocol handler" which deals with MINA diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java index 3de6f9b9ea..504d475740 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java @@ -20,20 +20,18 @@ */ package org.apache.qpid.client.transport; +import java.io.IOException; + import org.apache.mina.common.ConnectFuture; -import org.apache.mina.common.IoServiceConfig; import org.apache.mina.transport.vmpipe.QpidVmPipeConnector; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.mina.transport.vmpipe.VmPipeConnector; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.transport.network.mina.MINANetworkDriver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - public class VmPipeTransportConnection implements ITransportConnection { private static final Logger _logger = LoggerFactory.getLogger(VmPipeTransportConnection.class); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java deleted file mode 100644 index 49bce9f2f9..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.pool; - -import org.apache.mina.common.IoFilter; -import org.apache.mina.common.IoSession; - -/** - * An Event is a continuation, which is used to break a Mina filter chain and save the current point in the chain - * for later processing. It is an abstract class, with different implementations for continuations of different kinds - * of Mina events. - * - * <p/>These continuations are typically batched by {@link Job} for processing by a worker thread pool. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Process a continuation in the context of a Mina session. - * </table> - * - * @todo Pull up _nextFilter and getNextFilter into Event, as all events use it. Inner classes need to be non-static - * to use instance variables in the parent. Consequently they need to be non-inner to be instantiable outside of - * the context of the outer Event class. The inner class construction used here is preventing common code re-use - * (though not by a huge amount), but makes for an inelegent way of handling inheritance and doesn't seem like - * a justifiable use of inner classes. Move the inner classes out into their own files. - * - * @todo Could make Event implement Runnable, FutureTask, or a custom Continuation interface, to clarify its status as - * a continuation. Job is also a continuation, as is the job completion handler. Or, as Event is totally abstract, - * it is really an interface, so could just drop it and use the continuation interface instead. - */ -public class Event -{ - private Runnable _runner; - - public Event() - { - - } - - /** - * Creates a continuation. - */ - public Event(Runnable runner) - { - _runner = runner; - } - - /** - * Processes the continuation - */ - public void process() - { - _runner.run(); - } - - /** - * A continuation ({@link Event}) that takes a Mina messageReceived event, and passes it to a NextFilter. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Pass a Mina messageReceived event to a NextFilter. <td> {@link IoFilter.NextFilter}, {@link IoSession} - * </table> - */ - public static final class MinaReceivedEvent extends Event - { - private final Object _data; - private final IoFilter.NextFilter _nextFilter; - private final IoSession _session; - - public MinaReceivedEvent(final IoFilter.NextFilter nextFilter, final Object data, final IoSession session) - { - _nextFilter = nextFilter; - _data = data; - _session = session; - } - - public void process() - { - _nextFilter.messageReceived(_session, _data); - } - - public IoFilter.NextFilter getNextFilter() - { - return _nextFilter; - } - } - - /** - * A continuation ({@link Event}) that takes a Mina filterWrite event, and passes it to a NextFilter. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Pass a Mina filterWrite event to a NextFilter. - * <td> {@link IoFilter.NextFilter}, {@link IoFilter.WriteRequest}, {@link IoSession} - * </table> - */ - public static final class MinaWriteEvent extends Event - { - private final IoFilter.WriteRequest _data; - private final IoFilter.NextFilter _nextFilter; - private IoSession _session; - - public MinaWriteEvent(final IoFilter.NextFilter nextFilter, final IoFilter.WriteRequest data, final IoSession session) - { - _nextFilter = nextFilter; - _data = data; - _session = session; - } - - public void process() - { - _nextFilter.filterWrite(_session, _data); - } - - public IoFilter.NextFilter getNextFilter() - { - return _nextFilter; - } - } - - /** - * A continuation ({@link Event}) that takes a Mina sessionClosed event, and passes it to a NextFilter. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Pass a Mina sessionClosed event to a NextFilter. <td> {@link IoFilter.NextFilter}, {@link IoSession} - * </table> - */ - public static final class CloseEvent extends Event - { - private final IoFilter.NextFilter _nextFilter; - private final IoSession _session; - - public CloseEvent(final IoFilter.NextFilter nextFilter, final IoSession session) - { - _nextFilter = nextFilter; - _session = session; - } - - public void process() - { - _nextFilter.sessionClosed(_session); - } - - public IoFilter.NextFilter getNextFilter() - { - return _nextFilter; - } - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java index 15d1c20ff1..82b600de88 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java @@ -25,7 +25,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.mina.common.IoSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,40 +55,28 @@ import org.slf4j.LoggerFactory; */ public class Job implements ReadWriteRunnable { + + /** Defines the maximum number of events that will be batched into a single job. */ + public static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); + /** The maximum number of events to process per run of the job. More events than this may be queued in the job. */ private final int _maxEvents; /** Holds the queue of events that make up the job. */ - private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>(); + private final java.util.Queue<Runnable> _eventQueue = new ConcurrentLinkedQueue<Runnable>(); /** Holds a status flag, that indicates when the job is actively running. */ private final AtomicBoolean _active = new AtomicBoolean(); - /** Holds the completion continuation, called upon completion of a run of the job. */ - private final JobCompletionHandler _completionHandler; - private final boolean _readJob; + private ReferenceCountingExecutorService _poolReference; + private final static Logger _logger = LoggerFactory.getLogger(Job.class); - /** - * Creates a new job that aggregates many continuations together. - * - * @param session The Mina session. - * @param completionHandler The per job run, terminal continuation. - * @param maxEvents The maximum number of aggregated continuations to process per run of the job. - * @param readJob - */ - Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob) - { - _completionHandler = completionHandler; - _maxEvents = maxEvents; - _readJob = readJob; - } - - public Job(JobCompletionHandler completionHandler, int maxEvents, boolean readJob) + public Job(ReferenceCountingExecutorService poolReference, int maxEvents, boolean readJob) { - _completionHandler = completionHandler; + _poolReference = poolReference; _maxEvents = maxEvents; _readJob = readJob; } @@ -99,7 +86,7 @@ public class Job implements ReadWriteRunnable * * @param evt The continuation to enqueue. */ - public void add(Event evt) + public void add(Runnable evt) { _eventQueue.add(evt); } @@ -113,14 +100,14 @@ public class Job implements ReadWriteRunnable int i = _maxEvents; while( --i != 0 ) { - Event e = _eventQueue.poll(); + Runnable e = _eventQueue.poll(); if (e == null) { return true; } else { - e.process(); + e.run(); } } return false; @@ -162,11 +149,11 @@ public class Job implements ReadWriteRunnable if(processAll()) { deactivate(); - _completionHandler.completed(this); + completed(); } else { - _completionHandler.notCompleted(this); + notCompleted(); } } @@ -174,19 +161,6 @@ public class Job implements ReadWriteRunnable { return _readJob; } - - /** - * Another interface for a continuation. - * - * @todo Get rid of this interface as there are other interfaces that could be used instead, such as FutureTask, - * Runnable or a custom Continuation interface. - */ - static interface JobCompletionHandler - { - public void completed(Job job); - - public void notCompleted(final Job job); - } /** * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running. @@ -194,7 +168,7 @@ public class Job implements ReadWriteRunnable * @param job The job. * @param event The event to hand off asynchronously. */ - public static void fireAsynchEvent(ExecutorService pool, Job job, Event event) + public static void fireAsynchEvent(ExecutorService pool, Job job, Runnable event) { job.add(event); @@ -221,4 +195,59 @@ public class Job implements ReadWriteRunnable } + /** + * Implements a terminal continuation for the {@link Job} for this filter. Whenever the Job completes its processing + * of a batch of events this is called. This method simply re-activates the job, if it has more events to process. + * + * @param session The Mina session to work in. + * @param job The job that completed. + */ + public void completed() + { + if (!isComplete()) + { + final ExecutorService pool = _poolReference.getPool(); + + if(pool == null) + { + return; + } + + + // ritchiem : 2006-12-13 Do we need to perform the additional checks here? + // Can the pool be shutdown at this point? + if (activate()) + { + try + { + pool.execute(this); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } + + } + } + } + + public void notCompleted() + { + final ExecutorService pool = _poolReference.getPool(); + + if(pool == null) + { + return; + } + + try + { + pool.execute(this); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } + } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java deleted file mode 100644 index 4e02ac3a55..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java +++ /dev/null @@ -1,487 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.pool; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; - -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoFilterAdapter; -import org.apache.mina.common.IoSession; -import org.apache.qpid.pool.Event.CloseEvent; -import org.apache.qpid.pool.Event.MinaReceivedEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * PoolingFilter, is a no-op pass through filter that hands all events down the Mina filter chain by default. As it - * adds no behaviour by default to the filter chain, it is abstract. - * - * <p/>PoolingFilter provides a capability, available to sub-classes, to handle events in the chain asynchronously, by - * adding them to a job. If a job is not active, adding an event to it activates it. If it is active, the event is - * added to the job, which will run to completion and eventually process the event. The queue on the job itself acts as - * a buffer between stages of the pipeline. - * - * <p/>There are two convenience methods, {@link #createAynschReadPoolingFilter} and - * {@link #createAynschWritePoolingFilter}, for obtaining pooling filters that handle 'messageReceived' and - * 'filterWrite' events, making it possible to process these event streams seperately. - * - * <p/>Pooling filters have a name, in order to distinguish different filter types. They set up a {@link Job} on the - * Mina session they are working with, and store it in the session against their identifying name. This allows different - * filters with different names to be set up on the same filter chain, on the same Mina session, that batch their - * workloads in different jobs. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Implement default, pass through filter. - * <tr><td> Create pooling filters and a specific thread pool. <td> {@link ReferenceCountingExecutorService} - * <tr><td> Provide the ability to batch Mina events for asynchronous processing. <td> {@link Job}, {@link Event} - * <tr><td> Provide a terminal continuation to keep jobs running till empty. - * <td> {@link Job}, {@link Job.JobCompletionHandler} - * </table> - * - * @todo The static helper methods are pointless. Could just call new. - */ -public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler -{ - /** Used for debugging purposes. */ - private static final Logger _logger = LoggerFactory.getLogger(PoolingFilter.class); - - /** Holds the managed reference to obtain the executor for the batched jobs. */ - private final ReferenceCountingExecutorService _poolReference; - - /** Used to hold a name for identifying differeny pooling filter types. */ - private final String _name; - - /** Defines the maximum number of events that will be batched into a single job. */ - public static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); - - private final int _maxEvents; - - private final boolean _readFilter; - - /** - * Creates a named pooling filter, on the specified shared thread pool. - * - * @param refCountingPool The thread pool reference. - * @param name The identifying name of the filter type. - */ - public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents, boolean readFilter) - { - _poolReference = refCountingPool; - _name = name; - _maxEvents = maxEvents; - _readFilter = readFilter; - } - - /** - * Helper method to get an instance of a pooling filter that handles read events asynchronously. - * - * @param refCountingPool A managed reference to the thread pool. - * @param name The filter types identifying name. - * - * @return A pooling filter for asynchronous read events. - */ - public static PoolingFilter createAynschReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) - { - return new AsynchReadPoolingFilter(refCountingPool, name); - } - - /** - * Helper method to get an instance of a pooling filter that handles write events asynchronously. - * - * @param refCountingPool A managed reference to the thread pool. - * @param name The filter types identifying name. - * - * @return A pooling filter for asynchronous write events. - */ - public static PoolingFilter createAynschWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) - { - return new AsynchWritePoolingFilter(refCountingPool, name); - } - - /** - * Called by Mina to initialize this filter. Takes a reference to the thread pool. - */ - public void init() - { - _logger.debug("Init called on PoolingFilter " + toString()); - - // Called when the filter is initialised in the chain. If the reference count is - // zero this acquire will initialise the pool. - _poolReference.acquireExecutorService(); - } - - /** - * Called by Mina to clean up this filter. Releases the reference to the thread pool. - */ - public void destroy() - { - _logger.debug("Destroy called on PoolingFilter " + toString()); - } - - /** - * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running. - * - * @param job The job. - * @param event The event to hand off asynchronously. - */ - void fireAsynchEvent(Job job, Event event) - { - - job.add(event); - - final ExecutorService pool = _poolReference.getPool(); - - if(pool == null) - { - return; - } - - // rather than perform additional checks on pool to check that it hasn't shutdown. - // catch the RejectedExecutionException that will result from executing on a shutdown pool - if (job.activate()) - { - try - { - pool.execute(job); - } - catch(RejectedExecutionException e) - { - _logger.warn("Thread pool shutdown while tasks still outstanding"); - } - } - - } - - /** - * Creates a Job on the Mina session, identified by this filters name, in which this filter places asynchronously - * handled events. - * - * @param session The Mina session. - */ - public void createNewJobForSession(IoSession session) - { - Job job = new Job(session, this, MAX_JOB_EVENTS,_readFilter); - session.setAttribute(_name, job); - } - - /** - * Retrieves this filters Job, by this filters name, from the Mina session. - * - * @param session The Mina session. - * - * @return The Job for this filter to place asynchronous events into. - */ - public Job getJobForSession(IoSession session) - { - return (Job) session.getAttribute(_name); - } - - /** - * Implements a terminal continuation for the {@link Job} for this filter. Whenever the Job completes its processing - * of a batch of events this is called. This method simply re-activates the job, if it has more events to process. - * - * @param session The Mina session to work in. - * @param job The job that completed. - */ - public void completed(Job job) - { - - - if (!job.isComplete()) - { - final ExecutorService pool = _poolReference.getPool(); - - if(pool == null) - { - return; - } - - - // ritchiem : 2006-12-13 Do we need to perform the additional checks here? - // Can the pool be shutdown at this point? - if (job.activate()) - { - try - { - pool.execute(job); - } - catch(RejectedExecutionException e) - { - _logger.warn("Thread pool shutdown while tasks still outstanding"); - } - - } - } - } - - public void notCompleted(Job job) - { - final ExecutorService pool = _poolReference.getPool(); - - if(pool == null) - { - return; - } - - try - { - pool.execute(job); - } - catch(RejectedExecutionException e) - { - _logger.warn("Thread pool shutdown while tasks still outstanding"); - } - - } - - - - /** - * No-op pass through filter to the next filter in the chain. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * - * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow - * overriding sub-classes the ability to. - */ - public void sessionOpened(final NextFilter nextFilter, final IoSession session) throws Exception - { - nextFilter.sessionOpened(session); - } - - /** - * No-op pass through filter to the next filter in the chain. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * - * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow - * overriding sub-classes the ability to. - */ - public void sessionClosed(final NextFilter nextFilter, final IoSession session) throws Exception - { - nextFilter.sessionClosed(session); - } - - /** - * No-op pass through filter to the next filter in the chain. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * @param status The session idle status. - * - * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow - * overriding sub-classes the ability to. - */ - public void sessionIdle(final NextFilter nextFilter, final IoSession session, final IdleStatus status) throws Exception - { - nextFilter.sessionIdle(session, status); - } - - /** - * No-op pass through filter to the next filter in the chain. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * @param cause The underlying exception. - * - * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow - * overriding sub-classes the ability to. - */ - public void exceptionCaught(final NextFilter nextFilter, final IoSession session, final Throwable cause) throws Exception - { - nextFilter.exceptionCaught(session, cause); - } - - /** - * No-op pass through filter to the next filter in the chain. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * @param message The message received. - * - * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow - * overriding sub-classes the ability to. - */ - public void messageReceived(final NextFilter nextFilter, final IoSession session, final Object message) throws Exception - { - nextFilter.messageReceived(session, message); - } - - /** - * No-op pass through filter to the next filter in the chain. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * @param message The message sent. - * - * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow - * overriding sub-classes the ability to. - */ - public void messageSent(final NextFilter nextFilter, final IoSession session, final Object message) throws Exception - { - nextFilter.messageSent(session, message); - } - - /** - * No-op pass through filter to the next filter in the chain. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * @param writeRequest The write request event. - * - * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow - * overriding sub-classes the ability to. - */ - public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest) - throws Exception - { - nextFilter.filterWrite(session, writeRequest); - } - - /** - * No-op pass through filter to the next filter in the chain. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * - * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow - * overriding sub-classes the ability to. - */ - public void filterClose(NextFilter nextFilter, IoSession session) throws Exception - { - nextFilter.filterClose(session); - } - - /** - * No-op pass through filter to the next filter in the chain. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * - * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow - * overriding sub-classes the ability to. - */ - public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception - { - nextFilter.sessionCreated(session); - } - - /** - * Prints the filter types identifying name to a string, mainly for debugging purposes. - * - * @return The filter types identifying name. - */ - public String toString() - { - return _name; - } - - /** - * AsynchReadPoolingFilter is a pooling filter that handles 'messageReceived' and 'sessionClosed' events - * asynchronously. - */ - public static class AsynchReadPoolingFilter extends PoolingFilter - { - /** - * Creates a pooling filter that handles read events asynchronously. - * - * @param refCountingPool A managed reference to the thread pool. - * @param name The filter types identifying name. - */ - public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) - { - super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS),true); - } - - /** - * Hands off this event for asynchronous execution. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * @param message The message received. - */ - public void messageReceived(NextFilter nextFilter, final IoSession session, Object message) - { - Job job = getJobForSession(session); - fireAsynchEvent(job, new MinaReceivedEvent(nextFilter, message, session)); - } - - /** - * Hands off this event for asynchronous execution. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - */ - public void sessionClosed(final NextFilter nextFilter, final IoSession session) - { - Job job = getJobForSession(session); - fireAsynchEvent(job, new CloseEvent(nextFilter, session)); - } - } - - /** - * AsynchWritePoolingFilter is a pooling filter that handles 'filterWrite' and 'sessionClosed' events - * asynchronously. - */ - public static class AsynchWritePoolingFilter extends PoolingFilter - { - /** - * Creates a pooling filter that handles write events asynchronously. - * - * @param refCountingPool A managed reference to the thread pool. - * @param name The filter types identifying name. - */ - public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) - { - super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS),false); - } - - /** - * Hands off this event for asynchronous execution. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * @param writeRequest The write request event. - */ - public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest) - { - Job job = getJobForSession(session); - fireAsynchEvent(job, new Event.MinaWriteEvent(nextFilter, writeRequest, session)); - } - - /** - * Hands off this event for asynchronous execution. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - */ - public void sessionClosed(final NextFilter nextFilter, final IoSession session) - { - Job job = getJobForSession(session); - fireAsynchEvent(job, new CloseEvent(nextFilter, session)); - } - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java deleted file mode 100644 index 8cea70e597..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.pool; - -import org.apache.mina.common.IoFilterChain; -import org.apache.mina.common.ThreadModel; -import org.apache.mina.filter.ReferenceCountingIoFilter; - -/** - * ReadWriteThreadModel is a Mina i/o filter chain factory, which creates a filter chain with seperate filters to - * handle read and write events. The seperate filters are {@link PoolingFilter}s, which have thread pools to handle - * these events. The effect of this is that reading and writing may happen concurrently. - * - * <p/>Socket i/o will only happen with concurrent reads and writes if Mina has seperate selector threads for each. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Create a filter chain with seperate read and write thread pools for read/write Mina events. - * <td> {@link PoolingFilter} - * </table> - */ -public class ReadWriteThreadModel implements ThreadModel -{ - /** Holds the singleton instance of this factory. */ - private static final ReadWriteThreadModel _instance = new ReadWriteThreadModel(); - - /** Holds the thread pooling filter for reads. */ - private final PoolingFilter _asynchronousReadFilter; - - /** Holds the thread pooloing filter for writes. */ - private final PoolingFilter _asynchronousWriteFilter; - - /** - * Creates a new factory for concurrent i/o, thread pooling filter chain construction. This is private, so that - * only a singleton instance of the factory is ever created. - */ - private ReadWriteThreadModel() - { - final ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance(); - _asynchronousReadFilter = PoolingFilter.createAynschReadPoolingFilter(executor, "AsynchronousReadFilter"); - _asynchronousWriteFilter = PoolingFilter.createAynschWritePoolingFilter(executor, "AsynchronousWriteFilter"); - } - - /** - * Gets the singleton instance of this filter chain factory. - * - * @return The singleton instance of this filter chain factory. - */ - public static ReadWriteThreadModel getInstance() - { - return _instance; - } - - /** - * Gets the read filter. - * - * @return The read filter. - */ - public PoolingFilter getAsynchronousReadFilter() - { - return _asynchronousReadFilter; - } - - /** - * Gets the write filter. - * - * @return The write filter. - */ - public PoolingFilter getAsynchronousWriteFilter() - { - return _asynchronousWriteFilter; - } - - /** - * Adds the concurrent read and write filters to a filter chain. - * - * @param chain The Mina filter chain to add to. - */ - public void buildFilterChain(IoFilterChain chain) - { - chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(_asynchronousReadFilter)); - chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(_asynchronousWriteFilter)); - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java index 7cc5f8e442..38ea9307b7 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java @@ -28,8 +28,6 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; -import javax.net.ssl.SSLEngine; - import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoAcceptor; @@ -50,7 +48,6 @@ import org.apache.mina.transport.socket.nio.SocketConnectorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.mina.util.NewThreadExecutor; import org.apache.mina.util.SessionUtil; -import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.ssl.SSLContextFactory; @@ -58,7 +55,6 @@ import org.apache.qpid.thread.QpidThreadExecutor; import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.NetworkDriverConfiguration; import org.apache.qpid.transport.OpenException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -148,14 +144,6 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver sc.setTcpNoDelay(config.getTcpNoDelay()); } - // if we do not use the executor pool threading model we get the default - // leader follower - // implementation provided by MINA - if (_executorPool) - { - sconfig.setThreadModel(ReadWriteThreadModel.getInstance()); - } - if (sslFactory != null) { _sslFactory = sslFactory; @@ -227,14 +215,6 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver } SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig(); - - // if we do not use our own thread model we get the MINA default which is to use - // its own leader-follower model - boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool"); - if (readWriteThreading) - { - cfg.setThreadModel(ReadWriteThreadModel.getInstance()); - } SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); scfg.setTcpNoDelay((config != null) ? config.getTcpNoDelay() : true); @@ -258,8 +238,6 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver throw new OpenException("Could not open connection", _lastException); } _ioSession = future.getSession(); - ReadWriteThreadModel.getInstance().getAsynchronousReadFilter().createNewJobForSession(_ioSession); - ReadWriteThreadModel.getInstance().getAsynchronousWriteFilter().createNewJobForSession(_ioSession); _ioSession.setAttachment(engine); engine.setNetworkDriver(this); _protocolEngine = engine; diff --git a/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java b/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java deleted file mode 100644 index 6383d52298..0000000000 --- a/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.pool; - -import junit.framework.TestCase; -import junit.framework.Assert; -import org.apache.qpid.session.TestSession; -import org.apache.mina.common.IoFilter; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.IdleStatus; - -import java.util.concurrent.RejectedExecutionException; - -public class PoolingFilterTest extends TestCase -{ - private PoolingFilter _pool; - ReferenceCountingExecutorService _executorService; - - public void setUp() - { - - //Create Pool - _executorService = ReferenceCountingExecutorService.getInstance(); - _executorService.acquireExecutorService(); - _pool = PoolingFilter.createAynschWritePoolingFilter(_executorService, - "AsynchronousWriteFilter"); - - } - - public void testRejectedExecution() throws Exception - { - - TestSession testSession = new TestSession(); - _pool.createNewJobForSession(testSession); - _pool.filterWrite(new NoOpFilter(), testSession, new IoFilter.WriteRequest("Message")); - - //Shutdown the pool - _executorService.getPool().shutdownNow(); - - try - { - - testSession = new TestSession(); - _pool.createNewJobForSession(testSession); - //prior to fix for QPID-172 this would throw RejectedExecutionException - _pool.filterWrite(null, testSession, null); - } - catch (RejectedExecutionException rje) - { - Assert.fail("RejectedExecutionException should not occur after pool has shutdown:" + rje); - } - } - - private static class NoOpFilter implements IoFilter.NextFilter - { - - public void sessionOpened(IoSession session) - { - } - - public void sessionClosed(IoSession session) - { - } - - public void sessionIdle(IoSession session, IdleStatus status) - { - } - - public void exceptionCaught(IoSession session, Throwable cause) - { - } - - public void messageReceived(IoSession session, Object message) - { - } - - public void messageSent(IoSession session, Object message) - { - } - - public void filterWrite(IoSession session, IoFilter.WriteRequest writeRequest) - { - } - - public void filterClose(IoSession session) - { - } - - public void sessionCreated(IoSession session) - { - } - } -} |