diff options
5 files changed, 36 insertions, 27 deletions
diff --git a/java/broker/etc/config.xml b/java/broker/etc/config.xml index 78d96e52fd..80ee039ee5 100644 --- a/java/broker/etc/config.xml +++ b/java/broker/etc/config.xml @@ -33,7 +33,9 @@ <keystorePassword>keystorepass</keystorePassword> </ssl>--> <qpidnio>false</qpidnio> - <protectio>false</protectio> + <protectio> + <enabled>false</enabled> + </protectio> <transport>nio</transport> <port>5672</port> <sslport>8672</sslport> diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index db5d882f51..d8dbf97e49 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -29,10 +29,8 @@ import org.apache.mina.common.IoSession; import org.apache.mina.filter.ReadThrottleFilterBuilder; import org.apache.mina.filter.SSLFilter; import org.apache.mina.filter.WriteBufferLimitFilterBuilder; -import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.QpidProtocolCodecFilter; import org.apache.mina.filter.executor.ExecutorFilter; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.mina.util.SessionUtil; import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQCodecFactory; @@ -58,6 +56,11 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter private final IApplicationRegistry _applicationRegistry; + private static String DEFAULT_BUFFER_READ_LIMIT_SIZE = "262144"; + private static String DEFAULT_BUFFER_WRITE_LIMIT_SIZE = "262144"; + + private final int BUFFER_READ_LIMIT_SIZE; + private final int BUFFER_WRITE_LIMIT_SIZE; public AMQPFastProtocolHandler(Integer applicationRegistryInstance) { @@ -67,6 +70,11 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter public AMQPFastProtocolHandler(IApplicationRegistry applicationRegistry) { _applicationRegistry = applicationRegistry; + + // Read the configuration from the application registry + BUFFER_READ_LIMIT_SIZE = Integer.parseInt(_applicationRegistry.getConfiguration().getString("broker.connector.protectio.readBufferLimitSize", DEFAULT_BUFFER_READ_LIMIT_SIZE)); + BUFFER_WRITE_LIMIT_SIZE = Integer.parseInt(_applicationRegistry.getConfiguration().getString("broker.connector.protectio.writeBufferLimitSize", DEFAULT_BUFFER_WRITE_LIMIT_SIZE)); + _logger.debug("AMQPFastProtocolHandler created"); } @@ -115,27 +123,22 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter } - if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("broker.connector.protectio", false)) + if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("broker.connector.protectio.enabled", false)) { try { // //Add IO Protection Filters IoFilterChain chain = protocolSession.getFilterChain(); - int buf_size = 32768; - if (protocolSession.getConfig() instanceof SocketSessionConfig) - { - buf_size = ((SocketSessionConfig) protocolSession.getConfig()).getReceiveBufferSize(); - } protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); - readfilter.setMaximumConnectionBufferSize(buf_size); + readfilter.setMaximumConnectionBufferSize(BUFFER_READ_LIMIT_SIZE); readfilter.attach(chain); WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); - writefilter.setMaximumConnectionBufferSize(buf_size * 2); + writefilter.setMaximumConnectionBufferSize(BUFFER_WRITE_LIMIT_SIZE); writefilter.attach(chain); protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 89982a1af0..7909663f24 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -30,7 +30,6 @@ import org.apache.mina.filter.WriteBufferLimitFilterBuilder; import org.apache.mina.filter.codec.ProtocolCodecException; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.executor.ExecutorFilter; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; @@ -156,6 +155,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter /** Defines the default timeout to use for synchronous protocol commands. */ private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30; + /** Default buffer size for pending messages reads */ + private static final String DEFAULT_READ_BUFFER_LIMIT = "262144"; + + /** Default buffer size for pending messages writes */ + private static final String DEFAULT_WRITE_BUFFER_LIMIT = "262144"; + /** * Creates a new protocol handler, associated with the specified client connection instance. * @@ -219,19 +224,14 @@ public class AMQProtocolHandler extends IoHandlerAdapter //Add IO Protection Filters IoFilterChain chain = session.getFilterChain(); - int buf_size = 32768; - if (session.getConfig() instanceof SocketSessionConfig) - { - buf_size = ((SocketSessionConfig) session.getConfig()).getReceiveBufferSize(); - } session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); - readfilter.setMaximumConnectionBufferSize(buf_size); + readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER_LIMIT))); readfilter.attach(chain); WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); - writefilter.setMaximumConnectionBufferSize(buf_size * 2); + writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER_LIMIT))); writefilter.attach(chain); session.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); diff --git a/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java b/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java index 0b6ed81d18..b93dc46741 100644 --- a/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java +++ b/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java @@ -131,19 +131,20 @@ public class IOWriterClient implements Runnable private int _receivedCount = 0; private int _sentCount = 0; + private static final String DEFAULT_READ_BUFFER = "262144"; + private static final String DEFAULT_WRITE_BUFFER = "262144"; public void sessionCreated(IoSession session) throws Exception { IoFilterChain chain = session.getFilterChain(); - int buf_size = ((SocketSessionConfig) session.getConfig()).getSendBufferSize(); ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); - readfilter.setMaximumConnectionBufferSize(buf_size); + readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER))); readfilter.attach(chain); WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); -// writefilter.setMaximumConnectionBufferCount(1000); - writefilter.setMaximumConnectionBufferSize(buf_size * 2); + + writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER))); writefilter.attach(chain); } diff --git a/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java b/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java index 82ef3d57cc..423e98c67b 100644 --- a/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java +++ b/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java @@ -39,6 +39,9 @@ public class IOWriterServer static public int _PORT = 9999; + private static final String DEFAULT_READ_BUFFER = "262144"; + private static final String DEFAULT_WRITE_BUFFER = "262144"; + private static class TestHandler extends IoHandlerAdapter { @@ -52,14 +55,14 @@ public class IOWriterServer { IoFilterChain chain = ioSession.getFilterChain(); - int buf_size = ((SocketSessionConfig) ioSession.getConfig()).getReceiveBufferSize(); - ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); - readfilter.setMaximumConnectionBufferSize(buf_size); + readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER))); readfilter.attach(chain); WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); - writefilter.setMaximumConnectionBufferSize(buf_size * 2); + + writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER))); + writefilter.attach(chain); } |