diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2011-08-14 16:21:34 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2011-08-14 16:21:34 +0000 |
commit | d84a3a50dbb794c4383de7e5eca730ca602771e7 (patch) | |
tree | 7c6177573a2eedc172de2cbd8354ce7b4ea1e8fe /qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java | |
parent | 0aba202a7e2483f04fc77bbe4faa88bb86fe5b9b (diff) | |
parent | 47551f3aa2dd46b8daeeb9683a668464203ffa06 (diff) | |
download | qpid-python-d84a3a50dbb794c4383de7e5eca730ca602771e7.tar.gz |
Create sandbox from correct revision
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1157557 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java')
-rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java | 274 |
1 files changed, 274 insertions, 0 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java new file mode 100644 index 0000000000..b89eed48b0 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java @@ -0,0 +1,274 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.mina; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +import org.apache.mina.common.*; + +import org.apache.mina.transport.socket.nio.SocketAcceptor; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; +import org.apache.mina.transport.socket.nio.SocketConnector; +import org.apache.mina.filter.ReadThrottleFilterBuilder; +import org.apache.mina.filter.WriteBufferLimitFilterBuilder; +import org.apache.mina.filter.executor.ExecutorFilter; + +import org.apache.qpid.transport.Binding; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionDelegate; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.ConnectionBinding; + +import org.apache.qpid.transport.util.Logger; + +import org.apache.qpid.transport.network.Assembler; +import org.apache.qpid.transport.network.Disassembler; +import org.apache.qpid.transport.network.InputHandler; + +import static org.apache.qpid.transport.util.Functions.*; + +/** + * MinaHandler + * + * @author Rafael H. Schloming + */ +//RA making this public until we sort out the package issues +public class MinaHandler<E> implements IoHandler +{ + /** Default buffer size for pending messages reads */ + private static final String DEFAULT_READ_BUFFER_LIMIT = "262144"; + /** Default buffer size for pending messages writes */ + private static final String DEFAULT_WRITE_BUFFER_LIMIT = "262144"; + private static final int MAX_RCVBUF = 64*1024; + + private static final Logger log = Logger.get(MinaHandler.class); + + static + { + ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); + ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); + } + + private final Binding<E,java.nio.ByteBuffer> binding; + + private MinaHandler(Binding<E,java.nio.ByteBuffer> binding) + { + this.binding = binding; + } + + public void messageReceived(IoSession ssn, Object obj) + { + Attachment<E> attachment = (Attachment<E>) ssn.getAttachment(); + ByteBuffer buf = (ByteBuffer) obj; + try + { + attachment.receiver.received(buf.buf()); + } + catch (Throwable t) + { + log.error(t, "exception handling buffer %s", str(buf.buf())); + throw new RuntimeException(t); + } + } + + public void messageSent(IoSession ssn, Object obj) + { + // do nothing + } + + public void exceptionCaught(IoSession ssn, Throwable e) + { + Attachment<E> attachment = (Attachment<E>) ssn.getAttachment(); + attachment.receiver.exception(e); + } + + /** + * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the + * session, which filters the events handled by this handler. The filter chain consists of, handing off events + * to an optional protectio + * + * @param session The MINA session. + * @throws Exception Any underlying exceptions are allowed to fall through to MINA. + */ + public void sessionCreated(IoSession session) throws Exception + { + log.debug("Protocol session created for session " + System.identityHashCode(session)); + + if (Boolean.getBoolean("protectio")) + { + try + { + //Add IO Protection Filters + IoFilterChain chain = session.getFilterChain(); + + session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); + + ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); + readfilter.setMaximumConnectionBufferSize( + Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER_LIMIT))); + readfilter.attach(chain); + + WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); + writefilter.setMaximumConnectionBufferSize( + Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER_LIMIT))); + writefilter.attach(chain); + session.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); + + log.info("Using IO Read/Write Filter Protection"); + } + catch (Exception e) + { + log.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage()); + } + } + } + + public void sessionOpened(final IoSession ssn) + { + log.debug("opened: %s", this); + E endpoint = binding.endpoint(new MinaSender(ssn)); + Attachment<E> attachment = + new Attachment<E>(endpoint, binding.receiver(endpoint)); + + // We need to synchronize and notify here because the MINA + // connect future returns the session prior to the attachment + // being set. This is arguably a bug in MINA. + synchronized (ssn) + { + ssn.setAttachment(attachment); + ssn.notifyAll(); + } + } + + public void sessionClosed(IoSession ssn) + { + log.debug("closed: %s", ssn); + Attachment<E> attachment = (Attachment<E>) ssn.getAttachment(); + attachment.receiver.closed(); + ssn.setAttachment(null); + } + + public void sessionIdle(IoSession ssn, IdleStatus status) + { + // do nothing + } + + private static class Attachment<E> + { + + E endpoint; + Receiver<java.nio.ByteBuffer> receiver; + + Attachment(E endpoint, Receiver<java.nio.ByteBuffer> receiver) + { + this.endpoint = endpoint; + this.receiver = receiver; + } + } + + public static final void accept(String host, int port, + Binding<?,java.nio.ByteBuffer> binding) + throws IOException + { + accept(new InetSocketAddress(host, port), binding); + } + + public static final <E> void accept(SocketAddress address, + Binding<E,java.nio.ByteBuffer> binding) + throws IOException + { + IoAcceptor acceptor = new SocketAcceptor(); + acceptor.bind(address, new MinaHandler<E>(binding)); + } + + public static final <E> E connect(String host, int port, + Binding<E,java.nio.ByteBuffer> binding) + { + return connect(new InetSocketAddress(host, port), binding); + } + + public static final <E> E connect(SocketAddress address, + Binding<E,java.nio.ByteBuffer> binding) + { + MinaHandler<E> handler = new MinaHandler<E>(binding); + SocketConnector connector = new SocketConnector(); + IoServiceConfig acceptorConfig = connector.getDefaultConfig(); + acceptorConfig.setThreadModel(ThreadModel.MANUAL); + SocketSessionConfig scfg = (SocketSessionConfig) acceptorConfig.getSessionConfig(); + scfg.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay")); + Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize"); + if (sendBufferSize != null && sendBufferSize > 0) + { + scfg.setSendBufferSize(sendBufferSize); + } + Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize"); + if (receiveBufferSize != null && receiveBufferSize > 0) + { + scfg.setReceiveBufferSize(receiveBufferSize); + } + else if (scfg.getReceiveBufferSize() > MAX_RCVBUF) + { + scfg.setReceiveBufferSize(MAX_RCVBUF); + } + connector.setWorkerTimeout(0); + ConnectFuture cf = connector.connect(address, handler); + cf.join(); + IoSession ssn = cf.getSession(); + + // We need to synchronize and wait here because the MINA + // connect future returns the session prior to the attachment + // being set. This is arguably a bug in MINA. + synchronized (ssn) + { + while (ssn.getAttachment() == null) + { + try + { + ssn.wait(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + } + + Attachment<E> attachment = (Attachment<E>) ssn.getAttachment(); + return attachment.endpoint; + } + + public static final void accept(String host, int port, + ConnectionDelegate delegate) + throws IOException + { + accept(host, port, ConnectionBinding.get(delegate)); + } + + public static final Connection connect(String host, int port, + ConnectionDelegate delegate) + { + return connect(host, port, ConnectionBinding.get(delegate)); + } + +} |