diff options
author | Aidan Skinner <aidan@apache.org> | 2009-09-01 16:27:52 +0000 |
---|---|---|
committer | Aidan Skinner <aidan@apache.org> | 2009-09-01 16:27:52 +0000 |
commit | a7be8fc7337b5cc093f593cc1becb9fe7b4dc0fb (patch) | |
tree | 1bb4d963df5afb0293fea0fb60c3282bb46fed1c /qpid/java/common | |
parent | f0051104b5b99601507c578bd0a7b819a76aef55 (diff) | |
download | qpid-python-a7be8fc7337b5cc093f593cc1becb9fe7b4dc0fb.tar.gz |
QPID-2025: Add a AMQProtocolEngine from the de-MINAfied AMQMinaProtocolSession. Remove various now-unused classes and update references. Add tests for AMQDecoder. Net -1500 lines, +25% performance on transient messaging. Nice.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@810110 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
15 files changed, 431 insertions, 106 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java index fa890d0ebb..591dbd085b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java @@ -23,6 +23,7 @@ package org.apache.qpid.codec; import org.apache.mina.filter.codec.ProtocolCodecFactory; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolEncoder; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; /** * AMQCodecFactory is a Mina codec factory. It supplies the encoders and decoders need to read and write the bytes to @@ -50,9 +51,9 @@ public class AMQCodecFactory implements ProtocolCodecFactory * @param expectProtocolInitiation <tt>true</tt> if the first frame received is going to be a protocol initiation * frame, <tt>false</tt> if it is going to be a standard AMQ data block. */ - public AMQCodecFactory(boolean expectProtocolInitiation) + public AMQCodecFactory(boolean expectProtocolInitiation, AMQVersionAwareProtocolSession session) { - _frameDecoder = new AMQDecoder(expectProtocolInitiation); + _frameDecoder = new AMQDecoder(expectProtocolInitiation, session); } /** @@ -70,7 +71,7 @@ public class AMQCodecFactory implements ProtocolCodecFactory * * @return The AMQP decoder. */ - public ProtocolDecoder getDecoder() + public AMQDecoder getDecoder() { return _frameDecoder; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java index 7eef73f337..281c0761d9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java @@ -20,14 +20,21 @@ */ package org.apache.qpid.codec; +import java.util.ArrayList; + import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoSession; import org.apache.mina.common.SimpleByteBufferAllocator; import org.apache.mina.filter.codec.CumulativeProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput; +import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQDataBlockDecoder; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.framing.AMQMethodBodyFactory; +import org.apache.qpid.framing.AMQProtocolVersionException; import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; /** * AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a @@ -62,14 +69,19 @@ public class AMQDecoder extends CumulativeProtocolDecoder private boolean _expectProtocolInitiation; private boolean firstDecode = true; + private AMQMethodBodyFactory _bodyFactory; + + private ByteBuffer _remainingBuf; + /** * Creates a new AMQP decoder. * * @param expectProtocolInitiation <tt>true</tt> if this decoder needs to handle protocol initiation. */ - public AMQDecoder(boolean expectProtocolInitiation) + public AMQDecoder(boolean expectProtocolInitiation, AMQVersionAwareProtocolSession session) { _expectProtocolInitiation = expectProtocolInitiation; + _bodyFactory = new AMQMethodBodyFactory(session); } /** @@ -120,7 +132,7 @@ public class AMQDecoder extends CumulativeProtocolDecoder protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { int pos = in.position(); - boolean enoughData = _dataBlockDecoder.decodable(session, in); + boolean enoughData = _dataBlockDecoder.decodable(in.buf()); in.position(pos); if (!enoughData) { @@ -149,7 +161,7 @@ public class AMQDecoder extends CumulativeProtocolDecoder */ private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { - boolean enoughData = _piDecoder.decodable(session, in); + boolean enoughData = _piDecoder.decodable(in.buf()); if (!enoughData) { // returning false means it will leave the contents in the buffer and @@ -158,7 +170,8 @@ public class AMQDecoder extends CumulativeProtocolDecoder } else { - _piDecoder.decode(session, in, out); + ProtocolInitiation pi = new ProtocolInitiation(in.buf()); + out.write(pi); return true; } @@ -177,7 +190,7 @@ public class AMQDecoder extends CumulativeProtocolDecoder } - /** + /** * Cumulates content of <tt>in</tt> into internal buffer and forwards * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}. * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt> @@ -268,4 +281,60 @@ public class AMQDecoder extends CumulativeProtocolDecoder session.setAttribute( BUFFER, remainingBuf ); } + public ArrayList<AMQDataBlock> decodeBuffer(java.nio.ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException + { + + // get prior remaining data from accumulator + ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>(); + ByteBuffer msg; + // if we have a session buffer, append data to that otherwise + // use the buffer read from the network directly + if( _remainingBuf != null ) + { + _remainingBuf.put(buf); + _remainingBuf.flip(); + msg = _remainingBuf; + } + else + { + msg = ByteBuffer.wrap(buf); + } + + if (_expectProtocolInitiation + || (firstDecode + && (msg.remaining() > 0) + && (msg.get(msg.position()) == (byte)'A'))) + { + if (_piDecoder.decodable(msg.buf())) + { + dataBlocks.add(new ProtocolInitiation(msg.buf())); + } + } + else + { + boolean enoughData = true; + while (enoughData) + { + int pos = msg.position(); + + enoughData = _dataBlockDecoder.decodable(msg); + msg.position(pos); + if (enoughData) + { + dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory, msg)); + } + else + { + _remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate(msg.remaining(), false); + _remainingBuf.setAutoExpand(true); + _remainingBuf.put(msg); + } + } + } + if(firstDecode && dataBlocks.size() > 0) + { + firstDecode = false; + } + return dataBlocks; + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java index 82ffc60802..228867b2b0 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -47,7 +47,7 @@ public class AMQDataBlockDecoder public AMQDataBlockDecoder() { } - public boolean decodable(IoSession session, ByteBuffer in) throws AMQFrameDecodingException + public boolean decodable(java.nio.ByteBuffer in) throws AMQFrameDecodingException { final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1); // type, channel, body length and end byte @@ -56,14 +56,15 @@ public class AMQDataBlockDecoder return false; } - in.skip(1 + 2); - final long bodySize = in.getUnsignedInt(); + in.position(in.position() + 1 + 2); + // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt() + final long bodySize = in.getInt() & 0xffffffffL; return (remainingAfterAttributes >= bodySize); } - protected Object createAndPopulateFrame(IoSession session, ByteBuffer in) + public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, ByteBuffer in) throws AMQFrameDecodingException, AMQProtocolVersionException { final byte type = in.get(); @@ -71,15 +72,7 @@ public class AMQDataBlockDecoder BodyFactory bodyFactory; if (type == AMQMethodBody.TYPE) { - bodyFactory = (BodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY); - if (bodyFactory == null) - { - AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment(); - bodyFactory = new AMQMethodBodyFactory(protocolSession); - session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory); - - } - + bodyFactory = methodBodyFactory; } else { @@ -115,6 +108,24 @@ public class AMQDataBlockDecoder public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { - out.write(createAndPopulateFrame(session, in)); + AMQMethodBodyFactory bodyFactory = (AMQMethodBodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY); + if (bodyFactory == null) + { + AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment(); + bodyFactory = new AMQMethodBodyFactory(protocolSession); + session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory); + } + + out.write(createAndPopulateFrame(bodyFactory, in)); + } + + public boolean decodable(ByteBuffer msg) throws AMQFrameDecodingException + { + return decodable(msg.buf()); + } + + public AMQDataBlock createAndPopulateFrame(AMQMethodBodyFactory factory, java.nio.ByteBuffer msg) throws AMQProtocolVersionException, AMQFrameDecodingException + { + return createAndPopulateFrame(factory, ByteBuffer.wrap(msg)); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java index 05fd2bb480..374644b4f2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java @@ -50,7 +50,7 @@ public final class AMQDataBlockEncoder implements MessageEncoder { _logger.debug("Encoded frame byte-buffer is '" + EncodingUtils.convertToHexString(buffer) + "'"); } - + out.write(buffer); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java index 3ac17e9204..cf8a866e47 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java @@ -20,12 +20,10 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IoSession; -import org.apache.mina.filter.codec.ProtocolDecoderOutput; import org.apache.qpid.AMQException; import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock { @@ -53,13 +51,12 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData _protocolMajor = protocolMajor; _protocolMinor = protocolMinor; } - + public ProtocolInitiation(ProtocolVersion pv) { this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS, TCP_PROTOCOL_INSTANCE, pv.getMajorVersion(), pv.getMinorVersion()); } - public ProtocolInitiation(ByteBuffer in) { _protocolHeader = new byte[4]; @@ -71,6 +68,11 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData _protocolMinor = in.get(); } + public void writePayload(org.apache.mina.common.ByteBuffer buffer) + { + writePayload(buffer.buf()); + } + public long getSize() { return 4 + 1 + 1 + 1 + 1; @@ -127,16 +129,11 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData * @return true if we have enough data to decode the PI frame fully, false if more * data is required */ - public boolean decodable(IoSession session, ByteBuffer in) + public boolean decodable(ByteBuffer in) { return (in.remaining() >= 8); } - public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) - { - ProtocolInitiation pi = new ProtocolInitiation(in); - out.write(pi); - } } public ProtocolVersion checkVersion() throws AMQException @@ -192,4 +189,5 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData buffer.append(Integer.toHexString(_protocolMinor)); return buffer.toString(); } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java index 5996cbf89c..49bce9f2f9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java @@ -45,21 +45,31 @@ import org.apache.mina.common.IoSession; * a continuation. Job is also a continuation, as is the job completion handler. Or, as Event is totally abstract, * it is really an interface, so could just drop it and use the continuation interface instead. */ -public abstract class Event +public class Event { + private Runnable _runner; + + public Event() + { + + } + /** * Creates a continuation. */ - public Event() - { } + public Event(Runnable runner) + { + _runner = runner; + } /** - * Processes the continuation in the context of a Mina session. - * - * @param session The Mina session. + * Processes the continuation */ - public abstract void process(IoSession session); - + public void process() + { + _runner.run(); + } + /** * A continuation ({@link Event}) that takes a Mina messageReceived event, and passes it to a NextFilter. * @@ -68,22 +78,22 @@ public abstract class Event * <tr><td> Pass a Mina messageReceived event to a NextFilter. <td> {@link IoFilter.NextFilter}, {@link IoSession} * </table> */ - public static final class ReceivedEvent extends Event + public static final class MinaReceivedEvent extends Event { private final Object _data; - private final IoFilter.NextFilter _nextFilter; + private final IoSession _session; - public ReceivedEvent(final IoFilter.NextFilter nextFilter, final Object data) + public MinaReceivedEvent(final IoFilter.NextFilter nextFilter, final Object data, final IoSession session) { - super(); _nextFilter = nextFilter; _data = data; + _session = session; } - public void process(IoSession session) + public void process() { - _nextFilter.messageReceived(session, _data); + _nextFilter.messageReceived(_session, _data); } public IoFilter.NextFilter getNextFilter() @@ -101,21 +111,22 @@ public abstract class Event * <td> {@link IoFilter.NextFilter}, {@link IoFilter.WriteRequest}, {@link IoSession} * </table> */ - public static final class WriteEvent extends Event + public static final class MinaWriteEvent extends Event { private final IoFilter.WriteRequest _data; private final IoFilter.NextFilter _nextFilter; + private IoSession _session; - public WriteEvent(final IoFilter.NextFilter nextFilter, final IoFilter.WriteRequest data) + public MinaWriteEvent(final IoFilter.NextFilter nextFilter, final IoFilter.WriteRequest data, final IoSession session) { - super(); _nextFilter = nextFilter; _data = data; + _session = session; } - public void process(IoSession session) + public void process() { - _nextFilter.filterWrite(session, _data); + _nextFilter.filterWrite(_session, _data); } public IoFilter.NextFilter getNextFilter() @@ -135,16 +146,17 @@ public abstract class Event public static final class CloseEvent extends Event { private final IoFilter.NextFilter _nextFilter; + private final IoSession _session; - public CloseEvent(final IoFilter.NextFilter nextFilter) + public CloseEvent(final IoFilter.NextFilter nextFilter, final IoSession session) { - super(); _nextFilter = nextFilter; + _session = session; } - public void process(IoSession session) + public void process() { - _nextFilter.sessionClosed(session); + _nextFilter.sessionClosed(_session); } public IoFilter.NextFilter getNextFilter() diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java index 00da005515..4e4192dbe3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java @@ -55,9 +55,6 @@ public class Job implements ReadWriteRunnable /** The maximum number of events to process per run of the job. More events than this may be queued in the job. */ private final int _maxEvents; - /** The Mina session. */ - private final IoSession _session; - /** Holds the queue of events that make up the job. */ private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>(); @@ -79,7 +76,13 @@ public class Job implements ReadWriteRunnable */ Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob) { - _session = session; + _completionHandler = completionHandler; + _maxEvents = maxEvents; + _readJob = readJob; + } + + public Job(JobCompletionHandler completionHandler, int maxEvents, boolean readJob) + { _completionHandler = completionHandler; _maxEvents = maxEvents; _readJob = readJob; @@ -90,7 +93,7 @@ public class Job implements ReadWriteRunnable * * @param evt The continuation to enqueue. */ - void add(Event evt) + public void add(Event evt) { _eventQueue.add(evt); } @@ -111,7 +114,7 @@ public class Job implements ReadWriteRunnable } else { - e.process(_session); + e.process(); } } return false; @@ -153,30 +156,19 @@ public class Job implements ReadWriteRunnable if(processAll()) { deactivate(); - _completionHandler.completed(_session, this); + _completionHandler.completed(this); } else { - _completionHandler.notCompleted(_session, this); + _completionHandler.notCompleted(this); } } - public boolean isReadJob() - { - return _readJob; - } - public boolean isRead() { return _readJob; } - public boolean isWrite() - { - return !_readJob; - } - - /** * Another interface for a continuation. * @@ -185,8 +177,8 @@ public class Job implements ReadWriteRunnable */ static interface JobCompletionHandler { - public void completed(IoSession session, Job job); + public void completed(Job job); - public void notCompleted(final IoSession session, final Job job); + public void notCompleted(final Job job); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java index a080cc7e04..4863611c42 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java @@ -20,19 +20,17 @@ */ package org.apache.qpid.pool; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; + import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoFilterAdapter; import org.apache.mina.common.IoSession; import org.apache.qpid.pool.Event.CloseEvent; - +import org.apache.qpid.pool.Event.MinaReceivedEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ExecutorService; - /** * PoolingFilter, is a no-op pass through filter that hands all events down the Mina filter chain by default. As it * adds no behaviour by default to the filter chain, it is abstract. @@ -74,7 +72,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo private final String _name; /** Defines the maximum number of events that will be batched into a single job. */ - static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); + public static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); private final int _maxEvents; @@ -188,7 +186,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo Job job = new Job(session, this, MAX_JOB_EVENTS,_readFilter); session.setAttribute(_name, job); } - + /** * Retrieves this filters Job, by this filters name, from the Mina session. * @@ -208,7 +206,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo * @param session The Mina session to work in. * @param job The job that completed. */ - public void completed(IoSession session, Job job) + public void completed(Job job) { @@ -239,7 +237,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo } } - public void notCompleted(IoSession session, Job job) + public void notCompleted(Job job) { final ExecutorService pool = _poolReference.getPool(); @@ -430,7 +428,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo public void messageReceived(NextFilter nextFilter, final IoSession session, Object message) { Job job = getJobForSession(session); - fireAsynchEvent(job, new Event.ReceivedEvent(nextFilter, message)); + fireAsynchEvent(job, new MinaReceivedEvent(nextFilter, message, session)); } /** @@ -442,7 +440,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo public void sessionClosed(final NextFilter nextFilter, final IoSession session) { Job job = getJobForSession(session); - fireAsynchEvent(job, new CloseEvent(nextFilter)); + fireAsynchEvent(job, new CloseEvent(nextFilter, session)); } } @@ -473,7 +471,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest) { Job job = getJobForSession(session); - fireAsynchEvent(job, new Event.WriteEvent(nextFilter, writeRequest)); + fireAsynchEvent(job, new Event.MinaWriteEvent(nextFilter, writeRequest, session)); } /** @@ -485,7 +483,8 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo public void sessionClosed(final NextFilter nextFilter, final IoSession session) { Job job = getJobForSession(session); - fireAsynchEvent(job, new CloseEvent(nextFilter)); + fireAsynchEvent(job, new CloseEvent(nextFilter, session)); } } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java index ad04a923e1..140c93ca8d 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java @@ -23,5 +23,4 @@ package org.apache.qpid.pool; public interface ReadWriteRunnable extends Runnable { boolean isRead(); - boolean isWrite(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java index d8c0f2c916..9df84eef90 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java @@ -20,10 +20,12 @@ */ package org.apache.qpid.protocol; +import org.apache.qpid.transport.NetworkDriver; + public interface ProtocolEngineFactory { // Returns a new instance of a ProtocolEngine - ProtocolEngine newProtocolEngine(); + ProtocolEngine newProtocolEngine(NetworkDriver networkDriver); }
\ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java index 18cae6bf85..c38afe5dd5 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java @@ -28,17 +28,17 @@ package org.apache.qpid.transport; public interface NetworkDriverConfiguration { // Taken from Socket - boolean getKeepAlive(); - boolean getOOBInline(); - boolean getReuseAddress(); + Boolean getKeepAlive(); + Boolean getOOBInline(); + Boolean getReuseAddress(); Integer getSoLinger(); // null means off - int getSoTimeout(); - boolean getTcpNoDelay(); - int getTrafficClass(); + Integer getSoTimeout(); + Boolean getTcpNoDelay(); + Integer getTrafficClass(); // The amount of memory in bytes to allocate to the incoming buffer - int getReceiveBufferSize(); + Integer getReceiveBufferSize(); // The amount of memory in bytes to allocate to the outgoing buffer - int getSendBufferSize(); + Integer getSendBufferSize(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java index 7330a042df..477e2cd5af 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java @@ -181,6 +181,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver { return _ioSession.getLocalAddress(); } + public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config, SSLEngine sslEngine) throws OpenException @@ -251,6 +252,10 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver public void close() { + if (_lastWriteFuture != null) + { + _lastWriteFuture.join(); + } if (_acceptor != null) { _acceptor.unbindAll(); @@ -359,9 +364,14 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); } - + + if (_ioSession == null) + { + _ioSession = protocolSession; + } + // Set up the protocol engine - ProtocolEngine protocolEngine = _factory.newProtocolEngine(); + ProtocolEngine protocolEngine = _factory.newProtocolEngine(this); MINANetworkDriver newDriver = new MINANetworkDriver(_useNIO, _processors, _executorPool, _protectIO, protocolEngine, protocolSession); protocolEngine.setNetworkDriver(newDriver); protocolSession.setAttachment(protocolEngine); @@ -385,4 +395,10 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver return _protocolEngine; } + public void setProtocolEngineFactory(ProtocolEngineFactory engineFactory, boolean acceptingConnections) + { + _factory = engineFactory; + _acceptingConnections = acceptingConnections; + } + } diff --git a/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java b/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java new file mode 100644 index 0000000000..46c812e265 --- /dev/null +++ b/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java @@ -0,0 +1,130 @@ +package org.apache.qpid.codec; + +import java.nio.ByteBuffer; +import java.util.ArrayList; + +import junit.framework.TestCase; + +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.framing.AMQProtocolVersionException; +import org.apache.qpid.framing.HeartbeatBody; + +public class AMQDecoderTest extends TestCase +{ + + private AMQCodecFactory _factory; + private AMQDecoder _decoder; + + + public void setUp() + { + _factory = new AMQCodecFactory(false, null); + _decoder = _factory.getDecoder(); + } + + + public void testSingleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException + { + ByteBuffer msg = HeartbeatBody.FRAME.toNioByteBuffer(); + ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msg); + if (frames.get(0) instanceof AMQFrame) + { + assertEquals(HeartbeatBody.FRAME.getBodyFrame().getFrameType(), ((AMQFrame) frames.get(0)).getBodyFrame().getFrameType()); + } + else + { + fail("decode was not a frame"); + } + } + + public void testPartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException + { + ByteBuffer msg = HeartbeatBody.FRAME.toNioByteBuffer(); + ByteBuffer msgA = msg.slice(); + int msgbPos = msg.remaining() / 2; + int msgaLimit = msg.remaining() - msgbPos; + msgA.limit(msgaLimit); + msg.position(msgbPos); + ByteBuffer msgB = msg.slice(); + ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msgA); + assertEquals(0, frames.size()); + frames = _decoder.decodeBuffer(msgB); + assertEquals(1, frames.size()); + if (frames.get(0) instanceof AMQFrame) + { + assertEquals(HeartbeatBody.FRAME.getBodyFrame().getFrameType(), ((AMQFrame) frames.get(0)).getBodyFrame().getFrameType()); + } + else + { + fail("decode was not a frame"); + } + } + + public void testMultipleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException + { + ByteBuffer msgA = HeartbeatBody.FRAME.toNioByteBuffer(); + ByteBuffer msgB = HeartbeatBody.FRAME.toNioByteBuffer(); + ByteBuffer msg = ByteBuffer.allocate(msgA.remaining() + msgB.remaining()); + msg.put(msgA); + msg.put(msgB); + msg.flip(); + ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msg); + assertEquals(2, frames.size()); + for (AMQDataBlock frame : frames) + { + if (frame instanceof AMQFrame) + { + assertEquals(HeartbeatBody.FRAME.getBodyFrame().getFrameType(), ((AMQFrame) frame).getBodyFrame().getFrameType()); + } + else + { + fail("decode was not a frame"); + } + } + } + + public void testMultiplePartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException + { + ByteBuffer msgA = HeartbeatBody.FRAME.toNioByteBuffer(); + ByteBuffer msgB = HeartbeatBody.FRAME.toNioByteBuffer(); + ByteBuffer msgC = HeartbeatBody.FRAME.toNioByteBuffer(); + + ByteBuffer sliceA = ByteBuffer.allocate(msgA.remaining() + msgB.remaining() / 2); + sliceA.put(msgA); + int limit = msgB.limit(); + int pos = msgB.remaining() / 2; + msgB.limit(pos); + sliceA.put(msgB); + sliceA.flip(); + msgB.limit(limit); + msgB.position(pos); + + ByteBuffer sliceB = ByteBuffer.allocate(msgB.remaining() + pos); + sliceB.put(msgB); + msgC.limit(pos); + sliceB.put(msgC); + sliceB.flip(); + msgC.limit(limit); + + ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(sliceA); + assertEquals(1, frames.size()); + frames = _decoder.decodeBuffer(sliceB); + assertEquals(1, frames.size()); + frames = _decoder.decodeBuffer(msgC); + assertEquals(1, frames.size()); + for (AMQDataBlock frame : frames) + { + if (frame instanceof AMQFrame) + { + assertEquals(HeartbeatBody.FRAME.getBodyFrame().getFrameType(), ((AMQFrame) frame).getBodyFrame().getFrameType()); + } + else + { + fail("decode was not a frame"); + } + } + } + +} diff --git a/qpid/java/common/src/test/java/org/apache/qpid/codec/MockAMQVersionAwareProtocolSession.java b/qpid/java/common/src/test/java/org/apache/qpid/codec/MockAMQVersionAwareProtocolSession.java new file mode 100644 index 0000000000..bd7fb68d93 --- /dev/null +++ b/qpid/java/common/src/test/java/org/apache/qpid/codec/MockAMQVersionAwareProtocolSession.java @@ -0,0 +1,95 @@ +package org.apache.qpid.codec; + +import java.nio.ByteBuffer; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.transport.Sender; + +public class MockAMQVersionAwareProtocolSession implements AMQVersionAwareProtocolSession +{ + + @Override + public void contentBodyReceived(int channelId, ContentBody body) throws AMQException + { + // TODO Auto-generated method stub + + } + + @Override + public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException + { + // TODO Auto-generated method stub + + } + + @Override + public MethodRegistry getMethodRegistry() + { + return MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9); + } + + @Override + public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException + { + // TODO Auto-generated method stub + + } + + @Override + public void init() + { + // TODO Auto-generated method stub + + } + + @Override + public void methodFrameReceived(int channelId, AMQMethodBody body) throws AMQException + { + // TODO Auto-generated method stub + + } + + @Override + public void setSender(Sender<ByteBuffer> sender) + { + // TODO Auto-generated method stub + + } + + @Override + public void writeFrame(AMQDataBlock frame) + { + // TODO Auto-generated method stub + + } + + @Override + public byte getProtocolMajorVersion() + { + // TODO Auto-generated method stub + return 0; + } + + @Override + public byte getProtocolMinorVersion() + { + // TODO Auto-generated method stub + return 0; + } + + @Override + public ProtocolVersion getProtocolVersion() + { + // TODO Auto-generated method stub + return null; + } + +} diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java index 7901f6a99d..6024875cf5 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java @@ -299,7 +299,7 @@ public class MINANetworkDriverTest extends TestCase _countingEngine.setNewLatch(TEST_DATA.getBytes().length); _client.send(ByteBuffer.wrap(TEST_DATA.getBytes())); _countingEngine.getExceptionLatch().await(2, TimeUnit.SECONDS); - assertEquals("Exception should not been thrown", 0, + assertEquals("Exception should have been thrown", 0, _countingEngine.getExceptionLatch().getCount()); } @@ -321,11 +321,12 @@ public class MINANetworkDriverTest extends TestCase { EchoProtocolEngine _engine = null; - public ProtocolEngine newProtocolEngine() + public ProtocolEngine newProtocolEngine(NetworkDriver driver) { if (_engine == null) { _engine = new EchoProtocolEngine(); + _engine.setNetworkDriver(driver); } return getEngine(); } |