diff options
| author | Robert Gemmell <robbie@apache.org> | 2012-01-09 12:30:54 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2012-01-09 12:30:54 +0000 |
| commit | 1d5212bd70b7e4e9837b88f39be06e40a0fc224d (patch) | |
| tree | aa926663a4c84eceade95739c9e5ac0e7dfa3cea /java/common | |
| parent | 592c256cd4f97e7aa2f05676c48e27a3f274054e (diff) | |
| download | qpid-python-1d5212bd70b7e4e9837b88f39be06e40a0fc224d.tar.gz | |
QPID-3730: remove ReadWriteJobQueue, it is no longer necessary as the only things now using the thread pool are write jobs, so there is no need/benefit to request it be write-biased.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1229112 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
3 files changed, 4 insertions, 478 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java deleted file mode 100644 index 8de0f93ce9..0000000000 --- a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java +++ /dev/null @@ -1,432 +0,0 @@ -package org.apache.qpid.pool; - -import java.util.AbstractQueue; -import java.util.Iterator; -import java.util.Collection; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.atomic.AtomicInteger; - -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> -{ - - private final AtomicInteger _count = new AtomicInteger(0); - - private final ReentrantLock _takeLock = new ReentrantLock(); - - private final Condition _notEmpty = _takeLock.newCondition(); - - private final ReentrantLock _putLock = new ReentrantLock(); - - private final ConcurrentLinkedQueue<ReadWriteRunnable> _readJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>(); - - private final ConcurrentLinkedQueue<ReadWriteRunnable> _writeJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>(); - - - private class ReadWriteJobIterator implements Iterator<Runnable> - { - - private boolean _onReads; - private Iterator<ReadWriteRunnable> _iter = _writeJobQueue.iterator(); - - public boolean hasNext() - { - if(!_iter.hasNext()) - { - if(_onReads) - { - _iter = _readJobQueue.iterator(); - _onReads = true; - return _iter.hasNext(); - } - else - { - return false; - } - } - else - { - return true; - } - } - - public Runnable next() - { - if(_iter.hasNext()) - { - return _iter.next(); - } - else - { - return null; - } - } - - public void remove() - { - _takeLock.lock(); - try - { - _iter.remove(); - _count.decrementAndGet(); - } - finally - { - _takeLock.unlock(); - } - } - } - - public Iterator<Runnable> iterator() - { - return new ReadWriteJobIterator(); - } - - public int size() - { - return _count.get(); - } - - public boolean offer(final Runnable runnable) - { - final ReadWriteRunnable job = (ReadWriteRunnable) runnable; - final ReentrantLock putLock = _putLock; - putLock.lock(); - try - { - if(job.isRead()) - { - _readJobQueue.offer(job); - } - else - { - _writeJobQueue.offer(job); - } - if(_count.getAndIncrement() == 0) - { - _takeLock.lock(); - try - { - _notEmpty.signal(); - } - finally - { - _takeLock.unlock(); - } - } - return true; - } - finally - { - putLock.unlock(); - } - } - - public void put(final Runnable runnable) throws InterruptedException - { - final ReadWriteRunnable job = (ReadWriteRunnable) runnable; - final ReentrantLock putLock = _putLock; - putLock.lock(); - - try - { - if(job.isRead()) - { - _readJobQueue.offer(job); - } - else - { - _writeJobQueue.offer(job); - } - if(_count.getAndIncrement() == 0) - { - _takeLock.lock(); - try - { - _notEmpty.signal(); - } - finally - { - _takeLock.unlock(); - } - } - - } - finally - { - putLock.unlock(); - } - } - - - - public boolean offer(final Runnable runnable, final long timeout, final TimeUnit unit) throws InterruptedException - { - final ReadWriteRunnable job = (ReadWriteRunnable) runnable; - final ReentrantLock putLock = _putLock; - putLock.lock(); - - try - { - if(job.isRead()) - { - _readJobQueue.offer(job); - } - else - { - _writeJobQueue.offer(job); - } - if(_count.getAndIncrement() == 0) - { - _takeLock.lock(); - try - { - _notEmpty.signal(); - } - finally - { - _takeLock.unlock(); - } - } - - return true; - } - finally - { - putLock.unlock(); - } - - } - - public Runnable take() throws InterruptedException - { - final ReentrantLock takeLock = _takeLock; - takeLock.lockInterruptibly(); - try - { - try - { - while (_count.get() == 0) - { - _notEmpty.await(); - } - } - catch (InterruptedException ie) - { - _notEmpty.signal(); - throw ie; - } - - ReadWriteRunnable job = _writeJobQueue.poll(); - if(job == null) - { - job = _readJobQueue.poll(); - } - int c = _count.getAndDecrement(); - if (c > 1) - { - _notEmpty.signal(); - } - return job; - } - finally - { - takeLock.unlock(); - } - - - } - - public Runnable poll(final long timeout, final TimeUnit unit) throws InterruptedException - { - final ReentrantLock takeLock = _takeLock; - final AtomicInteger count = _count; - long nanos = unit.toNanos(timeout); - takeLock.lockInterruptibly(); - ReadWriteRunnable job = null; - try - { - - for (;;) - { - if (count.get() > 0) - { - job = _writeJobQueue.poll(); - if(job == null) - { - job = _readJobQueue.poll(); - } - int c = count.getAndDecrement(); - if (c > 1) - { - _notEmpty.signal(); - } - break; - } - if (nanos <= 0) - { - return null; - } - try - { - nanos = _notEmpty.awaitNanos(nanos); - } - catch (InterruptedException ie) - { - _notEmpty.signal(); - throw ie; - } - } - } - finally - { - takeLock.unlock(); - } - - return job; - } - - public int remainingCapacity() - { - return Integer.MAX_VALUE; - } - - public int drainTo(final Collection<? super Runnable> c) - { - int total = 0; - - _putLock.lock(); - _takeLock.lock(); - try - { - ReadWriteRunnable job; - while((job = _writeJobQueue.peek())!= null) - { - c.add(job); - _writeJobQueue.poll(); - _count.decrementAndGet(); - total++; - } - - while((job = _readJobQueue.peek())!= null) - { - c.add(job); - _readJobQueue.poll(); - _count.decrementAndGet(); - total++; - } - - } - finally - { - _takeLock.unlock(); - _putLock.unlock(); - } - return total; - } - - public int drainTo(final Collection<? super Runnable> c, final int maxElements) - { - int total = 0; - - _putLock.lock(); - _takeLock.lock(); - try - { - ReadWriteRunnable job; - while(total<=maxElements && (job = _writeJobQueue.peek())!= null) - { - c.add(job); - _writeJobQueue.poll(); - _count.decrementAndGet(); - total++; - } - - while(total<=maxElements && (job = _readJobQueue.peek())!= null) - { - c.add(job); - _readJobQueue.poll(); - _count.decrementAndGet(); - total++; - } - - } - finally - { - _takeLock.unlock(); - _putLock.unlock(); - } - return total; - - } - - public Runnable poll() - { - final ReentrantLock takeLock = _takeLock; - takeLock.lock(); - try - { - if(_count.get() > 0) - { - ReadWriteRunnable job = _writeJobQueue.poll(); - if(job == null) - { - job = _readJobQueue.poll(); - } - _count.decrementAndGet(); - return job; - } - else - { - return null; - } - } - finally - { - takeLock.unlock(); - } - - } - - public Runnable peek() - { - final ReentrantLock takeLock = _takeLock; - takeLock.lock(); - try - { - ReadWriteRunnable job = _writeJobQueue.peek(); - if(job == null) - { - job = _readJobQueue.peek(); - } - return job; - } - finally - { - takeLock.unlock(); - } - } -} diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java deleted file mode 100644 index 140c93ca8d..0000000000 --- a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java +++ /dev/null @@ -1,26 +0,0 @@ -package org.apache.qpid.pool; - -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -public interface ReadWriteRunnable extends Runnable -{ - boolean isRead(); -} diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java index 8152a1f5e9..3e99b244c4 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java +++ b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java @@ -96,8 +96,6 @@ public class ReferenceCountingExecutorService */ private ThreadFactory _threadFactory = Executors.defaultThreadFactory(); - private final boolean _useBiasedPool = Boolean.getBoolean("org.apache.qpid.use_write_biased_pool"); - /** * Retrieves the singleton instance of this reference counter. * @@ -125,26 +123,12 @@ public class ReferenceCountingExecutorService { if (_refCount++ == 0) { - // Use a job queue that biases to writes - if(_useBiasedPool) - { - _pool = new ThreadPoolExecutor(_poolSize, _poolSize, - 0L, TimeUnit.MILLISECONDS, - new ReadWriteJobQueue(), - _threadFactory); - - } - else - { - _pool = new ThreadPoolExecutor(_poolSize, _poolSize, - 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>(), - _threadFactory); - } - + _pool = new ThreadPoolExecutor(_poolSize, _poolSize, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(), + _threadFactory); } - return _pool; } } |
