/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.mina.filter; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.DefaultIoFilterChainBuilder; import org.apache.mina.common.IoFilterAdapter; import org.apache.mina.common.IoFilterChain; import org.apache.mina.common.IoSession; import org.apache.mina.filter.executor.ExecutorFilter; import java.util.Iterator; import java.util.List; /** * This filter will turn the asynchronous filterWrite method in to a blocking send when there are more than * the prescribed number of messages awaiting filterWrite. It should be used in conjunction with the * {@link ReadThrottleFilterBuilder} on a server as the blocking writes will allow the read thread to * cause an Out of Memory exception due to a back log of unprocessed messages. * * This is should only be viewed as a temporary work around for DIRMINA-302. * * A true solution should not be implemented as a filter as this issue will always occur. On a machine * where the network is slower than the local producer. * * Suggested improvement is to allow implementation of policices on what to do when buffer is full. * * They could be: * Block - As this does * Wait on a given Future - to drain more of the queue.. in essence this filter with high/low watermarks * Throw Exception - through the client filterWrite() method to allow them to get immediate feedback on buffer state * *

*

Usage: *

*


 * DefaultFilterChainBuilder builder = ...
 * WriteBufferLimitFilterBuilder filter = new WriteBufferLimitFilterBuilder();
 * filter.attach( builder );
 * 
*

* or *

*


 * IoFilterChain chain = ...
 * WriteBufferLimitFilterBuilder filter = new WriteBufferLimitFilterBuilder();
 * filter.attach( chain );
 * 
* * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $ */ public class WriteBufferLimitFilterBuilder { public static final String PENDING_SIZE = WriteBufferLimitFilterBuilder.class.getName() + ".pendingSize"; private static int DEFAULT_CONNECTION_BUFFER_MESSAGE_COUNT = 5000; private volatile boolean throwNotBlock = false; private volatile int maximumConnectionBufferCount; private volatile long maximumConnectionBufferSize; private final Object _blockLock = new Object(); private int _blockWaiters = 0; public WriteBufferLimitFilterBuilder() { this(DEFAULT_CONNECTION_BUFFER_MESSAGE_COUNT); } public WriteBufferLimitFilterBuilder(int maxWriteBufferSize) { setMaximumConnectionBufferCount(maxWriteBufferSize); } /** * Set the maximum amount pending items in the writeQueue for a given session. * Changing the value will only take effect when new data is received for a * connection, including existing connections. Default value is 5000 msgs. * * @param maximumConnectionBufferCount New buffer size. Must be > 0 */ public void setMaximumConnectionBufferCount(int maximumConnectionBufferCount) { this.maximumConnectionBufferCount = maximumConnectionBufferCount; this.maximumConnectionBufferSize = 0; } public void setMaximumConnectionBufferSize(long maximumConnectionBufferSize) { this.maximumConnectionBufferSize = maximumConnectionBufferSize; this.maximumConnectionBufferCount = 0; } /** * Attach this filter to the specified filter chain. It will search for the ThreadPoolFilter, and attach itself * before and after that filter. * * @param chain {@link IoFilterChain} to attach self to. */ public void attach(IoFilterChain chain) { String name = getThreadPoolFilterEntryName(chain.getAll()); chain.addBefore(name, getClass().getName() + ".sendlimit", new SendLimit()); } /** * Attach this filter to the specified builder. It will search for the * {@link ExecutorFilter}, and attach itself before and after that filter. * * @param builder {@link DefaultIoFilterChainBuilder} to attach self to. */ public void attach(DefaultIoFilterChainBuilder builder) { String name = getThreadPoolFilterEntryName(builder.getAll()); builder.addBefore(name, getClass().getName() + ".sendlimit", new SendLimit()); } private String getThreadPoolFilterEntryName(List entries) { Iterator i = entries.iterator(); while (i.hasNext()) { IoFilterChain.Entry entry = (IoFilterChain.Entry) i.next(); if (entry.getFilter().getClass().isAssignableFrom(ExecutorFilter.class)) { return entry.getName(); } } throw new IllegalStateException("Chain does not contain a ExecutorFilter"); } public class SendLimit extends IoFilterAdapter { public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { try { waitTillSendAllowed(session); } catch (WriteBufferFullExeception wbfe) { nextFilter.exceptionCaught(session, wbfe); } if (writeRequest.getMessage() instanceof ByteBuffer) { increasePendingWriteSize(session, (ByteBuffer) writeRequest.getMessage()); } nextFilter.filterWrite(session, writeRequest); } private void increasePendingWriteSize(IoSession session, ByteBuffer message) { synchronized (session) { Long pendingSize = getScheduledWriteBytes(session) + message.remaining(); session.setAttribute(PENDING_SIZE, pendingSize); } } private boolean sendAllowed(IoSession session) { if (session.isClosing()) { return true; } int lmswm = maximumConnectionBufferCount; long lmswb = maximumConnectionBufferSize; return (lmswm == 0 || session.getScheduledWriteRequests() < lmswm) && (lmswb == 0 || getScheduledWriteBytes(session) < lmswb); } private long getScheduledWriteBytes(IoSession session) { synchronized (session) { Long i = (Long) session.getAttribute(PENDING_SIZE); return null == i ? 0 : i; } } private void waitTillSendAllowed(IoSession session) { synchronized (_blockLock) { if (throwNotBlock) { throw new WriteBufferFullExeception(); } _blockWaiters++; while (!sendAllowed(session)) { try { _blockLock.wait(); } catch (InterruptedException e) { // Ignore. } } _blockWaiters--; } } public void messageSent(NextFilter nextFilter, IoSession session, Object message) throws Exception { if (message instanceof ByteBuffer) { decrementPendingWriteSize(session, (ByteBuffer) message); } notifyWaitingWriters(); nextFilter.messageSent(session, message); } private void decrementPendingWriteSize(IoSession session, ByteBuffer message) { synchronized (session) { session.setAttribute(PENDING_SIZE, getScheduledWriteBytes(session) - message.remaining()); } } private void notifyWaitingWriters() { synchronized (_blockLock) { if (_blockWaiters != 0) { _blockLock.notifyAll(); } } } }//SentLimit }