summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java')
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java1026
1 files changed, 1026 insertions, 0 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
new file mode 100644
index 0000000000..c23ad8686f
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
@@ -0,0 +1,1026 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.WriteTimeoutException;
+import org.apache.mina.util.IdentityHashSet;
+import org.apache.mina.util.NamePreservingRunnable;
+import org.apache.mina.util.Queue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally.
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $,
+ */
+class MultiThreadSocketIoProcessor extends SocketIoProcessor
+{
+ Logger _logger = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class);
+ Logger _loggerRead = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class + ".Reader");
+ Logger _loggerWrite = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class + ".Writer");
+
+ private static final long SELECTOR_TIMEOUT = 1000L;
+
+ private int MAX_READ_BYTES_PER_SESSION = 524288; //512K
+ private int MAX_FLUSH_BYTES_PER_SESSION = 524288; //512K
+
+ private final Object readLock = new Object();
+ private final Object writeLock = new Object();
+
+ private final String threadName;
+ private final Executor executor;
+
+ private ReentrantLock trafficMaskUpdateLock = new ReentrantLock();
+
+ /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */
+ private volatile Selector selector, writeSelector;
+
+ private final Queue newSessions = new Queue();
+ private final Queue removingSessions = new Queue();
+ private final BlockingQueue flushingSessions = new LinkedBlockingQueue();
+ private final IdentityHashSet flushingSessionsSet = new IdentityHashSet();
+
+ private final Queue trafficControllingSessions = new Queue();
+
+ private ReadWorker readWorker;
+ private WriteWorker writeWorker;
+ private long lastIdleReadCheckTime = System.currentTimeMillis();
+ private long lastIdleWriteCheckTime = System.currentTimeMillis();
+
+ MultiThreadSocketIoProcessor(String threadName, Executor executor)
+ {
+ super(threadName, executor);
+ this.threadName = threadName;
+ this.executor = executor;
+ }
+
+ void addNew(SocketSessionImpl session) throws IOException
+ {
+ synchronized (newSessions)
+ {
+ newSessions.push(session);
+ }
+
+ startupWorker();
+
+ selector.wakeup();
+ writeSelector.wakeup();
+ }
+
+ void remove(SocketSessionImpl session) throws IOException
+ {
+ scheduleRemove(session);
+ startupWorker();
+ selector.wakeup();
+ }
+
+ private void startupWorker() throws IOException
+ {
+ synchronized (readLock)
+ {
+ if (readWorker == null)
+ {
+ selector = Selector.open();
+ readWorker = new ReadWorker();
+ executor.execute(new NamePreservingRunnable(readWorker));
+ }
+ }
+
+ synchronized (writeLock)
+ {
+ if (writeWorker == null)
+ {
+ writeSelector = Selector.open();
+ writeWorker = new WriteWorker();
+ executor.execute(new NamePreservingRunnable(writeWorker));
+ }
+ }
+
+ }
+
+ void flush(SocketSessionImpl session)
+ {
+ scheduleFlush(session);
+ Selector selector = this.writeSelector;
+
+ if (selector != null)
+ {
+ selector.wakeup();
+ }
+ }
+
+ void updateTrafficMask(SocketSessionImpl session)
+ {
+ scheduleTrafficControl(session);
+ Selector selector = this.selector;
+ if (selector != null)
+ {
+ selector.wakeup();
+ }
+ }
+
+ private void scheduleRemove(SocketSessionImpl session)
+ {
+ synchronized (removingSessions)
+ {
+ removingSessions.push(session);
+ }
+ }
+
+ private void scheduleFlush(SocketSessionImpl session)
+ {
+ synchronized (flushingSessionsSet)
+ {
+ //if flushingSessions grows to contain Integer.MAX_VALUE sessions
+ // then this will fail.
+ if (flushingSessionsSet.add(session))
+ {
+ flushingSessions.offer(session);
+ }
+ }
+ }
+
+ private void scheduleTrafficControl(SocketSessionImpl session)
+ {
+ synchronized (trafficControllingSessions)
+ {
+ trafficControllingSessions.push(session);
+ }
+ }
+
+ private void doAddNewReader() throws InterruptedException
+ {
+ if (newSessions.isEmpty())
+ {
+ return;
+ }
+
+ for (; ;)
+ {
+ MultiThreadSocketSessionImpl session;
+
+ synchronized (newSessions)
+ {
+ session = (MultiThreadSocketSessionImpl) newSessions.peek();
+ }
+
+ if (session == null)
+ {
+ break;
+ }
+
+ SocketChannel ch = session.getChannel();
+
+
+ try
+ {
+
+ ch.configureBlocking(false);
+ session.setSelectionKey(ch.register(selector,
+ SelectionKey.OP_READ,
+ session));
+
+ //System.out.println("ReadDebug:"+"Awaiting Registration");
+ session.awaitRegistration();
+ sessionCreated(session);
+ }
+ catch (IOException e)
+ {
+ // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
+ // and call ConnectFuture.setException().
+ session.getFilterChain().fireExceptionCaught(session, e);
+ }
+ }
+ }
+
+
+ private void doAddNewWrite() throws InterruptedException
+ {
+ if (newSessions.isEmpty())
+ {
+ return;
+ }
+
+ for (; ;)
+ {
+ MultiThreadSocketSessionImpl session;
+
+ synchronized (newSessions)
+ {
+ session = (MultiThreadSocketSessionImpl) newSessions.peek();
+ }
+
+ if (session == null)
+ {
+ break;
+ }
+
+ SocketChannel ch = session.getChannel();
+
+ try
+ {
+ ch.configureBlocking(false);
+ synchronized (flushingSessionsSet)
+ {
+ flushingSessionsSet.add(session);
+ }
+
+ session.setWriteSelectionKey(ch.register(writeSelector,
+ SelectionKey.OP_WRITE,
+ session));
+
+ //System.out.println("WriteDebug:"+"Awaiting Registration");
+ session.awaitRegistration();
+ sessionCreated(session);
+ }
+ catch (IOException e)
+ {
+
+ // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
+ // and call ConnectFuture.setException().
+ session.getFilterChain().fireExceptionCaught(session, e);
+ }
+ }
+ }
+
+
+ private void sessionCreated(SocketSessionImpl sessionParam) throws InterruptedException
+ {
+ MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam;
+ synchronized (newSessions)
+ {
+ if (!session.created())
+ {
+ _logger.debug("Popping new session");
+ newSessions.pop();
+
+ // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here
+ // in AbstractIoFilterChain.fireSessionOpened().
+ session.getServiceListeners().fireSessionCreated(session);
+
+ session.doneCreation();
+ }
+ }
+ }
+
+ private void doRemove()
+ {
+ if (removingSessions.isEmpty())
+ {
+ return;
+ }
+
+ for (; ;)
+ {
+ MultiThreadSocketSessionImpl session;
+
+ synchronized (removingSessions)
+ {
+ session = (MultiThreadSocketSessionImpl) removingSessions.pop();
+ }
+
+ if (session == null)
+ {
+ break;
+ }
+
+ SocketChannel ch = session.getChannel();
+ SelectionKey key = session.getReadSelectionKey();
+ SelectionKey writeKey = session.getWriteSelectionKey();
+
+ // Retry later if session is not yet fully initialized.
+ // (In case that Session.close() is called before addSession() is processed)
+ if (key == null || writeKey == null)
+ {
+ scheduleRemove(session);
+ break;
+ }
+ // skip if channel is already closed
+ if (!key.isValid() || !writeKey.isValid())
+ {
+ continue;
+ }
+
+ try
+ {
+ //System.out.println("ReadDebug:"+"Removing Session: " + System.identityHashCode(session));
+ synchronized (readLock)
+ {
+ key.cancel();
+ }
+ synchronized (writeLock)
+ {
+ writeKey.cancel();
+ }
+ ch.close();
+ }
+ catch (IOException e)
+ {
+ session.getFilterChain().fireExceptionCaught(session, e);
+ }
+ finally
+ {
+ releaseWriteBuffers(session);
+ session.getServiceListeners().fireSessionDestroyed(session);
+ }
+ }
+ }
+
+ private void processRead(Set selectedKeys)
+ {
+ Iterator it = selectedKeys.iterator();
+
+ while (it.hasNext())
+ {
+ SelectionKey key = (SelectionKey) it.next();
+ MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) key.attachment();
+
+ synchronized (readLock)
+ {
+ if (key.isValid() && key.isReadable() && session.getTrafficMask().isReadable())
+ {
+ read(session);
+ }
+ }
+
+ }
+
+ selectedKeys.clear();
+ }
+
+ private void processWrite(Set selectedKeys)
+ {
+ Iterator it = selectedKeys.iterator();
+
+ while (it.hasNext())
+ {
+ SelectionKey key = (SelectionKey) it.next();
+ SocketSessionImpl session = (SocketSessionImpl) key.attachment();
+
+ synchronized (writeLock)
+ {
+ if (key.isValid() && key.isWritable() && session.getTrafficMask().isWritable())
+ {
+
+ // Clear OP_WRITE
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+
+ synchronized (flushingSessionsSet)
+ {
+ flushingSessions.offer(session);
+ }
+ }
+ }
+ }
+
+ selectedKeys.clear();
+ }
+
+ private void read(SocketSessionImpl session)
+ {
+
+ //if (_loggerWrite.isDebugEnabled())
+ {
+ //System.out.println("WriteDebug:"+"Starting read for Session:" + System.identityHashCode(session));
+ }
+
+ int totalReadBytes = 0;
+
+ while (totalReadBytes <= MAX_READ_BYTES_PER_SESSION)
+ {
+ ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());
+ SocketChannel ch = session.getChannel();
+
+ try
+ {
+ buf.clear();
+
+ int readBytes = 0;
+ int ret;
+
+ try
+ {
+ while ((ret = ch.read(buf.buf())) > 0)
+ {
+ readBytes += ret;
+ totalReadBytes += ret;
+ }
+ }
+ finally
+ {
+ buf.flip();
+ }
+
+
+ if (readBytes > 0)
+ {
+ session.increaseReadBytes(readBytes);
+
+ session.getFilterChain().fireMessageReceived(session, buf);
+ buf = null;
+ }
+
+ if (ret <= 0)
+ {
+ if (ret == 0)
+ {
+ if (readBytes == session.getReadBufferSize())
+ {
+ continue;
+ }
+ }
+ else
+ {
+ scheduleRemove(session);
+ }
+
+ break;
+ }
+ }
+ catch (Throwable e)
+ {
+ if (e instanceof IOException)
+ {
+ scheduleRemove(session);
+ }
+ session.getFilterChain().fireExceptionCaught(session, e);
+
+ //Stop Reading this session.
+ return;
+ }
+ finally
+ {
+ if (buf != null)
+ {
+ buf.release();
+ }
+ }
+ }//for
+
+ // if (_loggerWrite.isDebugEnabled())
+ {
+ //System.out.println("WriteDebug:"+"Read for Session:" + System.identityHashCode(session) + " got: " + totalReadBytes);
+ }
+ }
+
+
+ private void notifyReadIdleness()
+ {
+ // process idle sessions
+ long currentTime = System.currentTimeMillis();
+ if ((currentTime - lastIdleReadCheckTime) >= 1000)
+ {
+ lastIdleReadCheckTime = currentTime;
+ Set keys = selector.keys();
+ if (keys != null)
+ {
+ for (Iterator it = keys.iterator(); it.hasNext();)
+ {
+ SelectionKey key = (SelectionKey) it.next();
+ SocketSessionImpl session = (SocketSessionImpl) key.attachment();
+ notifyReadIdleness(session, currentTime);
+ }
+ }
+ }
+ }
+
+ private void notifyWriteIdleness()
+ {
+ // process idle sessions
+ long currentTime = System.currentTimeMillis();
+ if ((currentTime - lastIdleWriteCheckTime) >= 1000)
+ {
+ lastIdleWriteCheckTime = currentTime;
+ Set keys = writeSelector.keys();
+ if (keys != null)
+ {
+ for (Iterator it = keys.iterator(); it.hasNext();)
+ {
+ SelectionKey key = (SelectionKey) it.next();
+ SocketSessionImpl session = (SocketSessionImpl) key.attachment();
+ notifyWriteIdleness(session, currentTime);
+ }
+ }
+ }
+ }
+
+ private void notifyReadIdleness(SocketSessionImpl session, long currentTime)
+ {
+ notifyIdleness0(
+ session, currentTime,
+ session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
+ IdleStatus.BOTH_IDLE,
+ Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
+ notifyIdleness0(
+ session, currentTime,
+ session.getIdleTimeInMillis(IdleStatus.READER_IDLE),
+ IdleStatus.READER_IDLE,
+ Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE)));
+
+ notifyWriteTimeout(session, currentTime, session
+ .getWriteTimeoutInMillis(), session.getLastWriteTime());
+ }
+
+ private void notifyWriteIdleness(SocketSessionImpl session, long currentTime)
+ {
+ notifyIdleness0(
+ session, currentTime,
+ session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
+ IdleStatus.BOTH_IDLE,
+ Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
+ notifyIdleness0(
+ session, currentTime,
+ session.getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
+ IdleStatus.WRITER_IDLE,
+ Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
+
+ notifyWriteTimeout(session, currentTime, session
+ .getWriteTimeoutInMillis(), session.getLastWriteTime());
+ }
+
+ private void notifyIdleness0(SocketSessionImpl session, long currentTime,
+ long idleTime, IdleStatus status,
+ long lastIoTime)
+ {
+ if (idleTime > 0 && lastIoTime != 0
+ && (currentTime - lastIoTime) >= idleTime)
+ {
+ session.increaseIdleCount(status);
+ session.getFilterChain().fireSessionIdle(session, status);
+ }
+ }
+
+ private void notifyWriteTimeout(SocketSessionImpl session,
+ long currentTime,
+ long writeTimeout, long lastIoTime)
+ {
+
+ MultiThreadSocketSessionImpl sesh = (MultiThreadSocketSessionImpl) session;
+ SelectionKey key = sesh.getWriteSelectionKey();
+
+ synchronized (writeLock)
+ {
+ if (writeTimeout > 0
+ && (currentTime - lastIoTime) >= writeTimeout
+ && key != null && key.isValid()
+ && (key.interestOps() & SelectionKey.OP_WRITE) != 0)
+ {
+ session.getFilterChain().fireExceptionCaught(session, new WriteTimeoutException());
+ }
+ }
+ }
+
+ private SocketSessionImpl getNextFlushingSession()
+ {
+ return (SocketSessionImpl) flushingSessions.poll();
+ }
+
+ private void releaseSession(SocketSessionImpl session)
+ {
+ synchronized (session.getWriteRequestQueue())
+ {
+ synchronized (flushingSessionsSet)
+ {
+ if (session.getScheduledWriteRequests() > 0)
+ {
+ if (_loggerWrite.isDebugEnabled())
+ {
+ //System.out.println("WriteDebug:"+"Reflush" + System.identityHashCode(session));
+ }
+ flushingSessions.offer(session);
+ }
+ else
+ {
+ if (_loggerWrite.isDebugEnabled())
+ {
+ //System.out.println("WriteDebug:"+"Releasing session " + System.identityHashCode(session));
+ }
+ flushingSessionsSet.remove(session);
+ }
+ }
+ }
+ }
+
+ private void releaseWriteBuffers(SocketSessionImpl session)
+ {
+ Queue writeRequestQueue = session.getWriteRequestQueue();
+ WriteRequest req;
+
+ //Should this be synchronized?
+ synchronized (writeRequestQueue)
+ {
+ while ((req = (WriteRequest) writeRequestQueue.pop()) != null)
+ {
+ try
+ {
+ ((ByteBuffer) req.getMessage()).release();
+ }
+ catch (IllegalStateException e)
+ {
+ session.getFilterChain().fireExceptionCaught(session, e);
+ }
+ finally
+ {
+ req.getFuture().setWritten(false);
+ }
+ }
+ }
+ }
+
+ private void doFlush()
+ {
+ MultiThreadSocketSessionImpl session;
+
+ while ((session = (MultiThreadSocketSessionImpl) getNextFlushingSession()) != null)
+ {
+ if (!session.isConnected())
+ {
+ releaseWriteBuffers(session);
+ releaseSession(session);
+ continue;
+ }
+
+ SelectionKey key = session.getWriteSelectionKey();
+ // Retry later if session is not yet fully initialized.
+ // (In case that Session.write() is called before addSession() is processed)
+ if (key == null)
+ {
+ scheduleFlush(session);
+ releaseSession(session);
+ continue;
+ }
+ // skip if channel is already closed
+ if (!key.isValid())
+ {
+ releaseSession(session);
+ continue;
+ }
+
+ try
+ {
+ if (doFlush(session))
+ {
+ releaseSession(session);
+ }
+ }
+ catch (IOException e)
+ {
+ releaseSession(session);
+ scheduleRemove(session);
+ session.getFilterChain().fireExceptionCaught(session, e);
+ }
+
+ }
+
+ }
+
+ private boolean doFlush(SocketSessionImpl sessionParam) throws IOException
+ {
+ MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam;
+ // Clear OP_WRITE
+ SelectionKey key = session.getWriteSelectionKey();
+ synchronized (writeLock)
+ {
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+ }
+ SocketChannel ch = session.getChannel();
+ Queue writeRequestQueue = session.getWriteRequestQueue();
+
+ long totalFlushedBytes = 0;
+ while (true)
+ {
+ WriteRequest req;
+
+ synchronized (writeRequestQueue)
+ {
+ req = (WriteRequest) writeRequestQueue.first();
+ }
+
+ if (req == null)
+ {
+ break;
+ }
+
+ ByteBuffer buf = (ByteBuffer) req.getMessage();
+ if (buf.remaining() == 0)
+ {
+ synchronized (writeRequestQueue)
+ {
+ writeRequestQueue.pop();
+ }
+
+ session.increaseWrittenMessages();
+
+ buf.reset();
+ session.getFilterChain().fireMessageSent(session, req);
+ continue;
+ }
+
+
+ int writtenBytes = 0;
+
+ // Reported as DIRMINA-362
+ //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it.
+ if (key.isWritable())
+ {
+ writtenBytes = ch.write(buf.buf());
+ totalFlushedBytes += writtenBytes;
+ }
+
+ if (writtenBytes > 0)
+ {
+ session.increaseWrittenBytes(writtenBytes);
+ }
+
+ if (buf.hasRemaining() || (totalFlushedBytes <= MAX_FLUSH_BYTES_PER_SESSION))
+ {
+ // Kernel buffer is full
+ synchronized (writeLock)
+ {
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ }
+ if (_loggerWrite.isDebugEnabled())
+ {
+ //System.out.println("WriteDebug:"+"Written BF: " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes");
+ }
+ return false;
+ }
+ }
+
+ if (_loggerWrite.isDebugEnabled())
+ {
+ //System.out.println("WriteDebug:"+"Written : " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes");
+ }
+ return true;
+ }
+
+ private void doUpdateTrafficMask()
+ {
+ if (trafficControllingSessions.isEmpty() || trafficMaskUpdateLock.isLocked())
+ {
+ return;
+ }
+
+ // Synchronize over entire operation as this method should be called
+ // from both read and write thread and we don't want the order of the
+ // updates to get changed.
+ trafficMaskUpdateLock.lock();
+ try
+ {
+ for (; ;)
+ {
+ MultiThreadSocketSessionImpl session;
+
+ session = (MultiThreadSocketSessionImpl) trafficControllingSessions.pop();
+
+ if (session == null)
+ {
+ break;
+ }
+
+ SelectionKey key = session.getReadSelectionKey();
+ // Retry later if session is not yet fully initialized.
+ // (In case that Session.suspend??() or session.resume??() is
+ // called before addSession() is processed)
+ if (key == null)
+ {
+ scheduleTrafficControl(session);
+ break;
+ }
+ // skip if channel is already closed
+ if (!key.isValid())
+ {
+ continue;
+ }
+
+ // The normal is OP_READ and, if there are write requests in the
+ // session's write queue, set OP_WRITE to trigger flushing.
+
+ //Sset to Read and Write if there is nothing then the cost
+ // is one loop through the flusher.
+ int ops = SelectionKey.OP_READ;
+
+ // Now mask the preferred ops with the mask of the current session
+ int mask = session.getTrafficMask().getInterestOps();
+ synchronized (readLock)
+ {
+ key.interestOps(ops & mask);
+ }
+ //Change key to the WriteSelection Key
+ key = session.getWriteSelectionKey();
+ if (key != null && key.isValid())
+ {
+ Queue writeRequestQueue = session.getWriteRequestQueue();
+ synchronized (writeRequestQueue)
+ {
+ if (!writeRequestQueue.isEmpty())
+ {
+ ops = SelectionKey.OP_WRITE;
+ synchronized (writeLock)
+ {
+ key.interestOps(ops & mask);
+ }
+ }
+ }
+ }
+ }
+ }
+ finally
+ {
+ trafficMaskUpdateLock.unlock();
+ }
+
+ }
+
+ private class WriteWorker implements Runnable
+ {
+
+ public void run()
+ {
+ Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Writer");
+
+ //System.out.println("WriteDebug:"+"Startup");
+ for (; ;)
+ {
+ try
+ {
+ int nKeys = writeSelector.select(SELECTOR_TIMEOUT);
+
+ doAddNewWrite();
+ doUpdateTrafficMask();
+
+ if (nKeys > 0)
+ {
+ //System.out.println("WriteDebug:"+nKeys + " keys from writeselector");
+ processWrite(writeSelector.selectedKeys());
+ }
+ else
+ {
+ //System.out.println("WriteDebug:"+"No keys from writeselector");
+ }
+
+ doRemove();
+ notifyWriteIdleness();
+
+ if (flushingSessionsSet.size() > 0)
+ {
+ doFlush();
+ }
+
+ if (writeSelector.keys().isEmpty())
+ {
+ synchronized (writeLock)
+ {
+
+ if (writeSelector.keys().isEmpty() && newSessions.isEmpty())
+ {
+ writeWorker = null;
+ try
+ {
+ writeSelector.close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ finally
+ {
+ writeSelector = null;
+ }
+
+ break;
+ }
+ }
+ }
+
+ }
+ catch (Throwable t)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(t);
+
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e1)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e1);
+ }
+ }
+ }
+ //System.out.println("WriteDebug:"+"Shutdown");
+ }
+
+ }
+
+ private class ReadWorker implements Runnable
+ {
+
+ public void run()
+ {
+ Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Reader");
+
+ //System.out.println("ReadDebug:"+"Startup");
+ for (; ;)
+ {
+ try
+ {
+ int nKeys = selector.select(SELECTOR_TIMEOUT);
+
+ doAddNewReader();
+ doUpdateTrafficMask();
+
+ if (nKeys > 0)
+ {
+ //System.out.println("ReadDebug:"+nKeys + " keys from selector");
+
+ processRead(selector.selectedKeys());
+ }
+ else
+ {
+ //System.out.println("ReadDebug:"+"No keys from selector");
+ }
+
+
+ doRemove();
+ notifyReadIdleness();
+
+ if (selector.keys().isEmpty())
+ {
+
+ synchronized (readLock)
+ {
+ if (selector.keys().isEmpty() && newSessions.isEmpty())
+ {
+ readWorker = null;
+ try
+ {
+ selector.close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ finally
+ {
+ selector = null;
+ }
+
+ break;
+ }
+ }
+ }
+ }
+ catch (Throwable t)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(t);
+
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e1)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e1);
+ }
+ }
+ }
+ //System.out.println("ReadDebug:"+"Shutdown");
+ }
+
+ }
+}