diff options
Diffstat (limited to 'trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java')
-rw-r--r-- | trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java | 432 |
1 files changed, 432 insertions, 0 deletions
diff --git a/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java b/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java new file mode 100644 index 0000000000..8de0f93ce9 --- /dev/null +++ b/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java @@ -0,0 +1,432 @@ +package org.apache.qpid.pool; + +import java.util.AbstractQueue; +import java.util.Iterator; +import java.util.Collection; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.atomic.AtomicInteger; + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> +{ + + private final AtomicInteger _count = new AtomicInteger(0); + + private final ReentrantLock _takeLock = new ReentrantLock(); + + private final Condition _notEmpty = _takeLock.newCondition(); + + private final ReentrantLock _putLock = new ReentrantLock(); + + private final ConcurrentLinkedQueue<ReadWriteRunnable> _readJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>(); + + private final ConcurrentLinkedQueue<ReadWriteRunnable> _writeJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>(); + + + private class ReadWriteJobIterator implements Iterator<Runnable> + { + + private boolean _onReads; + private Iterator<ReadWriteRunnable> _iter = _writeJobQueue.iterator(); + + public boolean hasNext() + { + if(!_iter.hasNext()) + { + if(_onReads) + { + _iter = _readJobQueue.iterator(); + _onReads = true; + return _iter.hasNext(); + } + else + { + return false; + } + } + else + { + return true; + } + } + + public Runnable next() + { + if(_iter.hasNext()) + { + return _iter.next(); + } + else + { + return null; + } + } + + public void remove() + { + _takeLock.lock(); + try + { + _iter.remove(); + _count.decrementAndGet(); + } + finally + { + _takeLock.unlock(); + } + } + } + + public Iterator<Runnable> iterator() + { + return new ReadWriteJobIterator(); + } + + public int size() + { + return _count.get(); + } + + public boolean offer(final Runnable runnable) + { + final ReadWriteRunnable job = (ReadWriteRunnable) runnable; + final ReentrantLock putLock = _putLock; + putLock.lock(); + try + { + if(job.isRead()) + { + _readJobQueue.offer(job); + } + else + { + _writeJobQueue.offer(job); + } + if(_count.getAndIncrement() == 0) + { + _takeLock.lock(); + try + { + _notEmpty.signal(); + } + finally + { + _takeLock.unlock(); + } + } + return true; + } + finally + { + putLock.unlock(); + } + } + + public void put(final Runnable runnable) throws InterruptedException + { + final ReadWriteRunnable job = (ReadWriteRunnable) runnable; + final ReentrantLock putLock = _putLock; + putLock.lock(); + + try + { + if(job.isRead()) + { + _readJobQueue.offer(job); + } + else + { + _writeJobQueue.offer(job); + } + if(_count.getAndIncrement() == 0) + { + _takeLock.lock(); + try + { + _notEmpty.signal(); + } + finally + { + _takeLock.unlock(); + } + } + + } + finally + { + putLock.unlock(); + } + } + + + + public boolean offer(final Runnable runnable, final long timeout, final TimeUnit unit) throws InterruptedException + { + final ReadWriteRunnable job = (ReadWriteRunnable) runnable; + final ReentrantLock putLock = _putLock; + putLock.lock(); + + try + { + if(job.isRead()) + { + _readJobQueue.offer(job); + } + else + { + _writeJobQueue.offer(job); + } + if(_count.getAndIncrement() == 0) + { + _takeLock.lock(); + try + { + _notEmpty.signal(); + } + finally + { + _takeLock.unlock(); + } + } + + return true; + } + finally + { + putLock.unlock(); + } + + } + + public Runnable take() throws InterruptedException + { + final ReentrantLock takeLock = _takeLock; + takeLock.lockInterruptibly(); + try + { + try + { + while (_count.get() == 0) + { + _notEmpty.await(); + } + } + catch (InterruptedException ie) + { + _notEmpty.signal(); + throw ie; + } + + ReadWriteRunnable job = _writeJobQueue.poll(); + if(job == null) + { + job = _readJobQueue.poll(); + } + int c = _count.getAndDecrement(); + if (c > 1) + { + _notEmpty.signal(); + } + return job; + } + finally + { + takeLock.unlock(); + } + + + } + + public Runnable poll(final long timeout, final TimeUnit unit) throws InterruptedException + { + final ReentrantLock takeLock = _takeLock; + final AtomicInteger count = _count; + long nanos = unit.toNanos(timeout); + takeLock.lockInterruptibly(); + ReadWriteRunnable job = null; + try + { + + for (;;) + { + if (count.get() > 0) + { + job = _writeJobQueue.poll(); + if(job == null) + { + job = _readJobQueue.poll(); + } + int c = count.getAndDecrement(); + if (c > 1) + { + _notEmpty.signal(); + } + break; + } + if (nanos <= 0) + { + return null; + } + try + { + nanos = _notEmpty.awaitNanos(nanos); + } + catch (InterruptedException ie) + { + _notEmpty.signal(); + throw ie; + } + } + } + finally + { + takeLock.unlock(); + } + + return job; + } + + public int remainingCapacity() + { + return Integer.MAX_VALUE; + } + + public int drainTo(final Collection<? super Runnable> c) + { + int total = 0; + + _putLock.lock(); + _takeLock.lock(); + try + { + ReadWriteRunnable job; + while((job = _writeJobQueue.peek())!= null) + { + c.add(job); + _writeJobQueue.poll(); + _count.decrementAndGet(); + total++; + } + + while((job = _readJobQueue.peek())!= null) + { + c.add(job); + _readJobQueue.poll(); + _count.decrementAndGet(); + total++; + } + + } + finally + { + _takeLock.unlock(); + _putLock.unlock(); + } + return total; + } + + public int drainTo(final Collection<? super Runnable> c, final int maxElements) + { + int total = 0; + + _putLock.lock(); + _takeLock.lock(); + try + { + ReadWriteRunnable job; + while(total<=maxElements && (job = _writeJobQueue.peek())!= null) + { + c.add(job); + _writeJobQueue.poll(); + _count.decrementAndGet(); + total++; + } + + while(total<=maxElements && (job = _readJobQueue.peek())!= null) + { + c.add(job); + _readJobQueue.poll(); + _count.decrementAndGet(); + total++; + } + + } + finally + { + _takeLock.unlock(); + _putLock.unlock(); + } + return total; + + } + + public Runnable poll() + { + final ReentrantLock takeLock = _takeLock; + takeLock.lock(); + try + { + if(_count.get() > 0) + { + ReadWriteRunnable job = _writeJobQueue.poll(); + if(job == null) + { + job = _readJobQueue.poll(); + } + _count.decrementAndGet(); + return job; + } + else + { + return null; + } + } + finally + { + takeLock.unlock(); + } + + } + + public Runnable peek() + { + final ReentrantLock takeLock = _takeLock; + takeLock.lock(); + try + { + ReadWriteRunnable job = _writeJobQueue.peek(); + if(job == null) + { + job = _readJobQueue.peek(); + } + return job; + } + finally + { + takeLock.unlock(); + } + } +} |