summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-05-12 20:19:50 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-05-12 20:19:50 +0000
commitf0f34d8914a623dc6fe42038ca443ea560a64a28 (patch)
tree30896cdbb2b640aff604d326a79bf933e3c63d18
parenteb12fe81f29d3d50598eafd01a4eda1fad6275cb (diff)
downloadqpid-python-f0f34d8914a623dc6fe42038ca443ea560a64a28.tar.gz
More fixing up of refactoring stuff; getting all maven tests passing and implementing management methods
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@655630 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/Main.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java18
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java18
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java19
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java242
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java6
-rw-r--r--java/common/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java351
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java16
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java3
16 files changed, 643 insertions, 58 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index deb6ac8d94..64694c2686 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -495,9 +495,6 @@ public class AMQChannel
// Deliver Message
deliveryContext.requeue(unacked);
- // Should we allow access To the DM to directy deliver the message?
- // As we don't need to check for Consumers or worry about incrementing the message count?
- // unacked.queue.getDeliveryManager().deliver(_storeContext, unacked.queue.getName(), unacked.message, false);
}
else
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java
index d8a8cfb6d1..41d7f6c067 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -279,6 +279,12 @@ public class Main
ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator());
}
+
+ if(connectorConfig.useBiasedWrites)
+ {
+ System.setProperty("org.apache.qpid.use_write_biased_pool","true");
+ }
+
int port = connectorConfig.port;
String portStr = commandLine.getOptionValue("p");
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
index cb3aa5259a..db3a05eb52 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
@@ -49,6 +49,7 @@ public class TxAck implements TxnOp
public void update(long deliveryTag, boolean multiple)
{
+ _unacked.clear();
if (!multiple)
{
if(_individual == null)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index e790493a82..1df93dd0d8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -278,9 +278,17 @@ public class AMQMessage implements Filterable<AMQException>
}
/** Threadsafe. Increment the reference count on the message. */
- public void incrementReference()
+ public boolean incrementReference()
{
- _referenceCount.incrementAndGet();
+ if(_referenceCount.incrementAndGet() <= 1)
+ {
+ _referenceCount.decrementAndGet();
+ return false;
+ }
+ else
+ {
+ return true;
+ }
// if (_log.isDebugEnabled())
// {
// _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
@@ -298,6 +306,7 @@ public class AMQMessage implements Filterable<AMQException>
*/
public void decrementReference(StoreContext storeContext) throws MessageCleanupException
{
+
int count = _referenceCount.decrementAndGet();
// note that the operation of decrementing the reference count and then removing the message does not
@@ -306,6 +315,11 @@ public class AMQMessage implements Filterable<AMQException>
// not relying on the all the increments having taken place before the delivery manager decrements.
if (count == 0)
{
+ // set the reference count way below 0 so that we can detect that the message has been deleted
+ // this is to guard against the message being spontaneously recreated (from the mgmt console)
+ // by copying from other queues at the same time as it is being removed.
+ _referenceCount.set(Integer.MIN_VALUE/2);
+
try
{
// if (_log.isDebugEnabled())
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 98583c03d4..780cd49834 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -103,10 +103,6 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
void start();
- void enqueueMovedMessages(final StoreContext storeContext, final List<QueueEntry> foundMessagesList);
-
-
-
long getMaximumMessageSize();
void setMaximumMessageSize(long value);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index e751212272..431b76754f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -38,7 +38,7 @@ public class AMQQueueFactory
throws AMQException
{
- final int priorities = arguments.containsKey(X_QPID_PRIORITIES) ? arguments.getInteger(X_QPID_PRIORITIES) : 1;
+ final int priorities = arguments == null ? 1 : arguments.containsKey(X_QPID_PRIORITIES) ? arguments.getInteger(X_QPID_PRIORITIES) : 1;
if(priorities > 1)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 39c28a7355..dd967a7cb1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -34,7 +34,8 @@ public interface QueueEntry extends Comparable<QueueEntry>
AVAILABLE,
ACQUIRED,
EXPIRED,
- DEQUEUED
+ DEQUEUED,
+ DELETED
}
public static interface StateChangeListener
@@ -62,7 +63,7 @@ public interface QueueEntry extends Comparable<QueueEntry>
}
- public final class DeletedState extends EntryState
+ public final class DequeuedState extends EntryState
{
public State getState()
@@ -71,6 +72,16 @@ public interface QueueEntry extends Comparable<QueueEntry>
}
}
+
+ public final class DeletedState extends EntryState
+ {
+
+ public State getState()
+ {
+ return State.DELETED;
+ }
+ }
+
public final class ExpiredState extends EntryState
{
@@ -113,6 +124,7 @@ public interface QueueEntry extends Comparable<QueueEntry>
final static EntryState AVAILABLE_STATE = new AvailableState();
final static EntryState DELETED_STATE = new DeletedState();
+ final static EntryState DEQUEUED_STATE = new DequeuedState();
final static EntryState EXPIRED_STATE = new ExpiredState();
final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState();
@@ -165,7 +177,7 @@ public interface QueueEntry extends Comparable<QueueEntry>
void restoreCredit();
- void discard(StoreContext storeContext) throws AMQException;
+ void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException;
boolean isQueueDeleted();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 6225501c72..e3338996e1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -252,13 +252,18 @@ public class QueueEntryImpl implements QueueEntry
public void dequeue(final StoreContext storeContext) throws FailedDequeueException
{
+ EntryState state = _state;
-
- getQueue().dequeue(storeContext, this);
- if(_stateChangeListeners != null)
+ if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
{
- notifyStateChange(_state.getState() , QueueEntry.State.DEQUEUED);
+ getQueue().dequeue(storeContext, this);
+ if(_stateChangeListeners != null)
+ {
+ notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED);
+ }
+
}
+
}
private void notifyStateChange(final State oldState, final State newState)
@@ -271,8 +276,10 @@ public class QueueEntryImpl implements QueueEntry
public void dispose(final StoreContext storeContext) throws MessageCleanupException
{
- getMessage().decrementReference(storeContext);
- delete();
+ if(delete())
+ {
+ getMessage().decrementReference(storeContext);
+ }
}
public void restoreCredit()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 7e2f0fc56a..25116817d7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -5,6 +5,7 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -135,9 +136,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private AtomicReference _asynchronousRunner = new AtomicReference(null);
private AtomicInteger _deliveredMessages = new AtomicInteger();
-
-
-
protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
{
@@ -428,6 +426,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if(entry.immediateAndNotDelivered())
{
dequeue(storeContext, entry);
+ entry.dispose(storeContext);
}
else if(!entry.isAcquired())
{
@@ -582,7 +581,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
_virtualHost.getMessageStore().dequeueMessage(storeContext, getName(), msg.getMessageId());
}
- entry.delete();
+ //entry.dispose(storeContext);
}
catch (MessageCleanupException e)
@@ -685,7 +684,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public long getOldestMessageArrivalTime()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ QueueEntry entry = getOldestQueueEntry();
+ return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime();
+ }
+
+ protected QueueEntry getOldestQueueEntry()
+ {
+ return _entries.next(_entries.getHead());
}
public boolean isDeleted()
@@ -809,35 +814,217 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
- public void moveMessagesToAnotherQueue(long fromMessageId,
- long toMessageId,
+ public void moveMessagesToAnotherQueue(final long fromMessageId,
+ final long toMessageId,
String queueName,
StoreContext storeContext)
{
- //To change body of implemented methods use File | Settings | File Templates.
+
+ AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ MessageStore store = getVirtualHost().getMessageStore();
+
+
+ List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
+ {
+
+ public boolean accept(QueueEntry entry)
+ {
+ final long messageId = entry.getMessage().getMessageId();
+ return (messageId >= fromMessageId)
+ && (messageId <= toMessageId)
+ && entry.acquire();
+ }
+
+ public boolean filterComplete()
+ {
+ return false;
+ }
+ });
+
+
+ try
+ {
+ store.beginTran(storeContext);
+
+ // Move the messages in on the message store.
+ for (QueueEntry entry : entries)
+ {
+ AMQMessage message = entry.getMessage();
+
+ if(message.isPersistent() && toQueue.isDurable())
+ {
+ store.enqueueMessage(storeContext, toQueue.getName(), message.getMessageId());
+ }
+ // dequeue does not decrement the refence count
+ entry.dequeue(storeContext);
+ }
+
+ // Commit and flush the move transcations.
+ try
+ {
+ store.commitTran(storeContext);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
+ }
+ }
+ catch (AMQException e)
+ {
+ try
+ {
+ store.abortTran(storeContext);
+ }
+ catch (AMQException rollbackEx)
+ {
+ _logger.error("Failed to rollback transaction when error occured moving messages", rollbackEx);
+ }
+ throw new RuntimeException(e);
+ }
+
+ try
+ {
+ for (QueueEntry entry : entries)
+ {
+ toQueue.enqueue(storeContext, entry.getMessage());
+
+ }
+ }
+ catch (MessageCleanupException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+
}
- public void copyMessagesToAnotherQueue(long fromMessageId,
- long toMessageId,
+ public void copyMessagesToAnotherQueue(final long fromMessageId,
+ final long toMessageId,
String queueName,
- StoreContext storeContext)
+ final StoreContext storeContext)
{
- //To change body of implemented methods use File | Settings | File Templates.
+ AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ MessageStore store = getVirtualHost().getMessageStore();
+
+
+ List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
+ {
+
+ public boolean accept(QueueEntry entry)
+ {
+ final long messageId = entry.getMessage().getMessageId();
+ if((messageId >= fromMessageId)
+ && (messageId <= toMessageId))
+ {
+ if(!entry.isDeleted())
+ {
+ return entry.getMessage().incrementReference();
+ }
+ }
+
+ return false;
+ }
+
+ public boolean filterComplete()
+ {
+ return false;
+ }
+ });
+
+ try
+ {
+ store.beginTran(storeContext);
+
+ // Move the messages in on the message store.
+ for (QueueEntry entry : entries)
+ {
+ AMQMessage message = entry.getMessage();
+
+ if(message.isReferenced() && message.isPersistent() && toQueue.isDurable())
+ {
+ store.enqueueMessage(storeContext, toQueue.getName(), message.getMessageId());
+ }
+ }
+
+ // Commit and flush the move transcations.
+ try
+ {
+ store.commitTran(storeContext);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
+ }
+ }
+ catch (AMQException e)
+ {
+ try
+ {
+ store.abortTran(storeContext);
+ }
+ catch (AMQException rollbackEx)
+ {
+ _logger.error("Failed to rollback transaction when error occured moving messages", rollbackEx);
+ }
+ throw new RuntimeException(e);
+ }
+
+ try
+ {
+ for (QueueEntry entry : entries)
+ {
+ if(entry.getMessage().isReferenced())
+ {
+ toQueue.enqueue(storeContext, entry.getMessage());
+ }
+ }
+ }
+ catch (MessageCleanupException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+
}
public void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext)
{
- //To change body of implemented methods use File | Settings | File Templates.
- }
+ try
+ {
+ QueueEntryIterator queueListIterator = _entries.iterator();
- public void enqueueMovedMessages(final StoreContext storeContext, final List<QueueEntry> foundMessagesList)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
+ while(queueListIterator.advance())
+ {
+ QueueEntry node = queueListIterator.getNode();
+
+ final long messageId = node.getMessage().getMessageId();
+
+ if((messageId >= fromMessageId)
+ && (messageId <= toMessageId)
+ && !node.isDeleted()
+ && node.acquire())
+ {
+ node.discard(storeContext);
+ }
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
public void quiesce()
{
@@ -868,10 +1055,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
QueueEntry node = queueListIterator.getNode();
if(!node.isDeleted() && node.acquire())
{
- node.dequeue(storeContext);
-
- node.dispose(storeContext);
-
+ node.discard(storeContext);
noDeletes = false;
}
@@ -889,10 +1073,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
QueueEntry node = queueListIterator.getNode();
if(!node.isDeleted() && node.acquire())
{
- node.dequeue(storeContext);
-
- node.dispose(storeContext);
-
+ node.discard(storeContext);
count++;
}
@@ -1046,9 +1227,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if(node.acquire())
{
final StoreContext reapingStoreContext = new StoreContext();
- node.dequeue(reapingStoreContext);
- node.dispose(reapingStoreContext);
-
+ node.discard(reapingStoreContext);
}
}
QueueEntry newNode = _entries.next(node);
@@ -1209,10 +1388,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if(!node.isDeleted() && node.expired() && node.acquire())
{
- node.dequeue(storeContext);
-
- node.dispose(storeContext);
-
+ node.discard(storeContext);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
index 711497c799..fb70984d99 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
@@ -203,12 +203,15 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
MessageMetaData mmd = new MessageMetaData(publishBody, contentHeaderBody, _contentBodies.size(), arrivalTime);
- _messageStore.storeMessageMetaData(storeContext, _messageId, mmd);
+
_persistent = contentHeaderBody.properties instanceof BasicContentHeaderProperties &&
((BasicContentHeaderProperties) contentHeaderBody.properties).getDeliveryMode() == 2;
+ _messageStore.storeMessageMetaData(storeContext, _messageId, mmd);
+
+
populateFromMessageMetaData(mmd);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
index 23aaf56876..83e348b9f2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
@@ -97,6 +97,10 @@ public class ConnectorConfiguration
defaultValue = "false")
public boolean _multiThreadNIO;
+ @Configured(path = "advanced.useWriteBiasedPool",
+ defaultValue = "true")
+ public boolean useBiasedWrites;
+
public IoAcceptor createAcceptor()
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 60f57aaf0e..ad611b217a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -538,7 +538,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
try
{
- TransportConnection.getInstance(brokerDetail).connect(_protocolHandler, brokerDetail);
+ TransportConnection.connect(_protocolHandler,brokerDetail);
+
// this blocks until the connection has been set up or when an error
// has prevented the connection being set up
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 5907bd90af..a4e9191982 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -29,6 +29,7 @@ import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.slf4j.Logger;
@@ -346,4 +347,9 @@ public class TransportConnection
}
}
+ public static synchronized void connect(final AMQProtocolHandler protocolHandler, final BrokerDetails brokerDetail)
+ throws AMQTransportConnectionException, IOException
+ {
+ getInstance(brokerDetail).connect(protocolHandler, brokerDetail);
+ }
}
diff --git a/java/common/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java b/java/common/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java
new file mode 100644
index 0000000000..5723ffbaa9
--- /dev/null
+++ b/java/common/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.common.support;
+
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.mina.common.IoAcceptorConfig;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoFuture;
+import org.apache.mina.common.IoFutureListener;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.IoServiceListener;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.util.IdentityHashSet;
+
+/**
+ * A helper which provides addition and removal of {@link IoServiceListener}s and firing
+ * events.
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 446526 $, $Date: 2006-09-15 01:44:11 -0400 (Fri, 15 Sep 2006) $
+ */
+public class IoServiceListenerSupport
+{
+ /**
+ * A list of {@link IoServiceListener}s.
+ */
+ private final List listeners = new ArrayList();
+
+ /**
+ * Tracks managed <tt>serviceAddress</tt>es.
+ */
+ private final Set managedServiceAddresses = new HashSet();
+
+ /**
+ * Tracks managed sesssions with <tt>serviceAddress</tt> as a key.
+ */
+ private final Map managedSessions = new HashMap();
+
+ /**
+ * Creates a new instance.
+ */
+ public IoServiceListenerSupport()
+ {
+ }
+
+ /**
+ * Adds a new listener.
+ */
+ public void add( IoServiceListener listener )
+ {
+ synchronized( listeners )
+ {
+ listeners.add( listener );
+ }
+ }
+
+ /**
+ * Removes an existing listener.
+ */
+ public void remove( IoServiceListener listener )
+ {
+ synchronized( listeners )
+ {
+ listeners.remove( listener );
+ }
+ }
+
+ public Set getManagedServiceAddresses()
+ {
+ return Collections.unmodifiableSet( managedServiceAddresses );
+ }
+
+ public boolean isManaged( SocketAddress serviceAddress )
+ {
+ synchronized( managedServiceAddresses )
+ {
+ return managedServiceAddresses.contains( serviceAddress );
+ }
+ }
+
+ public Set getManagedSessions( SocketAddress serviceAddress )
+ {
+ Set sessions;
+ synchronized( managedSessions )
+ {
+ sessions = ( Set ) managedSessions.get( serviceAddress );
+ if( sessions == null )
+ {
+ sessions = new IdentityHashSet();
+ }
+ }
+
+ synchronized( sessions )
+ {
+ return new IdentityHashSet( sessions );
+ }
+ }
+
+ /**
+ * Calls {@link IoServiceListener#serviceActivated(IoService, SocketAddress, IoHandler, IoServiceConfig)}
+ * for all registered listeners.
+ */
+ public void fireServiceActivated(
+ IoService service, SocketAddress serviceAddress,
+ IoHandler handler, IoServiceConfig config )
+ {
+ synchronized( managedServiceAddresses )
+ {
+ if( !managedServiceAddresses.add( serviceAddress ) )
+ {
+ return;
+ }
+ }
+
+ synchronized( listeners )
+ {
+ for( Iterator i = listeners.iterator(); i.hasNext(); )
+ {
+ ( ( IoServiceListener ) i.next() ).serviceActivated(
+ service, serviceAddress, handler, config );
+ }
+ }
+ }
+
+ /**
+ * Calls {@link IoServiceListener#serviceDeactivated(IoService, SocketAddress, IoHandler, IoServiceConfig)}
+ * for all registered listeners.
+ */
+ public synchronized void fireServiceDeactivated(
+ IoService service, SocketAddress serviceAddress,
+ IoHandler handler, IoServiceConfig config )
+ {
+ synchronized( managedServiceAddresses )
+ {
+ if( !managedServiceAddresses.remove( serviceAddress ) )
+ {
+ return;
+ }
+ }
+
+ try
+ {
+ synchronized( listeners )
+ {
+ for( Iterator i = listeners.iterator(); i.hasNext(); )
+ {
+ ( ( IoServiceListener ) i.next() ).serviceDeactivated(
+ service, serviceAddress, handler, config );
+ }
+ }
+ }
+ finally
+ {
+ disconnectSessions( serviceAddress, config );
+ }
+ }
+
+
+ /**
+ * Calls {@link IoServiceListener#sessionCreated(IoSession)} for all registered listeners.
+ */
+ public void fireSessionCreated( IoSession session )
+ {
+ SocketAddress serviceAddress = session.getServiceAddress();
+
+ // Get the session set.
+ boolean firstSession = false;
+ Set sessions;
+ synchronized( managedSessions )
+ {
+ sessions = ( Set ) managedSessions.get( serviceAddress );
+ if( sessions == null )
+ {
+ sessions = new IdentityHashSet();
+ managedSessions.put( serviceAddress, sessions );
+ firstSession = true;
+ }
+ }
+
+ // If already registered, ignore.
+ synchronized( sessions )
+ {
+ if ( !sessions.add( session ) )
+ {
+ return;
+ }
+ }
+
+ // If the first connector session, fire a virtual service activation event.
+ if( session.getService() instanceof IoConnector && firstSession )
+ {
+ fireServiceActivated(
+ session.getService(), session.getServiceAddress(),
+ session.getHandler(), session.getServiceConfig() );
+ }
+
+ // Fire session events.
+ session.getFilterChain().fireSessionCreated( session );
+ session.getFilterChain().fireSessionOpened( session);
+
+ // Fire listener events.
+ synchronized( listeners )
+ {
+ for( Iterator i = listeners.iterator(); i.hasNext(); )
+ {
+ ( ( IoServiceListener ) i.next() ).sessionCreated( session );
+ }
+ }
+ }
+
+ /**
+ * Calls {@link IoServiceListener#sessionDestroyed(IoSession)} for all registered listeners.
+ */
+ public void fireSessionDestroyed( IoSession session )
+ {
+ SocketAddress serviceAddress = session.getServiceAddress();
+
+ // Get the session set.
+ Set sessions;
+ boolean lastSession = false;
+ synchronized( managedSessions )
+ {
+ sessions = ( Set ) managedSessions.get( serviceAddress );
+ // Ignore if unknown.
+ if( sessions == null )
+ {
+ return;
+ }
+
+ // Try to remove the remaining empty seession set after removal.
+ synchronized( sessions )
+ {
+ sessions.remove( session );
+ if( sessions.isEmpty() )
+ {
+ managedSessions.remove( serviceAddress );
+ lastSession = true;
+ }
+ }
+ }
+
+ // Fire session events.
+ session.getFilterChain().fireSessionClosed( session );
+
+ // Fire listener events.
+ try
+ {
+ synchronized( listeners )
+ {
+ for( Iterator i = listeners.iterator(); i.hasNext(); )
+ {
+ ( ( IoServiceListener ) i.next() ).sessionDestroyed( session );
+ }
+ }
+ }
+ finally
+ {
+ // Fire a virtual service deactivation event for the last session of the connector.
+ //TODO double-check that this is *STILL* the last session. May not be the case
+ if( session.getService() instanceof IoConnector && lastSession )
+ {
+ fireServiceDeactivated(
+ session.getService(), session.getServiceAddress(),
+ session.getHandler(), session.getServiceConfig() );
+ }
+ }
+ }
+
+ private void disconnectSessions( SocketAddress serviceAddress, IoServiceConfig config )
+ {
+ if( !( config instanceof IoAcceptorConfig ) )
+ {
+ return;
+ }
+
+ if( !( ( IoAcceptorConfig ) config ).isDisconnectOnUnbind() )
+ {
+ return;
+ }
+
+ Set sessions;
+ synchronized( managedSessions )
+ {
+ sessions = ( Set ) managedSessions.get( serviceAddress );
+ }
+
+ if( sessions == null )
+ {
+ return;
+ }
+
+ Set sessionsCopy;
+
+ // Create a copy to avoid ConcurrentModificationException
+ synchronized( sessions )
+ {
+ sessionsCopy = new IdentityHashSet( sessions );
+ }
+
+ final CountDownLatch latch = new CountDownLatch(sessionsCopy.size());
+
+ for( Iterator i = sessionsCopy.iterator(); i.hasNext(); )
+ {
+ ( ( IoSession ) i.next() ).close().addListener( new IoFutureListener()
+ {
+ public void operationComplete( IoFuture future )
+ {
+ latch.countDown();
+ }
+ } );
+ }
+
+ try
+ {
+ latch.await();
+ }
+ catch( InterruptedException ie )
+ {
+ // Ignored
+ }
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
index 1359e56958..d99973cffb 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
@@ -87,6 +87,8 @@ public class ReferenceCountingExecutorService
/** Holds the number of executor threads to create. */
private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE);
+ private final boolean _useBiasedPool = Boolean.getBoolean("org.apache.qpid.use_write_biased_pool");
+
/**
* Retrieves the singleton instance of this reference counter.
*
@@ -117,11 +119,19 @@ public class ReferenceCountingExecutorService
// _pool = Executors.newFixedThreadPool(_poolSize);
// Use a job queue that biases to writes
- _pool = new ThreadPoolExecutor(_poolSize, _poolSize,
- 0L, TimeUnit.MILLISECONDS,
- new ReadWriteJobQueue());
+ if(_useBiasedPool)
+ {
+ _pool = new ThreadPoolExecutor(_poolSize, _poolSize,
+ 0L, TimeUnit.MILLISECONDS,
+ new ReadWriteJobQueue());
+ }
+ else
+ {
+ _pool = Executors.newFixedThreadPool(_poolSize);
+ }
}
+
return _pool;
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
index 4e39367da1..45db47a1c3 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -243,9 +243,10 @@ public class TxAckTest extends TestCase
}
- public void incrementReference()
+ public boolean incrementReference()
{
_count++;
+ return true;
}
public void decrementReference(StoreContext context)