summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java7
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java79
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java39
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java20
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java58
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java34
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java27
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java1
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java16
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java20
12 files changed, 203 insertions, 104 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
index fa890d0ebb..591dbd085b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
@@ -23,6 +23,7 @@ package org.apache.qpid.codec;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
/**
* AMQCodecFactory is a Mina codec factory. It supplies the encoders and decoders need to read and write the bytes to
@@ -50,9 +51,9 @@ public class AMQCodecFactory implements ProtocolCodecFactory
* @param expectProtocolInitiation <tt>true</tt> if the first frame received is going to be a protocol initiation
* frame, <tt>false</tt> if it is going to be a standard AMQ data block.
*/
- public AMQCodecFactory(boolean expectProtocolInitiation)
+ public AMQCodecFactory(boolean expectProtocolInitiation, AMQVersionAwareProtocolSession session)
{
- _frameDecoder = new AMQDecoder(expectProtocolInitiation);
+ _frameDecoder = new AMQDecoder(expectProtocolInitiation, session);
}
/**
@@ -70,7 +71,7 @@ public class AMQCodecFactory implements ProtocolCodecFactory
*
* @return The AMQP decoder.
*/
- public ProtocolDecoder getDecoder()
+ public AMQDecoder getDecoder()
{
return _frameDecoder;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
index 7eef73f337..281c0761d9 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
@@ -20,14 +20,21 @@
*/
package org.apache.qpid.codec;
+import java.util.ArrayList;
+
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.SimpleByteBufferAllocator;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQDataBlockDecoder;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQMethodBodyFactory;
+import org.apache.qpid.framing.AMQProtocolVersionException;
import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
/**
* AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a
@@ -62,14 +69,19 @@ public class AMQDecoder extends CumulativeProtocolDecoder
private boolean _expectProtocolInitiation;
private boolean firstDecode = true;
+ private AMQMethodBodyFactory _bodyFactory;
+
+ private ByteBuffer _remainingBuf;
+
/**
* Creates a new AMQP decoder.
*
* @param expectProtocolInitiation <tt>true</tt> if this decoder needs to handle protocol initiation.
*/
- public AMQDecoder(boolean expectProtocolInitiation)
+ public AMQDecoder(boolean expectProtocolInitiation, AMQVersionAwareProtocolSession session)
{
_expectProtocolInitiation = expectProtocolInitiation;
+ _bodyFactory = new AMQMethodBodyFactory(session);
}
/**
@@ -120,7 +132,7 @@ public class AMQDecoder extends CumulativeProtocolDecoder
protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
{
int pos = in.position();
- boolean enoughData = _dataBlockDecoder.decodable(session, in);
+ boolean enoughData = _dataBlockDecoder.decodable(in.buf());
in.position(pos);
if (!enoughData)
{
@@ -149,7 +161,7 @@ public class AMQDecoder extends CumulativeProtocolDecoder
*/
private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
{
- boolean enoughData = _piDecoder.decodable(session, in);
+ boolean enoughData = _piDecoder.decodable(in.buf());
if (!enoughData)
{
// returning false means it will leave the contents in the buffer and
@@ -158,7 +170,8 @@ public class AMQDecoder extends CumulativeProtocolDecoder
}
else
{
- _piDecoder.decode(session, in, out);
+ ProtocolInitiation pi = new ProtocolInitiation(in.buf());
+ out.write(pi);
return true;
}
@@ -177,7 +190,7 @@ public class AMQDecoder extends CumulativeProtocolDecoder
}
- /**
+ /**
* Cumulates content of <tt>in</tt> into internal buffer and forwards
* decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
* <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
@@ -268,4 +281,60 @@ public class AMQDecoder extends CumulativeProtocolDecoder
session.setAttribute( BUFFER, remainingBuf );
}
+ public ArrayList<AMQDataBlock> decodeBuffer(java.nio.ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException
+ {
+
+ // get prior remaining data from accumulator
+ ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>();
+ ByteBuffer msg;
+ // if we have a session buffer, append data to that otherwise
+ // use the buffer read from the network directly
+ if( _remainingBuf != null )
+ {
+ _remainingBuf.put(buf);
+ _remainingBuf.flip();
+ msg = _remainingBuf;
+ }
+ else
+ {
+ msg = ByteBuffer.wrap(buf);
+ }
+
+ if (_expectProtocolInitiation
+ || (firstDecode
+ && (msg.remaining() > 0)
+ && (msg.get(msg.position()) == (byte)'A')))
+ {
+ if (_piDecoder.decodable(msg.buf()))
+ {
+ dataBlocks.add(new ProtocolInitiation(msg.buf()));
+ }
+ }
+ else
+ {
+ boolean enoughData = true;
+ while (enoughData)
+ {
+ int pos = msg.position();
+
+ enoughData = _dataBlockDecoder.decodable(msg);
+ msg.position(pos);
+ if (enoughData)
+ {
+ dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory, msg));
+ }
+ else
+ {
+ _remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate(msg.remaining(), false);
+ _remainingBuf.setAutoExpand(true);
+ _remainingBuf.put(msg);
+ }
+ }
+ }
+ if(firstDecode && dataBlocks.size() > 0)
+ {
+ firstDecode = false;
+ }
+ return dataBlocks;
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
index 82ffc60802..228867b2b0 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
@@ -47,7 +47,7 @@ public class AMQDataBlockDecoder
public AMQDataBlockDecoder()
{ }
- public boolean decodable(IoSession session, ByteBuffer in) throws AMQFrameDecodingException
+ public boolean decodable(java.nio.ByteBuffer in) throws AMQFrameDecodingException
{
final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1);
// type, channel, body length and end byte
@@ -56,14 +56,15 @@ public class AMQDataBlockDecoder
return false;
}
- in.skip(1 + 2);
- final long bodySize = in.getUnsignedInt();
+ in.position(in.position() + 1 + 2);
+ // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt()
+ final long bodySize = in.getInt() & 0xffffffffL;
return (remainingAfterAttributes >= bodySize);
}
- protected Object createAndPopulateFrame(IoSession session, ByteBuffer in)
+ public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, ByteBuffer in)
throws AMQFrameDecodingException, AMQProtocolVersionException
{
final byte type = in.get();
@@ -71,15 +72,7 @@ public class AMQDataBlockDecoder
BodyFactory bodyFactory;
if (type == AMQMethodBody.TYPE)
{
- bodyFactory = (BodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY);
- if (bodyFactory == null)
- {
- AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
- bodyFactory = new AMQMethodBodyFactory(protocolSession);
- session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory);
-
- }
-
+ bodyFactory = methodBodyFactory;
}
else
{
@@ -115,6 +108,24 @@ public class AMQDataBlockDecoder
public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
{
- out.write(createAndPopulateFrame(session, in));
+ AMQMethodBodyFactory bodyFactory = (AMQMethodBodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY);
+ if (bodyFactory == null)
+ {
+ AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
+ bodyFactory = new AMQMethodBodyFactory(protocolSession);
+ session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory);
+ }
+
+ out.write(createAndPopulateFrame(bodyFactory, in));
+ }
+
+ public boolean decodable(ByteBuffer msg) throws AMQFrameDecodingException
+ {
+ return decodable(msg.buf());
+ }
+
+ public AMQDataBlock createAndPopulateFrame(AMQMethodBodyFactory factory, java.nio.ByteBuffer msg) throws AMQProtocolVersionException, AMQFrameDecodingException
+ {
+ return createAndPopulateFrame(factory, ByteBuffer.wrap(msg));
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
index 05fd2bb480..374644b4f2 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
@@ -50,7 +50,7 @@ public final class AMQDataBlockEncoder implements MessageEncoder
{
_logger.debug("Encoded frame byte-buffer is '" + EncodingUtils.convertToHexString(buffer) + "'");
}
-
+
out.write(buffer);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
index 3ac17e9204..cf8a866e47 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
@@ -20,12 +20,10 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.qpid.AMQException;
import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock
{
@@ -53,13 +51,12 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
_protocolMajor = protocolMajor;
_protocolMinor = protocolMinor;
}
-
+
public ProtocolInitiation(ProtocolVersion pv)
{
this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS, TCP_PROTOCOL_INSTANCE, pv.getMajorVersion(), pv.getMinorVersion());
}
-
public ProtocolInitiation(ByteBuffer in)
{
_protocolHeader = new byte[4];
@@ -71,6 +68,11 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
_protocolMinor = in.get();
}
+ public void writePayload(org.apache.mina.common.ByteBuffer buffer)
+ {
+ writePayload(buffer.buf());
+ }
+
public long getSize()
{
return 4 + 1 + 1 + 1 + 1;
@@ -127,16 +129,11 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
* @return true if we have enough data to decode the PI frame fully, false if more
* data is required
*/
- public boolean decodable(IoSession session, ByteBuffer in)
+ public boolean decodable(ByteBuffer in)
{
return (in.remaining() >= 8);
}
- public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out)
- {
- ProtocolInitiation pi = new ProtocolInitiation(in);
- out.write(pi);
- }
}
public ProtocolVersion checkVersion() throws AMQException
@@ -192,4 +189,5 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
buffer.append(Integer.toHexString(_protocolMinor));
return buffer.toString();
}
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java
index 5996cbf89c..49bce9f2f9 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java
@@ -45,21 +45,31 @@ import org.apache.mina.common.IoSession;
* a continuation. Job is also a continuation, as is the job completion handler. Or, as Event is totally abstract,
* it is really an interface, so could just drop it and use the continuation interface instead.
*/
-public abstract class Event
+public class Event
{
+ private Runnable _runner;
+
+ public Event()
+ {
+
+ }
+
/**
* Creates a continuation.
*/
- public Event()
- { }
+ public Event(Runnable runner)
+ {
+ _runner = runner;
+ }
/**
- * Processes the continuation in the context of a Mina session.
- *
- * @param session The Mina session.
+ * Processes the continuation
*/
- public abstract void process(IoSession session);
-
+ public void process()
+ {
+ _runner.run();
+ }
+
/**
* A continuation ({@link Event}) that takes a Mina messageReceived event, and passes it to a NextFilter.
*
@@ -68,22 +78,22 @@ public abstract class Event
* <tr><td> Pass a Mina messageReceived event to a NextFilter. <td> {@link IoFilter.NextFilter}, {@link IoSession}
* </table>
*/
- public static final class ReceivedEvent extends Event
+ public static final class MinaReceivedEvent extends Event
{
private final Object _data;
-
private final IoFilter.NextFilter _nextFilter;
+ private final IoSession _session;
- public ReceivedEvent(final IoFilter.NextFilter nextFilter, final Object data)
+ public MinaReceivedEvent(final IoFilter.NextFilter nextFilter, final Object data, final IoSession session)
{
- super();
_nextFilter = nextFilter;
_data = data;
+ _session = session;
}
- public void process(IoSession session)
+ public void process()
{
- _nextFilter.messageReceived(session, _data);
+ _nextFilter.messageReceived(_session, _data);
}
public IoFilter.NextFilter getNextFilter()
@@ -101,21 +111,22 @@ public abstract class Event
* <td> {@link IoFilter.NextFilter}, {@link IoFilter.WriteRequest}, {@link IoSession}
* </table>
*/
- public static final class WriteEvent extends Event
+ public static final class MinaWriteEvent extends Event
{
private final IoFilter.WriteRequest _data;
private final IoFilter.NextFilter _nextFilter;
+ private IoSession _session;
- public WriteEvent(final IoFilter.NextFilter nextFilter, final IoFilter.WriteRequest data)
+ public MinaWriteEvent(final IoFilter.NextFilter nextFilter, final IoFilter.WriteRequest data, final IoSession session)
{
- super();
_nextFilter = nextFilter;
_data = data;
+ _session = session;
}
- public void process(IoSession session)
+ public void process()
{
- _nextFilter.filterWrite(session, _data);
+ _nextFilter.filterWrite(_session, _data);
}
public IoFilter.NextFilter getNextFilter()
@@ -135,16 +146,17 @@ public abstract class Event
public static final class CloseEvent extends Event
{
private final IoFilter.NextFilter _nextFilter;
+ private final IoSession _session;
- public CloseEvent(final IoFilter.NextFilter nextFilter)
+ public CloseEvent(final IoFilter.NextFilter nextFilter, final IoSession session)
{
- super();
_nextFilter = nextFilter;
+ _session = session;
}
- public void process(IoSession session)
+ public void process()
{
- _nextFilter.sessionClosed(session);
+ _nextFilter.sessionClosed(_session);
}
public IoFilter.NextFilter getNextFilter()
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
index 00da005515..4e4192dbe3 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
@@ -55,9 +55,6 @@ public class Job implements ReadWriteRunnable
/** The maximum number of events to process per run of the job. More events than this may be queued in the job. */
private final int _maxEvents;
- /** The Mina session. */
- private final IoSession _session;
-
/** Holds the queue of events that make up the job. */
private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>();
@@ -79,7 +76,13 @@ public class Job implements ReadWriteRunnable
*/
Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob)
{
- _session = session;
+ _completionHandler = completionHandler;
+ _maxEvents = maxEvents;
+ _readJob = readJob;
+ }
+
+ public Job(JobCompletionHandler completionHandler, int maxEvents, boolean readJob)
+ {
_completionHandler = completionHandler;
_maxEvents = maxEvents;
_readJob = readJob;
@@ -90,7 +93,7 @@ public class Job implements ReadWriteRunnable
*
* @param evt The continuation to enqueue.
*/
- void add(Event evt)
+ public void add(Event evt)
{
_eventQueue.add(evt);
}
@@ -111,7 +114,7 @@ public class Job implements ReadWriteRunnable
}
else
{
- e.process(_session);
+ e.process();
}
}
return false;
@@ -153,30 +156,19 @@ public class Job implements ReadWriteRunnable
if(processAll())
{
deactivate();
- _completionHandler.completed(_session, this);
+ _completionHandler.completed(this);
}
else
{
- _completionHandler.notCompleted(_session, this);
+ _completionHandler.notCompleted(this);
}
}
- public boolean isReadJob()
- {
- return _readJob;
- }
-
public boolean isRead()
{
return _readJob;
}
- public boolean isWrite()
- {
- return !_readJob;
- }
-
-
/**
* Another interface for a continuation.
*
@@ -185,8 +177,8 @@ public class Job implements ReadWriteRunnable
*/
static interface JobCompletionHandler
{
- public void completed(IoSession session, Job job);
+ public void completed(Job job);
- public void notCompleted(final IoSession session, final Job job);
+ public void notCompleted(final Job job);
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
index a080cc7e04..4863611c42 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
@@ -20,19 +20,17 @@
*/
package org.apache.qpid.pool;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoFilterAdapter;
import org.apache.mina.common.IoSession;
import org.apache.qpid.pool.Event.CloseEvent;
-
+import org.apache.qpid.pool.Event.MinaReceivedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ExecutorService;
-
/**
* PoolingFilter, is a no-op pass through filter that hands all events down the Mina filter chain by default. As it
* adds no behaviour by default to the filter chain, it is abstract.
@@ -74,7 +72,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
private final String _name;
/** Defines the maximum number of events that will be batched into a single job. */
- static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
+ public static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
private final int _maxEvents;
@@ -188,7 +186,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
Job job = new Job(session, this, MAX_JOB_EVENTS,_readFilter);
session.setAttribute(_name, job);
}
-
+
/**
* Retrieves this filters Job, by this filters name, from the Mina session.
*
@@ -208,7 +206,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
* @param session The Mina session to work in.
* @param job The job that completed.
*/
- public void completed(IoSession session, Job job)
+ public void completed(Job job)
{
@@ -239,7 +237,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
}
}
- public void notCompleted(IoSession session, Job job)
+ public void notCompleted(Job job)
{
final ExecutorService pool = _poolReference.getPool();
@@ -430,7 +428,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
public void messageReceived(NextFilter nextFilter, final IoSession session, Object message)
{
Job job = getJobForSession(session);
- fireAsynchEvent(job, new Event.ReceivedEvent(nextFilter, message));
+ fireAsynchEvent(job, new MinaReceivedEvent(nextFilter, message, session));
}
/**
@@ -442,7 +440,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
public void sessionClosed(final NextFilter nextFilter, final IoSession session)
{
Job job = getJobForSession(session);
- fireAsynchEvent(job, new CloseEvent(nextFilter));
+ fireAsynchEvent(job, new CloseEvent(nextFilter, session));
}
}
@@ -473,7 +471,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest)
{
Job job = getJobForSession(session);
- fireAsynchEvent(job, new Event.WriteEvent(nextFilter, writeRequest));
+ fireAsynchEvent(job, new Event.MinaWriteEvent(nextFilter, writeRequest, session));
}
/**
@@ -485,7 +483,8 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
public void sessionClosed(final NextFilter nextFilter, final IoSession session)
{
Job job = getJobForSession(session);
- fireAsynchEvent(job, new CloseEvent(nextFilter));
+ fireAsynchEvent(job, new CloseEvent(nextFilter, session));
}
}
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
index ad04a923e1..140c93ca8d 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
@@ -23,5 +23,4 @@ package org.apache.qpid.pool;
public interface ReadWriteRunnable extends Runnable
{
boolean isRead();
- boolean isWrite();
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
index d8c0f2c916..9df84eef90 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
@@ -20,10 +20,12 @@
*/
package org.apache.qpid.protocol;
+import org.apache.qpid.transport.NetworkDriver;
+
public interface ProtocolEngineFactory
{
// Returns a new instance of a ProtocolEngine
- ProtocolEngine newProtocolEngine();
+ ProtocolEngine newProtocolEngine(NetworkDriver networkDriver);
} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java
index 18cae6bf85..c38afe5dd5 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java
@@ -28,17 +28,17 @@ package org.apache.qpid.transport;
public interface NetworkDriverConfiguration
{
// Taken from Socket
- boolean getKeepAlive();
- boolean getOOBInline();
- boolean getReuseAddress();
+ Boolean getKeepAlive();
+ Boolean getOOBInline();
+ Boolean getReuseAddress();
Integer getSoLinger(); // null means off
- int getSoTimeout();
- boolean getTcpNoDelay();
- int getTrafficClass();
+ Integer getSoTimeout();
+ Boolean getTcpNoDelay();
+ Integer getTrafficClass();
// The amount of memory in bytes to allocate to the incoming buffer
- int getReceiveBufferSize();
+ Integer getReceiveBufferSize();
// The amount of memory in bytes to allocate to the outgoing buffer
- int getSendBufferSize();
+ Integer getSendBufferSize();
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
index 7330a042df..477e2cd5af 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
@@ -181,6 +181,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
{
return _ioSession.getLocalAddress();
}
+
public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config,
SSLEngine sslEngine) throws OpenException
@@ -251,6 +252,10 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
public void close()
{
+ if (_lastWriteFuture != null)
+ {
+ _lastWriteFuture.join();
+ }
if (_acceptor != null)
{
_acceptor.unbindAll();
@@ -359,9 +364,14 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
}
-
+
+ if (_ioSession == null)
+ {
+ _ioSession = protocolSession;
+ }
+
// Set up the protocol engine
- ProtocolEngine protocolEngine = _factory.newProtocolEngine();
+ ProtocolEngine protocolEngine = _factory.newProtocolEngine(this);
MINANetworkDriver newDriver = new MINANetworkDriver(_useNIO, _processors, _executorPool, _protectIO, protocolEngine, protocolSession);
protocolEngine.setNetworkDriver(newDriver);
protocolSession.setAttachment(protocolEngine);
@@ -385,4 +395,10 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
return _protocolEngine;
}
+ public void setProtocolEngineFactory(ProtocolEngineFactory engineFactory, boolean acceptingConnections)
+ {
+ _factory = engineFactory;
+ _acceptingConnections = acceptingConnections;
+ }
+
}