diff options
author | Robert Greig <rgreig@apache.org> | 2007-01-29 16:37:13 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-01-29 16:37:13 +0000 |
commit | 45b41e212c827905f711c188457ed6cdcb97aab3 (patch) | |
tree | f9b24e181c8db1ea4c22d5b872b1fef0a3a7bddd /java/common/src | |
parent | 1f21d6b6a37c98886de34fb33f74e7519d2dabe6 (diff) | |
download | qpid-python-45b41e212c827905f711c188457ed6cdcb97aab3.tar.gz |
QPID-327 : Patch supplied by Rob Godfrey - [race condition] PoolingFilter : Possible race condition when completing a Job
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@501096 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
7 files changed, 113 insertions, 45 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java index b29c23c2a2..e7175e7973 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java @@ -51,4 +51,9 @@ public class AMQTypedValue AMQType type = AMQTypeMap.getType(buffer.get());
return new AMQTypedValue(type, buffer);
}
+
+ public String toString()
+ {
+ return "["+getType()+": "+getValue()+"]";
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java index f2d1a70cdc..697a0f4249 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java @@ -164,4 +164,14 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData protocolMajor + "." + protocolMinor + " not found in protocol version list."); } } + + public String toString() + { + StringBuffer buffer = new StringBuffer(new String(header)); + buffer.append(Integer.toHexString(protocolClass)); + buffer.append(Integer.toHexString(protocolInstance)); + buffer.append(Integer.toHexString(protocolMajor)); + buffer.append(Integer.toHexString(protocolMinor)); + return buffer.toString(); + } } diff --git a/java/common/src/main/java/org/apache/qpid/pool/Job.java b/java/common/src/main/java/org/apache/qpid/pool/Job.java index b9673cd48f..9b3bcfa008 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/Job.java +++ b/java/common/src/main/java/org/apache/qpid/pool/Job.java @@ -30,13 +30,13 @@ import java.util.concurrent.atomic.AtomicInteger; * Holds events for a session that will be processed asynchronously by * the thread pool in PoolingFilter. */ -class Job implements Runnable +public class Job implements Runnable { private final int _maxEvents; private final IoSession _session; private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>(); private final AtomicBoolean _active = new AtomicBoolean(); - private final AtomicInteger _refCount = new AtomicInteger(); + //private final AtomicInteger _refCount = new AtomicInteger(); private final JobCompletionHandler _completionHandler; Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents) @@ -45,21 +45,21 @@ class Job implements Runnable _completionHandler = completionHandler; _maxEvents = maxEvents; } - - void acquire() - { - _refCount.incrementAndGet(); - } - - void release() - { - _refCount.decrementAndGet(); - } - - boolean isReferenced() - { - return _refCount.get() > 0; - } +// +// void acquire() +// { +// _refCount.incrementAndGet(); +// } +// +// void release() +// { +// _refCount.decrementAndGet(); +// } +// +// boolean isReferenced() +// { +// return _refCount.get() > 0; +// } void add(Event evt) { diff --git a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java index c0026c1f36..073f1cdfa3 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java +++ b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java @@ -48,7 +48,7 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH void fireAsynchEvent(IoSession session, Event event) { Job job = getJobForSession(session); - job.acquire(); //prevents this job being removed from _jobs + // job.acquire(); //prevents this job being removed from _jobs job.add(event); //Additional checks on pool to check that it hasn't shutdown. @@ -60,10 +60,25 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH } + public void createNewJobForSession(IoSession session) + { + Job job = new Job(session, this, _maxEvents); + session.setAttribute(_name, job); + } + private Job getJobForSession(IoSession session) { - Job job = _jobs.get(session); - return job == null ? createJobForSession(session) : job; + return (Job) session.getAttribute(_name); + +/* if(job == null) + { + System.err.println("Error in " + _name); + Thread.dumpStack(); + } + + + job = _jobs.get(session); + return job == null ? createJobForSession(session) : job;*/ } private Job createJobForSession(IoSession session) @@ -81,15 +96,16 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH //Job.JobCompletionHandler public void completed(IoSession session, Job job) { - if (job.isComplete()) - { - job.release(); - if (!job.isReferenced()) - { - _jobs.remove(session); - } - } - else +// if (job.isComplete()) +// { +// job.release(); +// if (!job.isReferenced()) +// { +// _jobs.remove(session); +// } +// } +// else + if(!job.isComplete()) { // ritchiem : 2006-12-13 Do we need to perform the additional checks here? // Can the pool be shutdown at this point? diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java index 84b72bb0dc..c2f7f7ac48 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java +++ b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java @@ -26,12 +26,38 @@ import org.apache.mina.common.ThreadModel; public class ReadWriteThreadModel implements ThreadModel { + + private static final ReadWriteThreadModel _instance = new ReadWriteThreadModel(); + + private final PoolingFilter _asynchronousReadFilter; + private final PoolingFilter _asynchronousWriteFilter; + + private ReadWriteThreadModel() + { + final ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance(); + _asynchronousReadFilter = PoolingFilter.createAynschReadPoolingFilter(executor, "AsynchronousReadFilter"); + _asynchronousWriteFilter = PoolingFilter.createAynschWritePoolingFilter(executor, "AsynchronousWriteFilter"); + } + + public PoolingFilter getAsynchronousReadFilter() + { + return _asynchronousReadFilter; + } + + public PoolingFilter getAsynchronousWriteFilter() + { + return _asynchronousWriteFilter; + } + public void buildFilterChain(IoFilterChain chain) throws Exception { - ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance(); - PoolingFilter asyncRead = PoolingFilter.createAynschReadPoolingFilter(executor, "AsynchronousReadFilter"); - PoolingFilter asyncWrite = PoolingFilter.createAynschWritePoolingFilter(executor, "AsynchronousWriteFilter"); - chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(asyncRead)); - chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(asyncWrite)); + + chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(_asynchronousReadFilter)); + chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(_asynchronousWriteFilter)); + } + + public static ReadWriteThreadModel getInstance() + { + return _instance; } } diff --git a/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java b/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java index 9a5208662b..6383d52298 100644 --- a/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java +++ b/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java @@ -36,25 +36,32 @@ public class PoolingFilterTest extends TestCase public void setUp() { + //Create Pool _executorService = ReferenceCountingExecutorService.getInstance(); _executorService.acquireExecutorService(); - _pool = PoolingFilter.createAynschWritePoolingFilter(_executorService, + _pool = PoolingFilter.createAynschWritePoolingFilter(_executorService, "AsynchronousWriteFilter"); } public void testRejectedExecution() throws Exception { - _pool.filterWrite(new NoOpFilter(), new TestSession(), new IoFilter.WriteRequest("Message")); + + TestSession testSession = new TestSession(); + _pool.createNewJobForSession(testSession); + _pool.filterWrite(new NoOpFilter(), testSession, new IoFilter.WriteRequest("Message")); //Shutdown the pool _executorService.getPool().shutdownNow(); try { + + testSession = new TestSession(); + _pool.createNewJobForSession(testSession); //prior to fix for QPID-172 this would throw RejectedExecutionException - _pool.filterWrite(null, new TestSession(), null); + _pool.filterWrite(null, testSession, null); } catch (RejectedExecutionException rje) { diff --git a/java/common/src/test/java/org/apache/qpid/session/TestSession.java b/java/common/src/test/java/org/apache/qpid/session/TestSession.java index f10d55e9d0..aafc91b03b 100644 --- a/java/common/src/test/java/org/apache/qpid/session/TestSession.java +++ b/java/common/src/test/java/org/apache/qpid/session/TestSession.java @@ -24,9 +24,13 @@ import org.apache.mina.common.*; import java.net.SocketAddress; import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentHashMap; public class TestSession implements IoSession { + private final ConcurrentMap attributes = new ConcurrentHashMap(); + public TestSession() { } @@ -68,42 +72,42 @@ public class TestSession implements IoSession public Object getAttachment() { - return null; //TODO + return getAttribute(""); } public Object setAttachment(Object attachment) { - return null; //TODO + return setAttribute("",attachment); } public Object getAttribute(String key) { - return null; //TODO + return attributes.get(key); } public Object setAttribute(String key, Object value) { - return null; //TODO + return attributes.put(key,value); } public Object setAttribute(String key) { - return null; //TODO + return attributes.put(key, Boolean.TRUE); } public Object removeAttribute(String key) { - return null; //TODO + return attributes.remove(key); } public boolean containsAttribute(String key) { - return false; //TODO + return attributes.containsKey(key); } public Set getAttributeKeys() { - return null; //TODO + return attributes.keySet(); } public TransportType getTransportType() |