summaryrefslogtreecommitdiff
path: root/java/common/src
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-01-29 16:37:13 +0000
committerRobert Greig <rgreig@apache.org>2007-01-29 16:37:13 +0000
commit45b41e212c827905f711c188457ed6cdcb97aab3 (patch)
treef9b24e181c8db1ea4c22d5b872b1fef0a3a7bddd /java/common/src
parent1f21d6b6a37c98886de34fb33f74e7519d2dabe6 (diff)
downloadqpid-python-45b41e212c827905f711c188457ed6cdcb97aab3.tar.gz
QPID-327 : Patch supplied by Rob Godfrey - [race condition] PoolingFilter : Possible race condition when completing a Job
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@501096 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java5
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/Job.java34
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java40
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java36
-rw-r--r--java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java13
-rw-r--r--java/common/src/test/java/org/apache/qpid/session/TestSession.java20
7 files changed, 113 insertions, 45 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
index b29c23c2a2..e7175e7973 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
@@ -51,4 +51,9 @@ public class AMQTypedValue
AMQType type = AMQTypeMap.getType(buffer.get());
return new AMQTypedValue(type, buffer);
}
+
+ public String toString()
+ {
+ return "["+getType()+": "+getValue()+"]";
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
index f2d1a70cdc..697a0f4249 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
@@ -164,4 +164,14 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
protocolMajor + "." + protocolMinor + " not found in protocol version list.");
}
}
+
+ public String toString()
+ {
+ StringBuffer buffer = new StringBuffer(new String(header));
+ buffer.append(Integer.toHexString(protocolClass));
+ buffer.append(Integer.toHexString(protocolInstance));
+ buffer.append(Integer.toHexString(protocolMajor));
+ buffer.append(Integer.toHexString(protocolMinor));
+ return buffer.toString();
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/Job.java b/java/common/src/main/java/org/apache/qpid/pool/Job.java
index b9673cd48f..9b3bcfa008 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/Job.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/Job.java
@@ -30,13 +30,13 @@ import java.util.concurrent.atomic.AtomicInteger;
* Holds events for a session that will be processed asynchronously by
* the thread pool in PoolingFilter.
*/
-class Job implements Runnable
+public class Job implements Runnable
{
private final int _maxEvents;
private final IoSession _session;
private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>();
private final AtomicBoolean _active = new AtomicBoolean();
- private final AtomicInteger _refCount = new AtomicInteger();
+ //private final AtomicInteger _refCount = new AtomicInteger();
private final JobCompletionHandler _completionHandler;
Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents)
@@ -45,21 +45,21 @@ class Job implements Runnable
_completionHandler = completionHandler;
_maxEvents = maxEvents;
}
-
- void acquire()
- {
- _refCount.incrementAndGet();
- }
-
- void release()
- {
- _refCount.decrementAndGet();
- }
-
- boolean isReferenced()
- {
- return _refCount.get() > 0;
- }
+//
+// void acquire()
+// {
+// _refCount.incrementAndGet();
+// }
+//
+// void release()
+// {
+// _refCount.decrementAndGet();
+// }
+//
+// boolean isReferenced()
+// {
+// return _refCount.get() > 0;
+// }
void add(Event evt)
{
diff --git a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
index c0026c1f36..073f1cdfa3 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
@@ -48,7 +48,7 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH
void fireAsynchEvent(IoSession session, Event event)
{
Job job = getJobForSession(session);
- job.acquire(); //prevents this job being removed from _jobs
+ // job.acquire(); //prevents this job being removed from _jobs
job.add(event);
//Additional checks on pool to check that it hasn't shutdown.
@@ -60,10 +60,25 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH
}
+ public void createNewJobForSession(IoSession session)
+ {
+ Job job = new Job(session, this, _maxEvents);
+ session.setAttribute(_name, job);
+ }
+
private Job getJobForSession(IoSession session)
{
- Job job = _jobs.get(session);
- return job == null ? createJobForSession(session) : job;
+ return (Job) session.getAttribute(_name);
+
+/* if(job == null)
+ {
+ System.err.println("Error in " + _name);
+ Thread.dumpStack();
+ }
+
+
+ job = _jobs.get(session);
+ return job == null ? createJobForSession(session) : job;*/
}
private Job createJobForSession(IoSession session)
@@ -81,15 +96,16 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH
//Job.JobCompletionHandler
public void completed(IoSession session, Job job)
{
- if (job.isComplete())
- {
- job.release();
- if (!job.isReferenced())
- {
- _jobs.remove(session);
- }
- }
- else
+// if (job.isComplete())
+// {
+// job.release();
+// if (!job.isReferenced())
+// {
+// _jobs.remove(session);
+// }
+// }
+// else
+ if(!job.isComplete())
{
// ritchiem : 2006-12-13 Do we need to perform the additional checks here?
// Can the pool be shutdown at this point?
diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
index 84b72bb0dc..c2f7f7ac48 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
@@ -26,12 +26,38 @@ import org.apache.mina.common.ThreadModel;
public class ReadWriteThreadModel implements ThreadModel
{
+
+ private static final ReadWriteThreadModel _instance = new ReadWriteThreadModel();
+
+ private final PoolingFilter _asynchronousReadFilter;
+ private final PoolingFilter _asynchronousWriteFilter;
+
+ private ReadWriteThreadModel()
+ {
+ final ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance();
+ _asynchronousReadFilter = PoolingFilter.createAynschReadPoolingFilter(executor, "AsynchronousReadFilter");
+ _asynchronousWriteFilter = PoolingFilter.createAynschWritePoolingFilter(executor, "AsynchronousWriteFilter");
+ }
+
+ public PoolingFilter getAsynchronousReadFilter()
+ {
+ return _asynchronousReadFilter;
+ }
+
+ public PoolingFilter getAsynchronousWriteFilter()
+ {
+ return _asynchronousWriteFilter;
+ }
+
public void buildFilterChain(IoFilterChain chain) throws Exception
{
- ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance();
- PoolingFilter asyncRead = PoolingFilter.createAynschReadPoolingFilter(executor, "AsynchronousReadFilter");
- PoolingFilter asyncWrite = PoolingFilter.createAynschWritePoolingFilter(executor, "AsynchronousWriteFilter");
- chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(asyncRead));
- chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(asyncWrite));
+
+ chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(_asynchronousReadFilter));
+ chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(_asynchronousWriteFilter));
+ }
+
+ public static ReadWriteThreadModel getInstance()
+ {
+ return _instance;
}
}
diff --git a/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java b/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java
index 9a5208662b..6383d52298 100644
--- a/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java
+++ b/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java
@@ -36,25 +36,32 @@ public class PoolingFilterTest extends TestCase
public void setUp()
{
+
//Create Pool
_executorService = ReferenceCountingExecutorService.getInstance();
_executorService.acquireExecutorService();
- _pool = PoolingFilter.createAynschWritePoolingFilter(_executorService,
+ _pool = PoolingFilter.createAynschWritePoolingFilter(_executorService,
"AsynchronousWriteFilter");
}
public void testRejectedExecution() throws Exception
{
- _pool.filterWrite(new NoOpFilter(), new TestSession(), new IoFilter.WriteRequest("Message"));
+
+ TestSession testSession = new TestSession();
+ _pool.createNewJobForSession(testSession);
+ _pool.filterWrite(new NoOpFilter(), testSession, new IoFilter.WriteRequest("Message"));
//Shutdown the pool
_executorService.getPool().shutdownNow();
try
{
+
+ testSession = new TestSession();
+ _pool.createNewJobForSession(testSession);
//prior to fix for QPID-172 this would throw RejectedExecutionException
- _pool.filterWrite(null, new TestSession(), null);
+ _pool.filterWrite(null, testSession, null);
}
catch (RejectedExecutionException rje)
{
diff --git a/java/common/src/test/java/org/apache/qpid/session/TestSession.java b/java/common/src/test/java/org/apache/qpid/session/TestSession.java
index f10d55e9d0..aafc91b03b 100644
--- a/java/common/src/test/java/org/apache/qpid/session/TestSession.java
+++ b/java/common/src/test/java/org/apache/qpid/session/TestSession.java
@@ -24,9 +24,13 @@ import org.apache.mina.common.*;
import java.net.SocketAddress;
import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
public class TestSession implements IoSession
{
+ private final ConcurrentMap attributes = new ConcurrentHashMap();
+
public TestSession()
{
}
@@ -68,42 +72,42 @@ public class TestSession implements IoSession
public Object getAttachment()
{
- return null; //TODO
+ return getAttribute("");
}
public Object setAttachment(Object attachment)
{
- return null; //TODO
+ return setAttribute("",attachment);
}
public Object getAttribute(String key)
{
- return null; //TODO
+ return attributes.get(key);
}
public Object setAttribute(String key, Object value)
{
- return null; //TODO
+ return attributes.put(key,value);
}
public Object setAttribute(String key)
{
- return null; //TODO
+ return attributes.put(key, Boolean.TRUE);
}
public Object removeAttribute(String key)
{
- return null; //TODO
+ return attributes.remove(key);
}
public boolean containsAttribute(String key)
{
- return false; //TODO
+ return attributes.containsKey(key);
}
public Set getAttributeKeys()
{
- return null; //TODO
+ return attributes.keySet();
}
public TransportType getTransportType()