summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java27
1 files changed, 13 insertions, 14 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
index a080cc7e04..4863611c42 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
@@ -20,19 +20,17 @@
*/
package org.apache.qpid.pool;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoFilterAdapter;
import org.apache.mina.common.IoSession;
import org.apache.qpid.pool.Event.CloseEvent;
-
+import org.apache.qpid.pool.Event.MinaReceivedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ExecutorService;
-
/**
* PoolingFilter, is a no-op pass through filter that hands all events down the Mina filter chain by default. As it
* adds no behaviour by default to the filter chain, it is abstract.
@@ -74,7 +72,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
private final String _name;
/** Defines the maximum number of events that will be batched into a single job. */
- static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
+ public static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
private final int _maxEvents;
@@ -188,7 +186,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
Job job = new Job(session, this, MAX_JOB_EVENTS,_readFilter);
session.setAttribute(_name, job);
}
-
+
/**
* Retrieves this filters Job, by this filters name, from the Mina session.
*
@@ -208,7 +206,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
* @param session The Mina session to work in.
* @param job The job that completed.
*/
- public void completed(IoSession session, Job job)
+ public void completed(Job job)
{
@@ -239,7 +237,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
}
}
- public void notCompleted(IoSession session, Job job)
+ public void notCompleted(Job job)
{
final ExecutorService pool = _poolReference.getPool();
@@ -430,7 +428,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
public void messageReceived(NextFilter nextFilter, final IoSession session, Object message)
{
Job job = getJobForSession(session);
- fireAsynchEvent(job, new Event.ReceivedEvent(nextFilter, message));
+ fireAsynchEvent(job, new MinaReceivedEvent(nextFilter, message, session));
}
/**
@@ -442,7 +440,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
public void sessionClosed(final NextFilter nextFilter, final IoSession session)
{
Job job = getJobForSession(session);
- fireAsynchEvent(job, new CloseEvent(nextFilter));
+ fireAsynchEvent(job, new CloseEvent(nextFilter, session));
}
}
@@ -473,7 +471,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest)
{
Job job = getJobForSession(session);
- fireAsynchEvent(job, new Event.WriteEvent(nextFilter, writeRequest));
+ fireAsynchEvent(job, new Event.MinaWriteEvent(nextFilter, writeRequest, session));
}
/**
@@ -485,7 +483,8 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
public void sessionClosed(final NextFilter nextFilter, final IoSession session)
{
Job job = getJobForSession(session);
- fireAsynchEvent(job, new CloseEvent(nextFilter));
+ fireAsynchEvent(job, new CloseEvent(nextFilter, session));
}
}
+
}