From f0f34d8914a623dc6fe42038ca443ea560a64a28 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Mon, 12 May 2008 20:19:50 +0000 Subject: More fixing up of refactoring stuff; getting all maven tests passing and implementing management methods git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@655630 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/AMQChannel.java | 3 - .../src/main/java/org/apache/qpid/server/Main.java | 6 + .../java/org/apache/qpid/server/ack/TxAck.java | 1 + .../org/apache/qpid/server/queue/AMQMessage.java | 18 +- .../org/apache/qpid/server/queue/AMQQueue.java | 4 - .../apache/qpid/server/queue/AMQQueueFactory.java | 2 +- .../org/apache/qpid/server/queue/QueueEntry.java | 18 +- .../apache/qpid/server/queue/QueueEntryImpl.java | 19 +- .../apache/qpid/server/queue/SimpleAMQQueue.java | 242 ++++++++++++-- .../server/queue/WeakReferenceMessageHandle.java | 5 +- .../server/transport/ConnectorConfiguration.java | 4 + .../java/org/apache/qpid/client/AMQConnection.java | 3 +- .../qpid/client/transport/TransportConnection.java | 6 + .../common/support/IoServiceListenerSupport.java | 351 +++++++++++++++++++++ .../pool/ReferenceCountingExecutorService.java | 16 +- .../java/org/apache/qpid/server/ack/TxAckTest.java | 3 +- 16 files changed, 643 insertions(+), 58 deletions(-) create mode 100644 java/common/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index deb6ac8d94..64694c2686 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -495,9 +495,6 @@ public class AMQChannel // Deliver Message deliveryContext.requeue(unacked); - // Should we allow access To the DM to directy deliver the message? - // As we don't need to check for Consumers or worry about incrementing the message count? - // unacked.queue.getDeliveryManager().deliver(_storeContext, unacked.queue.getName(), unacked.message, false); } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java index d8a8cfb6d1..41d7f6c067 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -279,6 +279,12 @@ public class Main ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator()); } + + if(connectorConfig.useBiasedWrites) + { + System.setProperty("org.apache.qpid.use_write_biased_pool","true"); + } + int port = connectorConfig.port; String portStr = commandLine.getOptionValue("p"); diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index cb3aa5259a..db3a05eb52 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -49,6 +49,7 @@ public class TxAck implements TxnOp public void update(long deliveryTag, boolean multiple) { + _unacked.clear(); if (!multiple) { if(_individual == null) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index e790493a82..1df93dd0d8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -278,9 +278,17 @@ public class AMQMessage implements Filterable } /** Threadsafe. Increment the reference count on the message. */ - public void incrementReference() + public boolean incrementReference() { - _referenceCount.incrementAndGet(); + if(_referenceCount.incrementAndGet() <= 1) + { + _referenceCount.decrementAndGet(); + return false; + } + else + { + return true; + } // if (_log.isDebugEnabled()) // { // _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); @@ -298,6 +306,7 @@ public class AMQMessage implements Filterable */ public void decrementReference(StoreContext storeContext) throws MessageCleanupException { + int count = _referenceCount.decrementAndGet(); // note that the operation of decrementing the reference count and then removing the message does not @@ -306,6 +315,11 @@ public class AMQMessage implements Filterable // not relying on the all the increments having taken place before the delivery manager decrements. if (count == 0) { + // set the reference count way below 0 so that we can detect that the message has been deleted + // this is to guard against the message being spontaneously recreated (from the mgmt console) + // by copying from other queues at the same time as it is being removed. + _referenceCount.set(Integer.MIN_VALUE/2); + try { // if (_log.isDebugEnabled()) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 98583c03d4..780cd49834 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -103,10 +103,6 @@ public interface AMQQueue extends Managable, Comparable void start(); - void enqueueMovedMessages(final StoreContext storeContext, final List foundMessagesList); - - - long getMaximumMessageSize(); void setMaximumMessageSize(long value); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index e751212272..431b76754f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -38,7 +38,7 @@ public class AMQQueueFactory throws AMQException { - final int priorities = arguments.containsKey(X_QPID_PRIORITIES) ? arguments.getInteger(X_QPID_PRIORITIES) : 1; + final int priorities = arguments == null ? 1 : arguments.containsKey(X_QPID_PRIORITIES) ? arguments.getInteger(X_QPID_PRIORITIES) : 1; if(priorities > 1) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 39c28a7355..dd967a7cb1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -34,7 +34,8 @@ public interface QueueEntry extends Comparable AVAILABLE, ACQUIRED, EXPIRED, - DEQUEUED + DEQUEUED, + DELETED } public static interface StateChangeListener @@ -62,7 +63,7 @@ public interface QueueEntry extends Comparable } - public final class DeletedState extends EntryState + public final class DequeuedState extends EntryState { public State getState() @@ -71,6 +72,16 @@ public interface QueueEntry extends Comparable } } + + public final class DeletedState extends EntryState + { + + public State getState() + { + return State.DELETED; + } + } + public final class ExpiredState extends EntryState { @@ -113,6 +124,7 @@ public interface QueueEntry extends Comparable final static EntryState AVAILABLE_STATE = new AvailableState(); final static EntryState DELETED_STATE = new DeletedState(); + final static EntryState DEQUEUED_STATE = new DequeuedState(); final static EntryState EXPIRED_STATE = new ExpiredState(); final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState(); @@ -165,7 +177,7 @@ public interface QueueEntry extends Comparable void restoreCredit(); - void discard(StoreContext storeContext) throws AMQException; + void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException; boolean isQueueDeleted(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 6225501c72..e3338996e1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -252,13 +252,18 @@ public class QueueEntryImpl implements QueueEntry public void dequeue(final StoreContext storeContext) throws FailedDequeueException { + EntryState state = _state; - - getQueue().dequeue(storeContext, this); - if(_stateChangeListeners != null) + if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE)) { - notifyStateChange(_state.getState() , QueueEntry.State.DEQUEUED); + getQueue().dequeue(storeContext, this); + if(_stateChangeListeners != null) + { + notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED); + } + } + } private void notifyStateChange(final State oldState, final State newState) @@ -271,8 +276,10 @@ public class QueueEntryImpl implements QueueEntry public void dispose(final StoreContext storeContext) throws MessageCleanupException { - getMessage().decrementReference(storeContext); - delete(); + if(delete()) + { + getMessage().decrementReference(storeContext); + } } public void restoreCredit() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 7e2f0fc56a..25116817d7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -5,6 +5,7 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionList; import org.apache.qpid.server.output.ProtocolOutputConverter; @@ -135,9 +136,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private AtomicReference _asynchronousRunner = new AtomicReference(null); private AtomicInteger _deliveredMessages = new AtomicInteger(); - - - protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException { @@ -428,6 +426,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if(entry.immediateAndNotDelivered()) { dequeue(storeContext, entry); + entry.dispose(storeContext); } else if(!entry.isAcquired()) { @@ -582,7 +581,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { _virtualHost.getMessageStore().dequeueMessage(storeContext, getName(), msg.getMessageId()); } - entry.delete(); + //entry.dispose(storeContext); } catch (MessageCleanupException e) @@ -685,7 +684,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public long getOldestMessageArrivalTime() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + QueueEntry entry = getOldestQueueEntry(); + return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime(); + } + + protected QueueEntry getOldestQueueEntry() + { + return _entries.next(_entries.getHead()); } public boolean isDeleted() @@ -809,35 +814,217 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - public void moveMessagesToAnotherQueue(long fromMessageId, - long toMessageId, + public void moveMessagesToAnotherQueue(final long fromMessageId, + final long toMessageId, String queueName, StoreContext storeContext) { - //To change body of implemented methods use File | Settings | File Templates. + + AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); + MessageStore store = getVirtualHost().getMessageStore(); + + + List entries = getMessagesOnTheQueue(new QueueEntryFilter() + { + + public boolean accept(QueueEntry entry) + { + final long messageId = entry.getMessage().getMessageId(); + return (messageId >= fromMessageId) + && (messageId <= toMessageId) + && entry.acquire(); + } + + public boolean filterComplete() + { + return false; + } + }); + + + try + { + store.beginTran(storeContext); + + // Move the messages in on the message store. + for (QueueEntry entry : entries) + { + AMQMessage message = entry.getMessage(); + + if(message.isPersistent() && toQueue.isDurable()) + { + store.enqueueMessage(storeContext, toQueue.getName(), message.getMessageId()); + } + // dequeue does not decrement the refence count + entry.dequeue(storeContext); + } + + // Commit and flush the move transcations. + try + { + store.commitTran(storeContext); + } + catch (AMQException e) + { + throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e); + } + } + catch (AMQException e) + { + try + { + store.abortTran(storeContext); + } + catch (AMQException rollbackEx) + { + _logger.error("Failed to rollback transaction when error occured moving messages", rollbackEx); + } + throw new RuntimeException(e); + } + + try + { + for (QueueEntry entry : entries) + { + toQueue.enqueue(storeContext, entry.getMessage()); + + } + } + catch (MessageCleanupException e) + { + throw new RuntimeException(e); + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + + } - public void copyMessagesToAnotherQueue(long fromMessageId, - long toMessageId, + public void copyMessagesToAnotherQueue(final long fromMessageId, + final long toMessageId, String queueName, - StoreContext storeContext) + final StoreContext storeContext) { - //To change body of implemented methods use File | Settings | File Templates. + AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); + MessageStore store = getVirtualHost().getMessageStore(); + + + List entries = getMessagesOnTheQueue(new QueueEntryFilter() + { + + public boolean accept(QueueEntry entry) + { + final long messageId = entry.getMessage().getMessageId(); + if((messageId >= fromMessageId) + && (messageId <= toMessageId)) + { + if(!entry.isDeleted()) + { + return entry.getMessage().incrementReference(); + } + } + + return false; + } + + public boolean filterComplete() + { + return false; + } + }); + + try + { + store.beginTran(storeContext); + + // Move the messages in on the message store. + for (QueueEntry entry : entries) + { + AMQMessage message = entry.getMessage(); + + if(message.isReferenced() && message.isPersistent() && toQueue.isDurable()) + { + store.enqueueMessage(storeContext, toQueue.getName(), message.getMessageId()); + } + } + + // Commit and flush the move transcations. + try + { + store.commitTran(storeContext); + } + catch (AMQException e) + { + throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e); + } + } + catch (AMQException e) + { + try + { + store.abortTran(storeContext); + } + catch (AMQException rollbackEx) + { + _logger.error("Failed to rollback transaction when error occured moving messages", rollbackEx); + } + throw new RuntimeException(e); + } + + try + { + for (QueueEntry entry : entries) + { + if(entry.getMessage().isReferenced()) + { + toQueue.enqueue(storeContext, entry.getMessage()); + } + } + } + catch (MessageCleanupException e) + { + throw new RuntimeException(e); + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + + } public void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext) { - //To change body of implemented methods use File | Settings | File Templates. - } + try + { + QueueEntryIterator queueListIterator = _entries.iterator(); - public void enqueueMovedMessages(final StoreContext storeContext, final List foundMessagesList) - { - //To change body of implemented methods use File | Settings | File Templates. - } + while(queueListIterator.advance()) + { + QueueEntry node = queueListIterator.getNode(); + + final long messageId = node.getMessage().getMessageId(); + + if((messageId >= fromMessageId) + && (messageId <= toMessageId) + && !node.isDeleted() + && node.acquire()) + { + node.discard(storeContext); + } + } + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + } public void quiesce() { @@ -868,10 +1055,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntry node = queueListIterator.getNode(); if(!node.isDeleted() && node.acquire()) { - node.dequeue(storeContext); - - node.dispose(storeContext); - + node.discard(storeContext); noDeletes = false; } @@ -889,10 +1073,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntry node = queueListIterator.getNode(); if(!node.isDeleted() && node.acquire()) { - node.dequeue(storeContext); - - node.dispose(storeContext); - + node.discard(storeContext); count++; } @@ -1046,9 +1227,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if(node.acquire()) { final StoreContext reapingStoreContext = new StoreContext(); - node.dequeue(reapingStoreContext); - node.dispose(reapingStoreContext); - + node.discard(reapingStoreContext); } } QueueEntry newNode = _entries.next(node); @@ -1209,10 +1388,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if(!node.isDeleted() && node.expired() && node.acquire()) { - node.dequeue(storeContext); - - node.dispose(storeContext); - + node.discard(storeContext); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java index 711497c799..fb70984d99 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java @@ -203,12 +203,15 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle MessageMetaData mmd = new MessageMetaData(publishBody, contentHeaderBody, _contentBodies.size(), arrivalTime); - _messageStore.storeMessageMetaData(storeContext, _messageId, mmd); + _persistent = contentHeaderBody.properties instanceof BasicContentHeaderProperties && ((BasicContentHeaderProperties) contentHeaderBody.properties).getDeliveryMode() == 2; + _messageStore.storeMessageMetaData(storeContext, _messageId, mmd); + + populateFromMessageMetaData(mmd); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java index 23aaf56876..83e348b9f2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java @@ -97,6 +97,10 @@ public class ConnectorConfiguration defaultValue = "false") public boolean _multiThreadNIO; + @Configured(path = "advanced.useWriteBiasedPool", + defaultValue = "true") + public boolean useBiasedWrites; + public IoAcceptor createAcceptor() { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 60f57aaf0e..ad611b217a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -538,7 +538,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect try { - TransportConnection.getInstance(brokerDetail).connect(_protocolHandler, brokerDetail); + TransportConnection.connect(_protocolHandler,brokerDetail); + // this blocks until the connection has been set up or when an error // has prevented the connection being set up diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 5907bd90af..a4e9191982 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -29,6 +29,7 @@ import org.apache.mina.transport.socket.nio.SocketConnector; import org.apache.mina.transport.vmpipe.VmPipeAcceptor; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; +import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; import org.slf4j.Logger; @@ -346,4 +347,9 @@ public class TransportConnection } } + public static synchronized void connect(final AMQProtocolHandler protocolHandler, final BrokerDetails brokerDetail) + throws AMQTransportConnectionException, IOException + { + getInstance(brokerDetail).connect(protocolHandler, brokerDetail); + } } diff --git a/java/common/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java b/java/common/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java new file mode 100644 index 0000000000..5723ffbaa9 --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.common.support; + +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +import org.apache.mina.common.IoAcceptorConfig; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoFuture; +import org.apache.mina.common.IoFutureListener; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoService; +import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.common.IoServiceListener; +import org.apache.mina.common.IoSession; +import org.apache.mina.util.IdentityHashSet; + +/** + * A helper which provides addition and removal of {@link IoServiceListener}s and firing + * events. + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev: 446526 $, $Date: 2006-09-15 01:44:11 -0400 (Fri, 15 Sep 2006) $ + */ +public class IoServiceListenerSupport +{ + /** + * A list of {@link IoServiceListener}s. + */ + private final List listeners = new ArrayList(); + + /** + * Tracks managed serviceAddresses. + */ + private final Set managedServiceAddresses = new HashSet(); + + /** + * Tracks managed sesssions with serviceAddress as a key. + */ + private final Map managedSessions = new HashMap(); + + /** + * Creates a new instance. + */ + public IoServiceListenerSupport() + { + } + + /** + * Adds a new listener. + */ + public void add( IoServiceListener listener ) + { + synchronized( listeners ) + { + listeners.add( listener ); + } + } + + /** + * Removes an existing listener. + */ + public void remove( IoServiceListener listener ) + { + synchronized( listeners ) + { + listeners.remove( listener ); + } + } + + public Set getManagedServiceAddresses() + { + return Collections.unmodifiableSet( managedServiceAddresses ); + } + + public boolean isManaged( SocketAddress serviceAddress ) + { + synchronized( managedServiceAddresses ) + { + return managedServiceAddresses.contains( serviceAddress ); + } + } + + public Set getManagedSessions( SocketAddress serviceAddress ) + { + Set sessions; + synchronized( managedSessions ) + { + sessions = ( Set ) managedSessions.get( serviceAddress ); + if( sessions == null ) + { + sessions = new IdentityHashSet(); + } + } + + synchronized( sessions ) + { + return new IdentityHashSet( sessions ); + } + } + + /** + * Calls {@link IoServiceListener#serviceActivated(IoService, SocketAddress, IoHandler, IoServiceConfig)} + * for all registered listeners. + */ + public void fireServiceActivated( + IoService service, SocketAddress serviceAddress, + IoHandler handler, IoServiceConfig config ) + { + synchronized( managedServiceAddresses ) + { + if( !managedServiceAddresses.add( serviceAddress ) ) + { + return; + } + } + + synchronized( listeners ) + { + for( Iterator i = listeners.iterator(); i.hasNext(); ) + { + ( ( IoServiceListener ) i.next() ).serviceActivated( + service, serviceAddress, handler, config ); + } + } + } + + /** + * Calls {@link IoServiceListener#serviceDeactivated(IoService, SocketAddress, IoHandler, IoServiceConfig)} + * for all registered listeners. + */ + public synchronized void fireServiceDeactivated( + IoService service, SocketAddress serviceAddress, + IoHandler handler, IoServiceConfig config ) + { + synchronized( managedServiceAddresses ) + { + if( !managedServiceAddresses.remove( serviceAddress ) ) + { + return; + } + } + + try + { + synchronized( listeners ) + { + for( Iterator i = listeners.iterator(); i.hasNext(); ) + { + ( ( IoServiceListener ) i.next() ).serviceDeactivated( + service, serviceAddress, handler, config ); + } + } + } + finally + { + disconnectSessions( serviceAddress, config ); + } + } + + + /** + * Calls {@link IoServiceListener#sessionCreated(IoSession)} for all registered listeners. + */ + public void fireSessionCreated( IoSession session ) + { + SocketAddress serviceAddress = session.getServiceAddress(); + + // Get the session set. + boolean firstSession = false; + Set sessions; + synchronized( managedSessions ) + { + sessions = ( Set ) managedSessions.get( serviceAddress ); + if( sessions == null ) + { + sessions = new IdentityHashSet(); + managedSessions.put( serviceAddress, sessions ); + firstSession = true; + } + } + + // If already registered, ignore. + synchronized( sessions ) + { + if ( !sessions.add( session ) ) + { + return; + } + } + + // If the first connector session, fire a virtual service activation event. + if( session.getService() instanceof IoConnector && firstSession ) + { + fireServiceActivated( + session.getService(), session.getServiceAddress(), + session.getHandler(), session.getServiceConfig() ); + } + + // Fire session events. + session.getFilterChain().fireSessionCreated( session ); + session.getFilterChain().fireSessionOpened( session); + + // Fire listener events. + synchronized( listeners ) + { + for( Iterator i = listeners.iterator(); i.hasNext(); ) + { + ( ( IoServiceListener ) i.next() ).sessionCreated( session ); + } + } + } + + /** + * Calls {@link IoServiceListener#sessionDestroyed(IoSession)} for all registered listeners. + */ + public void fireSessionDestroyed( IoSession session ) + { + SocketAddress serviceAddress = session.getServiceAddress(); + + // Get the session set. + Set sessions; + boolean lastSession = false; + synchronized( managedSessions ) + { + sessions = ( Set ) managedSessions.get( serviceAddress ); + // Ignore if unknown. + if( sessions == null ) + { + return; + } + + // Try to remove the remaining empty seession set after removal. + synchronized( sessions ) + { + sessions.remove( session ); + if( sessions.isEmpty() ) + { + managedSessions.remove( serviceAddress ); + lastSession = true; + } + } + } + + // Fire session events. + session.getFilterChain().fireSessionClosed( session ); + + // Fire listener events. + try + { + synchronized( listeners ) + { + for( Iterator i = listeners.iterator(); i.hasNext(); ) + { + ( ( IoServiceListener ) i.next() ).sessionDestroyed( session ); + } + } + } + finally + { + // Fire a virtual service deactivation event for the last session of the connector. + //TODO double-check that this is *STILL* the last session. May not be the case + if( session.getService() instanceof IoConnector && lastSession ) + { + fireServiceDeactivated( + session.getService(), session.getServiceAddress(), + session.getHandler(), session.getServiceConfig() ); + } + } + } + + private void disconnectSessions( SocketAddress serviceAddress, IoServiceConfig config ) + { + if( !( config instanceof IoAcceptorConfig ) ) + { + return; + } + + if( !( ( IoAcceptorConfig ) config ).isDisconnectOnUnbind() ) + { + return; + } + + Set sessions; + synchronized( managedSessions ) + { + sessions = ( Set ) managedSessions.get( serviceAddress ); + } + + if( sessions == null ) + { + return; + } + + Set sessionsCopy; + + // Create a copy to avoid ConcurrentModificationException + synchronized( sessions ) + { + sessionsCopy = new IdentityHashSet( sessions ); + } + + final CountDownLatch latch = new CountDownLatch(sessionsCopy.size()); + + for( Iterator i = sessionsCopy.iterator(); i.hasNext(); ) + { + ( ( IoSession ) i.next() ).close().addListener( new IoFutureListener() + { + public void operationComplete( IoFuture future ) + { + latch.countDown(); + } + } ); + } + + try + { + latch.await(); + } + catch( InterruptedException ie ) + { + // Ignored + } + } +} diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java index 1359e56958..d99973cffb 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java +++ b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java @@ -87,6 +87,8 @@ public class ReferenceCountingExecutorService /** Holds the number of executor threads to create. */ private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE); + private final boolean _useBiasedPool = Boolean.getBoolean("org.apache.qpid.use_write_biased_pool"); + /** * Retrieves the singleton instance of this reference counter. * @@ -117,11 +119,19 @@ public class ReferenceCountingExecutorService // _pool = Executors.newFixedThreadPool(_poolSize); // Use a job queue that biases to writes - _pool = new ThreadPoolExecutor(_poolSize, _poolSize, - 0L, TimeUnit.MILLISECONDS, - new ReadWriteJobQueue()); + if(_useBiasedPool) + { + _pool = new ThreadPoolExecutor(_poolSize, _poolSize, + 0L, TimeUnit.MILLISECONDS, + new ReadWriteJobQueue()); + } + else + { + _pool = Executors.newFixedThreadPool(_poolSize); + } } + return _pool; } } diff --git a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java index 4e39367da1..45db47a1c3 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java @@ -243,9 +243,10 @@ public class TxAckTest extends TestCase } - public void incrementReference() + public boolean incrementReference() { _count++; + return true; } public void decrementReference(StoreContext context) -- cgit v1.2.1