summaryrefslogtreecommitdiff
path: root/java/common/src/main/java/org
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-06-19 09:01:59 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-06-19 09:01:59 +0000
commit913d1a55b290f9a8295d5e8396c696d3cee73bc0 (patch)
treea4dde827f8b825e6535197cc12df347bd8d064db /java/common/src/main/java/org
parentf3fc904893b8c345b1aa358816d118fd0aad7d8b (diff)
downloadqpid-python-913d1a55b290f9a8295d5e8396c696d3cee73bc0.tar.gz
QPID-950 : Broker refactoring, copied / merged from branch
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@669431 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src/main/java/org')
-rw-r--r--java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java63
-rw-r--r--java/common/src/main/java/org/apache/qpid/common/ClientProperties.java23
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java56
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java20
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/FieldTable.java105
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/Job.java24
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java30
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java432
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java27
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java24
10 files changed, 705 insertions, 99 deletions
diff --git a/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java b/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java
index bed80d5954..0c311b6645 100644
--- a/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java
+++ b/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java
@@ -62,7 +62,6 @@ public class FixedSizeByteBufferAllocator implements ByteBufferAllocator
private static final class FixedSizeByteBuffer extends ByteBuffer
{
private java.nio.ByteBuffer buf;
- private int refCount = 1;
private int mark = -1;
@@ -70,36 +69,14 @@ public class FixedSizeByteBufferAllocator implements ByteBufferAllocator
{
this.buf = buf;
buf.order( ByteOrder.BIG_ENDIAN );
- refCount = 1;
}
public synchronized void acquire()
{
- if( refCount <= 0 )
- {
- throw new IllegalStateException( "Already released buffer." );
- }
-
- refCount ++;
}
public void release()
{
- synchronized( this )
- {
- if( refCount <= 0 )
- {
- refCount = 0;
- throw new IllegalStateException(
- "Already released buffer. You released the buffer too many times." );
- }
-
- refCount --;
- if( refCount > 0)
- {
- return;
- }
- }
}
public java.nio.ByteBuffer buf()
@@ -157,50 +134,12 @@ public class FixedSizeByteBufferAllocator implements ByteBufferAllocator
{
if( newCapacity > capacity() )
{
- // Allocate a new buffer and transfer all settings to it.
- int pos = position();
- int limit = limit();
- ByteOrder bo = order();
-
- capacity0( newCapacity );
- buf.limit( limit );
- if( mark >= 0 )
- {
- buf.position( mark );
- buf.mark();
- }
- buf.position( pos );
- buf.order( bo );
+ throw new IllegalArgumentException();
}
return this;
}
- protected void capacity0( int requestedCapacity )
- {
- int newCapacity = MINIMUM_CAPACITY;
- while( newCapacity < requestedCapacity )
- {
- newCapacity <<= 1;
- }
-
- java.nio.ByteBuffer oldBuf = this.buf;
- java.nio.ByteBuffer newBuf;
- if( isDirect() )
- {
- newBuf = java.nio.ByteBuffer.allocateDirect( newCapacity );
- }
- else
- {
- newBuf = java.nio.ByteBuffer.allocate( newCapacity );
- }
-
- newBuf.clear();
- oldBuf.clear();
- newBuf.put( oldBuf );
- this.buf = newBuf;
- }
-
public boolean isAutoExpand()
diff --git a/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
index 67f16e6a87..7371c12519 100644
--- a/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
+++ b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.common;
+import org.apache.qpid.framing.AMQShortString;
+
/**
* Specifies the available client property types that different clients can use to identify themselves with.
*
@@ -30,8 +32,21 @@ package org.apache.qpid.common;
*/
public enum ClientProperties
{
- instance,
- product,
- version,
- platform
+ instance("instance"),
+ product("product"),
+ version("version"),
+ platform("platform");
+
+ private final AMQShortString _amqShortString;
+
+ private ClientProperties(String name)
+ {
+ _amqShortString = new AMQShortString(name);
+ }
+
+
+ public AMQShortString toAMQShortString()
+ {
+ return _amqShortString;
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
index a747aaeda7..2a248bf703 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
@@ -224,7 +224,6 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
}
}
-
/**
* Get the length of the short string
* @return length of the underlying byte array
@@ -464,13 +463,49 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
return false;
}
- if ((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
+ final int hashCode = _hashCode;
+
+ final int otherHashCode = otherString._hashCode;
+
+ if ((hashCode != 0) && (otherHashCode != 0) && (hashCode != otherHashCode))
+ {
+ return false;
+ }
+
+ final int length = _length;
+
+ if(length != otherString._length)
{
return false;
}
- return (_offset == 0 && otherString._offset == 0 && _length == _data.length && otherString._length == otherString._data.length && Arrays.equals(_data,otherString._data))
- || Arrays.equals(getBytes(),otherString.getBytes());
+
+ final byte[] data = _data;
+
+ final byte[] otherData = otherString._data;
+
+ final int offset = _offset;
+
+ final int otherOffset = otherString._offset;
+
+ if(offset == 0 && otherOffset == 0 && length == data.length && length == otherData.length)
+ {
+ return Arrays.equals(data, otherData);
+ }
+ else
+ {
+ int thisIdx = offset;
+ int otherIdx = otherOffset;
+ for(int i = length; i-- != 0; )
+ {
+ if(!(data[thisIdx++] == otherData[otherIdx++]))
+ {
+ return false;
+ }
+ }
+ }
+
+ return true;
}
@@ -718,4 +753,17 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
return false; //To change body of created methods use File | Settings | File Templates.
}
+
+ public static void main(String args[])
+ {
+ AMQShortString s = new AMQShortString("a.b.c.d.e.f.g.h.i.j.k");
+ AMQShortString s2 = s.substring(2, 7);
+
+ AMQShortStringTokenizer t = s2.tokenize((byte) '.');
+ while(t.hasMoreTokens())
+ {
+ System.err.println(t.nextToken());
+ }
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
index d6359baa0f..1ff39ca790 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
@@ -93,4 +93,24 @@ public class AMQTypedValue
{
return "[" + getType() + ": " + getValue() + "]";
}
+
+
+ public boolean equals(Object o)
+ {
+ if(o instanceof AMQTypedValue)
+ {
+ AMQTypedValue other = (AMQTypedValue) o;
+ return _type == other._type && (_value == null ? other._value == null : _value.equals(other._value));
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public int hashCode()
+ {
+ return _type.hashCode() ^ (_value == null ? 0 : _value.hashCode());
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
index 9ba9b53b13..ed01c91804 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
@@ -74,7 +74,7 @@ public class FieldTable
buffer.skip((int) length);
}
- private AMQTypedValue getProperty(AMQShortString string)
+ public AMQTypedValue getProperty(AMQShortString string)
{
checkPropertyName(string);
@@ -891,6 +891,20 @@ public class FieldTable
return keys;
}
+ public Iterator<Map.Entry<AMQShortString, AMQTypedValue>> iterator()
+ {
+ if(_encodedForm != null)
+ {
+ return new FieldTableIterator(_encodedForm.duplicate().rewind(),(int)_encodedSize);
+ }
+ else
+ {
+ initMapIfNecessary();
+ return _properties.entrySet().iterator();
+ }
+ }
+
+
public Object get(AMQShortString key)
{
@@ -1050,6 +1064,95 @@ public class FieldTable
}
}
+ private static final class FieldTableEntry implements Map.Entry<AMQShortString, AMQTypedValue>
+ {
+ private final AMQTypedValue _value;
+ private final AMQShortString _key;
+
+ public FieldTableEntry(final AMQShortString key, final AMQTypedValue value)
+ {
+ _key = key;
+ _value = value;
+ }
+
+ public AMQShortString getKey()
+ {
+ return _key;
+ }
+
+ public AMQTypedValue getValue()
+ {
+ return _value;
+ }
+
+ public AMQTypedValue setValue(final AMQTypedValue value)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean equals(Object o)
+ {
+ if(o instanceof FieldTableEntry)
+ {
+ FieldTableEntry other = (FieldTableEntry) o;
+ return (_key == null ? other._key == null : _key.equals(other._key))
+ && (_value == null ? other._value == null : _value.equals(other._value));
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public int hashCode()
+ {
+ return (getKey()==null ? 0 : getKey().hashCode())
+ ^ (getValue()==null ? 0 : getValue().hashCode());
+ }
+
+ }
+
+
+ private static final class FieldTableIterator implements Iterator<Map.Entry<AMQShortString, AMQTypedValue>>
+ {
+
+ private final ByteBuffer _buffer;
+ private int _expectedRemaining;
+
+ public FieldTableIterator(ByteBuffer buffer, int length)
+ {
+ _buffer = buffer;
+ _expectedRemaining = buffer.remaining() - length;
+ }
+
+ public boolean hasNext()
+ {
+ return (_buffer.remaining() > _expectedRemaining);
+ }
+
+ public Map.Entry<AMQShortString, AMQTypedValue> next()
+ {
+ if(hasNext())
+ {
+ final AMQShortString key = EncodingUtils.readAMQShortString(_buffer);
+ AMQTypedValue value = AMQTypedValue.readFromBuffer(_buffer);
+ return new FieldTableEntry(key, value);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+
+
+
public int hashCode()
{
initMapIfNecessary();
diff --git a/java/common/src/main/java/org/apache/qpid/pool/Job.java b/java/common/src/main/java/org/apache/qpid/pool/Job.java
index b2a09ac592..00da005515 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/Job.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/Job.java
@@ -50,7 +50,7 @@ import org.apache.mina.common.IoSession;
*
* @todo For better re-usability could make the completion handler optional. Only run it when one is set.
*/
-public class Job implements Runnable
+public class Job implements ReadWriteRunnable
{
/** The maximum number of events to process per run of the job. More events than this may be queued in the job. */
private final int _maxEvents;
@@ -67,18 +67,22 @@ public class Job implements Runnable
/** Holds the completion continuation, called upon completion of a run of the job. */
private final JobCompletionHandler _completionHandler;
+ private final boolean _readJob;
+
/**
* Creates a new job that aggregates many continuations together.
*
* @param session The Mina session.
* @param completionHandler The per job run, terminal continuation.
* @param maxEvents The maximum number of aggregated continuations to process per run of the job.
+ * @param readJob
*/
- Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents)
+ Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob)
{
_session = session;
_completionHandler = completionHandler;
_maxEvents = maxEvents;
+ _readJob = readJob;
}
/**
@@ -157,6 +161,22 @@ public class Job implements Runnable
}
}
+ public boolean isReadJob()
+ {
+ return _readJob;
+ }
+
+ public boolean isRead()
+ {
+ return _readJob;
+ }
+
+ public boolean isWrite()
+ {
+ return !_readJob;
+ }
+
+
/**
* Another interface for a continuation.
*
diff --git a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
index 2912e54662..a080cc7e04 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
@@ -60,24 +60,6 @@ import java.util.concurrent.ExecutorService;
* <td> {@link Job}, {@link Job.JobCompletionHandler}
* </table>
*
- * @todo This seems a bit bizarre. ReadWriteThreadModel creates seperate pooling filters for read and write events.
- * The pooling filters themselves batch read and write events into jobs, but hand these jobs to a common thread
- * pool for execution. So the same thread pool ends up handling read and write events, albeit with many threads
- * so there is concurrency. But why go to the trouble of seperating out the read and write events in that case?
- * Why not just batch them into jobs together? Perhaps its so that seperate thread pools could be used for these
- * stages.
- *
- * @todo Why set an event limit of 10 on the Job? This also seems bizarre, as the job can have more than 10 events in
- * it. Its just that it runs them 10 at a time, but the completion hander here checks if there are more to run
- * and trips off another batch of 10 until they are all done. Why not just have a straight forward
- * consumer/producer queue scenario without the batches of 10? So instead of having many jobs with batches of 10
- * in them, just have one queue of events and worker threads taking the next event. There will be coordination
- * between worker threads and new events arriving on the job anyway, so the simpler scenario may have the same
- * amount of contention. I can see that the batches of 10 is done, so that no job is allowed to hog the worker
- * pool for too long. I'm not convinced this fairly complex scheme will actually add anything, and it might be
- * better to encapsulate it under a Queue interface anyway, so that different queue implementations can easily
- * be substituted in.
- *
* @todo The static helper methods are pointless. Could just call new.
*/
public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler
@@ -96,17 +78,20 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
private final int _maxEvents;
+ private final boolean _readFilter;
+
/**
* Creates a named pooling filter, on the specified shared thread pool.
*
* @param refCountingPool The thread pool reference.
* @param name The identifying name of the filter type.
*/
- public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents)
+ public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents, boolean readFilter)
{
_poolReference = refCountingPool;
_name = name;
_maxEvents = maxEvents;
+ _readFilter = readFilter;
}
/**
@@ -167,7 +152,6 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
void fireAsynchEvent(Job job, Event event)
{
- // job.acquire(); //prevents this job being removed from _jobs
job.add(event);
final ExecutorService pool = _poolReference.getPool();
@@ -201,7 +185,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
*/
public void createNewJobForSession(IoSession session)
{
- Job job = new Job(session, this, MAX_JOB_EVENTS);
+ Job job = new Job(session, this, MAX_JOB_EVENTS,_readFilter);
session.setAttribute(_name, job);
}
@@ -433,7 +417,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
*/
public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
{
- super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS));
+ super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS),true);
}
/**
@@ -476,7 +460,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
*/
public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
{
- super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS));
+ super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS),false);
}
/**
diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java
new file mode 100644
index 0000000000..8de0f93ce9
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java
@@ -0,0 +1,432 @@
+package org.apache.qpid.pool;
+
+import java.util.AbstractQueue;
+import java.util.Iterator;
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable>
+{
+
+ private final AtomicInteger _count = new AtomicInteger(0);
+
+ private final ReentrantLock _takeLock = new ReentrantLock();
+
+ private final Condition _notEmpty = _takeLock.newCondition();
+
+ private final ReentrantLock _putLock = new ReentrantLock();
+
+ private final ConcurrentLinkedQueue<ReadWriteRunnable> _readJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>();
+
+ private final ConcurrentLinkedQueue<ReadWriteRunnable> _writeJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>();
+
+
+ private class ReadWriteJobIterator implements Iterator<Runnable>
+ {
+
+ private boolean _onReads;
+ private Iterator<ReadWriteRunnable> _iter = _writeJobQueue.iterator();
+
+ public boolean hasNext()
+ {
+ if(!_iter.hasNext())
+ {
+ if(_onReads)
+ {
+ _iter = _readJobQueue.iterator();
+ _onReads = true;
+ return _iter.hasNext();
+ }
+ else
+ {
+ return false;
+ }
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ public Runnable next()
+ {
+ if(_iter.hasNext())
+ {
+ return _iter.next();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public void remove()
+ {
+ _takeLock.lock();
+ try
+ {
+ _iter.remove();
+ _count.decrementAndGet();
+ }
+ finally
+ {
+ _takeLock.unlock();
+ }
+ }
+ }
+
+ public Iterator<Runnable> iterator()
+ {
+ return new ReadWriteJobIterator();
+ }
+
+ public int size()
+ {
+ return _count.get();
+ }
+
+ public boolean offer(final Runnable runnable)
+ {
+ final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
+ final ReentrantLock putLock = _putLock;
+ putLock.lock();
+ try
+ {
+ if(job.isRead())
+ {
+ _readJobQueue.offer(job);
+ }
+ else
+ {
+ _writeJobQueue.offer(job);
+ }
+ if(_count.getAndIncrement() == 0)
+ {
+ _takeLock.lock();
+ try
+ {
+ _notEmpty.signal();
+ }
+ finally
+ {
+ _takeLock.unlock();
+ }
+ }
+ return true;
+ }
+ finally
+ {
+ putLock.unlock();
+ }
+ }
+
+ public void put(final Runnable runnable) throws InterruptedException
+ {
+ final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
+ final ReentrantLock putLock = _putLock;
+ putLock.lock();
+
+ try
+ {
+ if(job.isRead())
+ {
+ _readJobQueue.offer(job);
+ }
+ else
+ {
+ _writeJobQueue.offer(job);
+ }
+ if(_count.getAndIncrement() == 0)
+ {
+ _takeLock.lock();
+ try
+ {
+ _notEmpty.signal();
+ }
+ finally
+ {
+ _takeLock.unlock();
+ }
+ }
+
+ }
+ finally
+ {
+ putLock.unlock();
+ }
+ }
+
+
+
+ public boolean offer(final Runnable runnable, final long timeout, final TimeUnit unit) throws InterruptedException
+ {
+ final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
+ final ReentrantLock putLock = _putLock;
+ putLock.lock();
+
+ try
+ {
+ if(job.isRead())
+ {
+ _readJobQueue.offer(job);
+ }
+ else
+ {
+ _writeJobQueue.offer(job);
+ }
+ if(_count.getAndIncrement() == 0)
+ {
+ _takeLock.lock();
+ try
+ {
+ _notEmpty.signal();
+ }
+ finally
+ {
+ _takeLock.unlock();
+ }
+ }
+
+ return true;
+ }
+ finally
+ {
+ putLock.unlock();
+ }
+
+ }
+
+ public Runnable take() throws InterruptedException
+ {
+ final ReentrantLock takeLock = _takeLock;
+ takeLock.lockInterruptibly();
+ try
+ {
+ try
+ {
+ while (_count.get() == 0)
+ {
+ _notEmpty.await();
+ }
+ }
+ catch (InterruptedException ie)
+ {
+ _notEmpty.signal();
+ throw ie;
+ }
+
+ ReadWriteRunnable job = _writeJobQueue.poll();
+ if(job == null)
+ {
+ job = _readJobQueue.poll();
+ }
+ int c = _count.getAndDecrement();
+ if (c > 1)
+ {
+ _notEmpty.signal();
+ }
+ return job;
+ }
+ finally
+ {
+ takeLock.unlock();
+ }
+
+
+ }
+
+ public Runnable poll(final long timeout, final TimeUnit unit) throws InterruptedException
+ {
+ final ReentrantLock takeLock = _takeLock;
+ final AtomicInteger count = _count;
+ long nanos = unit.toNanos(timeout);
+ takeLock.lockInterruptibly();
+ ReadWriteRunnable job = null;
+ try
+ {
+
+ for (;;)
+ {
+ if (count.get() > 0)
+ {
+ job = _writeJobQueue.poll();
+ if(job == null)
+ {
+ job = _readJobQueue.poll();
+ }
+ int c = count.getAndDecrement();
+ if (c > 1)
+ {
+ _notEmpty.signal();
+ }
+ break;
+ }
+ if (nanos <= 0)
+ {
+ return null;
+ }
+ try
+ {
+ nanos = _notEmpty.awaitNanos(nanos);
+ }
+ catch (InterruptedException ie)
+ {
+ _notEmpty.signal();
+ throw ie;
+ }
+ }
+ }
+ finally
+ {
+ takeLock.unlock();
+ }
+
+ return job;
+ }
+
+ public int remainingCapacity()
+ {
+ return Integer.MAX_VALUE;
+ }
+
+ public int drainTo(final Collection<? super Runnable> c)
+ {
+ int total = 0;
+
+ _putLock.lock();
+ _takeLock.lock();
+ try
+ {
+ ReadWriteRunnable job;
+ while((job = _writeJobQueue.peek())!= null)
+ {
+ c.add(job);
+ _writeJobQueue.poll();
+ _count.decrementAndGet();
+ total++;
+ }
+
+ while((job = _readJobQueue.peek())!= null)
+ {
+ c.add(job);
+ _readJobQueue.poll();
+ _count.decrementAndGet();
+ total++;
+ }
+
+ }
+ finally
+ {
+ _takeLock.unlock();
+ _putLock.unlock();
+ }
+ return total;
+ }
+
+ public int drainTo(final Collection<? super Runnable> c, final int maxElements)
+ {
+ int total = 0;
+
+ _putLock.lock();
+ _takeLock.lock();
+ try
+ {
+ ReadWriteRunnable job;
+ while(total<=maxElements && (job = _writeJobQueue.peek())!= null)
+ {
+ c.add(job);
+ _writeJobQueue.poll();
+ _count.decrementAndGet();
+ total++;
+ }
+
+ while(total<=maxElements && (job = _readJobQueue.peek())!= null)
+ {
+ c.add(job);
+ _readJobQueue.poll();
+ _count.decrementAndGet();
+ total++;
+ }
+
+ }
+ finally
+ {
+ _takeLock.unlock();
+ _putLock.unlock();
+ }
+ return total;
+
+ }
+
+ public Runnable poll()
+ {
+ final ReentrantLock takeLock = _takeLock;
+ takeLock.lock();
+ try
+ {
+ if(_count.get() > 0)
+ {
+ ReadWriteRunnable job = _writeJobQueue.poll();
+ if(job == null)
+ {
+ job = _readJobQueue.poll();
+ }
+ _count.decrementAndGet();
+ return job;
+ }
+ else
+ {
+ return null;
+ }
+ }
+ finally
+ {
+ takeLock.unlock();
+ }
+
+ }
+
+ public Runnable peek()
+ {
+ final ReentrantLock takeLock = _takeLock;
+ takeLock.lock();
+ try
+ {
+ ReadWriteRunnable job = _writeJobQueue.peek();
+ if(job == null)
+ {
+ job = _readJobQueue.peek();
+ }
+ return job;
+ }
+ finally
+ {
+ takeLock.unlock();
+ }
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
new file mode 100644
index 0000000000..ad04a923e1
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
@@ -0,0 +1,27 @@
+package org.apache.qpid.pool;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+public interface ReadWriteRunnable extends Runnable
+{
+ boolean isRead();
+ boolean isWrite();
+}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
index 84c9e1f465..ce9c6ae4cb 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
@@ -22,6 +22,9 @@ package org.apache.qpid.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
/**
* ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts
@@ -84,6 +87,8 @@ public class ReferenceCountingExecutorService
/** Holds the number of executor threads to create. */
private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE);
+ private final boolean _useBiasedPool = Boolean.getBoolean("org.apache.qpid.use_write_biased_pool");
+
/**
* Retrieves the singleton instance of this reference counter.
*
@@ -105,15 +110,28 @@ public class ReferenceCountingExecutorService
*
* @return An executor service.
*/
- ExecutorService acquireExecutorService()
+ public ExecutorService acquireExecutorService()
{
synchronized (_lock)
{
if (_refCount++ == 0)
{
- _pool = Executors.newFixedThreadPool(_poolSize);
+// _pool = Executors.newFixedThreadPool(_poolSize);
+
+ // Use a job queue that biases to writes
+ if(_useBiasedPool)
+ {
+ _pool = new ThreadPoolExecutor(_poolSize, _poolSize,
+ 0L, TimeUnit.MILLISECONDS,
+ new ReadWriteJobQueue());
+ }
+ else
+ {
+ _pool = Executors.newFixedThreadPool(_poolSize);
+ }
}
+
return _pool;
}
}
@@ -122,7 +140,7 @@ public class ReferenceCountingExecutorService
* Releases a reference to a shared executor service, decrementing the reference count. If the refence count falls
* to zero, the executor service is shut down.
*/
- void releaseExecutorService()
+ public void releaseExecutorService()
{
synchronized (_lock)
{