From 913d1a55b290f9a8295d5e8396c696d3cee73bc0 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 19 Jun 2008 09:01:59 +0000 Subject: QPID-950 : Broker refactoring, copied / merged from branch git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@669431 13f79535-47bb-0310-9956-ffa450edef68 --- .../mina/common/FixedSizeByteBufferAllocator.java | 63 +-- .../org/apache/qpid/common/ClientProperties.java | 23 +- .../org/apache/qpid/framing/AMQShortString.java | 56 ++- .../org/apache/qpid/framing/AMQTypedValue.java | 20 + .../java/org/apache/qpid/framing/FieldTable.java | 105 ++++- .../src/main/java/org/apache/qpid/pool/Job.java | 24 +- .../java/org/apache/qpid/pool/PoolingFilter.java | 30 +- .../org/apache/qpid/pool/ReadWriteJobQueue.java | 432 +++++++++++++++++++++ .../org/apache/qpid/pool/ReadWriteRunnable.java | 27 ++ .../pool/ReferenceCountingExecutorService.java | 24 +- 10 files changed, 705 insertions(+), 99 deletions(-) create mode 100644 java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java create mode 100644 java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java (limited to 'java/common') diff --git a/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java b/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java index bed80d5954..0c311b6645 100644 --- a/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java +++ b/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java @@ -62,7 +62,6 @@ public class FixedSizeByteBufferAllocator implements ByteBufferAllocator private static final class FixedSizeByteBuffer extends ByteBuffer { private java.nio.ByteBuffer buf; - private int refCount = 1; private int mark = -1; @@ -70,36 +69,14 @@ public class FixedSizeByteBufferAllocator implements ByteBufferAllocator { this.buf = buf; buf.order( ByteOrder.BIG_ENDIAN ); - refCount = 1; } public synchronized void acquire() { - if( refCount <= 0 ) - { - throw new IllegalStateException( "Already released buffer." ); - } - - refCount ++; } public void release() { - synchronized( this ) - { - if( refCount <= 0 ) - { - refCount = 0; - throw new IllegalStateException( - "Already released buffer. You released the buffer too many times." ); - } - - refCount --; - if( refCount > 0) - { - return; - } - } } public java.nio.ByteBuffer buf() @@ -157,50 +134,12 @@ public class FixedSizeByteBufferAllocator implements ByteBufferAllocator { if( newCapacity > capacity() ) { - // Allocate a new buffer and transfer all settings to it. - int pos = position(); - int limit = limit(); - ByteOrder bo = order(); - - capacity0( newCapacity ); - buf.limit( limit ); - if( mark >= 0 ) - { - buf.position( mark ); - buf.mark(); - } - buf.position( pos ); - buf.order( bo ); + throw new IllegalArgumentException(); } return this; } - protected void capacity0( int requestedCapacity ) - { - int newCapacity = MINIMUM_CAPACITY; - while( newCapacity < requestedCapacity ) - { - newCapacity <<= 1; - } - - java.nio.ByteBuffer oldBuf = this.buf; - java.nio.ByteBuffer newBuf; - if( isDirect() ) - { - newBuf = java.nio.ByteBuffer.allocateDirect( newCapacity ); - } - else - { - newBuf = java.nio.ByteBuffer.allocate( newCapacity ); - } - - newBuf.clear(); - oldBuf.clear(); - newBuf.put( oldBuf ); - this.buf = newBuf; - } - public boolean isAutoExpand() diff --git a/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java index 67f16e6a87..7371c12519 100644 --- a/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java +++ b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.common; +import org.apache.qpid.framing.AMQShortString; + /** * Specifies the available client property types that different clients can use to identify themselves with. * @@ -30,8 +32,21 @@ package org.apache.qpid.common; */ public enum ClientProperties { - instance, - product, - version, - platform + instance("instance"), + product("product"), + version("version"), + platform("platform"); + + private final AMQShortString _amqShortString; + + private ClientProperties(String name) + { + _amqShortString = new AMQShortString(name); + } + + + public AMQShortString toAMQShortString() + { + return _amqShortString; + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java index a747aaeda7..2a248bf703 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -224,7 +224,6 @@ public final class AMQShortString implements CharSequence, Comparable> iterator() + { + if(_encodedForm != null) + { + return new FieldTableIterator(_encodedForm.duplicate().rewind(),(int)_encodedSize); + } + else + { + initMapIfNecessary(); + return _properties.entrySet().iterator(); + } + } + + public Object get(AMQShortString key) { @@ -1050,6 +1064,95 @@ public class FieldTable } } + private static final class FieldTableEntry implements Map.Entry + { + private final AMQTypedValue _value; + private final AMQShortString _key; + + public FieldTableEntry(final AMQShortString key, final AMQTypedValue value) + { + _key = key; + _value = value; + } + + public AMQShortString getKey() + { + return _key; + } + + public AMQTypedValue getValue() + { + return _value; + } + + public AMQTypedValue setValue(final AMQTypedValue value) + { + throw new UnsupportedOperationException(); + } + + public boolean equals(Object o) + { + if(o instanceof FieldTableEntry) + { + FieldTableEntry other = (FieldTableEntry) o; + return (_key == null ? other._key == null : _key.equals(other._key)) + && (_value == null ? other._value == null : _value.equals(other._value)); + } + else + { + return false; + } + } + + public int hashCode() + { + return (getKey()==null ? 0 : getKey().hashCode()) + ^ (getValue()==null ? 0 : getValue().hashCode()); + } + + } + + + private static final class FieldTableIterator implements Iterator> + { + + private final ByteBuffer _buffer; + private int _expectedRemaining; + + public FieldTableIterator(ByteBuffer buffer, int length) + { + _buffer = buffer; + _expectedRemaining = buffer.remaining() - length; + } + + public boolean hasNext() + { + return (_buffer.remaining() > _expectedRemaining); + } + + public Map.Entry next() + { + if(hasNext()) + { + final AMQShortString key = EncodingUtils.readAMQShortString(_buffer); + AMQTypedValue value = AMQTypedValue.readFromBuffer(_buffer); + return new FieldTableEntry(key, value); + } + else + { + return null; + } + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + } + + + + public int hashCode() { initMapIfNecessary(); diff --git a/java/common/src/main/java/org/apache/qpid/pool/Job.java b/java/common/src/main/java/org/apache/qpid/pool/Job.java index b2a09ac592..00da005515 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/Job.java +++ b/java/common/src/main/java/org/apache/qpid/pool/Job.java @@ -50,7 +50,7 @@ import org.apache.mina.common.IoSession; * * @todo For better re-usability could make the completion handler optional. Only run it when one is set. */ -public class Job implements Runnable +public class Job implements ReadWriteRunnable { /** The maximum number of events to process per run of the job. More events than this may be queued in the job. */ private final int _maxEvents; @@ -67,18 +67,22 @@ public class Job implements Runnable /** Holds the completion continuation, called upon completion of a run of the job. */ private final JobCompletionHandler _completionHandler; + private final boolean _readJob; + /** * Creates a new job that aggregates many continuations together. * * @param session The Mina session. * @param completionHandler The per job run, terminal continuation. * @param maxEvents The maximum number of aggregated continuations to process per run of the job. + * @param readJob */ - Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents) + Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob) { _session = session; _completionHandler = completionHandler; _maxEvents = maxEvents; + _readJob = readJob; } /** @@ -157,6 +161,22 @@ public class Job implements Runnable } } + public boolean isReadJob() + { + return _readJob; + } + + public boolean isRead() + { + return _readJob; + } + + public boolean isWrite() + { + return !_readJob; + } + + /** * Another interface for a continuation. * diff --git a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java index 2912e54662..a080cc7e04 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java +++ b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java @@ -60,24 +60,6 @@ import java.util.concurrent.ExecutorService; * {@link Job}, {@link Job.JobCompletionHandler} * * - * @todo This seems a bit bizarre. ReadWriteThreadModel creates seperate pooling filters for read and write events. - * The pooling filters themselves batch read and write events into jobs, but hand these jobs to a common thread - * pool for execution. So the same thread pool ends up handling read and write events, albeit with many threads - * so there is concurrency. But why go to the trouble of seperating out the read and write events in that case? - * Why not just batch them into jobs together? Perhaps its so that seperate thread pools could be used for these - * stages. - * - * @todo Why set an event limit of 10 on the Job? This also seems bizarre, as the job can have more than 10 events in - * it. Its just that it runs them 10 at a time, but the completion hander here checks if there are more to run - * and trips off another batch of 10 until they are all done. Why not just have a straight forward - * consumer/producer queue scenario without the batches of 10? So instead of having many jobs with batches of 10 - * in them, just have one queue of events and worker threads taking the next event. There will be coordination - * between worker threads and new events arriving on the job anyway, so the simpler scenario may have the same - * amount of contention. I can see that the batches of 10 is done, so that no job is allowed to hog the worker - * pool for too long. I'm not convinced this fairly complex scheme will actually add anything, and it might be - * better to encapsulate it under a Queue interface anyway, so that different queue implementations can easily - * be substituted in. - * * @todo The static helper methods are pointless. Could just call new. */ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler @@ -96,17 +78,20 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo private final int _maxEvents; + private final boolean _readFilter; + /** * Creates a named pooling filter, on the specified shared thread pool. * * @param refCountingPool The thread pool reference. * @param name The identifying name of the filter type. */ - public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents) + public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents, boolean readFilter) { _poolReference = refCountingPool; _name = name; _maxEvents = maxEvents; + _readFilter = readFilter; } /** @@ -167,7 +152,6 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo void fireAsynchEvent(Job job, Event event) { - // job.acquire(); //prevents this job being removed from _jobs job.add(event); final ExecutorService pool = _poolReference.getPool(); @@ -201,7 +185,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public void createNewJobForSession(IoSession session) { - Job job = new Job(session, this, MAX_JOB_EVENTS); + Job job = new Job(session, this, MAX_JOB_EVENTS,_readFilter); session.setAttribute(_name, job); } @@ -433,7 +417,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) { - super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS)); + super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS),true); } /** @@ -476,7 +460,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) { - super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS)); + super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS),false); } /** diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java new file mode 100644 index 0000000000..8de0f93ce9 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java @@ -0,0 +1,432 @@ +package org.apache.qpid.pool; + +import java.util.AbstractQueue; +import java.util.Iterator; +import java.util.Collection; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.atomic.AtomicInteger; + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +public class ReadWriteJobQueue extends AbstractQueue implements BlockingQueue +{ + + private final AtomicInteger _count = new AtomicInteger(0); + + private final ReentrantLock _takeLock = new ReentrantLock(); + + private final Condition _notEmpty = _takeLock.newCondition(); + + private final ReentrantLock _putLock = new ReentrantLock(); + + private final ConcurrentLinkedQueue _readJobQueue = new ConcurrentLinkedQueue(); + + private final ConcurrentLinkedQueue _writeJobQueue = new ConcurrentLinkedQueue(); + + + private class ReadWriteJobIterator implements Iterator + { + + private boolean _onReads; + private Iterator _iter = _writeJobQueue.iterator(); + + public boolean hasNext() + { + if(!_iter.hasNext()) + { + if(_onReads) + { + _iter = _readJobQueue.iterator(); + _onReads = true; + return _iter.hasNext(); + } + else + { + return false; + } + } + else + { + return true; + } + } + + public Runnable next() + { + if(_iter.hasNext()) + { + return _iter.next(); + } + else + { + return null; + } + } + + public void remove() + { + _takeLock.lock(); + try + { + _iter.remove(); + _count.decrementAndGet(); + } + finally + { + _takeLock.unlock(); + } + } + } + + public Iterator iterator() + { + return new ReadWriteJobIterator(); + } + + public int size() + { + return _count.get(); + } + + public boolean offer(final Runnable runnable) + { + final ReadWriteRunnable job = (ReadWriteRunnable) runnable; + final ReentrantLock putLock = _putLock; + putLock.lock(); + try + { + if(job.isRead()) + { + _readJobQueue.offer(job); + } + else + { + _writeJobQueue.offer(job); + } + if(_count.getAndIncrement() == 0) + { + _takeLock.lock(); + try + { + _notEmpty.signal(); + } + finally + { + _takeLock.unlock(); + } + } + return true; + } + finally + { + putLock.unlock(); + } + } + + public void put(final Runnable runnable) throws InterruptedException + { + final ReadWriteRunnable job = (ReadWriteRunnable) runnable; + final ReentrantLock putLock = _putLock; + putLock.lock(); + + try + { + if(job.isRead()) + { + _readJobQueue.offer(job); + } + else + { + _writeJobQueue.offer(job); + } + if(_count.getAndIncrement() == 0) + { + _takeLock.lock(); + try + { + _notEmpty.signal(); + } + finally + { + _takeLock.unlock(); + } + } + + } + finally + { + putLock.unlock(); + } + } + + + + public boolean offer(final Runnable runnable, final long timeout, final TimeUnit unit) throws InterruptedException + { + final ReadWriteRunnable job = (ReadWriteRunnable) runnable; + final ReentrantLock putLock = _putLock; + putLock.lock(); + + try + { + if(job.isRead()) + { + _readJobQueue.offer(job); + } + else + { + _writeJobQueue.offer(job); + } + if(_count.getAndIncrement() == 0) + { + _takeLock.lock(); + try + { + _notEmpty.signal(); + } + finally + { + _takeLock.unlock(); + } + } + + return true; + } + finally + { + putLock.unlock(); + } + + } + + public Runnable take() throws InterruptedException + { + final ReentrantLock takeLock = _takeLock; + takeLock.lockInterruptibly(); + try + { + try + { + while (_count.get() == 0) + { + _notEmpty.await(); + } + } + catch (InterruptedException ie) + { + _notEmpty.signal(); + throw ie; + } + + ReadWriteRunnable job = _writeJobQueue.poll(); + if(job == null) + { + job = _readJobQueue.poll(); + } + int c = _count.getAndDecrement(); + if (c > 1) + { + _notEmpty.signal(); + } + return job; + } + finally + { + takeLock.unlock(); + } + + + } + + public Runnable poll(final long timeout, final TimeUnit unit) throws InterruptedException + { + final ReentrantLock takeLock = _takeLock; + final AtomicInteger count = _count; + long nanos = unit.toNanos(timeout); + takeLock.lockInterruptibly(); + ReadWriteRunnable job = null; + try + { + + for (;;) + { + if (count.get() > 0) + { + job = _writeJobQueue.poll(); + if(job == null) + { + job = _readJobQueue.poll(); + } + int c = count.getAndDecrement(); + if (c > 1) + { + _notEmpty.signal(); + } + break; + } + if (nanos <= 0) + { + return null; + } + try + { + nanos = _notEmpty.awaitNanos(nanos); + } + catch (InterruptedException ie) + { + _notEmpty.signal(); + throw ie; + } + } + } + finally + { + takeLock.unlock(); + } + + return job; + } + + public int remainingCapacity() + { + return Integer.MAX_VALUE; + } + + public int drainTo(final Collection c) + { + int total = 0; + + _putLock.lock(); + _takeLock.lock(); + try + { + ReadWriteRunnable job; + while((job = _writeJobQueue.peek())!= null) + { + c.add(job); + _writeJobQueue.poll(); + _count.decrementAndGet(); + total++; + } + + while((job = _readJobQueue.peek())!= null) + { + c.add(job); + _readJobQueue.poll(); + _count.decrementAndGet(); + total++; + } + + } + finally + { + _takeLock.unlock(); + _putLock.unlock(); + } + return total; + } + + public int drainTo(final Collection c, final int maxElements) + { + int total = 0; + + _putLock.lock(); + _takeLock.lock(); + try + { + ReadWriteRunnable job; + while(total<=maxElements && (job = _writeJobQueue.peek())!= null) + { + c.add(job); + _writeJobQueue.poll(); + _count.decrementAndGet(); + total++; + } + + while(total<=maxElements && (job = _readJobQueue.peek())!= null) + { + c.add(job); + _readJobQueue.poll(); + _count.decrementAndGet(); + total++; + } + + } + finally + { + _takeLock.unlock(); + _putLock.unlock(); + } + return total; + + } + + public Runnable poll() + { + final ReentrantLock takeLock = _takeLock; + takeLock.lock(); + try + { + if(_count.get() > 0) + { + ReadWriteRunnable job = _writeJobQueue.poll(); + if(job == null) + { + job = _readJobQueue.poll(); + } + _count.decrementAndGet(); + return job; + } + else + { + return null; + } + } + finally + { + takeLock.unlock(); + } + + } + + public Runnable peek() + { + final ReentrantLock takeLock = _takeLock; + takeLock.lock(); + try + { + ReadWriteRunnable job = _writeJobQueue.peek(); + if(job == null) + { + job = _readJobQueue.peek(); + } + return job; + } + finally + { + takeLock.unlock(); + } + } +} diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java new file mode 100644 index 0000000000..ad04a923e1 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java @@ -0,0 +1,27 @@ +package org.apache.qpid.pool; + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +public interface ReadWriteRunnable extends Runnable +{ + boolean isRead(); + boolean isWrite(); +} diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java index 84c9e1f465..ce9c6ae4cb 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java +++ b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java @@ -22,6 +22,9 @@ package org.apache.qpid.pool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.LinkedBlockingQueue; /** * ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts @@ -84,6 +87,8 @@ public class ReferenceCountingExecutorService /** Holds the number of executor threads to create. */ private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE); + private final boolean _useBiasedPool = Boolean.getBoolean("org.apache.qpid.use_write_biased_pool"); + /** * Retrieves the singleton instance of this reference counter. * @@ -105,15 +110,28 @@ public class ReferenceCountingExecutorService * * @return An executor service. */ - ExecutorService acquireExecutorService() + public ExecutorService acquireExecutorService() { synchronized (_lock) { if (_refCount++ == 0) { - _pool = Executors.newFixedThreadPool(_poolSize); +// _pool = Executors.newFixedThreadPool(_poolSize); + + // Use a job queue that biases to writes + if(_useBiasedPool) + { + _pool = new ThreadPoolExecutor(_poolSize, _poolSize, + 0L, TimeUnit.MILLISECONDS, + new ReadWriteJobQueue()); + } + else + { + _pool = Executors.newFixedThreadPool(_poolSize); + } } + return _pool; } } @@ -122,7 +140,7 @@ public class ReferenceCountingExecutorService * Releases a reference to a shared executor service, decrementing the reference count. If the refence count falls * to zero, the executor service is shut down. */ - void releaseExecutorService() + public void releaseExecutorService() { synchronized (_lock) { -- cgit v1.2.1