/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.mina.transport.socket.nio; import edu.emory.mathcs.backport.java.util.concurrent.Executor; import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.ExceptionMonitor; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoFilter.WriteRequest; import org.apache.mina.common.WriteTimeoutException; import org.apache.mina.util.IdentityHashSet; import org.apache.mina.util.NamePreservingRunnable; import org.apache.mina.util.Queue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; /** * Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally. * * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $, */ class MultiThreadSocketIoProcessor extends SocketIoProcessor { Logger _logger = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class); Logger _loggerRead = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class + ".Reader"); Logger _loggerWrite = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class + ".Writer"); private static final long SELECTOR_TIMEOUT = 1000L; private int MAX_READ_BYTES_PER_SESSION = 524288; //512K private int MAX_FLUSH_BYTES_PER_SESSION = 524288; //512K private final Object readLock = new Object(); private final Object writeLock = new Object(); private final String threadName; private final Executor executor; private ReentrantLock trafficMaskUpdateLock = new ReentrantLock(); /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */ private volatile Selector selector, writeSelector; private final Queue newSessions = new Queue(); private final Queue removingSessions = new Queue(); private final BlockingQueue flushingSessions = new LinkedBlockingQueue(); private final IdentityHashSet flushingSessionsSet = new IdentityHashSet(); private final Queue trafficControllingSessions = new Queue(); private ReadWorker readWorker; private WriteWorker writeWorker; private long lastIdleReadCheckTime = System.currentTimeMillis(); private long lastIdleWriteCheckTime = System.currentTimeMillis(); MultiThreadSocketIoProcessor(String threadName, Executor executor) { super(threadName, executor); this.threadName = threadName; this.executor = executor; } void addNew(SocketSessionImpl session) throws IOException { synchronized (newSessions) { newSessions.push(session); } startupWorker(); selector.wakeup(); writeSelector.wakeup(); } void remove(SocketSessionImpl session) throws IOException { scheduleRemove(session); startupWorker(); selector.wakeup(); } private void startupWorker() throws IOException { synchronized (readLock) { if (readWorker == null) { selector = Selector.open(); readWorker = new ReadWorker(); executor.execute(new NamePreservingRunnable(readWorker)); } } synchronized (writeLock) { if (writeWorker == null) { writeSelector = Selector.open(); writeWorker = new WriteWorker(); executor.execute(new NamePreservingRunnable(writeWorker)); } } } void flush(SocketSessionImpl session) { scheduleFlush(session); Selector selector = this.writeSelector; if (selector != null) { selector.wakeup(); } } void updateTrafficMask(SocketSessionImpl session) { scheduleTrafficControl(session); Selector selector = this.selector; if (selector != null) { selector.wakeup(); } } private void scheduleRemove(SocketSessionImpl session) { synchronized (removingSessions) { removingSessions.push(session); } } private void scheduleFlush(SocketSessionImpl session) { synchronized (flushingSessionsSet) { //if flushingSessions grows to contain Integer.MAX_VALUE sessions // then this will fail. if (flushingSessionsSet.add(session)) { flushingSessions.offer(session); } } } private void scheduleTrafficControl(SocketSessionImpl session) { synchronized (trafficControllingSessions) { trafficControllingSessions.push(session); } } private void doAddNewReader() throws InterruptedException { if (newSessions.isEmpty()) { return; } for (; ;) { MultiThreadSocketSessionImpl session; synchronized (newSessions) { session = (MultiThreadSocketSessionImpl) newSessions.peek(); } if (session == null) { break; } SocketChannel ch = session.getChannel(); try { ch.configureBlocking(false); session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session)); //System.out.println("ReadDebug:"+"Awaiting Registration"); session.awaitRegistration(); sessionCreated(session); } catch (IOException e) { // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute // and call ConnectFuture.setException(). session.getFilterChain().fireExceptionCaught(session, e); } } } private void doAddNewWrite() throws InterruptedException { if (newSessions.isEmpty()) { return; } for (; ;) { MultiThreadSocketSessionImpl session; synchronized (newSessions) { session = (MultiThreadSocketSessionImpl) newSessions.peek(); } if (session == null) { break; } SocketChannel ch = session.getChannel(); try { ch.configureBlocking(false); synchronized (flushingSessionsSet) { flushingSessionsSet.add(session); } session.setWriteSelectionKey(ch.register(writeSelector, SelectionKey.OP_WRITE, session)); //System.out.println("WriteDebug:"+"Awaiting Registration"); session.awaitRegistration(); sessionCreated(session); } catch (IOException e) { // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute // and call ConnectFuture.setException(). session.getFilterChain().fireExceptionCaught(session, e); } } } private void sessionCreated(SocketSessionImpl sessionParam) throws InterruptedException { MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam; synchronized (newSessions) { if (!session.created()) { _logger.debug("Popping new session"); newSessions.pop(); // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here // in AbstractIoFilterChain.fireSessionOpened(). session.getServiceListeners().fireSessionCreated(session); session.doneCreation(); } } } private void doRemove() { if (removingSessions.isEmpty()) { return; } for (; ;) { MultiThreadSocketSessionImpl session; synchronized (removingSessions) { session = (MultiThreadSocketSessionImpl) removingSessions.pop(); } if (session == null) { break; } SocketChannel ch = session.getChannel(); SelectionKey key = session.getReadSelectionKey(); SelectionKey writeKey = session.getWriteSelectionKey(); // Retry later if session is not yet fully initialized. // (In case that Session.close() is called before addSession() is processed) if (key == null || writeKey == null) { scheduleRemove(session); break; } // skip if channel is already closed if (!key.isValid() || !writeKey.isValid()) { continue; } try { //System.out.println("ReadDebug:"+"Removing Session: " + System.identityHashCode(session)); synchronized (readLock) { key.cancel(); } synchronized (writeLock) { writeKey.cancel(); } ch.close(); } catch (IOException e) { session.getFilterChain().fireExceptionCaught(session, e); } finally { releaseWriteBuffers(session); session.getServiceListeners().fireSessionDestroyed(session); } } } private void processRead(Set selectedKeys) { Iterator it = selectedKeys.iterator(); while (it.hasNext()) { SelectionKey key = (SelectionKey) it.next(); MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) key.attachment(); synchronized (readLock) { if (key.isValid() && key.isReadable() && session.getTrafficMask().isReadable()) { read(session); } } } selectedKeys.clear(); } private void processWrite(Set selectedKeys) { Iterator it = selectedKeys.iterator(); while (it.hasNext()) { SelectionKey key = (SelectionKey) it.next(); SocketSessionImpl session = (SocketSessionImpl) key.attachment(); synchronized (writeLock) { if (key.isValid() && key.isWritable() && session.getTrafficMask().isWritable()) { // Clear OP_WRITE key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); synchronized (flushingSessionsSet) { flushingSessions.offer(session); } } } } selectedKeys.clear(); } private void read(SocketSessionImpl session) { //if (_loggerWrite.isDebugEnabled()) { //System.out.println("WriteDebug:"+"Starting read for Session:" + System.identityHashCode(session)); } int totalReadBytes = 0; while (totalReadBytes <= MAX_READ_BYTES_PER_SESSION) { ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize()); SocketChannel ch = session.getChannel(); try { buf.clear(); int readBytes = 0; int ret; try { while ((ret = ch.read(buf.buf())) > 0) { readBytes += ret; totalReadBytes += ret; } } finally { buf.flip(); } if (readBytes > 0) { session.increaseReadBytes(readBytes); session.getFilterChain().fireMessageReceived(session, buf); buf = null; } if (ret <= 0) { if (ret == 0) { if (readBytes == session.getReadBufferSize()) { continue; } } else { scheduleRemove(session); } break; } } catch (Throwable e) { if (e instanceof IOException) { scheduleRemove(session); } session.getFilterChain().fireExceptionCaught(session, e); //Stop Reading this session. return; } finally { if (buf != null) { buf.release(); } } }//for // if (_loggerWrite.isDebugEnabled()) { //System.out.println("WriteDebug:"+"Read for Session:" + System.identityHashCode(session) + " got: " + totalReadBytes); } } private void notifyReadIdleness() { // process idle sessions long currentTime = System.currentTimeMillis(); if ((currentTime - lastIdleReadCheckTime) >= 1000) { lastIdleReadCheckTime = currentTime; Set keys = selector.keys(); if (keys != null) { for (Iterator it = keys.iterator(); it.hasNext();) { SelectionKey key = (SelectionKey) it.next(); SocketSessionImpl session = (SocketSessionImpl) key.attachment(); notifyReadIdleness(session, currentTime); } } } } private void notifyWriteIdleness() { // process idle sessions long currentTime = System.currentTimeMillis(); if ((currentTime - lastIdleWriteCheckTime) >= 1000) { lastIdleWriteCheckTime = currentTime; Set keys = writeSelector.keys(); if (keys != null) { for (Iterator it = keys.iterator(); it.hasNext();) { SelectionKey key = (SelectionKey) it.next(); SocketSessionImpl session = (SocketSessionImpl) key.attachment(); notifyWriteIdleness(session, currentTime); } } } } private void notifyReadIdleness(SocketSessionImpl session, long currentTime) { notifyIdleness0( session, currentTime, session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); notifyIdleness0( session, currentTime, session.getIdleTimeInMillis(IdleStatus.READER_IDLE), IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE))); notifyWriteTimeout(session, currentTime, session .getWriteTimeoutInMillis(), session.getLastWriteTime()); } private void notifyWriteIdleness(SocketSessionImpl session, long currentTime) { notifyIdleness0( session, currentTime, session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); notifyIdleness0( session, currentTime, session.getIdleTimeInMillis(IdleStatus.WRITER_IDLE), IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE))); notifyWriteTimeout(session, currentTime, session .getWriteTimeoutInMillis(), session.getLastWriteTime()); } private void notifyIdleness0(SocketSessionImpl session, long currentTime, long idleTime, IdleStatus status, long lastIoTime) { if (idleTime > 0 && lastIoTime != 0 && (currentTime - lastIoTime) >= idleTime) { session.increaseIdleCount(status); session.getFilterChain().fireSessionIdle(session, status); } } private void notifyWriteTimeout(SocketSessionImpl session, long currentTime, long writeTimeout, long lastIoTime) { MultiThreadSocketSessionImpl sesh = (MultiThreadSocketSessionImpl) session; SelectionKey key = sesh.getWriteSelectionKey(); synchronized (writeLock) { if (writeTimeout > 0 && (currentTime - lastIoTime) >= writeTimeout && key != null && key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE) != 0) { session.getFilterChain().fireExceptionCaught(session, new WriteTimeoutException()); } } } private SocketSessionImpl getNextFlushingSession() { return (SocketSessionImpl) flushingSessions.poll(); } private void releaseSession(SocketSessionImpl session) { synchronized (session.getWriteRequestQueue()) { synchronized (flushingSessionsSet) { if (session.getScheduledWriteRequests() > 0) { if (_loggerWrite.isDebugEnabled()) { //System.out.println("WriteDebug:"+"Reflush" + System.identityHashCode(session)); } flushingSessions.offer(session); } else { if (_loggerWrite.isDebugEnabled()) { //System.out.println("WriteDebug:"+"Releasing session " + System.identityHashCode(session)); } flushingSessionsSet.remove(session); } } } } private void releaseWriteBuffers(SocketSessionImpl session) { Queue writeRequestQueue = session.getWriteRequestQueue(); WriteRequest req; //Should this be synchronized? synchronized (writeRequestQueue) { while ((req = (WriteRequest) writeRequestQueue.pop()) != null) { try { ((ByteBuffer) req.getMessage()).release(); } catch (IllegalStateException e) { session.getFilterChain().fireExceptionCaught(session, e); } finally { req.getFuture().setWritten(false); } } } } private void doFlush() { MultiThreadSocketSessionImpl session; while ((session = (MultiThreadSocketSessionImpl) getNextFlushingSession()) != null) { if (!session.isConnected()) { releaseWriteBuffers(session); releaseSession(session); continue; } SelectionKey key = session.getWriteSelectionKey(); // Retry later if session is not yet fully initialized. // (In case that Session.write() is called before addSession() is processed) if (key == null) { scheduleFlush(session); releaseSession(session); continue; } // skip if channel is already closed if (!key.isValid()) { releaseSession(session); continue; } try { if (doFlush(session)) { releaseSession(session); } } catch (IOException e) { releaseSession(session); scheduleRemove(session); session.getFilterChain().fireExceptionCaught(session, e); } } } private boolean doFlush(SocketSessionImpl sessionParam) throws IOException { MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam; // Clear OP_WRITE SelectionKey key = session.getWriteSelectionKey(); synchronized (writeLock) { key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); } SocketChannel ch = session.getChannel(); Queue writeRequestQueue = session.getWriteRequestQueue(); long totalFlushedBytes = 0; while (true) { WriteRequest req; synchronized (writeRequestQueue) { req = (WriteRequest) writeRequestQueue.first(); } if (req == null) { break; } ByteBuffer buf = (ByteBuffer) req.getMessage(); if (buf.remaining() == 0) { synchronized (writeRequestQueue) { writeRequestQueue.pop(); } session.increaseWrittenMessages(); buf.reset(); session.getFilterChain().fireMessageSent(session, req); continue; } int writtenBytes = 0; // Reported as DIRMINA-362 //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it. if (key.isWritable()) { writtenBytes = ch.write(buf.buf()); totalFlushedBytes += writtenBytes; } if (writtenBytes > 0) { session.increaseWrittenBytes(writtenBytes); } if (buf.hasRemaining() || (totalFlushedBytes <= MAX_FLUSH_BYTES_PER_SESSION)) { // Kernel buffer is full synchronized (writeLock) { key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); } if (_loggerWrite.isDebugEnabled()) { //System.out.println("WriteDebug:"+"Written BF: " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes"); } return false; } } if (_loggerWrite.isDebugEnabled()) { //System.out.println("WriteDebug:"+"Written : " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes"); } return true; } private void doUpdateTrafficMask() { if (trafficControllingSessions.isEmpty() || trafficMaskUpdateLock.isLocked()) { return; } // Synchronize over entire operation as this method should be called // from both read and write thread and we don't want the order of the // updates to get changed. trafficMaskUpdateLock.lock(); try { for (; ;) { MultiThreadSocketSessionImpl session; session = (MultiThreadSocketSessionImpl) trafficControllingSessions.pop(); if (session == null) { break; } SelectionKey key = session.getReadSelectionKey(); // Retry later if session is not yet fully initialized. // (In case that Session.suspend??() or session.resume??() is // called before addSession() is processed) if (key == null) { scheduleTrafficControl(session); break; } // skip if channel is already closed if (!key.isValid()) { continue; } // The normal is OP_READ and, if there are write requests in the // session's write queue, set OP_WRITE to trigger flushing. //Sset to Read and Write if there is nothing then the cost // is one loop through the flusher. int ops = SelectionKey.OP_READ; // Now mask the preferred ops with the mask of the current session int mask = session.getTrafficMask().getInterestOps(); synchronized (readLock) { key.interestOps(ops & mask); } //Change key to the WriteSelection Key key = session.getWriteSelectionKey(); if (key != null && key.isValid()) { Queue writeRequestQueue = session.getWriteRequestQueue(); synchronized (writeRequestQueue) { if (!writeRequestQueue.isEmpty()) { ops = SelectionKey.OP_WRITE; synchronized (writeLock) { key.interestOps(ops & mask); } } } } } } finally { trafficMaskUpdateLock.unlock(); } } private class WriteWorker implements Runnable { public void run() { Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Writer"); //System.out.println("WriteDebug:"+"Startup"); for (; ;) { try { int nKeys = writeSelector.select(SELECTOR_TIMEOUT); doAddNewWrite(); doUpdateTrafficMask(); if (nKeys > 0) { //System.out.println("WriteDebug:"+nKeys + " keys from writeselector"); processWrite(writeSelector.selectedKeys()); } else { //System.out.println("WriteDebug:"+"No keys from writeselector"); } doRemove(); notifyWriteIdleness(); if (flushingSessionsSet.size() > 0) { doFlush(); } if (writeSelector.keys().isEmpty()) { synchronized (writeLock) { if (writeSelector.keys().isEmpty() && newSessions.isEmpty()) { writeWorker = null; try { writeSelector.close(); } catch (IOException e) { ExceptionMonitor.getInstance().exceptionCaught(e); } finally { writeSelector = null; } break; } } } } catch (Throwable t) { ExceptionMonitor.getInstance().exceptionCaught(t); try { Thread.sleep(1000); } catch (InterruptedException e1) { ExceptionMonitor.getInstance().exceptionCaught(e1); } } } //System.out.println("WriteDebug:"+"Shutdown"); } } private class ReadWorker implements Runnable { public void run() { Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Reader"); //System.out.println("ReadDebug:"+"Startup"); for (; ;) { try { int nKeys = selector.select(SELECTOR_TIMEOUT); doAddNewReader(); doUpdateTrafficMask(); if (nKeys > 0) { //System.out.println("ReadDebug:"+nKeys + " keys from selector"); processRead(selector.selectedKeys()); } else { //System.out.println("ReadDebug:"+"No keys from selector"); } doRemove(); notifyReadIdleness(); if (selector.keys().isEmpty()) { synchronized (readLock) { if (selector.keys().isEmpty() && newSessions.isEmpty()) { readWorker = null; try { selector.close(); } catch (IOException e) { ExceptionMonitor.getInstance().exceptionCaught(e); } finally { selector = null; } break; } } } } catch (Throwable t) { ExceptionMonitor.getInstance().exceptionCaught(t); try { Thread.sleep(1000); } catch (InterruptedException e1) { ExceptionMonitor.getInstance().exceptionCaught(e1); } } } //System.out.println("ReadDebug:"+"Shutdown"); } } }