diff options
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java')
-rw-r--r-- | qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java | 1026 |
1 files changed, 0 insertions, 1026 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java deleted file mode 100644 index c23ad8686f..0000000000 --- a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java +++ /dev/null @@ -1,1026 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.mina.transport.socket.nio; - -import edu.emory.mathcs.backport.java.util.concurrent.Executor; -import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock; -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.ExceptionMonitor; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoFilter.WriteRequest; -import org.apache.mina.common.WriteTimeoutException; -import org.apache.mina.util.IdentityHashSet; -import org.apache.mina.util.NamePreservingRunnable; -import org.apache.mina.util.Queue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; -import java.util.Iterator; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -/** - * Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally. - * - * @author The Apache Directory Project (mina-dev@directory.apache.org) - * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $, - */ -class MultiThreadSocketIoProcessor extends SocketIoProcessor -{ - Logger _logger = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class); - Logger _loggerRead = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class + ".Reader"); - Logger _loggerWrite = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class + ".Writer"); - - private static final long SELECTOR_TIMEOUT = 1000L; - - private int MAX_READ_BYTES_PER_SESSION = 524288; //512K - private int MAX_FLUSH_BYTES_PER_SESSION = 524288; //512K - - private final Object readLock = new Object(); - private final Object writeLock = new Object(); - - private final String threadName; - private final Executor executor; - - private ReentrantLock trafficMaskUpdateLock = new ReentrantLock(); - - /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */ - private volatile Selector selector, writeSelector; - - private final Queue newSessions = new Queue(); - private final Queue removingSessions = new Queue(); - private final BlockingQueue flushingSessions = new LinkedBlockingQueue(); - private final IdentityHashSet flushingSessionsSet = new IdentityHashSet(); - - private final Queue trafficControllingSessions = new Queue(); - - private ReadWorker readWorker; - private WriteWorker writeWorker; - private long lastIdleReadCheckTime = System.currentTimeMillis(); - private long lastIdleWriteCheckTime = System.currentTimeMillis(); - - MultiThreadSocketIoProcessor(String threadName, Executor executor) - { - super(threadName, executor); - this.threadName = threadName; - this.executor = executor; - } - - void addNew(SocketSessionImpl session) throws IOException - { - synchronized (newSessions) - { - newSessions.push(session); - } - - startupWorker(); - - selector.wakeup(); - writeSelector.wakeup(); - } - - void remove(SocketSessionImpl session) throws IOException - { - scheduleRemove(session); - startupWorker(); - selector.wakeup(); - } - - private void startupWorker() throws IOException - { - synchronized (readLock) - { - if (readWorker == null) - { - selector = Selector.open(); - readWorker = new ReadWorker(); - executor.execute(new NamePreservingRunnable(readWorker)); - } - } - - synchronized (writeLock) - { - if (writeWorker == null) - { - writeSelector = Selector.open(); - writeWorker = new WriteWorker(); - executor.execute(new NamePreservingRunnable(writeWorker)); - } - } - - } - - void flush(SocketSessionImpl session) - { - scheduleFlush(session); - Selector selector = this.writeSelector; - - if (selector != null) - { - selector.wakeup(); - } - } - - void updateTrafficMask(SocketSessionImpl session) - { - scheduleTrafficControl(session); - Selector selector = this.selector; - if (selector != null) - { - selector.wakeup(); - } - } - - private void scheduleRemove(SocketSessionImpl session) - { - synchronized (removingSessions) - { - removingSessions.push(session); - } - } - - private void scheduleFlush(SocketSessionImpl session) - { - synchronized (flushingSessionsSet) - { - //if flushingSessions grows to contain Integer.MAX_VALUE sessions - // then this will fail. - if (flushingSessionsSet.add(session)) - { - flushingSessions.offer(session); - } - } - } - - private void scheduleTrafficControl(SocketSessionImpl session) - { - synchronized (trafficControllingSessions) - { - trafficControllingSessions.push(session); - } - } - - private void doAddNewReader() throws InterruptedException - { - if (newSessions.isEmpty()) - { - return; - } - - for (; ;) - { - MultiThreadSocketSessionImpl session; - - synchronized (newSessions) - { - session = (MultiThreadSocketSessionImpl) newSessions.peek(); - } - - if (session == null) - { - break; - } - - SocketChannel ch = session.getChannel(); - - - try - { - - ch.configureBlocking(false); - session.setSelectionKey(ch.register(selector, - SelectionKey.OP_READ, - session)); - - //System.out.println("ReadDebug:"+"Awaiting Registration"); - session.awaitRegistration(); - sessionCreated(session); - } - catch (IOException e) - { - // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute - // and call ConnectFuture.setException(). - session.getFilterChain().fireExceptionCaught(session, e); - } - } - } - - - private void doAddNewWrite() throws InterruptedException - { - if (newSessions.isEmpty()) - { - return; - } - - for (; ;) - { - MultiThreadSocketSessionImpl session; - - synchronized (newSessions) - { - session = (MultiThreadSocketSessionImpl) newSessions.peek(); - } - - if (session == null) - { - break; - } - - SocketChannel ch = session.getChannel(); - - try - { - ch.configureBlocking(false); - synchronized (flushingSessionsSet) - { - flushingSessionsSet.add(session); - } - - session.setWriteSelectionKey(ch.register(writeSelector, - SelectionKey.OP_WRITE, - session)); - - //System.out.println("WriteDebug:"+"Awaiting Registration"); - session.awaitRegistration(); - sessionCreated(session); - } - catch (IOException e) - { - - // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute - // and call ConnectFuture.setException(). - session.getFilterChain().fireExceptionCaught(session, e); - } - } - } - - - private void sessionCreated(SocketSessionImpl sessionParam) throws InterruptedException - { - MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam; - synchronized (newSessions) - { - if (!session.created()) - { - _logger.debug("Popping new session"); - newSessions.pop(); - - // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here - // in AbstractIoFilterChain.fireSessionOpened(). - session.getServiceListeners().fireSessionCreated(session); - - session.doneCreation(); - } - } - } - - private void doRemove() - { - if (removingSessions.isEmpty()) - { - return; - } - - for (; ;) - { - MultiThreadSocketSessionImpl session; - - synchronized (removingSessions) - { - session = (MultiThreadSocketSessionImpl) removingSessions.pop(); - } - - if (session == null) - { - break; - } - - SocketChannel ch = session.getChannel(); - SelectionKey key = session.getReadSelectionKey(); - SelectionKey writeKey = session.getWriteSelectionKey(); - - // Retry later if session is not yet fully initialized. - // (In case that Session.close() is called before addSession() is processed) - if (key == null || writeKey == null) - { - scheduleRemove(session); - break; - } - // skip if channel is already closed - if (!key.isValid() || !writeKey.isValid()) - { - continue; - } - - try - { - //System.out.println("ReadDebug:"+"Removing Session: " + System.identityHashCode(session)); - synchronized (readLock) - { - key.cancel(); - } - synchronized (writeLock) - { - writeKey.cancel(); - } - ch.close(); - } - catch (IOException e) - { - session.getFilterChain().fireExceptionCaught(session, e); - } - finally - { - releaseWriteBuffers(session); - session.getServiceListeners().fireSessionDestroyed(session); - } - } - } - - private void processRead(Set selectedKeys) - { - Iterator it = selectedKeys.iterator(); - - while (it.hasNext()) - { - SelectionKey key = (SelectionKey) it.next(); - MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) key.attachment(); - - synchronized (readLock) - { - if (key.isValid() && key.isReadable() && session.getTrafficMask().isReadable()) - { - read(session); - } - } - - } - - selectedKeys.clear(); - } - - private void processWrite(Set selectedKeys) - { - Iterator it = selectedKeys.iterator(); - - while (it.hasNext()) - { - SelectionKey key = (SelectionKey) it.next(); - SocketSessionImpl session = (SocketSessionImpl) key.attachment(); - - synchronized (writeLock) - { - if (key.isValid() && key.isWritable() && session.getTrafficMask().isWritable()) - { - - // Clear OP_WRITE - key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); - - synchronized (flushingSessionsSet) - { - flushingSessions.offer(session); - } - } - } - } - - selectedKeys.clear(); - } - - private void read(SocketSessionImpl session) - { - - //if (_loggerWrite.isDebugEnabled()) - { - //System.out.println("WriteDebug:"+"Starting read for Session:" + System.identityHashCode(session)); - } - - int totalReadBytes = 0; - - while (totalReadBytes <= MAX_READ_BYTES_PER_SESSION) - { - ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize()); - SocketChannel ch = session.getChannel(); - - try - { - buf.clear(); - - int readBytes = 0; - int ret; - - try - { - while ((ret = ch.read(buf.buf())) > 0) - { - readBytes += ret; - totalReadBytes += ret; - } - } - finally - { - buf.flip(); - } - - - if (readBytes > 0) - { - session.increaseReadBytes(readBytes); - - session.getFilterChain().fireMessageReceived(session, buf); - buf = null; - } - - if (ret <= 0) - { - if (ret == 0) - { - if (readBytes == session.getReadBufferSize()) - { - continue; - } - } - else - { - scheduleRemove(session); - } - - break; - } - } - catch (Throwable e) - { - if (e instanceof IOException) - { - scheduleRemove(session); - } - session.getFilterChain().fireExceptionCaught(session, e); - - //Stop Reading this session. - return; - } - finally - { - if (buf != null) - { - buf.release(); - } - } - }//for - - // if (_loggerWrite.isDebugEnabled()) - { - //System.out.println("WriteDebug:"+"Read for Session:" + System.identityHashCode(session) + " got: " + totalReadBytes); - } - } - - - private void notifyReadIdleness() - { - // process idle sessions - long currentTime = System.currentTimeMillis(); - if ((currentTime - lastIdleReadCheckTime) >= 1000) - { - lastIdleReadCheckTime = currentTime; - Set keys = selector.keys(); - if (keys != null) - { - for (Iterator it = keys.iterator(); it.hasNext();) - { - SelectionKey key = (SelectionKey) it.next(); - SocketSessionImpl session = (SocketSessionImpl) key.attachment(); - notifyReadIdleness(session, currentTime); - } - } - } - } - - private void notifyWriteIdleness() - { - // process idle sessions - long currentTime = System.currentTimeMillis(); - if ((currentTime - lastIdleWriteCheckTime) >= 1000) - { - lastIdleWriteCheckTime = currentTime; - Set keys = writeSelector.keys(); - if (keys != null) - { - for (Iterator it = keys.iterator(); it.hasNext();) - { - SelectionKey key = (SelectionKey) it.next(); - SocketSessionImpl session = (SocketSessionImpl) key.attachment(); - notifyWriteIdleness(session, currentTime); - } - } - } - } - - private void notifyReadIdleness(SocketSessionImpl session, long currentTime) - { - notifyIdleness0( - session, currentTime, - session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), - IdleStatus.BOTH_IDLE, - Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); - notifyIdleness0( - session, currentTime, - session.getIdleTimeInMillis(IdleStatus.READER_IDLE), - IdleStatus.READER_IDLE, - Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE))); - - notifyWriteTimeout(session, currentTime, session - .getWriteTimeoutInMillis(), session.getLastWriteTime()); - } - - private void notifyWriteIdleness(SocketSessionImpl session, long currentTime) - { - notifyIdleness0( - session, currentTime, - session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), - IdleStatus.BOTH_IDLE, - Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); - notifyIdleness0( - session, currentTime, - session.getIdleTimeInMillis(IdleStatus.WRITER_IDLE), - IdleStatus.WRITER_IDLE, - Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE))); - - notifyWriteTimeout(session, currentTime, session - .getWriteTimeoutInMillis(), session.getLastWriteTime()); - } - - private void notifyIdleness0(SocketSessionImpl session, long currentTime, - long idleTime, IdleStatus status, - long lastIoTime) - { - if (idleTime > 0 && lastIoTime != 0 - && (currentTime - lastIoTime) >= idleTime) - { - session.increaseIdleCount(status); - session.getFilterChain().fireSessionIdle(session, status); - } - } - - private void notifyWriteTimeout(SocketSessionImpl session, - long currentTime, - long writeTimeout, long lastIoTime) - { - - MultiThreadSocketSessionImpl sesh = (MultiThreadSocketSessionImpl) session; - SelectionKey key = sesh.getWriteSelectionKey(); - - synchronized (writeLock) - { - if (writeTimeout > 0 - && (currentTime - lastIoTime) >= writeTimeout - && key != null && key.isValid() - && (key.interestOps() & SelectionKey.OP_WRITE) != 0) - { - session.getFilterChain().fireExceptionCaught(session, new WriteTimeoutException()); - } - } - } - - private SocketSessionImpl getNextFlushingSession() - { - return (SocketSessionImpl) flushingSessions.poll(); - } - - private void releaseSession(SocketSessionImpl session) - { - synchronized (session.getWriteRequestQueue()) - { - synchronized (flushingSessionsSet) - { - if (session.getScheduledWriteRequests() > 0) - { - if (_loggerWrite.isDebugEnabled()) - { - //System.out.println("WriteDebug:"+"Reflush" + System.identityHashCode(session)); - } - flushingSessions.offer(session); - } - else - { - if (_loggerWrite.isDebugEnabled()) - { - //System.out.println("WriteDebug:"+"Releasing session " + System.identityHashCode(session)); - } - flushingSessionsSet.remove(session); - } - } - } - } - - private void releaseWriteBuffers(SocketSessionImpl session) - { - Queue writeRequestQueue = session.getWriteRequestQueue(); - WriteRequest req; - - //Should this be synchronized? - synchronized (writeRequestQueue) - { - while ((req = (WriteRequest) writeRequestQueue.pop()) != null) - { - try - { - ((ByteBuffer) req.getMessage()).release(); - } - catch (IllegalStateException e) - { - session.getFilterChain().fireExceptionCaught(session, e); - } - finally - { - req.getFuture().setWritten(false); - } - } - } - } - - private void doFlush() - { - MultiThreadSocketSessionImpl session; - - while ((session = (MultiThreadSocketSessionImpl) getNextFlushingSession()) != null) - { - if (!session.isConnected()) - { - releaseWriteBuffers(session); - releaseSession(session); - continue; - } - - SelectionKey key = session.getWriteSelectionKey(); - // Retry later if session is not yet fully initialized. - // (In case that Session.write() is called before addSession() is processed) - if (key == null) - { - scheduleFlush(session); - releaseSession(session); - continue; - } - // skip if channel is already closed - if (!key.isValid()) - { - releaseSession(session); - continue; - } - - try - { - if (doFlush(session)) - { - releaseSession(session); - } - } - catch (IOException e) - { - releaseSession(session); - scheduleRemove(session); - session.getFilterChain().fireExceptionCaught(session, e); - } - - } - - } - - private boolean doFlush(SocketSessionImpl sessionParam) throws IOException - { - MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam; - // Clear OP_WRITE - SelectionKey key = session.getWriteSelectionKey(); - synchronized (writeLock) - { - key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); - } - SocketChannel ch = session.getChannel(); - Queue writeRequestQueue = session.getWriteRequestQueue(); - - long totalFlushedBytes = 0; - while (true) - { - WriteRequest req; - - synchronized (writeRequestQueue) - { - req = (WriteRequest) writeRequestQueue.first(); - } - - if (req == null) - { - break; - } - - ByteBuffer buf = (ByteBuffer) req.getMessage(); - if (buf.remaining() == 0) - { - synchronized (writeRequestQueue) - { - writeRequestQueue.pop(); - } - - session.increaseWrittenMessages(); - - buf.reset(); - session.getFilterChain().fireMessageSent(session, req); - continue; - } - - - int writtenBytes = 0; - - // Reported as DIRMINA-362 - //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it. - if (key.isWritable()) - { - writtenBytes = ch.write(buf.buf()); - totalFlushedBytes += writtenBytes; - } - - if (writtenBytes > 0) - { - session.increaseWrittenBytes(writtenBytes); - } - - if (buf.hasRemaining() || (totalFlushedBytes <= MAX_FLUSH_BYTES_PER_SESSION)) - { - // Kernel buffer is full - synchronized (writeLock) - { - key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); - } - if (_loggerWrite.isDebugEnabled()) - { - //System.out.println("WriteDebug:"+"Written BF: " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes"); - } - return false; - } - } - - if (_loggerWrite.isDebugEnabled()) - { - //System.out.println("WriteDebug:"+"Written : " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes"); - } - return true; - } - - private void doUpdateTrafficMask() - { - if (trafficControllingSessions.isEmpty() || trafficMaskUpdateLock.isLocked()) - { - return; - } - - // Synchronize over entire operation as this method should be called - // from both read and write thread and we don't want the order of the - // updates to get changed. - trafficMaskUpdateLock.lock(); - try - { - for (; ;) - { - MultiThreadSocketSessionImpl session; - - session = (MultiThreadSocketSessionImpl) trafficControllingSessions.pop(); - - if (session == null) - { - break; - } - - SelectionKey key = session.getReadSelectionKey(); - // Retry later if session is not yet fully initialized. - // (In case that Session.suspend??() or session.resume??() is - // called before addSession() is processed) - if (key == null) - { - scheduleTrafficControl(session); - break; - } - // skip if channel is already closed - if (!key.isValid()) - { - continue; - } - - // The normal is OP_READ and, if there are write requests in the - // session's write queue, set OP_WRITE to trigger flushing. - - //Sset to Read and Write if there is nothing then the cost - // is one loop through the flusher. - int ops = SelectionKey.OP_READ; - - // Now mask the preferred ops with the mask of the current session - int mask = session.getTrafficMask().getInterestOps(); - synchronized (readLock) - { - key.interestOps(ops & mask); - } - //Change key to the WriteSelection Key - key = session.getWriteSelectionKey(); - if (key != null && key.isValid()) - { - Queue writeRequestQueue = session.getWriteRequestQueue(); - synchronized (writeRequestQueue) - { - if (!writeRequestQueue.isEmpty()) - { - ops = SelectionKey.OP_WRITE; - synchronized (writeLock) - { - key.interestOps(ops & mask); - } - } - } - } - } - } - finally - { - trafficMaskUpdateLock.unlock(); - } - - } - - private class WriteWorker implements Runnable - { - - public void run() - { - Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Writer"); - - //System.out.println("WriteDebug:"+"Startup"); - for (; ;) - { - try - { - int nKeys = writeSelector.select(SELECTOR_TIMEOUT); - - doAddNewWrite(); - doUpdateTrafficMask(); - - if (nKeys > 0) - { - //System.out.println("WriteDebug:"+nKeys + " keys from writeselector"); - processWrite(writeSelector.selectedKeys()); - } - else - { - //System.out.println("WriteDebug:"+"No keys from writeselector"); - } - - doRemove(); - notifyWriteIdleness(); - - if (flushingSessionsSet.size() > 0) - { - doFlush(); - } - - if (writeSelector.keys().isEmpty()) - { - synchronized (writeLock) - { - - if (writeSelector.keys().isEmpty() && newSessions.isEmpty()) - { - writeWorker = null; - try - { - writeSelector.close(); - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - finally - { - writeSelector = null; - } - - break; - } - } - } - - } - catch (Throwable t) - { - ExceptionMonitor.getInstance().exceptionCaught(t); - - try - { - Thread.sleep(1000); - } - catch (InterruptedException e1) - { - ExceptionMonitor.getInstance().exceptionCaught(e1); - } - } - } - //System.out.println("WriteDebug:"+"Shutdown"); - } - - } - - private class ReadWorker implements Runnable - { - - public void run() - { - Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Reader"); - - //System.out.println("ReadDebug:"+"Startup"); - for (; ;) - { - try - { - int nKeys = selector.select(SELECTOR_TIMEOUT); - - doAddNewReader(); - doUpdateTrafficMask(); - - if (nKeys > 0) - { - //System.out.println("ReadDebug:"+nKeys + " keys from selector"); - - processRead(selector.selectedKeys()); - } - else - { - //System.out.println("ReadDebug:"+"No keys from selector"); - } - - - doRemove(); - notifyReadIdleness(); - - if (selector.keys().isEmpty()) - { - - synchronized (readLock) - { - if (selector.keys().isEmpty() && newSessions.isEmpty()) - { - readWorker = null; - try - { - selector.close(); - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - finally - { - selector = null; - } - - break; - } - } - } - } - catch (Throwable t) - { - ExceptionMonitor.getInstance().exceptionCaught(t); - - try - { - Thread.sleep(1000); - } - catch (InterruptedException e1) - { - ExceptionMonitor.getInstance().exceptionCaught(e1); - } - } - } - //System.out.println("ReadDebug:"+"Shutdown"); - } - - } -} |