summaryrefslogtreecommitdiff
path: root/java/common
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-01-09 12:30:54 +0000
committerRobert Gemmell <robbie@apache.org>2012-01-09 12:30:54 +0000
commit1d5212bd70b7e4e9837b88f39be06e40a0fc224d (patch)
treeaa926663a4c84eceade95739c9e5ac0e7dfa3cea /java/common
parent592c256cd4f97e7aa2f05676c48e27a3f274054e (diff)
downloadqpid-python-1d5212bd70b7e4e9837b88f39be06e40a0fc224d.tar.gz
QPID-3730: remove ReadWriteJobQueue, it is no longer necessary as the only things now using the thread pool are write jobs, so there is no need/benefit to request it be write-biased.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1229112 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java432
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java26
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java24
3 files changed, 4 insertions, 478 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java
deleted file mode 100644
index 8de0f93ce9..0000000000
--- a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java
+++ /dev/null
@@ -1,432 +0,0 @@
-package org.apache.qpid.pool;
-
-import java.util.AbstractQueue;
-import java.util.Iterator;
-import java.util.Collection;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable>
-{
-
- private final AtomicInteger _count = new AtomicInteger(0);
-
- private final ReentrantLock _takeLock = new ReentrantLock();
-
- private final Condition _notEmpty = _takeLock.newCondition();
-
- private final ReentrantLock _putLock = new ReentrantLock();
-
- private final ConcurrentLinkedQueue<ReadWriteRunnable> _readJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>();
-
- private final ConcurrentLinkedQueue<ReadWriteRunnable> _writeJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>();
-
-
- private class ReadWriteJobIterator implements Iterator<Runnable>
- {
-
- private boolean _onReads;
- private Iterator<ReadWriteRunnable> _iter = _writeJobQueue.iterator();
-
- public boolean hasNext()
- {
- if(!_iter.hasNext())
- {
- if(_onReads)
- {
- _iter = _readJobQueue.iterator();
- _onReads = true;
- return _iter.hasNext();
- }
- else
- {
- return false;
- }
- }
- else
- {
- return true;
- }
- }
-
- public Runnable next()
- {
- if(_iter.hasNext())
- {
- return _iter.next();
- }
- else
- {
- return null;
- }
- }
-
- public void remove()
- {
- _takeLock.lock();
- try
- {
- _iter.remove();
- _count.decrementAndGet();
- }
- finally
- {
- _takeLock.unlock();
- }
- }
- }
-
- public Iterator<Runnable> iterator()
- {
- return new ReadWriteJobIterator();
- }
-
- public int size()
- {
- return _count.get();
- }
-
- public boolean offer(final Runnable runnable)
- {
- final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
- final ReentrantLock putLock = _putLock;
- putLock.lock();
- try
- {
- if(job.isRead())
- {
- _readJobQueue.offer(job);
- }
- else
- {
- _writeJobQueue.offer(job);
- }
- if(_count.getAndIncrement() == 0)
- {
- _takeLock.lock();
- try
- {
- _notEmpty.signal();
- }
- finally
- {
- _takeLock.unlock();
- }
- }
- return true;
- }
- finally
- {
- putLock.unlock();
- }
- }
-
- public void put(final Runnable runnable) throws InterruptedException
- {
- final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
- final ReentrantLock putLock = _putLock;
- putLock.lock();
-
- try
- {
- if(job.isRead())
- {
- _readJobQueue.offer(job);
- }
- else
- {
- _writeJobQueue.offer(job);
- }
- if(_count.getAndIncrement() == 0)
- {
- _takeLock.lock();
- try
- {
- _notEmpty.signal();
- }
- finally
- {
- _takeLock.unlock();
- }
- }
-
- }
- finally
- {
- putLock.unlock();
- }
- }
-
-
-
- public boolean offer(final Runnable runnable, final long timeout, final TimeUnit unit) throws InterruptedException
- {
- final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
- final ReentrantLock putLock = _putLock;
- putLock.lock();
-
- try
- {
- if(job.isRead())
- {
- _readJobQueue.offer(job);
- }
- else
- {
- _writeJobQueue.offer(job);
- }
- if(_count.getAndIncrement() == 0)
- {
- _takeLock.lock();
- try
- {
- _notEmpty.signal();
- }
- finally
- {
- _takeLock.unlock();
- }
- }
-
- return true;
- }
- finally
- {
- putLock.unlock();
- }
-
- }
-
- public Runnable take() throws InterruptedException
- {
- final ReentrantLock takeLock = _takeLock;
- takeLock.lockInterruptibly();
- try
- {
- try
- {
- while (_count.get() == 0)
- {
- _notEmpty.await();
- }
- }
- catch (InterruptedException ie)
- {
- _notEmpty.signal();
- throw ie;
- }
-
- ReadWriteRunnable job = _writeJobQueue.poll();
- if(job == null)
- {
- job = _readJobQueue.poll();
- }
- int c = _count.getAndDecrement();
- if (c > 1)
- {
- _notEmpty.signal();
- }
- return job;
- }
- finally
- {
- takeLock.unlock();
- }
-
-
- }
-
- public Runnable poll(final long timeout, final TimeUnit unit) throws InterruptedException
- {
- final ReentrantLock takeLock = _takeLock;
- final AtomicInteger count = _count;
- long nanos = unit.toNanos(timeout);
- takeLock.lockInterruptibly();
- ReadWriteRunnable job = null;
- try
- {
-
- for (;;)
- {
- if (count.get() > 0)
- {
- job = _writeJobQueue.poll();
- if(job == null)
- {
- job = _readJobQueue.poll();
- }
- int c = count.getAndDecrement();
- if (c > 1)
- {
- _notEmpty.signal();
- }
- break;
- }
- if (nanos <= 0)
- {
- return null;
- }
- try
- {
- nanos = _notEmpty.awaitNanos(nanos);
- }
- catch (InterruptedException ie)
- {
- _notEmpty.signal();
- throw ie;
- }
- }
- }
- finally
- {
- takeLock.unlock();
- }
-
- return job;
- }
-
- public int remainingCapacity()
- {
- return Integer.MAX_VALUE;
- }
-
- public int drainTo(final Collection<? super Runnable> c)
- {
- int total = 0;
-
- _putLock.lock();
- _takeLock.lock();
- try
- {
- ReadWriteRunnable job;
- while((job = _writeJobQueue.peek())!= null)
- {
- c.add(job);
- _writeJobQueue.poll();
- _count.decrementAndGet();
- total++;
- }
-
- while((job = _readJobQueue.peek())!= null)
- {
- c.add(job);
- _readJobQueue.poll();
- _count.decrementAndGet();
- total++;
- }
-
- }
- finally
- {
- _takeLock.unlock();
- _putLock.unlock();
- }
- return total;
- }
-
- public int drainTo(final Collection<? super Runnable> c, final int maxElements)
- {
- int total = 0;
-
- _putLock.lock();
- _takeLock.lock();
- try
- {
- ReadWriteRunnable job;
- while(total<=maxElements && (job = _writeJobQueue.peek())!= null)
- {
- c.add(job);
- _writeJobQueue.poll();
- _count.decrementAndGet();
- total++;
- }
-
- while(total<=maxElements && (job = _readJobQueue.peek())!= null)
- {
- c.add(job);
- _readJobQueue.poll();
- _count.decrementAndGet();
- total++;
- }
-
- }
- finally
- {
- _takeLock.unlock();
- _putLock.unlock();
- }
- return total;
-
- }
-
- public Runnable poll()
- {
- final ReentrantLock takeLock = _takeLock;
- takeLock.lock();
- try
- {
- if(_count.get() > 0)
- {
- ReadWriteRunnable job = _writeJobQueue.poll();
- if(job == null)
- {
- job = _readJobQueue.poll();
- }
- _count.decrementAndGet();
- return job;
- }
- else
- {
- return null;
- }
- }
- finally
- {
- takeLock.unlock();
- }
-
- }
-
- public Runnable peek()
- {
- final ReentrantLock takeLock = _takeLock;
- takeLock.lock();
- try
- {
- ReadWriteRunnable job = _writeJobQueue.peek();
- if(job == null)
- {
- job = _readJobQueue.peek();
- }
- return job;
- }
- finally
- {
- takeLock.unlock();
- }
- }
-}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
deleted file mode 100644
index 140c93ca8d..0000000000
--- a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.qpid.pool;
-
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-public interface ReadWriteRunnable extends Runnable
-{
- boolean isRead();
-}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
index 8152a1f5e9..3e99b244c4 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
@@ -96,8 +96,6 @@ public class ReferenceCountingExecutorService
*/
private ThreadFactory _threadFactory = Executors.defaultThreadFactory();
- private final boolean _useBiasedPool = Boolean.getBoolean("org.apache.qpid.use_write_biased_pool");
-
/**
* Retrieves the singleton instance of this reference counter.
*
@@ -125,26 +123,12 @@ public class ReferenceCountingExecutorService
{
if (_refCount++ == 0)
{
- // Use a job queue that biases to writes
- if(_useBiasedPool)
- {
- _pool = new ThreadPoolExecutor(_poolSize, _poolSize,
- 0L, TimeUnit.MILLISECONDS,
- new ReadWriteJobQueue(),
- _threadFactory);
-
- }
- else
- {
- _pool = new ThreadPoolExecutor(_poolSize, _poolSize,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>(),
- _threadFactory);
- }
-
+ _pool = new ThreadPoolExecutor(_poolSize, _poolSize,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ _threadFactory);
}
-
return _pool;
}
}