diff options
Diffstat (limited to 'M4-RCs/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java')
-rw-r--r-- | M4-RCs/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java | 432 |
1 files changed, 0 insertions, 432 deletions
diff --git a/M4-RCs/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java b/M4-RCs/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java deleted file mode 100644 index 8de0f93ce9..0000000000 --- a/M4-RCs/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java +++ /dev/null @@ -1,432 +0,0 @@ -package org.apache.qpid.pool; - -import java.util.AbstractQueue; -import java.util.Iterator; -import java.util.Collection; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.atomic.AtomicInteger; - -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> -{ - - private final AtomicInteger _count = new AtomicInteger(0); - - private final ReentrantLock _takeLock = new ReentrantLock(); - - private final Condition _notEmpty = _takeLock.newCondition(); - - private final ReentrantLock _putLock = new ReentrantLock(); - - private final ConcurrentLinkedQueue<ReadWriteRunnable> _readJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>(); - - private final ConcurrentLinkedQueue<ReadWriteRunnable> _writeJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>(); - - - private class ReadWriteJobIterator implements Iterator<Runnable> - { - - private boolean _onReads; - private Iterator<ReadWriteRunnable> _iter = _writeJobQueue.iterator(); - - public boolean hasNext() - { - if(!_iter.hasNext()) - { - if(_onReads) - { - _iter = _readJobQueue.iterator(); - _onReads = true; - return _iter.hasNext(); - } - else - { - return false; - } - } - else - { - return true; - } - } - - public Runnable next() - { - if(_iter.hasNext()) - { - return _iter.next(); - } - else - { - return null; - } - } - - public void remove() - { - _takeLock.lock(); - try - { - _iter.remove(); - _count.decrementAndGet(); - } - finally - { - _takeLock.unlock(); - } - } - } - - public Iterator<Runnable> iterator() - { - return new ReadWriteJobIterator(); - } - - public int size() - { - return _count.get(); - } - - public boolean offer(final Runnable runnable) - { - final ReadWriteRunnable job = (ReadWriteRunnable) runnable; - final ReentrantLock putLock = _putLock; - putLock.lock(); - try - { - if(job.isRead()) - { - _readJobQueue.offer(job); - } - else - { - _writeJobQueue.offer(job); - } - if(_count.getAndIncrement() == 0) - { - _takeLock.lock(); - try - { - _notEmpty.signal(); - } - finally - { - _takeLock.unlock(); - } - } - return true; - } - finally - { - putLock.unlock(); - } - } - - public void put(final Runnable runnable) throws InterruptedException - { - final ReadWriteRunnable job = (ReadWriteRunnable) runnable; - final ReentrantLock putLock = _putLock; - putLock.lock(); - - try - { - if(job.isRead()) - { - _readJobQueue.offer(job); - } - else - { - _writeJobQueue.offer(job); - } - if(_count.getAndIncrement() == 0) - { - _takeLock.lock(); - try - { - _notEmpty.signal(); - } - finally - { - _takeLock.unlock(); - } - } - - } - finally - { - putLock.unlock(); - } - } - - - - public boolean offer(final Runnable runnable, final long timeout, final TimeUnit unit) throws InterruptedException - { - final ReadWriteRunnable job = (ReadWriteRunnable) runnable; - final ReentrantLock putLock = _putLock; - putLock.lock(); - - try - { - if(job.isRead()) - { - _readJobQueue.offer(job); - } - else - { - _writeJobQueue.offer(job); - } - if(_count.getAndIncrement() == 0) - { - _takeLock.lock(); - try - { - _notEmpty.signal(); - } - finally - { - _takeLock.unlock(); - } - } - - return true; - } - finally - { - putLock.unlock(); - } - - } - - public Runnable take() throws InterruptedException - { - final ReentrantLock takeLock = _takeLock; - takeLock.lockInterruptibly(); - try - { - try - { - while (_count.get() == 0) - { - _notEmpty.await(); - } - } - catch (InterruptedException ie) - { - _notEmpty.signal(); - throw ie; - } - - ReadWriteRunnable job = _writeJobQueue.poll(); - if(job == null) - { - job = _readJobQueue.poll(); - } - int c = _count.getAndDecrement(); - if (c > 1) - { - _notEmpty.signal(); - } - return job; - } - finally - { - takeLock.unlock(); - } - - - } - - public Runnable poll(final long timeout, final TimeUnit unit) throws InterruptedException - { - final ReentrantLock takeLock = _takeLock; - final AtomicInteger count = _count; - long nanos = unit.toNanos(timeout); - takeLock.lockInterruptibly(); - ReadWriteRunnable job = null; - try - { - - for (;;) - { - if (count.get() > 0) - { - job = _writeJobQueue.poll(); - if(job == null) - { - job = _readJobQueue.poll(); - } - int c = count.getAndDecrement(); - if (c > 1) - { - _notEmpty.signal(); - } - break; - } - if (nanos <= 0) - { - return null; - } - try - { - nanos = _notEmpty.awaitNanos(nanos); - } - catch (InterruptedException ie) - { - _notEmpty.signal(); - throw ie; - } - } - } - finally - { - takeLock.unlock(); - } - - return job; - } - - public int remainingCapacity() - { - return Integer.MAX_VALUE; - } - - public int drainTo(final Collection<? super Runnable> c) - { - int total = 0; - - _putLock.lock(); - _takeLock.lock(); - try - { - ReadWriteRunnable job; - while((job = _writeJobQueue.peek())!= null) - { - c.add(job); - _writeJobQueue.poll(); - _count.decrementAndGet(); - total++; - } - - while((job = _readJobQueue.peek())!= null) - { - c.add(job); - _readJobQueue.poll(); - _count.decrementAndGet(); - total++; - } - - } - finally - { - _takeLock.unlock(); - _putLock.unlock(); - } - return total; - } - - public int drainTo(final Collection<? super Runnable> c, final int maxElements) - { - int total = 0; - - _putLock.lock(); - _takeLock.lock(); - try - { - ReadWriteRunnable job; - while(total<=maxElements && (job = _writeJobQueue.peek())!= null) - { - c.add(job); - _writeJobQueue.poll(); - _count.decrementAndGet(); - total++; - } - - while(total<=maxElements && (job = _readJobQueue.peek())!= null) - { - c.add(job); - _readJobQueue.poll(); - _count.decrementAndGet(); - total++; - } - - } - finally - { - _takeLock.unlock(); - _putLock.unlock(); - } - return total; - - } - - public Runnable poll() - { - final ReentrantLock takeLock = _takeLock; - takeLock.lock(); - try - { - if(_count.get() > 0) - { - ReadWriteRunnable job = _writeJobQueue.poll(); - if(job == null) - { - job = _readJobQueue.poll(); - } - _count.decrementAndGet(); - return job; - } - else - { - return null; - } - } - finally - { - takeLock.unlock(); - } - - } - - public Runnable peek() - { - final ReentrantLock takeLock = _takeLock; - takeLock.lock(); - try - { - ReadWriteRunnable job = _writeJobQueue.peek(); - if(job == null) - { - job = _readJobQueue.peek(); - } - return job; - } - finally - { - takeLock.unlock(); - } - } -} |