diff options
Diffstat (limited to 'qpid/java/common/src/main')
12 files changed, 203 insertions, 104 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java index fa890d0ebb..591dbd085b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java @@ -23,6 +23,7 @@ package org.apache.qpid.codec; import org.apache.mina.filter.codec.ProtocolCodecFactory; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolEncoder; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; /** * AMQCodecFactory is a Mina codec factory. It supplies the encoders and decoders need to read and write the bytes to @@ -50,9 +51,9 @@ public class AMQCodecFactory implements ProtocolCodecFactory * @param expectProtocolInitiation <tt>true</tt> if the first frame received is going to be a protocol initiation * frame, <tt>false</tt> if it is going to be a standard AMQ data block. */ - public AMQCodecFactory(boolean expectProtocolInitiation) + public AMQCodecFactory(boolean expectProtocolInitiation, AMQVersionAwareProtocolSession session) { - _frameDecoder = new AMQDecoder(expectProtocolInitiation); + _frameDecoder = new AMQDecoder(expectProtocolInitiation, session); } /** @@ -70,7 +71,7 @@ public class AMQCodecFactory implements ProtocolCodecFactory * * @return The AMQP decoder. */ - public ProtocolDecoder getDecoder() + public AMQDecoder getDecoder() { return _frameDecoder; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java index 7eef73f337..281c0761d9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java @@ -20,14 +20,21 @@ */ package org.apache.qpid.codec; +import java.util.ArrayList; + import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoSession; import org.apache.mina.common.SimpleByteBufferAllocator; import org.apache.mina.filter.codec.CumulativeProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput; +import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQDataBlockDecoder; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.framing.AMQMethodBodyFactory; +import org.apache.qpid.framing.AMQProtocolVersionException; import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; /** * AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a @@ -62,14 +69,19 @@ public class AMQDecoder extends CumulativeProtocolDecoder private boolean _expectProtocolInitiation; private boolean firstDecode = true; + private AMQMethodBodyFactory _bodyFactory; + + private ByteBuffer _remainingBuf; + /** * Creates a new AMQP decoder. * * @param expectProtocolInitiation <tt>true</tt> if this decoder needs to handle protocol initiation. */ - public AMQDecoder(boolean expectProtocolInitiation) + public AMQDecoder(boolean expectProtocolInitiation, AMQVersionAwareProtocolSession session) { _expectProtocolInitiation = expectProtocolInitiation; + _bodyFactory = new AMQMethodBodyFactory(session); } /** @@ -120,7 +132,7 @@ public class AMQDecoder extends CumulativeProtocolDecoder protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { int pos = in.position(); - boolean enoughData = _dataBlockDecoder.decodable(session, in); + boolean enoughData = _dataBlockDecoder.decodable(in.buf()); in.position(pos); if (!enoughData) { @@ -149,7 +161,7 @@ public class AMQDecoder extends CumulativeProtocolDecoder */ private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { - boolean enoughData = _piDecoder.decodable(session, in); + boolean enoughData = _piDecoder.decodable(in.buf()); if (!enoughData) { // returning false means it will leave the contents in the buffer and @@ -158,7 +170,8 @@ public class AMQDecoder extends CumulativeProtocolDecoder } else { - _piDecoder.decode(session, in, out); + ProtocolInitiation pi = new ProtocolInitiation(in.buf()); + out.write(pi); return true; } @@ -177,7 +190,7 @@ public class AMQDecoder extends CumulativeProtocolDecoder } - /** + /** * Cumulates content of <tt>in</tt> into internal buffer and forwards * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}. * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt> @@ -268,4 +281,60 @@ public class AMQDecoder extends CumulativeProtocolDecoder session.setAttribute( BUFFER, remainingBuf ); } + public ArrayList<AMQDataBlock> decodeBuffer(java.nio.ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException + { + + // get prior remaining data from accumulator + ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>(); + ByteBuffer msg; + // if we have a session buffer, append data to that otherwise + // use the buffer read from the network directly + if( _remainingBuf != null ) + { + _remainingBuf.put(buf); + _remainingBuf.flip(); + msg = _remainingBuf; + } + else + { + msg = ByteBuffer.wrap(buf); + } + + if (_expectProtocolInitiation + || (firstDecode + && (msg.remaining() > 0) + && (msg.get(msg.position()) == (byte)'A'))) + { + if (_piDecoder.decodable(msg.buf())) + { + dataBlocks.add(new ProtocolInitiation(msg.buf())); + } + } + else + { + boolean enoughData = true; + while (enoughData) + { + int pos = msg.position(); + + enoughData = _dataBlockDecoder.decodable(msg); + msg.position(pos); + if (enoughData) + { + dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory, msg)); + } + else + { + _remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate(msg.remaining(), false); + _remainingBuf.setAutoExpand(true); + _remainingBuf.put(msg); + } + } + } + if(firstDecode && dataBlocks.size() > 0) + { + firstDecode = false; + } + return dataBlocks; + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java index 82ffc60802..228867b2b0 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -47,7 +47,7 @@ public class AMQDataBlockDecoder public AMQDataBlockDecoder() { } - public boolean decodable(IoSession session, ByteBuffer in) throws AMQFrameDecodingException + public boolean decodable(java.nio.ByteBuffer in) throws AMQFrameDecodingException { final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1); // type, channel, body length and end byte @@ -56,14 +56,15 @@ public class AMQDataBlockDecoder return false; } - in.skip(1 + 2); - final long bodySize = in.getUnsignedInt(); + in.position(in.position() + 1 + 2); + // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt() + final long bodySize = in.getInt() & 0xffffffffL; return (remainingAfterAttributes >= bodySize); } - protected Object createAndPopulateFrame(IoSession session, ByteBuffer in) + public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, ByteBuffer in) throws AMQFrameDecodingException, AMQProtocolVersionException { final byte type = in.get(); @@ -71,15 +72,7 @@ public class AMQDataBlockDecoder BodyFactory bodyFactory; if (type == AMQMethodBody.TYPE) { - bodyFactory = (BodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY); - if (bodyFactory == null) - { - AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment(); - bodyFactory = new AMQMethodBodyFactory(protocolSession); - session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory); - - } - + bodyFactory = methodBodyFactory; } else { @@ -115,6 +108,24 @@ public class AMQDataBlockDecoder public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { - out.write(createAndPopulateFrame(session, in)); + AMQMethodBodyFactory bodyFactory = (AMQMethodBodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY); + if (bodyFactory == null) + { + AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment(); + bodyFactory = new AMQMethodBodyFactory(protocolSession); + session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory); + } + + out.write(createAndPopulateFrame(bodyFactory, in)); + } + + public boolean decodable(ByteBuffer msg) throws AMQFrameDecodingException + { + return decodable(msg.buf()); + } + + public AMQDataBlock createAndPopulateFrame(AMQMethodBodyFactory factory, java.nio.ByteBuffer msg) throws AMQProtocolVersionException, AMQFrameDecodingException + { + return createAndPopulateFrame(factory, ByteBuffer.wrap(msg)); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java index 05fd2bb480..374644b4f2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java @@ -50,7 +50,7 @@ public final class AMQDataBlockEncoder implements MessageEncoder { _logger.debug("Encoded frame byte-buffer is '" + EncodingUtils.convertToHexString(buffer) + "'"); } - + out.write(buffer); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java index 3ac17e9204..cf8a866e47 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java @@ -20,12 +20,10 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IoSession; -import org.apache.mina.filter.codec.ProtocolDecoderOutput; import org.apache.qpid.AMQException; import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock { @@ -53,13 +51,12 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData _protocolMajor = protocolMajor; _protocolMinor = protocolMinor; } - + public ProtocolInitiation(ProtocolVersion pv) { this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS, TCP_PROTOCOL_INSTANCE, pv.getMajorVersion(), pv.getMinorVersion()); } - public ProtocolInitiation(ByteBuffer in) { _protocolHeader = new byte[4]; @@ -71,6 +68,11 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData _protocolMinor = in.get(); } + public void writePayload(org.apache.mina.common.ByteBuffer buffer) + { + writePayload(buffer.buf()); + } + public long getSize() { return 4 + 1 + 1 + 1 + 1; @@ -127,16 +129,11 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData * @return true if we have enough data to decode the PI frame fully, false if more * data is required */ - public boolean decodable(IoSession session, ByteBuffer in) + public boolean decodable(ByteBuffer in) { return (in.remaining() >= 8); } - public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) - { - ProtocolInitiation pi = new ProtocolInitiation(in); - out.write(pi); - } } public ProtocolVersion checkVersion() throws AMQException @@ -192,4 +189,5 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData buffer.append(Integer.toHexString(_protocolMinor)); return buffer.toString(); } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java index 5996cbf89c..49bce9f2f9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java @@ -45,21 +45,31 @@ import org.apache.mina.common.IoSession; * a continuation. Job is also a continuation, as is the job completion handler. Or, as Event is totally abstract, * it is really an interface, so could just drop it and use the continuation interface instead. */ -public abstract class Event +public class Event { + private Runnable _runner; + + public Event() + { + + } + /** * Creates a continuation. */ - public Event() - { } + public Event(Runnable runner) + { + _runner = runner; + } /** - * Processes the continuation in the context of a Mina session. - * - * @param session The Mina session. + * Processes the continuation */ - public abstract void process(IoSession session); - + public void process() + { + _runner.run(); + } + /** * A continuation ({@link Event}) that takes a Mina messageReceived event, and passes it to a NextFilter. * @@ -68,22 +78,22 @@ public abstract class Event * <tr><td> Pass a Mina messageReceived event to a NextFilter. <td> {@link IoFilter.NextFilter}, {@link IoSession} * </table> */ - public static final class ReceivedEvent extends Event + public static final class MinaReceivedEvent extends Event { private final Object _data; - private final IoFilter.NextFilter _nextFilter; + private final IoSession _session; - public ReceivedEvent(final IoFilter.NextFilter nextFilter, final Object data) + public MinaReceivedEvent(final IoFilter.NextFilter nextFilter, final Object data, final IoSession session) { - super(); _nextFilter = nextFilter; _data = data; + _session = session; } - public void process(IoSession session) + public void process() { - _nextFilter.messageReceived(session, _data); + _nextFilter.messageReceived(_session, _data); } public IoFilter.NextFilter getNextFilter() @@ -101,21 +111,22 @@ public abstract class Event * <td> {@link IoFilter.NextFilter}, {@link IoFilter.WriteRequest}, {@link IoSession} * </table> */ - public static final class WriteEvent extends Event + public static final class MinaWriteEvent extends Event { private final IoFilter.WriteRequest _data; private final IoFilter.NextFilter _nextFilter; + private IoSession _session; - public WriteEvent(final IoFilter.NextFilter nextFilter, final IoFilter.WriteRequest data) + public MinaWriteEvent(final IoFilter.NextFilter nextFilter, final IoFilter.WriteRequest data, final IoSession session) { - super(); _nextFilter = nextFilter; _data = data; + _session = session; } - public void process(IoSession session) + public void process() { - _nextFilter.filterWrite(session, _data); + _nextFilter.filterWrite(_session, _data); } public IoFilter.NextFilter getNextFilter() @@ -135,16 +146,17 @@ public abstract class Event public static final class CloseEvent extends Event { private final IoFilter.NextFilter _nextFilter; + private final IoSession _session; - public CloseEvent(final IoFilter.NextFilter nextFilter) + public CloseEvent(final IoFilter.NextFilter nextFilter, final IoSession session) { - super(); _nextFilter = nextFilter; + _session = session; } - public void process(IoSession session) + public void process() { - _nextFilter.sessionClosed(session); + _nextFilter.sessionClosed(_session); } public IoFilter.NextFilter getNextFilter() diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java index 00da005515..4e4192dbe3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java @@ -55,9 +55,6 @@ public class Job implements ReadWriteRunnable /** The maximum number of events to process per run of the job. More events than this may be queued in the job. */ private final int _maxEvents; - /** The Mina session. */ - private final IoSession _session; - /** Holds the queue of events that make up the job. */ private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>(); @@ -79,7 +76,13 @@ public class Job implements ReadWriteRunnable */ Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob) { - _session = session; + _completionHandler = completionHandler; + _maxEvents = maxEvents; + _readJob = readJob; + } + + public Job(JobCompletionHandler completionHandler, int maxEvents, boolean readJob) + { _completionHandler = completionHandler; _maxEvents = maxEvents; _readJob = readJob; @@ -90,7 +93,7 @@ public class Job implements ReadWriteRunnable * * @param evt The continuation to enqueue. */ - void add(Event evt) + public void add(Event evt) { _eventQueue.add(evt); } @@ -111,7 +114,7 @@ public class Job implements ReadWriteRunnable } else { - e.process(_session); + e.process(); } } return false; @@ -153,30 +156,19 @@ public class Job implements ReadWriteRunnable if(processAll()) { deactivate(); - _completionHandler.completed(_session, this); + _completionHandler.completed(this); } else { - _completionHandler.notCompleted(_session, this); + _completionHandler.notCompleted(this); } } - public boolean isReadJob() - { - return _readJob; - } - public boolean isRead() { return _readJob; } - public boolean isWrite() - { - return !_readJob; - } - - /** * Another interface for a continuation. * @@ -185,8 +177,8 @@ public class Job implements ReadWriteRunnable */ static interface JobCompletionHandler { - public void completed(IoSession session, Job job); + public void completed(Job job); - public void notCompleted(final IoSession session, final Job job); + public void notCompleted(final Job job); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java index a080cc7e04..4863611c42 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java @@ -20,19 +20,17 @@ */ package org.apache.qpid.pool; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; + import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoFilterAdapter; import org.apache.mina.common.IoSession; import org.apache.qpid.pool.Event.CloseEvent; - +import org.apache.qpid.pool.Event.MinaReceivedEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ExecutorService; - /** * PoolingFilter, is a no-op pass through filter that hands all events down the Mina filter chain by default. As it * adds no behaviour by default to the filter chain, it is abstract. @@ -74,7 +72,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo private final String _name; /** Defines the maximum number of events that will be batched into a single job. */ - static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); + public static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); private final int _maxEvents; @@ -188,7 +186,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo Job job = new Job(session, this, MAX_JOB_EVENTS,_readFilter); session.setAttribute(_name, job); } - + /** * Retrieves this filters Job, by this filters name, from the Mina session. * @@ -208,7 +206,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo * @param session The Mina session to work in. * @param job The job that completed. */ - public void completed(IoSession session, Job job) + public void completed(Job job) { @@ -239,7 +237,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo } } - public void notCompleted(IoSession session, Job job) + public void notCompleted(Job job) { final ExecutorService pool = _poolReference.getPool(); @@ -430,7 +428,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo public void messageReceived(NextFilter nextFilter, final IoSession session, Object message) { Job job = getJobForSession(session); - fireAsynchEvent(job, new Event.ReceivedEvent(nextFilter, message)); + fireAsynchEvent(job, new MinaReceivedEvent(nextFilter, message, session)); } /** @@ -442,7 +440,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo public void sessionClosed(final NextFilter nextFilter, final IoSession session) { Job job = getJobForSession(session); - fireAsynchEvent(job, new CloseEvent(nextFilter)); + fireAsynchEvent(job, new CloseEvent(nextFilter, session)); } } @@ -473,7 +471,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest) { Job job = getJobForSession(session); - fireAsynchEvent(job, new Event.WriteEvent(nextFilter, writeRequest)); + fireAsynchEvent(job, new Event.MinaWriteEvent(nextFilter, writeRequest, session)); } /** @@ -485,7 +483,8 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo public void sessionClosed(final NextFilter nextFilter, final IoSession session) { Job job = getJobForSession(session); - fireAsynchEvent(job, new CloseEvent(nextFilter)); + fireAsynchEvent(job, new CloseEvent(nextFilter, session)); } } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java index ad04a923e1..140c93ca8d 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java @@ -23,5 +23,4 @@ package org.apache.qpid.pool; public interface ReadWriteRunnable extends Runnable { boolean isRead(); - boolean isWrite(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java index d8c0f2c916..9df84eef90 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java @@ -20,10 +20,12 @@ */ package org.apache.qpid.protocol; +import org.apache.qpid.transport.NetworkDriver; + public interface ProtocolEngineFactory { // Returns a new instance of a ProtocolEngine - ProtocolEngine newProtocolEngine(); + ProtocolEngine newProtocolEngine(NetworkDriver networkDriver); }
\ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java index 18cae6bf85..c38afe5dd5 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java @@ -28,17 +28,17 @@ package org.apache.qpid.transport; public interface NetworkDriverConfiguration { // Taken from Socket - boolean getKeepAlive(); - boolean getOOBInline(); - boolean getReuseAddress(); + Boolean getKeepAlive(); + Boolean getOOBInline(); + Boolean getReuseAddress(); Integer getSoLinger(); // null means off - int getSoTimeout(); - boolean getTcpNoDelay(); - int getTrafficClass(); + Integer getSoTimeout(); + Boolean getTcpNoDelay(); + Integer getTrafficClass(); // The amount of memory in bytes to allocate to the incoming buffer - int getReceiveBufferSize(); + Integer getReceiveBufferSize(); // The amount of memory in bytes to allocate to the outgoing buffer - int getSendBufferSize(); + Integer getSendBufferSize(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java index 7330a042df..477e2cd5af 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java @@ -181,6 +181,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver { return _ioSession.getLocalAddress(); } + public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config, SSLEngine sslEngine) throws OpenException @@ -251,6 +252,10 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver public void close() { + if (_lastWriteFuture != null) + { + _lastWriteFuture.join(); + } if (_acceptor != null) { _acceptor.unbindAll(); @@ -359,9 +364,14 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); } - + + if (_ioSession == null) + { + _ioSession = protocolSession; + } + // Set up the protocol engine - ProtocolEngine protocolEngine = _factory.newProtocolEngine(); + ProtocolEngine protocolEngine = _factory.newProtocolEngine(this); MINANetworkDriver newDriver = new MINANetworkDriver(_useNIO, _processors, _executorPool, _protectIO, protocolEngine, protocolSession); protocolEngine.setNetworkDriver(newDriver); protocolSession.setAttachment(protocolEngine); @@ -385,4 +395,10 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver return _protocolEngine; } + public void setProtocolEngineFactory(ProtocolEngineFactory engineFactory, boolean acceptingConnections) + { + _factory = engineFactory; + _acceptingConnections = acceptingConnections; + } + } |