summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java11
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java9
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java49
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java65
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java52
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java101
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java87
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java53
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java90
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java86
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java192
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java777
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java24
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java42
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java26
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java14
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java32
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java34
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java109
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java70
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java56
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java25
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java26
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java4
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java468
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java16
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java54
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java93
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java9
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java9
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java8
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java61
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java23
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java59
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java9
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java9
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java9
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java11
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java642
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java187
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java320
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java23
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java28
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java24
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/FutureResult.java (renamed from qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoreFuture.java)16
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java239
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java33
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java28
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java108
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java5
76 files changed, 3427 insertions, 1228 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
index 6dd6c58853..bc5d30a0f0 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
@@ -29,9 +29,13 @@ import java.security.PrivilegedExceptionAction;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
@@ -106,11 +110,16 @@ public class Broker implements BrokerShutdownProvider
{
if(_systemConfig != null)
{
- _systemConfig.close();
+ ListenableFuture<Void> closeResult = _systemConfig.closeAsync();
+ closeResult.get(30000l, TimeUnit.MILLISECONDS);
}
_taskExecutor.stop();
}
+ catch (TimeoutException | InterruptedException | ExecutionException e)
+ {
+ LOGGER.warn("Attempting to cleanly shutdown took too long, exiting immediately");
+ }
finally
{
if (_configuringOwnLogging)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
index c2c0cc77fa..06ce48ffa2 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
@@ -28,6 +28,9 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.updater.VoidTask;
import org.apache.qpid.server.exchange.AbstractExchange;
@@ -196,7 +199,7 @@ public class BindingImpl
}
@StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
if(_deleted.compareAndSet(false,true))
{
@@ -209,12 +212,14 @@ public class BindingImpl
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE)
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
public void addStateChangeListener(StateChangeListener<BindingImpl,State> listener)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
index e0c03fe822..8d572189b3 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.configuration.updater;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.Executor;
import java.util.concurrent.Future;
public interface TaskExecutor
@@ -43,4 +44,7 @@ public interface TaskExecutor
<T> Future<T> submit(Task<T> task) throws CancellationException;
+ boolean isTaskExecutorThread();
+
+ Executor getExecutor();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
index 96e4e256b2..0f59494850 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -48,6 +49,7 @@ public class TaskExecutorImpl implements TaskExecutor
private volatile Thread _taskThread;
private final AtomicBoolean _running = new AtomicBoolean();
private volatile ExecutorService _executor;
+ private final ImmediateIfSameThreadExecutor _wrappedExecutor = new ImmediateIfSameThreadExecutor();
@Override
@@ -67,7 +69,7 @@ public class TaskExecutorImpl implements TaskExecutor
@Override
public Thread newThread(Runnable r)
{
- _taskThread = new Thread(r, TASK_EXECUTION_THREAD_NAME);
+ _taskThread = new TaskThread(r, TASK_EXECUTION_THREAD_NAME, TaskExecutorImpl.this);
return _taskThread;
}
});
@@ -277,7 +279,13 @@ public class TaskExecutorImpl implements TaskExecutor
}
}
- private boolean isTaskExecutorThread()
+ @Override
+ public Executor getExecutor()
+ {
+ return _wrappedExecutor;
+ }
+
+ public boolean isTaskExecutorThread()
{
return Thread.currentThread() == _taskThread;
}
@@ -373,4 +381,41 @@ public class TaskExecutorImpl implements TaskExecutor
return get();
}
}
+
+ private class ImmediateIfSameThreadExecutor implements Executor
+ {
+
+ @Override
+ public void execute(final Runnable command)
+ {
+ if(isTaskExecutorThread()
+ || (_executor == null && (Thread.currentThread() instanceof TaskThread
+ && ((TaskThread)Thread.currentThread()).getTaskExecutor() == TaskExecutorImpl.this)))
+ {
+ command.run();
+ }
+ else
+ {
+ _executor.execute(command);
+ }
+
+ }
+ }
+
+ private static class TaskThread extends Thread
+ {
+
+ private final TaskExecutorImpl _taskExecutor;
+
+ public TaskThread(final Runnable r, final String name, final TaskExecutorImpl taskExecutor)
+ {
+ super(r, name);
+ _taskExecutor = taskExecutor;
+ }
+
+ public TaskExecutorImpl getTaskExecutor()
+ {
+ return _taskExecutor;
+ }
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java
index 82ae9f6454..237a5b4069 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java
@@ -20,11 +20,12 @@
*/
package org.apache.qpid.server.connection;
+import java.net.SocketAddress;
+
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.security.auth.SocketConnectionPrincipal;
-import java.net.SocketAddress;
-
public class ConnectionPrincipal implements SocketConnectionPrincipal
{
private final AMQConnectionModel _connection;
@@ -51,6 +52,11 @@ public class ConnectionPrincipal implements SocketConnectionPrincipal
return _connection;
}
+ public VirtualHost<?,?,?> getVirtualHost()
+ {
+ return _connection.getVirtualHost();
+ }
+
@Override
public boolean equals(final Object o)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
index 883785c7ce..a24195075e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
@@ -74,7 +74,7 @@ public class ConnectionRegistry implements IConnectionRegistry
AMQConnectionModel connection = itr.next();
try
{
- connection.close(AMQConstant.CONNECTION_FORCED, replyText);
+ connection.closeAsync(AMQConstant.CONNECTION_FORCED, replyText);
}
catch (Exception e)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index 0421a66abf..be4ac9d427 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -23,17 +23,21 @@ package org.apache.qpid.server.consumer;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.util.StateChangeListener;
public abstract class AbstractConsumerTarget implements ConsumerTarget
{
-
+ private static final Logger LOGGER = Logger.getLogger(AbstractConsumerTarget.class);
private final AtomicReference<State> _state;
private final Set<StateChangeListener<ConsumerTarget, State>> _stateChangeListeners = new
@@ -41,6 +45,7 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget
private final Lock _stateChangeLock = new ReentrantLock();
private final AtomicInteger _stateActivates = new AtomicInteger();
+ private ConcurrentLinkedQueue<ConsumerMessageInstancePair> _queue = new ConcurrentLinkedQueue();
protected AbstractConsumerTarget(final State initialState)
@@ -48,6 +53,26 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget
_state = new AtomicReference<State>(initialState);
}
+ @Override
+ public void processPending()
+ {
+ while(hasMessagesToSend())
+ {
+ sendNextMessage();
+ }
+
+ processClosed();
+ }
+
+ protected abstract void processClosed();
+
+ @Override
+ public final boolean isSuspended()
+ {
+ return getSessionModel().getConnectionModel().isMessageAssignmentSuspended() || doIsSuspended();
+ }
+
+ protected abstract boolean doIsSuspended();
public final State getState()
{
@@ -101,6 +126,7 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget
}
}
+ @Override
public final void notifyCurrentState()
{
@@ -136,4 +162,41 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget
_stateChangeLock.unlock();
}
+ @Override
+ public final long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+ {
+ _queue.add(new ConsumerMessageInstancePair(consumer, entry, batch));
+
+ getSessionModel().getConnectionModel().notifyWork();
+ return entry.getMessage().getSize();
+ }
+
+ protected abstract void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
+
+ @Override
+ public boolean hasMessagesToSend()
+ {
+ return !_queue.isEmpty();
+ }
+
+ @Override
+ public void sendNextMessage()
+ {
+ ConsumerMessageInstancePair consumerMessage = _queue.peek();
+ if (consumerMessage != null)
+ {
+ _queue.poll();
+
+ ConsumerImpl consumer = consumerMessage.getConsumer();
+ MessageInstance entry = consumerMessage.getEntry();
+ boolean batch = consumerMessage.isBatch();
+ doSend(consumer, entry, batch);
+
+ if (consumer.acquires())
+ {
+ entry.unlockAcquisition();
+ }
+ }
+
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
index c0db72d498..e17eca8614 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.consumer;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -31,6 +33,8 @@ public interface ConsumerImpl
void externalStateChange();
+ ConsumerTarget getTarget();
+
enum Option
{
ACQUIRES,
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java
new file mode 100644
index 0000000000..aa5e419ce2
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.consumer;
+
+import org.apache.qpid.server.message.MessageInstance;
+
+public class ConsumerMessageInstancePair
+{
+ private final ConsumerImpl _consumer;
+ private final MessageInstance _entry;
+ private final boolean _batch;
+
+ public ConsumerMessageInstancePair(final ConsumerImpl consumer, final MessageInstance entry, final boolean batch)
+ {
+ _consumer = consumer;
+ _entry = entry;
+ _batch = batch;
+
+ }
+
+ public ConsumerImpl getConsumer()
+ {
+ return _consumer;
+ }
+
+ public MessageInstance getEntry()
+ {
+ return _entry;
+ }
+
+ public boolean isBatch()
+ {
+ return _batch;
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
index 5aef922da5..cef566793f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
@@ -33,6 +33,8 @@ public interface ConsumerTarget
void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> listener);
+ void processPending();
+
enum State
{
ACTIVE, SUSPENDED, CLOSED
@@ -44,6 +46,8 @@ public interface ConsumerTarget
void consumerRemoved(ConsumerImpl sub);
+ void notifyCurrentState();
+
void addStateListener(StateChangeListener<ConsumerTarget, State> listener);
long getUnacknowledgedBytes();
@@ -54,6 +58,10 @@ public interface ConsumerTarget
long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
+ boolean hasMessagesToSend();
+
+ void sendNextMessage();
+
void flushBatched();
void queueDeleted();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index cb026e175b..61bbe6f732 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -36,6 +37,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -63,12 +68,12 @@ import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
public abstract class AbstractExchange<T extends AbstractExchange<T>>
extends AbstractConfiguredObject<T>
@@ -479,7 +484,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
{
if (_virtualHost.getState() != State.ACTIVE)
{
- throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent");
+ throw new VirtualHostUnavailableException(this._virtualHost);
}
List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties);
@@ -593,9 +598,18 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
}
@Override
- public boolean addBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments)
+ public boolean addBinding(final String bindingKey, final AMQQueue queue, final Map<String, Object> arguments)
{
- return makeBinding(null, bindingKey, queue, arguments, false);
+ return doSync(doOnConfigThread(new Callable<ListenableFuture<Boolean>>()
+ {
+ @Override
+ public ListenableFuture<Boolean> call() throws Exception
+ {
+ return makeBindingAsync(null, bindingKey, queue, arguments, false);
+ }
+ }));
+
+
}
@Override
@@ -603,12 +617,20 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
final AMQQueue queue,
final Map<String, Object> arguments)
{
- final BindingImpl existingBinding = getBinding(bindingKey, queue);
- return makeBinding(existingBinding == null ? null : existingBinding.getId(),
- bindingKey,
- queue,
- arguments,
- true);
+ return doSync(doOnConfigThread(new Callable<ListenableFuture<Boolean>>()
+ {
+ @Override
+ public ListenableFuture<Boolean> call() throws Exception
+ {
+
+ final BindingImpl existingBinding = getBinding(bindingKey, queue);
+ return makeBindingAsync(existingBinding == null ? null : existingBinding.getId(),
+ bindingKey,
+ queue,
+ arguments,
+ true);
+ }
+ }));
}
@@ -634,7 +656,15 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
doRemoveBinding(b);
queue.removeBinding(b);
- b.delete();
+ // TODO - RG - Fix bindings!
+ if(getTaskExecutor().isTaskExecutorThread())
+ {
+ b.deleteAsync();
+ }
+ else
+ {
+ b.delete();
+ }
}
}
@@ -651,7 +681,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
return _bindingsMap.get(new BindingIdentifier(bindingKey,queue));
}
- private boolean makeBinding(UUID id,
+ private ListenableFuture<Boolean> makeBindingAsync(UUID id,
String bindingKey,
AMQQueue queue,
Map<String, Object> arguments,
@@ -685,22 +715,45 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
attributes.put(Binding.ID, id);
attributes.put(Binding.ARGUMENTS, arguments);
- BindingImpl b = new BindingImpl(attributes, queue, this);
- b.create(); // Must be called before addBinding as it resolves automated attributes.
+ final BindingImpl b = new BindingImpl(attributes, queue, this);
- addBinding(b);
- return true;
+ final SettableFuture<Boolean> returnVal = SettableFuture.create();
+
+ Futures.addCallback(b.createAsync(), new FutureCallback<Void>()
+ {
+ @Override
+ public void onSuccess(final Void result)
+ {
+ try
+ {
+ addBinding(b);
+ returnVal.set(true);
+ }
+ catch(Throwable t)
+ {
+ returnVal.setException(t);
+ }
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ returnVal.setException(t);
+ }
+ }, getTaskExecutor().getExecutor()); // Must be called before addBinding as it resolves automated attributes.
+
+ return returnVal;
}
else if(force)
{
Map<String,Object> oldArguments = existingMapping.getArguments();
existingMapping.setArguments(arguments);
onBindingUpdated(existingMapping, oldArguments);
- return true;
+ return Futures.immediateFuture(true);
}
else
{
- return false;
+ return Futures.immediateFuture(false);
}
}
}
@@ -723,22 +776,24 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
@StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE)
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.UNINITIALIZED, desiredState = State.DELETED)
- private void doDeleteBeforeInitialize()
+ private ListenableFuture<Void> doDeleteBeforeInitialize()
{
preSetAlternateExchange();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
try
{
@@ -748,8 +803,9 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
}
catch (ExchangeIsAlternateException e)
{
- return;
+
}
+ return Futures.immediateFuture(null);
}
@Override
@@ -860,4 +916,5 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
return binding;
}
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
index 3e377ebaa6..be98665df8 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
@@ -107,8 +107,4 @@ public interface ExchangeImpl<T extends ExchangeImpl<T>> extends Exchange<T>, Ex
void bindingRemoved(ExchangeImpl exchange, BindingImpl binding);
}
- public void addBindingListener(BindingListener listener);
-
- public void removeBindingListener(BindingListener listener);
-
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
deleted file mode 100644
index be3a13d2d3..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.flow;
-
-
-import java.util.concurrent.atomic.AtomicLong;
-
-public class BytesOnlyCreditManager extends AbstractFlowCreditManager
-{
- private final AtomicLong _bytesCredit;
-
- public BytesOnlyCreditManager(long initialCredit)
- {
- _bytesCredit = new AtomicLong(initialCredit);
- }
-
- public long getMessageCredit()
- {
- return -1L;
- }
-
- public long getBytesCredit()
- {
- return _bytesCredit.get();
- }
-
- public void restoreCredit(long messageCredit, long bytesCredit)
- {
- _bytesCredit.addAndGet(bytesCredit);
- setSuspended(false);
- }
-
- public void removeAllCredit()
- {
- _bytesCredit.set(0L);
- }
-
- public boolean hasCredit()
- {
- return _bytesCredit.get() > 0L;
- }
-
- public boolean useCreditForMessage(long msgSize)
- {
- if(hasCredit())
- {
- if(_bytesCredit.addAndGet(-msgSize) >= 0)
- {
- return true;
- }
- else
- {
- _bytesCredit.addAndGet(msgSize);
- setSuspended(true);
- return false;
- }
- }
- else
- {
- return false;
- }
-
- }
-
- public void setBytesCredit(long bytesCredit)
- {
- _bytesCredit.set( bytesCredit );
- }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
index 280f2851a4..08aac0b511 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
@@ -24,10 +24,6 @@ package org.apache.qpid.server.flow;
public interface FlowCreditManager
{
- long getMessageCredit();
-
- long getBytesCredit();
-
public static interface FlowCreditManagerListener
{
void creditStateChanged(boolean hasCredit);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
deleted file mode 100644
index 89fc60666b..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.flow;
-
-
-public class LimitlessCreditManager extends AbstractFlowCreditManager implements FlowCreditManager
-{
- public long getMessageCredit()
- {
- return -1L;
- }
-
- public long getBytesCredit()
- {
- return -1L;
- }
-
- public void restoreCredit(long messageCredit, long bytesCredit)
- {
- }
-
- public void removeAllCredit()
- {
- }
-
- public boolean hasCredit()
- {
- return true;
- }
-
- public boolean useCreditForMessage(long msgSize)
- {
- return true;
- }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
deleted file mode 100644
index 31c1fda968..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.flow;
-
-
-public class MessageAndBytesCreditManager extends AbstractFlowCreditManager implements FlowCreditManager
-{
- private long _messageCredit;
- private long _bytesCredit;
-
- public MessageAndBytesCreditManager(final long messageCredit, final long bytesCredit)
- {
- _messageCredit = messageCredit;
- _bytesCredit = bytesCredit;
- }
-
- public synchronized long getMessageCredit()
- {
- return _messageCredit;
- }
-
- public synchronized long getBytesCredit()
- {
- return _bytesCredit;
- }
-
- public synchronized void restoreCredit(long messageCredit, long bytesCredit)
- {
- _messageCredit += messageCredit;
- _bytesCredit += bytesCredit;
- setSuspended(hasCredit());
- }
-
- public synchronized void removeAllCredit()
- {
- _messageCredit = 0L;
- _bytesCredit = 0L;
- setSuspended(true);
- }
-
- public synchronized boolean hasCredit()
- {
- return (_messageCredit > 0L) && ( _bytesCredit > 0L );
- }
-
- public synchronized boolean useCreditForMessage(final long msgSize)
- {
- if(_messageCredit == 0L)
- {
- setSuspended(true);
- return false;
- }
- else
- {
- if(msgSize > _bytesCredit)
- {
- setSuspended(true);
- return false;
- }
- _messageCredit--;
- _bytesCredit -= msgSize;
- setSuspended(false);
- return true;
- }
-
- }
-
- public synchronized void setBytesCredit(long bytesCredit)
- {
- _bytesCredit = bytesCredit;
- }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
deleted file mode 100644
index 1817e8ad31..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.flow;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-public class MessageOnlyCreditManager extends AbstractFlowCreditManager implements FlowCreditManager
-{
- private final AtomicLong _messageCredit;
-
- public MessageOnlyCreditManager(final long initialCredit)
- {
- _messageCredit = new AtomicLong(initialCredit);
- }
-
- public long getMessageCredit()
- {
- return _messageCredit.get();
- }
-
- public long getBytesCredit()
- {
- return -1L;
- }
-
- public void restoreCredit(long messageCredit, long bytesCredit)
- {
- _messageCredit.addAndGet(messageCredit);
- setSuspended(false);
-
- }
-
- public void removeAllCredit()
- {
- setSuspended(true);
- _messageCredit.set(0L);
- }
-
- public boolean hasCredit()
- {
- return _messageCredit.get() > 0L;
- }
-
- public boolean useCreditForMessage(long msgSize)
- {
- if(hasCredit())
- {
- if(_messageCredit.addAndGet(-1L) >= 0)
- {
- setSuspended(false);
- return true;
- }
- else
- {
- _messageCredit.addAndGet(1L);
- setSuspended(true);
- return false;
- }
- }
- else
- {
- setSuspended(true);
- return false;
- }
-
- }
-
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
deleted file mode 100644
index fc2d4bfb53..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.flow;
-
-
-public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager
-{
-
- private volatile long _bytesCreditLimit;
- private volatile long _messageCreditLimit;
-
- private volatile long _bytesCredit;
- private volatile long _messageCredit;
-
- public Pre0_10CreditManager(long bytesCreditLimit, long messageCreditLimit)
- {
- _bytesCreditLimit = bytesCreditLimit;
- _messageCreditLimit = messageCreditLimit;
- _bytesCredit = bytesCreditLimit;
- _messageCredit = messageCreditLimit;
- }
-
-
- public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit)
- {
- long bytesCreditChange = bytesCreditLimit - _bytesCreditLimit;
- long messageCreditChange = messageCreditLimit - _messageCreditLimit;
-
-
-
- if(bytesCreditChange != 0L)
- {
- if(bytesCreditLimit == 0L)
- {
- _bytesCredit = 0;
- }
- else
- {
- _bytesCredit += bytesCreditChange;
- }
- }
-
-
- if(messageCreditChange != 0L)
- {
- if(messageCreditLimit == 0L)
- {
- _messageCredit = 0;
- }
- else
- {
- _messageCredit += messageCreditChange;
- }
- }
-
-
- _bytesCreditLimit = bytesCreditLimit;
- _messageCreditLimit = messageCreditLimit;
-
- setSuspended(!hasCredit());
-
- }
-
-
- public long getMessageCredit()
- {
- return _messageCredit;
- }
-
- public long getBytesCredit()
- {
- return _bytesCredit;
- }
-
- public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
- {
- final long messageCreditLimit = _messageCreditLimit;
- boolean notifyIncrease = true;
- if(messageCreditLimit != 0L)
- {
- notifyIncrease = (_messageCredit != 0);
- long newCredit = _messageCredit + messageCredit;
- _messageCredit = newCredit > messageCreditLimit ? messageCreditLimit : newCredit;
- }
-
-
- final long bytesCreditLimit = _bytesCreditLimit;
- if(bytesCreditLimit != 0L)
- {
- long newCredit = _bytesCredit + bytesCredit;
- _bytesCredit = newCredit > bytesCreditLimit ? bytesCreditLimit : newCredit;
- if(notifyIncrease && bytesCredit>0)
- {
- notifyIncreaseBytesCredit();
- }
- }
-
-
-
- setSuspended(!hasCredit());
-
- }
-
- public synchronized void removeAllCredit()
- {
- _bytesCredit = 0L;
- _messageCredit = 0L;
- setSuspended(!hasCredit());
- }
-
- public synchronized boolean hasCredit()
- {
- return (_bytesCreditLimit == 0L || _bytesCredit > 0)
- && (_messageCreditLimit == 0L || _messageCredit > 0);
- }
-
- public synchronized boolean useCreditForMessage(final long msgSize)
- {
- if(_messageCreditLimit != 0L)
- {
- if(_messageCredit != 0L)
- {
- if(_bytesCreditLimit == 0L)
- {
- _messageCredit--;
-
- return true;
- }
- else
- {
- if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit))
- {
- _messageCredit--;
- _bytesCredit -= msgSize;
-
- return true;
- }
- else
- {
- return false;
- }
- }
- }
- else
- {
- setSuspended(true);
- return false;
- }
- }
- else
- {
- if(_bytesCreditLimit == 0L)
- {
-
- return true;
- }
- else
- {
- if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit))
- {
- _bytesCredit -= msgSize;
-
- return true;
- }
- else
- {
- return false;
- }
- }
-
- }
-
- }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
index e63638213e..529aa230d4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
@@ -41,12 +41,23 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonProcessingException;
@@ -68,6 +79,7 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.encryption.ConfigurationSecretEncrypter;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.util.Strings;
@@ -162,7 +174,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
private final OwnAttributeResolver _attributeResolver = new OwnAttributeResolver(this);
- @ManagedAttributeField( afterSet = "attainStateIfOpenedOrReopenFailed" )
+ @ManagedAttributeField
private State _desiredState;
private boolean _openComplete;
private boolean _openFailed;
@@ -439,24 +451,84 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
@Override
public final void open()
{
- if(_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+ doSync(openAsync());
+ }
+
+
+ public final ListenableFuture<Void> openAsync()
+ {
+ return doOnConfigThread(new Callable<ListenableFuture<Void>>()
+ {
+ @Override
+ public ListenableFuture<Void> call() throws Exception
+ {
+ if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+ {
+ _openFailed = false;
+ OpenExceptionHandler exceptionHandler = new OpenExceptionHandler();
+ try
+ {
+ doResolution(true, exceptionHandler);
+ doValidation(true, exceptionHandler);
+ doOpening(true, exceptionHandler);
+ return doAttainState(exceptionHandler);
+ }
+ catch (RuntimeException e)
+ {
+ exceptionHandler.handleException(e, AbstractConfiguredObject.this);
+ return Futures.immediateFuture(null);
+ }
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
+ }
+
+ }
+ });
+
+ }
+
+ protected final <T> ListenableFuture<T> doOnConfigThread(final Callable<ListenableFuture<T>> action)
+ {
+ final SettableFuture<T> returnVal = SettableFuture.create();
+
+ _taskExecutor.submit(new Task<Void>()
{
- _openFailed = false;
- OpenExceptionHandler exceptionHandler = new OpenExceptionHandler();
- try
- {
- doResolution(true, exceptionHandler);
- doValidation(true, exceptionHandler);
- doOpening(true, exceptionHandler);
- doAttainState(exceptionHandler);
- }
- catch(RuntimeException e)
+
+ @Override
+ public Void execute()
{
- exceptionHandler.handleException(e, this);
+ try
+ {
+ Futures.addCallback(action.call(), new FutureCallback<T>()
+ {
+ @Override
+ public void onSuccess(final T result)
+ {
+ returnVal.set(result);
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ returnVal.setException(t);
+ }
+ });
+ }
+ catch (Exception e)
+ {
+ returnVal.setException(e);
+ }
+ return null;
}
- }
+ });
+
+ return returnVal;
}
+
+
public void registerWithParents()
{
for(ConfiguredObject<?> parent : _parents.values())
@@ -468,17 +540,78 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
}
- protected void closeChildren()
+ private class ChildCounter
{
+ private final AtomicInteger _count = new AtomicInteger();
+ private final Runnable _task;
+
+ private ChildCounter(final Runnable task)
+ {
+ _task = task;
+ }
+
+ public void incrementCount()
+ {
+ _count.incrementAndGet();
+ }
+
+ public void decrementCount()
+ {
+ if(_count.decrementAndGet() == 0)
+ {
+ _task.run();
+ }
+ }
+ }
+
+ protected final ListenableFuture<Void> closeChildren()
+ {
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ final ChildCounter counter = new ChildCounter(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ returnVal.set(null);
+ LOGGER.debug("All children closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName() );
+
+ }
+ });
+ counter.incrementCount();
+
+
applyToChildren(new Action<ConfiguredObject<?>>()
{
@Override
public void performAction(final ConfiguredObject<?> child)
{
- child.close();
+ counter.incrementCount();
+ ListenableFuture<Void> close = child.closeAsync();
+ Futures.addCallback(close, new FutureCallback<Void>()
+ {
+ @Override
+ public void onSuccess(final Void result)
+ {
+ counter.decrementCount();
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ LOGGER.error("Exception occurred while closing "
+ + child.getClass().getSimpleName()
+ + " : '"
+ + child.getName()
+ + "'", t);
+ // No need to decrement counter as setting the exception will complete the future
+ returnVal.setException(t);
+ }
+ }, MoreExecutors.sameThreadExecutor());
}
});
+ counter.decrementCount();
+
for(Collection<ConfiguredObject<?>> childList : _children.values())
{
childList.clear();
@@ -494,23 +627,60 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
childNameMap.clear();
}
+ return returnVal;
+ }
+
+ @Override
+ public void close()
+ {
+ doSync(closeAsync());
}
@Override
- public final void close()
+ public final ListenableFuture<Void> closeAsync()
{
- if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED))
+ return doOnConfigThread(new Callable<ListenableFuture<Void>>()
{
- beforeClose();
- closeChildren();
- onClose();
- unregister(false);
+ @Override
+ public ListenableFuture<Void> call() throws Exception
+ {
+ LOGGER.debug("Closing " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
+
+ if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED))
+ {
+
+ return doAfter(beforeClose(), new Callable<ListenableFuture<Void>>()
+ {
+ @Override
+ public ListenableFuture<Void> call() throws Exception
+ {
+ return closeChildren();
+ }
+ }).then(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ onClose();
+ unregister(false);
+ LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
+ }
+ });
+ }
+ else
+ {
+ LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
+
+ return Futures.immediateFuture(null);
+ }
+ }
+ });
- }
}
- protected void beforeClose()
+ protected ListenableFuture<Void> beforeClose()
{
+ return Futures.immediateFuture(null);
}
protected void onClose()
@@ -519,48 +689,65 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
public final void create()
{
- if(_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+ doSync(createAsync());
+ }
+
+ public final ListenableFuture<Void> createAsync()
+ {
+ return doOnConfigThread(new Callable<ListenableFuture<Void>>()
{
- final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser();
- if(currentUser != null)
+ @Override
+ public ListenableFuture<Void> call() throws Exception
{
- String currentUserName = currentUser.getName();
- _attributes.put(LAST_UPDATED_BY, currentUserName);
- _attributes.put(CREATED_BY, currentUserName);
- _lastUpdatedBy = currentUserName;
- _createdBy = currentUserName;
- }
- final long currentTime = System.currentTimeMillis();
- _attributes.put(LAST_UPDATED_TIME, currentTime);
- _attributes.put(CREATED_TIME, currentTime);
- _lastUpdatedTime = currentTime;
- _createdTime = currentTime;
+ if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+ {
+ final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser();
+ if (currentUser != null)
+ {
+ String currentUserName = currentUser.getName();
+ _attributes.put(LAST_UPDATED_BY, currentUserName);
+ _attributes.put(CREATED_BY, currentUserName);
+ _lastUpdatedBy = currentUserName;
+ _createdBy = currentUserName;
+ }
+ final long currentTime = System.currentTimeMillis();
+ _attributes.put(LAST_UPDATED_TIME, currentTime);
+ _attributes.put(CREATED_TIME, currentTime);
+ _lastUpdatedTime = currentTime;
+ _createdTime = currentTime;
+
+ CreateExceptionHandler createExceptionHandler = new CreateExceptionHandler();
+ try
+ {
+ doResolution(true, createExceptionHandler);
+ doValidation(true, createExceptionHandler);
+ validateOnCreate();
+ registerWithParents();
+ }
+ catch (RuntimeException e)
+ {
+ createExceptionHandler.handleException(e, AbstractConfiguredObject.this);
+ }
- CreateExceptionHandler createExceptionHandler = new CreateExceptionHandler();
- try
- {
- doResolution(true, createExceptionHandler);
- doValidation(true, createExceptionHandler);
- validateOnCreate();
- registerWithParents();
- }
- catch(RuntimeException e)
- {
- createExceptionHandler.handleException(e, this);
- }
+ final AbstractConfiguredObjectExceptionHandler unregisteringExceptionHandler =
+ new CreateExceptionHandler(true);
+
+ try
+ {
+ doCreation(true, unregisteringExceptionHandler);
+ doOpening(true, unregisteringExceptionHandler);
+ return doAttainState(unregisteringExceptionHandler);
+ }
+ catch (RuntimeException e)
+ {
+ unregisteringExceptionHandler.handleException(e, AbstractConfiguredObject.this);
+ }
+ }
+ return Futures.immediateFuture(null);
- AbstractConfiguredObjectExceptionHandler unregisteringExceptionHandler = new CreateExceptionHandler(true);
- try
- {
- doCreation(true, unregisteringExceptionHandler);
- doOpening(true, unregisteringExceptionHandler);
- doAttainState(unregisteringExceptionHandler);
- }
- catch(RuntimeException e)
- {
- unregisteringExceptionHandler.handleException(e, this);
}
- }
+ });
+
}
protected void validateOnCreate()
@@ -610,8 +797,40 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
}
- private void doAttainState(final AbstractConfiguredObjectExceptionHandler exceptionHandler)
+ private ListenableFuture<Void> doAttainState(final AbstractConfiguredObjectExceptionHandler exceptionHandler)
{
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ final ChildCounter counter = new ChildCounter(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ attainState().addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ returnVal.set(null);
+ }
+ }, getTaskExecutor().getExecutor());
+ }
+ catch(RuntimeException e)
+ {
+ try
+ {
+ exceptionHandler.handleException(e, AbstractConfiguredObject.this);
+ returnVal.set(null);
+ }
+ catch(Throwable t)
+ {
+ returnVal.setException(t);
+ }
+ }
+ }
+ });
+ counter.incrementCount();
applyToChildren(new Action<ConfiguredObject<?>>()
{
@Override
@@ -619,22 +838,43 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
if (child instanceof AbstractConfiguredObject)
{
- AbstractConfiguredObject configuredObject = (AbstractConfiguredObject) child;
+ final AbstractConfiguredObject configuredObject = (AbstractConfiguredObject) child;
if (configuredObject._dynamicState.get() == DynamicState.OPENED)
{
- try
- {
- configuredObject.doAttainState(exceptionHandler);
- }
- catch (RuntimeException e)
- {
- exceptionHandler.handleException(e, configuredObject);
- }
+ counter.incrementCount();
+ Futures.addCallback(configuredObject.doAttainState(exceptionHandler),
+ new FutureCallback()
+ {
+ @Override
+ public void onSuccess(final Object result)
+ {
+ counter.decrementCount();
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ try
+ {
+ if (t instanceof RuntimeException)
+ {
+ exceptionHandler.handleException((RuntimeException) t,
+ configuredObject);
+ }
+ }
+ finally
+ {
+ counter.decrementCount();
+ }
+ }
+ },getTaskExecutor().getExecutor());
+
}
}
}
});
- attainState();
+ counter.decrementCount();
+ return returnVal;
}
protected void doOpening(boolean skipCheck, final AbstractConfiguredObjectExceptionHandler exceptionHandler)
@@ -890,16 +1130,17 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
}
- private void attainStateIfOpenedOrReopenFailed()
+ private ListenableFuture<Void> attainStateIfOpenedOrReopenFailed()
{
if (_openComplete || getDesiredState() == State.DELETED)
{
- attainState();
+ return attainState();
}
else if (_openFailed)
{
- open();
+ return openAsync();
}
+ return Futures.immediateFuture(null);
}
protected void onOpen()
@@ -907,10 +1148,11 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
- protected void attainState()
+ protected ListenableFuture<Void> attainState()
{
State currentState = getState();
State desiredState = getDesiredState();
+ ListenableFuture<Void> returnVal;
if(currentState != desiredState)
{
Method stateChangingMethod = getStateChangeMethod(currentState, desiredState);
@@ -918,7 +1160,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
try
{
- stateChangingMethod.invoke(this);
+ returnVal = (ListenableFuture<Void>) stateChangingMethod.invoke(this);
}
catch (IllegalAccessException e)
{
@@ -938,7 +1180,16 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
throw new ServerScopedRuntimeException("Unexpected checked exception when calling state transition", underlying);
}
}
+ else
+ {
+ returnVal = Futures.immediateFuture(null);
+ }
}
+ else
+ {
+ returnVal = Futures.immediateFuture(null);
+ }
+ return returnVal;
}
private Method getStateChangeMethod(final State currentState, final State desiredState)
@@ -1013,44 +1264,72 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
- private State setDesiredState(final State desiredState)
+ private ListenableFuture<Void> setDesiredState(final State desiredState)
throws IllegalStateTransitionException, AccessControlException
{
-
- return runTask(new Task<State>()
+ return doOnConfigThread(new Callable<ListenableFuture<Void>>()
+ {
+ @Override
+ public ListenableFuture<Void> call() throws Exception
+ {
+ final State state = getState();
+ final State currentDesiredState = getDesiredState();
+ if(desiredState == currentDesiredState && desiredState != state)
+ {
+ return doAfter(attainStateIfOpenedOrReopenFailed(), new Runnable()
+ {
+ @Override
+ public void run()
{
- @Override
- public State execute()
+ final State currentState = getState();
+ if (currentState != state)
{
+ notifyStateChanged(state, currentState);
+ }
- State state = getState();
- if(desiredState == getDesiredState() && desiredState != state)
- {
- attainStateIfOpenedOrReopenFailed();
- final State currentState = getState();
- if (currentState != state)
- {
- notifyStateChanged(state, currentState);
- }
- return currentState;
- }
- else
- {
- setAttributes(Collections.<String, Object>singletonMap(DESIRED_STATE,
- desiredState));
+ }
+ });
+ }
+ else
+ {
+ ConfiguredObject<?> proxyForValidation =
+ createProxyForValidation(Collections.<String, Object>singletonMap(
+ ConfiguredObject.DESIRED_STATE,
+ desiredState));
+ Set<String> desiredStateOnlySet = Collections.unmodifiableSet(
+ Collections.singleton(ConfiguredObject.DESIRED_STATE));
+ authoriseSetAttributes(proxyForValidation, desiredStateOnlySet);
+ validateChange(proxyForValidation, desiredStateOnlySet);
+
+ if (changeAttribute(ConfiguredObject.DESIRED_STATE, currentDesiredState, desiredState))
+ {
+ attributeSet(ConfiguredObject.DESIRED_STATE,
+ currentDesiredState,
+ desiredState);
+
+ return doAfter(attainStateIfOpenedOrReopenFailed(),new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ if (getState() == desiredState)
+ {
+ notifyStateChanged(state, desiredState);
+ }
+
+ }
+ }
+ );
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
+ }
+ }
+
+ }
+ });
- if (getState() == desiredState)
- {
- notifyStateChanged(state, desiredState);
- return desiredState;
- }
- else
- {
- return getState();
- }
- }
- }
- });
}
@Override
@@ -1429,20 +1708,62 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
public final void stop()
{
- setDesiredState(State.STOPPED);
+ doSync(setDesiredState(State.STOPPED));
}
public final void delete()
{
- if(getState() == State.UNINITIALIZED)
+ doSync(deleteAsync());
+ }
+
+ protected final <R> R doSync(ListenableFuture<R> async)
+ {
+ try
+ {
+ return async.get();
+ }
+ catch (InterruptedException e)
{
- _desiredState = State.DELETED;
+ throw new ServerScopedRuntimeException(e);
+ }
+ catch (ExecutionException e)
+ {
+ Throwable cause = e.getCause();
+ if(cause instanceof RuntimeException)
+ {
+ throw (RuntimeException) cause;
+ }
+ else if(cause instanceof Error)
+ {
+ throw (Error) cause;
+ }
+ else if(cause != null)
+ {
+ throw new ServerScopedRuntimeException(cause);
+ }
+ else
+ {
+ throw new ServerScopedRuntimeException(e);
+ }
+
}
- setDesiredState(State.DELETED);
+ }
+
+ public final ListenableFuture<Void> deleteAsync()
+ {
+ return setDesiredState(State.DELETED);
+ }
+
+ public final void start()
+ {
+ doSync(startAsync());
+ }
+ public ListenableFuture<Void> startAsync()
+ {
+ return setDesiredState(State.ACTIVE);
}
- public final void start() { setDesiredState(State.ACTIVE); }
protected void deleted()
{
@@ -1527,24 +1848,175 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
_taskExecutor.run(task);
}
+ @Override
+ public void setAttributes(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ doSync(setAttributesAsync(attributes));
+ }
+
+ protected final ChainedListenableFuture doAfter(ListenableFuture<Void> first, final Runnable second)
+ {
+ return doAfter(getTaskExecutor().getExecutor(), first, second);
+ }
+
+ protected static final ChainedListenableFuture doAfter(Executor executor, ListenableFuture<Void> first, final Runnable second)
+ {
+ final ChainedSettableFuture returnVal = new ChainedSettableFuture(executor);
+ Futures.addCallback(first, new FutureCallback<Void>()
+ {
+ @Override
+ public void onSuccess(final Void result)
+ {
+ try
+ {
+ second.run();
+ returnVal.set(null);
+ }
+ catch(Throwable e)
+ {
+ returnVal.setException(e);
+ }
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ returnVal.setException(t);
+ }
+ }, executor);
+
+ return returnVal;
+ }
+
+ public static interface ChainedListenableFuture extends ListenableFuture<Void>
+ {
+ ChainedListenableFuture then(Runnable r);
+ ChainedListenableFuture then(Callable<ListenableFuture<Void>> r);
+ }
+
+ public static class ChainedSettableFuture extends AbstractFuture<Void> implements ChainedListenableFuture
+ {
+ private final Executor _exector;
+
+ public ChainedSettableFuture(final Executor executor)
+ {
+ _exector = executor;
+ }
+
+ @Override
+ public boolean set(Void value)
+ {
+ return super.set(value);
+ }
+
+ @Override
+ public boolean setException(Throwable throwable)
+ {
+ return super.setException(throwable);
+ }
+
+ @Override
+ public ChainedListenableFuture then(final Runnable r)
+ {
+ return doAfter(_exector, this, r);
+ }
+
+ @Override
+ public ChainedListenableFuture then(final Callable<ListenableFuture<Void>> r)
+ {
+ return doAfter(_exector, this,r);
+ }
+ }
+
+ protected final ChainedListenableFuture doAfter(ListenableFuture<Void> first, final Callable<ListenableFuture<Void>> second)
+ {
+ return doAfter(getTaskExecutor().getExecutor(), first, second);
+ }
+
+ protected static final ChainedListenableFuture doAfter(final Executor executor, ListenableFuture<Void> first, final Callable<ListenableFuture<Void>> second)
+ {
+ final ChainedSettableFuture returnVal = new ChainedSettableFuture(executor);
+ Futures.addCallback(first, new FutureCallback<Void>()
+ {
+ @Override
+ public void onSuccess(final Void result)
+ {
+ try
+ {
+ final ListenableFuture<Void> future = second.call();
+ Futures.addCallback(future, new FutureCallback<Void>()
+ {
+ @Override
+ public void onSuccess(final Void result)
+ {
+ returnVal.set(null);
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ returnVal.setException(t);
+ }
+ }, executor);
+
+ }
+ catch(Throwable e)
+ {
+ returnVal.setException(e);
+ }
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ returnVal.setException(t);
+ }
+ }, executor);
+
+ return returnVal;
+ }
@Override
- public void setAttributes(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException
+ public ListenableFuture<Void> setAttributesAsync(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException
{
+ final Map<String,Object> updateAttributes = new HashMap<>(attributes);
+ Object desiredState = updateAttributes.remove(ConfiguredObject.DESIRED_STATE);
runTask(new VoidTask()
{
@Override
public void execute()
{
authoriseSetAttributes(createProxyForValidation(attributes), attributes.keySet());
- changeAttributes(attributes);
+ validateChange(createProxyForValidation(attributes), attributes.keySet());
+
+ changeAttributes(updateAttributes);
}
});
+ if(desiredState != null)
+ {
+ State state;
+ if(desiredState instanceof State)
+ {
+ state = (State)desiredState;
+ }
+ else if(desiredState instanceof String)
+ {
+ state = State.valueOf((String)desiredState);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Cannot convert an object of type " + desiredState.getClass().getName() + " to a State");
+ }
+ return setDesiredState(state);
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
+ }
}
protected void changeAttributes(final Map<String, Object> attributes)
{
- validateChange(createProxyForValidation(attributes), attributes.keySet());
Collection<String> names = getAttributeNames();
for (String name : names)
{
@@ -1938,6 +2410,74 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
}
+ private static class CloseResult implements FutureResult
+ {
+ private volatile FutureResult _childFutureResult;
+
+ @Override
+ public boolean isComplete()
+ {
+ return _childFutureResult != null && _childFutureResult.isComplete();
+ }
+
+ @Override
+ public void waitForCompletion()
+ {
+ synchronized (this)
+ {
+ while (_childFutureResult == null)
+ {
+ try
+ {
+ wait();
+ }
+ catch (InterruptedException e)
+ {
+
+ }
+ }
+ }
+ _childFutureResult.waitForCompletion();
+
+ }
+
+ @Override
+ public void waitForCompletion(final long timeout) throws TimeoutException
+ {
+ long startTime = System.currentTimeMillis();
+ long remaining = timeout;
+
+ synchronized (this)
+ {
+ while (_childFutureResult == null && remaining > 0)
+ {
+ try
+ {
+ wait(remaining);
+ }
+ catch (InterruptedException e)
+ {
+
+ }
+ remaining = startTime + timeout - System.currentTimeMillis();
+
+ if(remaining <= 0)
+ {
+ throw new TimeoutException("Completion did not occur within given timeout: " + timeout);
+ }
+ }
+ }
+ _childFutureResult.waitForCompletion(remaining);
+
+ }
+
+ public synchronized void setChildFutureResult(final FutureResult childFutureResult)
+ {
+ _childFutureResult = childFutureResult;
+ notifyAll();
+ }
+ }
+
private static class AttributeGettingHandler implements InvocationHandler
{
@@ -2127,7 +2667,8 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
if (source.getState() != State.DELETED)
{
- source.delete();
+ // TODO - RG - This isn't right :-(
+ source.deleteAsync();
}
}
finally
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java
index 5bf5e337ad..f97d2dfe14 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java
@@ -23,6 +23,10 @@ package org.apache.qpid.server.model;
import java.util.HashMap;
import java.util.Map;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+
import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
import org.apache.qpid.server.store.ConfiguredObjectDependency;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
@@ -59,6 +63,26 @@ abstract public class AbstractConfiguredObjectTypeFactory<X extends AbstractConf
return instance;
}
+
+ @Override
+ public ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory,
+ final Map<String, Object> attributes,
+ final ConfiguredObject<?>... parents)
+ {
+ final SettableFuture<X> returnVal = SettableFuture.create();
+ final X instance = createInstance(attributes, parents);
+ final ListenableFuture<Void> createFuture = instance.createAsync();
+ createFuture.addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ returnVal.set(instance);
+ }
+ }, MoreExecutors.sameThreadExecutor());
+ return returnVal;
+ }
+
protected abstract X createInstance(Map<String, Object> attributes, ConfiguredObject<?>... parents);
public final <C extends ConfiguredObject<?>> C getParent(Class<C> parentClass, ConfiguredObject<?>... parents)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
index b421c5aaf1..c6ac7d4073 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
@@ -31,6 +31,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.store.ManagementModeStoreHandler;
@@ -194,11 +197,11 @@ public abstract class AbstractSystemConfig<X extends SystemConfig<X>>
}
@StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE)
- protected void activate()
+ protected ListenableFuture<Void> activate()
{
final EventLogger eventLogger = _eventLogger;
- EventLogger startupLogger;
+ final EventLogger startupLogger;
if (isStartupLoggedToSystemOut())
{
//Create the composite (logging+SystemOut MessageLogger to be used during startup
@@ -232,17 +235,34 @@ public abstract class AbstractSystemConfig<X extends SystemConfig<X>>
BrokerStoreUpgraderAndRecoverer upgrader = new BrokerStoreUpgraderAndRecoverer(this);
upgrader.perform();
- Broker broker = getBroker();
+ final Broker broker = getBroker();
broker.setEventLogger(startupLogger);
- broker.open();
-
- if (broker.getState() == State.ACTIVE)
- {
- startupLogger.message(BrokerMessages.READY());
- broker.setEventLogger(eventLogger);
- }
-
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ broker.openAsync().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+
+ if (broker.getState() == State.ACTIVE)
+ {
+ startupLogger.message(BrokerMessages.READY());
+ broker.setEventLogger(eventLogger);
+ }
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
+
+ return returnVal;
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
index 944ed97ccc..c56698c60c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
@@ -45,5 +45,4 @@ public interface Binding<X extends Binding<X>> extends ConfiguredObject<X>
@ManagedStatistic
long getMatches();
- void delete();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java
new file mode 100644
index 0000000000..5e9d794e14
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.model;
+
+
+public interface CloseFuture
+{
+ public void runWhenComplete(final Runnable closeRunnable);
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
index 2d60879861..d2ab317f0e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
@@ -26,6 +26,8 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
@@ -236,6 +238,8 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>>
ConfiguredObject... otherParents);
void setAttributes(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException;
+ ListenableFuture<Void> setAttributesAsync(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException;
+
Class<? extends ConfiguredObject> getCategoryClass();
Class<? extends ConfiguredObject> getTypeClass();
@@ -248,8 +252,12 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>>
ConfiguredObjectRecord asObjectRecord();
void open();
+ ListenableFuture<Void> openAsync();
void close();
+ ListenableFuture<Void> closeAsync();
+
+ ListenableFuture<Void> deleteAsync();
TaskExecutor getTaskExecutor();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java
index 7d4023862b..ed7c841344 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.model;
import java.util.Collection;
import java.util.Map;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.UnresolvedConfiguredObject;
@@ -34,6 +36,8 @@ public interface ConfiguredObjectFactory
<X extends ConfiguredObject<X>> X create(Class<X> clazz, Map<String, Object> attributes, ConfiguredObject<?>... parents);
+ <X extends ConfiguredObject<X>> ListenableFuture<X> createAsync(Class<X> clazz, Map<String, Object> attributes, ConfiguredObject<?>... parents);
+
<X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(Class<X> categoryClass,
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
index 5026df0e19..82da0fd206 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
@@ -26,6 +26,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
@@ -112,6 +114,18 @@ public class ConfiguredObjectFactoryImpl implements ConfiguredObjectFactory
return factory.create(this, attributes, parents);
}
+
+ @Override
+ public <X extends ConfiguredObject<X>> ListenableFuture<X> createAsync(Class<X> clazz,
+ final Map<String, Object> attributes,
+ final ConfiguredObject<?>... parents)
+ {
+ ConfiguredObjectTypeFactory<X> factory = getConfiguredObjectTypeFactory(clazz, attributes);
+
+ return factory.createAsync(this, attributes, parents);
+ }
+
+
@Override
public <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(final Class<X> categoryClass,
Map<String, Object> attributes)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
index d0c6fb041e..a93e6a602f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
@@ -40,6 +40,7 @@ import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.plugin.ConfiguredObjectRegistration;
@@ -801,20 +802,37 @@ public class ConfiguredObjectTypeRegistry
{
if(m.isAnnotationPresent(StateTransition.class))
{
- if(m.getParameterTypes().length == 0)
+ if(ListenableFuture.class.isAssignableFrom(m.getReturnType()))
{
- m.setAccessible(true);
- StateTransition annotation = m.getAnnotation(StateTransition.class);
+ if (m.getParameterTypes().length == 0)
+ {
+ m.setAccessible(true);
+ StateTransition annotation = m.getAnnotation(StateTransition.class);
+
+ for (State state : annotation.currentState())
+ {
+ addStateTransition(state, annotation.desiredState(), m, map);
+ }
- for(State state : annotation.currentState())
+ }
+ else
{
- addStateTransition(state, annotation.desiredState(), m, map);
+ throw new ServerScopedRuntimeException(
+ "A state transition method must have no arguments. Method "
+ + m.getName()
+ + " on "
+ + clazz.getName()
+ + " does not meet this criteria.");
}
-
}
else
{
- throw new ServerScopedRuntimeException("A state transition method must have no arguments. Method " + m.getName() + " on " + clazz.getName() + " does not meet this criteria.");
+ throw new ServerScopedRuntimeException(
+ "A state transition method must return a ListenableFuture. Method "
+ + m.getName()
+ + " on "
+ + clazz.getName()
+ + " does not meet this criteria.");
}
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
index b28441438d..1c245363a5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
@@ -103,7 +103,6 @@ public interface Connection<X extends Connection<X>> extends ConfiguredObject<X>
//children
Collection<Session> getSessions();
- void delete();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
index 7318a58640..999a3594b4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.model;
import java.util.Collection;
import java.util.Set;
+import com.google.common.util.concurrent.ListenableFuture;
+
@ManagedObject
public interface Port<X extends Port<X>> extends ConfiguredObject<X>
{
@@ -76,4 +78,6 @@ public interface Port<X extends Port<X>> extends ConfiguredObject<X>
void start();
+ ListenableFuture<Void> startAsync();
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
index cc758ba7c9..c2338c08d8 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
@@ -147,8 +147,6 @@ public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>,
void stop();
- void delete();
-
String getRedirectHost(AmqpPort<?> port);
public static interface Transaction
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
index be1d6ebf59..036c4d7716 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
@@ -35,6 +35,9 @@ import java.util.regex.Pattern;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.common.QpidProperties;
@@ -234,13 +237,40 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
}
@StateTransition( currentState = State.UNINITIALIZED, desiredState = State.ACTIVE )
- private void activate()
+ private ListenableFuture<Void> activate()
{
if(_parent.isManagementMode())
{
- _managementModeAuthenticationProvider.open();
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+
+ _managementModeAuthenticationProvider.openAsync().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ activateWithoutManagementMode();
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
+ return returnVal;
}
+ else
+ {
+ activateWithoutManagementMode();
+ return Futures.immediateFuture(null);
+ }
+ }
+ private void activateWithoutManagementMode()
+ {
boolean hasBrokerAnyErroredChildren = false;
for (final Class<? extends ConfiguredObject> childClass : getModel().getChildTypes(getCategoryClass()))
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
index e03904789d..8bcbba9ac4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
@@ -27,10 +27,13 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.CloseFuture;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Port;
@@ -51,6 +54,7 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
private final Action _underlyingConnectionDeleteTask;
private final AtomicBoolean _underlyingClosed = new AtomicBoolean(false);
private AMQConnectionModel _underlyingConnection;
+ private final AtomicBoolean _closing = new AtomicBoolean();
public ConnectionAdapter(final AMQConnectionModel conn)
{
@@ -156,17 +160,59 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
}
@StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
- closeUnderlyingConnection();
- deleted();
- setState(State.DELETED);
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ asyncClose().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ deleted();
+ setState(State.DELETED);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
+ return returnVal;
+ }
+
+ @Override
+ protected ListenableFuture<Void> beforeClose()
+ {
+ _closing.set(true);
+
+ return asyncClose();
+
+ }
+
+ private ListenableFuture<Void> asyncClose()
+ {
+ final SettableFuture<Void> closeFuture = SettableFuture.create();
+
+ _underlyingConnection.addDeleteTask(new Action()
+ {
+ @Override
+ public void performAction(final Object object)
+ {
+ closeFuture.set(null);
+ }
+ });
+
+ _underlyingConnection.closeAsync(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+ return closeFuture;
}
@Override
protected void onClose()
{
- closeUnderlyingConnection();
}
@Override
@@ -233,23 +279,54 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
// SessionAdapter installs delete task to cause session model object to delete
}
- private void closeUnderlyingConnection()
+
+ private static class ConnectionCloseFuture implements CloseFuture
{
- if (_underlyingClosed.compareAndSet(false, true))
+ private boolean _closed;
+
+ public synchronized void connectionClosed()
{
- _underlyingConnection.removeDeleteTask(_underlyingConnectionDeleteTask);
- try
+ _closed = true;
+ notifyAll();
+
+ }
+
+ @Override
+ public void runWhenComplete(final Runnable closeRunnable)
+ {
+ if (_closed )
{
- _underlyingConnection.close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+ closeRunnable.run();
}
- catch (Exception e)
+ else
{
- LOGGER.warn("Exception closing connection "
- + _underlyingConnection.getConnectionId()
- + " from "
- + _underlyingConnection.getRemoteAddressString(), e);
- }
+ Thread t = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ synchronized (ConnectionCloseFuture.this)
+ {
+ while (!_closed)
+ {
+ try
+ {
+ ConnectionCloseFuture.this.wait();
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+
+ closeRunnable.run();
+ }
+ }
+ });
+
+ t.setDaemon(true);
+ t.start();
+ }
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
index fda8a6f2e9..1a119be32d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
@@ -31,6 +31,9 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.BrokerProperties;
@@ -145,7 +148,8 @@ public class FileBasedGroupProviderImpl
GroupAdapter groupAdapter = new GroupAdapter(attrMap);
principals.add(groupAdapter);
groupAdapter.registerWithParents();
- groupAdapter.open();
+ // TODO - we know this is safe, but the sync method shouldn't really be called from the management thread
+ groupAdapter.openAsync();
}
}
@@ -261,7 +265,7 @@ public class FileBasedGroupProviderImpl
}
@StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE )
- private void activate()
+ private ListenableFuture<Void> activate()
{
if (_groupDatabase != null)
{
@@ -278,29 +282,48 @@ public class FileBasedGroupProviderImpl
throw new IllegalConfigurationException(String.format("Cannot load groups from '%s'", getPath()));
}
}
+ return Futures.immediateFuture(null);
}
@StateTransition( currentState = { State.QUIESCED, State.ACTIVE, State.ERRORED}, desiredState = State.DELETED )
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
- close();
- File file = new File(getPath());
- if (file.exists())
- {
- if (!file.delete())
- {
- throw new IllegalConfigurationException("Cannot delete group file");
- }
- }
-
- deleted();
- setState(State.DELETED);
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ closeAsync().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ File file = new File(getPath());
+ if (file.exists())
+ {
+ if (!file.delete())
+ {
+ throw new IllegalConfigurationException("Cannot delete group file");
+ }
+ }
+
+ deleted();
+ setState(State.DELETED);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
+ return returnVal;
}
@StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED)
- private void startQuiesced()
+ private ListenableFuture<Void> startQuiesced()
{
setState(State.QUIESCED);
+ return Futures.immediateFuture(null);
}
public Set<Principal> getGroupPrincipalsForUser(String username)
@@ -352,9 +375,10 @@ public class FileBasedGroupProviderImpl
}
@StateTransition( currentState = State.UNINITIALIZED, desiredState = State.ACTIVE )
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@Override
@@ -371,7 +395,8 @@ public class FileBasedGroupProviderImpl
attrMap.put(GroupMember.NAME, principal.getName());
GroupMemberAdapter groupMemberAdapter = new GroupMemberAdapter(attrMap);
groupMemberAdapter.registerWithParents();
- groupMemberAdapter.open();
+ // todo - this will be safe, but the synchronous open should not be called from the management thread
+ groupMemberAdapter.openAsync();
members.add(groupMemberAdapter);
}
_groupPrincipal = new GroupPrincipal(getName());
@@ -432,11 +457,12 @@ public class FileBasedGroupProviderImpl
}
@StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED )
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
_groupDatabase.removeGroup(getName());
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@Override
@@ -494,17 +520,19 @@ public class FileBasedGroupProviderImpl
}
@StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE)
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
_groupDatabase.removeUserFromGroup(getName(), GroupAdapter.this.getName());
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
index 2b77b0d2a9..500df8cb87 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
@@ -37,16 +37,17 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.util.BaseAction;
-import org.apache.qpid.server.util.FileHelper;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.JsonProcessingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
import org.codehaus.jackson.type.TypeReference;
+import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.AuthenticationProvider;
@@ -55,6 +56,8 @@ import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.StateTransition;
+import org.apache.qpid.server.util.BaseAction;
+import org.apache.qpid.server.util.FileHelper;
public class FileSystemPreferencesProviderImpl
@@ -128,7 +131,7 @@ public class FileSystemPreferencesProviderImpl
}
@StateTransition( currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE )
- private void activate()
+ private ListenableFuture<Void> activate()
{
if (_store != null)
{
@@ -138,6 +141,7 @@ public class FileSystemPreferencesProviderImpl
{
throw new IllegalStateException("Cannot open preferences provider " + getName() + " in state " + getState() );
}
+ return Futures.immediateFuture(null);
}
@Override
@@ -171,33 +175,52 @@ public class FileSystemPreferencesProviderImpl
}
@StateTransition(currentState = { State.ACTIVE }, desiredState = State.QUIESCED)
- private void doQuiesce()
+ private ListenableFuture<Void> doQuiesce()
{
if(_store != null)
{
_store.close();
}
setState(State.QUIESCED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED }, desiredState = State.DELETED )
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
- close();
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ closeAsync().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ if(_store != null)
+ {
+ _store.close();
+ _store.delete();
+ deleted();
+ _authenticationProvider.setPreferencesProvider(null);
+
+ }
+ setState(State.DELETED);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
- if(_store != null)
- {
- _store.close();
- _store.delete();
- deleted();
- _authenticationProvider.setPreferencesProvider(null);
+ return returnVal;
- }
- setState(State.DELETED);
}
@StateTransition(currentState = State.QUIESCED, desiredState = State.ACTIVE )
- private void restart()
+ private ListenableFuture<Void> restart()
{
if (_store == null)
{
@@ -206,6 +229,7 @@ public class FileSystemPreferencesProviderImpl
_store.open();
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
index 7c9b439e93..cb412e8d41 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
@@ -26,6 +26,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Consumer;
@@ -169,10 +172,11 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl
}
@StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
index 791bbe4dd3..0e6f18a70a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
@@ -27,6 +27,9 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -226,14 +229,24 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo
}
@StateTransition(currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED )
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
- close();
- setState(State.DELETED);
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ closeAsync().addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ setState(State.DELETED);
+ returnVal.set(null);
+
+ }
+ }, getTaskExecutor().getExecutor());
+ return returnVal;
}
@StateTransition( currentState = {State.UNINITIALIZED, State.QUIESCED, State.ERRORED}, desiredState = State.ACTIVE )
- protected void activate()
+ protected ListenableFuture<Void> activate()
{
try
{
@@ -244,12 +257,14 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo
setState(State.ERRORED);
throw new IllegalConfigurationException("Unable to active port '" + getName() + "'of type " + getType() + " on " + getPort(), e);
}
+ return Futures.immediateFuture(null);
}
@StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED)
- private void startQuiesced()
+ private ListenableFuture<Void> startQuiesced()
{
setState(State.QUIESCED);
+ return Futures.immediateFuture(null);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
index 43cb5f0c62..350f137a04 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
@@ -40,6 +40,8 @@ import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.codehaus.jackson.map.ObjectMapper;
import org.apache.qpid.server.configuration.BrokerProperties;
@@ -118,6 +120,8 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider<
private final Broker<?> _broker;
private AcceptingTransport _transport;
+ private final AtomicBoolean _closing = new AtomicBoolean();
+ private final SettableFuture _noConnectionsRemain = SettableFuture.create();
@ManagedObjectFactoryConstructor
public AmqpPortImpl(Map<String, Object> attributes, Broker<?> broker)
@@ -254,6 +258,19 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider<
}
@Override
+ protected ListenableFuture<Void> beforeClose()
+ {
+ _closing.set(true);
+
+ if (_connectionCount.get() == 0)
+ {
+ _noConnectionsRemain.set(null);
+ }
+
+ return _noConnectionsRemain;
+ }
+
+ @Override
protected void onClose()
{
if (_transport != null)
@@ -262,6 +279,8 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider<
{
_broker.getEventLogger().message(BrokerMessages.SHUTTING_DOWN(String.valueOf(transport), getPort()));
}
+
+
_transport.close();
}
}
@@ -500,6 +519,11 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider<
_connectionCountWarningGiven.compareAndSet(true,false);
}
+ if (_closing.get() && _connectionCount.get() == 0)
+ {
+ _noConnectionsRemain.set(null);
+ }
+
return openConnections;
}
@@ -511,7 +535,7 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider<
@Override
public boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress)
{
- return _maxOpenConnections < 0 || _connectionCount.get() < _maxOpenConnections;
+ return !_closing.get() && ( _maxOpenConnections < 0 || _connectionCount.get() < _maxOpenConnections );
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java
index 870621f292..5c3000db4a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.model.port;
import java.util.Map;
import java.util.Set;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
@@ -108,6 +110,14 @@ public class PortFactory<X extends Port<X>> implements ConfiguredObjectTypeFacto
}
@Override
+ public ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory,
+ final Map<String, Object> attributes,
+ final ConfiguredObject<?>... parents)
+ {
+ return getPortFactory(factory, attributes, (Broker<?>)parents[0]).createAsync(factory, attributes,parents);
+ }
+
+ @Override
public UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory factory,
final ConfiguredObjectRecord record,
final ConfiguredObject<?>... parents)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java
index 0d16b4ffc7..cd0187034e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java
@@ -20,19 +20,23 @@
*/
package org.apache.qpid.server.plugin;
+import java.util.Map;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.UnresolvedConfiguredObject;
-import java.util.Map;
-
public interface ConfiguredObjectTypeFactory<X extends ConfiguredObject<X>> extends Pluggable
{
Class<? super X> getCategoryClass();
X create(final ConfiguredObjectFactory factory, Map<String, Object> attributes, ConfiguredObject<?>... parents);
+ ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory, Map<String, Object> attributes, ConfiguredObject<?>... parents);
+
UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory factory,
ConfiguredObjectRecord record,
ConfiguredObject<?>... parents);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
index 6e1b6529d8..6100a2eb80 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
@@ -19,7 +19,7 @@ package org.apache.qpid.server.plugin;/*
*
*/
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
index 26e8271d14..95b9bf8970 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
@@ -40,7 +40,7 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends
* @param cause
* @param message
*/
- public void close(AMQConstant cause, String message);
+ public void closeAsync(AMQConstant cause, String message);
public void block();
@@ -53,7 +53,7 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends
* @param cause
* @param message
*/
- public void closeSession(S session, AMQConstant cause, String message);
+ public void closeSessionAsync(S session, AMQConstant cause, String message);
public long getConnectionId();
@@ -107,4 +107,8 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends
void removeSessionListener(SessionModelListener listener);
+ void notifyWork();
+
+ boolean isMessageAssignmentSuspended();
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
index f13af479ad..3731ae262b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
@@ -113,4 +113,8 @@ public interface AMQSessionModel<T extends AMQSessionModel<T,C>, C extends AMQCo
* @return the time of the last activity or 0 if not in a transaction
*/
long getTransactionUpdateTime();
+
+ void transportStateChanged();
+
+ void processPending();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index 49c0812f4a..2ccf595c26 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -24,40 +24,31 @@ package org.apache.qpid.server.protocol;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import java.security.Principal;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLPeerUnverifiedException;
import javax.security.auth.Subject;
import org.apache.log4j.Logger;
-import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.security.SSLStatus;
-import org.apache.qpid.transport.network.security.ssl.SSLBufferingSender;
-import org.apache.qpid.transport.network.security.ssl.SSLReceiver;
-import org.apache.qpid.transport.network.security.ssl.SSLUtil;
public class MultiVersionProtocolEngine implements ServerProtocolEngine
{
private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class);
private final long _id;
- private final SSLContext _sslContext;
- private final boolean _wantClientAuth;
- private final boolean _needClientAuth;
private final AmqpPort<?> _port;
- private final Transport _transport;
+ private Transport _transport;
private final ProtocolEngineCreator[] _creators;
private final Runnable _onCloseTask;
@@ -65,15 +56,13 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
private String _fqdn;
private final Broker<?> _broker;
private NetworkConnection _network;
- private Sender<ByteBuffer> _sender;
+ private ByteBufferSender _sender;
private final Protocol _defaultSupportedReply;
private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
+ private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>();
public MultiVersionProtocolEngine(final Broker<?> broker,
- SSLContext sslContext,
- boolean wantClientAuth,
- boolean needClientAuth,
final Set<Protocol> supported,
final Protocol defaultSupportedReply,
AmqpPort<?> port,
@@ -92,15 +81,23 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
_broker = broker;
_supported = supported;
_defaultSupportedReply = defaultSupportedReply;
- _sslContext = sslContext;
- _wantClientAuth = wantClientAuth;
- _needClientAuth = needClientAuth;
_port = port;
_transport = transport;
_creators = creators;
_onCloseTask = onCloseTask;
}
+ @Override
+ public void setMessageAssignmentSuspended(final boolean value)
+ {
+ _delegate.setMessageAssignmentSuspended(value);
+ }
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ return _delegate.isMessageAssignmentSuspended();
+ }
public SocketAddress getRemoteAddress()
{
@@ -147,6 +144,12 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
_delegate.readerIdle();
}
+ @Override
+ public void encryptedTransport()
+ {
+ _delegate.encryptedTransport();
+ }
+
public void received(ByteBuffer msg)
{
@@ -169,9 +172,21 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
return _delegate.getSubject();
}
+ @Override
+ public boolean isTransportBlockedForWriting()
+ {
+ return _delegate.isTransportBlockedForWriting();
+ }
+
+ @Override
+ public void setTransportBlockedForWriting(final boolean blocked)
+ {
+ _delegate.setTransportBlockedForWriting(blocked);
+ }
+
private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8;
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
{
_network = network;
SocketAddress address = _network.getLocalAddress();
@@ -198,10 +213,82 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
return _delegate.getLastWriteTime();
}
+ @Override
+ public void processPending()
+ {
+ _delegate.processPending();
+ }
+
+ @Override
+ public boolean hasWork()
+ {
+ return _delegate.hasWork();
+ }
+
+ @Override
+ public void notifyWork()
+ {
+ _delegate.notifyWork();
+ }
+ @Override
+ public void setWorkListener(final Action<ServerProtocolEngine> listener)
+ {
+ _workListener.set(listener);
+ _delegate.setWorkListener(listener);
+ }
+
+ @Override
+ public void clearWork()
+ {
+ _delegate.clearWork();
+ }
private class ClosedDelegateProtocolEngine implements ServerProtocolEngine
{
+
+ @Override
+ public void setMessageAssignmentSuspended(final boolean value)
+ {
+
+ }
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ return false;
+ }
+
+ @Override
+ public void processPending()
+ {
+
+ }
+
+ @Override
+ public boolean hasWork()
+ {
+ return false;
+ }
+
+ @Override
+ public void notifyWork()
+ {
+
+ }
+
+ @Override
+ public void setWorkListener(final Action<ServerProtocolEngine> listener)
+ {
+
+ }
+
+ @Override
+ public void clearWork()
+ {
+
+ }
+
public SocketAddress getRemoteAddress()
{
return _network.getRemoteAddress();
@@ -247,7 +334,13 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ @Override
+ public void encryptedTransport()
+ {
+
+ }
+
+ public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
{
}
@@ -274,12 +367,24 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
{
return new Subject();
}
+
+ @Override
+ public boolean isTransportBlockedForWriting()
+ {
+ return false;
+ }
+
+ @Override
+ public void setTransportBlockedForWriting(final boolean blocked)
+ {
+ }
}
private class SelfDelegateProtocolEngine implements ServerProtocolEngine
{
private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
- private long _lastReadTime;
+ private long _lastReadTime = System.currentTimeMillis();
+ private final AtomicBoolean _hasWork = new AtomicBoolean();
public SocketAddress getRemoteAddress()
{
@@ -301,6 +406,47 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
return 0;
}
+ @Override
+ public void setMessageAssignmentSuspended(final boolean value)
+ {
+ }
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ return false;
+ }
+
+ @Override
+ public void processPending()
+ {
+
+ }
+
+ @Override
+ public boolean hasWork()
+ {
+ return _hasWork.get();
+ }
+
+ @Override
+ public void notifyWork()
+ {
+ _hasWork.set(true);
+ }
+
+ @Override
+ public void setWorkListener(final Action<ServerProtocolEngine> listener)
+ {
+
+ }
+
+ @Override
+ public void clearWork()
+ {
+ _hasWork.set(false);
+ }
+
public void received(ByteBuffer msg)
{
_lastReadTime = System.currentTimeMillis();
@@ -360,15 +506,6 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
}
-
- if(newDelegate == null && looksLikeSSL(headerBytes))
- {
- if(_sslContext != null)
- {
- newDelegate = new SslDelegateProtocolEngine();
- }
- }
-
// If no delegate is found then send back a supported protocol version id
if(newDelegate == null)
{
@@ -398,8 +535,13 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
else
{
+ boolean hasWork = _delegate.hasWork();
+ if (hasWork)
+ {
+ newDelegate.notifyWork();
+ }
_delegate = newDelegate;
-
+ _delegate.setWorkListener(_workListener.get());
_header.flip();
_delegate.received(_header);
if(msg.hasRemaining())
@@ -423,6 +565,17 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
return _delegate.getSubject();
}
+ @Override
+ public boolean isTransportBlockedForWriting()
+ {
+ return false;
+ }
+
+ @Override
+ public void setTransportBlockedForWriting(final boolean blocked)
+ {
+ }
+
public void exception(Throwable t)
{
_logger.error("Error establishing session", t);
@@ -466,132 +619,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
_network.close();
}
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
- {
-
- }
-
- @Override
- public long getLastReadTime()
- {
- return _lastReadTime;
- }
-
@Override
- public long getLastWriteTime()
+ public void encryptedTransport()
{
- return 0;
- }
- }
-
- private class SslDelegateProtocolEngine implements ServerProtocolEngine
- {
- private final MultiVersionProtocolEngine _decryptEngine;
- private final SSLEngine _engine;
- private final SSLReceiver _sslReceiver;
- private final SSLBufferingSender _sslSender;
- private long _lastReadTime;
-
- private SslDelegateProtocolEngine()
- {
-
- _decryptEngine = new MultiVersionProtocolEngine(_broker, null, false, false, _supported,
- _defaultSupportedReply, _port, Transport.SSL, _id, _creators,
- null);
-
- _engine = _sslContext.createSSLEngine();
- _engine.setUseClientMode(false);
- SSLUtil.removeSSLv3Support(_engine);
- SSLUtil.updateEnabledCipherSuites(_engine, _port.getEnabledCipherSuites(), _port.getDisabledCipherSuites());
-
- if(_needClientAuth)
- {
- _engine.setNeedClientAuth(true);
- }
- else if(_wantClientAuth)
+ if(_transport == Transport.TCP)
{
- _engine.setWantClientAuth(true);
+ _transport = Transport.SSL;
}
-
- SSLStatus sslStatus = new SSLStatus();
- _sslReceiver = new SSLReceiver(_engine,_decryptEngine,sslStatus);
- _sslSender = new SSLBufferingSender(_engine,_sender,sslStatus);
- _decryptEngine.setNetworkConnection(new SSLNetworkConnection(_engine,_network, _sslSender), _sslSender);
- }
-
- @Override
- public void received(ByteBuffer msg)
- {
- _lastReadTime = System.currentTimeMillis();
- _sslReceiver.received(msg);
- _sslSender.send();
- _sslSender.flush();
- }
-
- @Override
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
- {
- //TODO - Implement
- }
-
- @Override
- public SocketAddress getRemoteAddress()
- {
- return _decryptEngine.getRemoteAddress();
}
- @Override
- public SocketAddress getLocalAddress()
+ public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
{
- return _decryptEngine.getLocalAddress();
- }
- @Override
- public long getWrittenBytes()
- {
- return _decryptEngine.getWrittenBytes();
- }
-
- @Override
- public long getReadBytes()
- {
- return _decryptEngine.getReadBytes();
- }
-
- @Override
- public void closed()
- {
- _decryptEngine.closed();
- }
-
- @Override
- public void writerIdle()
- {
- _decryptEngine.writerIdle();
- }
-
- @Override
- public void readerIdle()
- {
- _decryptEngine.readerIdle();
- }
-
- @Override
- public void exception(Throwable t)
- {
- _decryptEngine.exception(t);
- }
-
- @Override
- public long getConnectionId()
- {
- return _decryptEngine.getConnectionId();
- }
-
- @Override
- public Subject getSubject()
- {
- return _decryptEngine.getSubject();
}
@Override
@@ -603,132 +642,9 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
@Override
public long getLastWriteTime()
{
- return _decryptEngine.getLastWriteTime();
+ return 0;
}
}
- private boolean looksLikeSSL(byte[] headerBytes)
- {
- return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes);
- }
-
- private boolean looksLikeSSLv3ClientHello(byte[] headerBytes)
- {
- return headerBytes[0] == 22 && // SSL Handshake
- (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x
- (headerBytes[2] == 0 || // SSL 3.0
- headerBytes[2] == 1 || // TLS 1.0
- headerBytes[2] == 2 || // TLS 1.1
- headerBytes[2] == 3)) && // TLS1.2
- (headerBytes[5] == 1); // client_hello
- }
-
- private boolean looksLikeSSLv2ClientHello(byte[] headerBytes)
- {
- return headerBytes[0] == -128 &&
- headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x
- (headerBytes[4] == 0 || // SSL 3.0
- headerBytes[4] == 1 || // TLS 1.0
- headerBytes[4] == 2 || // TLS 1.1
- headerBytes[4] == 3);
- }
-
-
- private static class SSLNetworkConnection implements NetworkConnection
- {
- private final NetworkConnection _network;
- private final SSLBufferingSender _sslSender;
- private final SSLEngine _engine;
- private Principal _principal;
- private boolean _principalChecked;
- private final Object _lock = new Object();
-
- public SSLNetworkConnection(SSLEngine engine, NetworkConnection network,
- SSLBufferingSender sslSender)
- {
- _engine = engine;
- _network = network;
- _sslSender = sslSender;
-
- }
-
- @Override
- public Sender<ByteBuffer> getSender()
- {
- return _sslSender;
- }
-
- @Override
- public void start()
- {
- _network.start();
- }
-
- @Override
- public void close()
- {
- _sslSender.close();
-
- _network.close();
- }
-
- @Override
- public SocketAddress getRemoteAddress()
- {
- return _network.getRemoteAddress();
- }
-
- @Override
- public SocketAddress getLocalAddress()
- {
- return _network.getLocalAddress();
- }
-
- @Override
- public void setMaxWriteIdle(int sec)
- {
- _network.setMaxWriteIdle(sec);
- }
- @Override
- public void setMaxReadIdle(int sec)
- {
- _network.setMaxReadIdle(sec);
- }
-
- @Override
- public Principal getPeerPrincipal()
- {
- synchronized (_lock)
- {
- if(!_principalChecked)
- {
- try
- {
- _principal = _engine.getSession().getPeerPrincipal();
- }
- catch (SSLPeerUnverifiedException e)
- {
- _principal = null;
- }
-
- _principalChecked = true;
- }
-
- return _principal;
- }
- }
-
- @Override
- public int getMaxReadIdle()
- {
- return _network.getMaxReadIdle();
- }
-
- @Override
- public int getMaxWriteIdle()
- {
- return _network.getMaxWriteIdle();
- }
- }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
index 5c704c5967..a51717e79e 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
@@ -27,10 +27,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
-import javax.net.ssl.SSLContext;
-
import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.logging.messages.PortMessages;
import org.apache.qpid.server.logging.subjects.PortLogSubject;
import org.apache.qpid.server.model.Broker;
@@ -48,9 +45,6 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
private final Broker<?> _broker;
private final Set<Protocol> _supported;
private final Protocol _defaultSupportedReply;
- private final SSLContext _sslContext;
- private final boolean _wantClientAuth;
- private final boolean _needClientAuth;
private final AmqpPort<?> _port;
private final Transport _transport;
private final ProtocolEngineCreator[] _creators;
@@ -58,9 +52,6 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
_connectionCountDecrementingTask = new ConnectionCountDecrementingTask();
public MultiVersionProtocolEngineFactory(Broker<?> broker,
- SSLContext sslContext,
- boolean wantClientAuth,
- boolean needClientAuth,
final Set<Protocol> supportedVersions,
final Protocol defaultSupportedReply,
AmqpPort<?> port,
@@ -73,7 +64,6 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
}
_broker = broker;
- _sslContext = sslContext;
_supported = supportedVersions;
_defaultSupportedReply = defaultSupportedReply;
final List<ProtocolEngineCreator> creators = new ArrayList<ProtocolEngineCreator>();
@@ -83,18 +73,16 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
}
Collections.sort(creators, new ProtocolEngineCreatorComparator());
_creators = creators.toArray(new ProtocolEngineCreator[creators.size()]);
- _wantClientAuth = wantClientAuth;
- _needClientAuth = needClientAuth;
_port = port;
_transport = transport;
}
- public ServerProtocolEngine newProtocolEngine(final SocketAddress remoteSocketAddress)
+ public MultiVersionProtocolEngine newProtocolEngine(final SocketAddress remoteSocketAddress)
{
if(_port.canAcceptNewConnection(remoteSocketAddress))
{
_port.incrementConnectionCount();
- return new MultiVersionProtocolEngine(_broker, _sslContext, _wantClientAuth, _needClientAuth,
+ return new MultiVersionProtocolEngine(_broker,
_supported, _defaultSupportedReply, _port, _transport,
ID_GENERATOR.getAndIncrement(),
_creators, _connectionCountDecrementingTask);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
new file mode 100644
index 0000000000..eba1f78ad0
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import javax.security.auth.Subject;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.server.util.Action;
+
+public interface ServerProtocolEngine extends ProtocolEngine
+{
+ /**
+ * Gets the connection ID associated with this ProtocolEngine
+ */
+ long getConnectionId();
+
+ Subject getSubject();
+
+ boolean isTransportBlockedForWriting();
+
+ void setTransportBlockedForWriting(boolean blocked);
+
+ void setMessageAssignmentSuspended(boolean value);
+
+ boolean isMessageAssignmentSuspended();
+
+ void processPending();
+
+ boolean hasWork();
+
+ void clearWork();
+
+ void notifyWork();
+
+ void setWorkListener(Action<ServerProtocolEngine> listener);
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 04d5fef462..2a51657f60 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -43,11 +43,15 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.configuration.updater.Task;
+import org.apache.qpid.server.configuration.updater.TaskWithException;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
@@ -96,6 +100,7 @@ import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.transport.TransportException;
public abstract class AbstractQueue<X extends AbstractQueue<X>>
@@ -642,16 +647,51 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
@Override
- public synchronized QueueConsumerImpl addConsumer(final ConsumerTarget target,
- FilterManager filters,
+ public QueueConsumerImpl addConsumer(final ConsumerTarget target,
+ final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
final String consumerName,
- EnumSet<ConsumerImpl.Option> optionSet)
+ final EnumSet<ConsumerImpl.Option> optionSet)
throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
ConsumerAccessRefused
{
+ try
+ {
+ return getTaskExecutor().run(new TaskWithException<QueueConsumerImpl, Exception>()
+ {
+
+ @Override
+ public QueueConsumerImpl execute()
+ throws Exception
+ {
+
+ return addConsumerInternal(target, filters, messageClass, consumerName, optionSet);
+ }
+ });
+ }
+ catch (ExistingExclusiveConsumer | ConsumerAccessRefused |
+ ExistingConsumerPreventsExclusive | RuntimeException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ // Should never happen
+ throw new ServerScopedRuntimeException(e);
+ }
+
+ }
+
+ private QueueConsumerImpl addConsumerInternal(final ConsumerTarget target,
+ FilterManager filters,
+ final Class<? extends ServerMessage> messageClass,
+ final String consumerName,
+ EnumSet<ConsumerImpl.Option> optionSet)
+ throws ExistingExclusiveConsumer, ConsumerAccessRefused,
+ ExistingConsumerPreventsExclusive
+ {
if (hasExclusiveConsumer())
{
throw new ExistingExclusiveConsumer();
@@ -763,7 +803,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
QueueConsumerImpl consumer = new QueueConsumerImpl(this,
target,
consumerName,
- filters,
+ filters,
messageClass,
optionSet);
@@ -812,19 +852,18 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
deliverAsync();
return consumer;
-
}
@Override
- protected void beforeClose()
+ protected ListenableFuture<Void> beforeClose()
{
_closing = true;
- super.beforeClose();
+ return super.beforeClose();
}
- synchronized void unregisterConsumer(final QueueConsumerImpl consumer)
+ void unregisterConsumer(final QueueConsumerImpl consumer)
{
if (consumer == null)
{
@@ -835,7 +874,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
if (removed)
{
- consumer.close();
+ consumer.closeAsync();
// No longer can the queue have an exclusive consumer
setExclusiveSubscriber(null);
@@ -1219,10 +1258,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
else
{
deliverMessage(sub, entry, false);
- if(sub.acquires())
- {
- entry.unlockAcquisition();
- }
}
}
}
@@ -1798,7 +1833,15 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
for (BindingImpl b : bindingCopy)
{
- b.delete();
+ // TODO - RG - Need to sort out bindings!
+ if(getTaskExecutor().isTaskExecutorThread())
+ {
+ b.deleteAsync();
+ }
+ else
+ {
+ b.delete();
+ }
}
QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
@@ -1851,7 +1894,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
_deleteTaskList.clear();
- close();
+ closeAsync();
deleted();
//Log Queue Deletion
getEventLogger().message(_logSubject, QueueMessages.DELETED());
@@ -2050,10 +2093,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
else
{
deliverMessage(sub, node, batch);
- if(sub.acquires())
- {
- node.unlockAcquisition();
- }
}
}
@@ -2221,7 +2260,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
if (consumerDone)
{
sub.flushBatched();
- if (lastLoop && getNextAvailableEntry(sub) == null)
+ boolean noMore = getNextAvailableEntry(sub) == null;
+ if (lastLoop && noMore)
{
sub.queueEmpty();
}
@@ -2595,7 +2635,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
if (_virtualHost.getState() != State.ACTIVE)
{
- throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent");
+ throw new VirtualHostUnavailableException(this._virtualHost);
}
if(!message.isReferenced(this))
@@ -2660,7 +2700,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
return allowed;
}
- private synchronized void updateExclusivityPolicy(ExclusivityPolicy desiredPolicy)
+ private void updateExclusivityPolicy(ExclusivityPolicy desiredPolicy)
throws ExistingConsumerPreventsExclusive
{
if(desiredPolicy == null)
@@ -2862,24 +2902,27 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
//=============
@StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE)
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.UNINITIALIZED, desiredState = State.DELETED)
- private void doDeleteBeforeInitialize()
+ private ListenableFuture<Void> doDeleteBeforeInitialize()
{
preSetAlternateExchange();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
_virtualHost.removeQueue(this);
preSetAlternateExchange();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
index a2c275e797..c459737c46 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
@@ -52,5 +52,4 @@ public interface QueueConsumer<X extends QueueConsumer<X>> extends ConsumerImpl,
QueueContext getQueueContext();
- ConsumerTarget getTarget();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index 12ab353c8a..b409b638b3 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -198,7 +198,7 @@ class QueueConsumerImpl
if(newState == ConsumerTarget.State.CLOSED && oldState != newState && !_closed.get())
{
- close();
+ closeAsync();
}
final StateChangeListener<? super QueueConsumerImpl, State> stateListener = getStateListener();
if(stateListener != null)
@@ -323,6 +323,7 @@ class QueueConsumerImpl
public final void flush()
{
_queue.flushConsumer(this);
+ _target.processPending();
}
public boolean resend(final QueueEntry entry)
@@ -514,6 +515,7 @@ class QueueConsumerImpl
return _selector;
}
+
@Override
public String toLogString()
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
index 19265ef453..b9ff6505fc 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.queue;
import java.util.Map;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.Port;
@@ -49,6 +51,14 @@ public class QueueFactory<X extends Queue<X>> implements ConfiguredObjectTypeFa
}
@Override
+ public ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory,
+ final Map<String, Object> attributes,
+ final ConfiguredObject<?>... parents)
+ {
+ return getQueueFactory(factory, attributes).createAsync(factory, attributes, parents);
+ }
+
+ @Override
public UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory factory,
final ConfiguredObjectRecord record,
final ConfiguredObject<?>... parents)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java
index bf648186d2..aedfb3670a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java
@@ -37,6 +37,9 @@ import java.util.Set;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Broker;
@@ -96,7 +99,7 @@ public class FileKeyStoreImpl extends AbstractConfiguredObject<FileKeyStoreImpl>
}
@StateTransition(currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.DELETED)
- protected void doDelete()
+ protected ListenableFuture<Void> doDelete()
{
// verify that it is not in use
String storeName = getName();
@@ -111,12 +114,14 @@ public class FileKeyStoreImpl extends AbstractConfiguredObject<FileKeyStoreImpl>
}
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE)
- protected void doActivate()
+ protected ListenableFuture<Void> doActivate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java
index b53dcf9ea1..ce5a394591 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java
@@ -37,6 +37,9 @@ import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.AuthenticationProvider;
@@ -96,7 +99,7 @@ public class FileTrustStoreImpl extends AbstractConfiguredObject<FileTrustStoreI
}
@StateTransition(currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.DELETED)
- protected void doDelete()
+ protected ListenableFuture<Void> doDelete()
{
// verify that it is not in use
String storeName = getName();
@@ -137,12 +140,14 @@ public class FileTrustStoreImpl extends AbstractConfiguredObject<FileTrustStoreI
}
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE)
- protected void doActivate()
+ protected ListenableFuture<Void> doActivate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java
index 0c68cb467e..ea6a0394f0 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java
@@ -56,6 +56,8 @@ import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.xml.bind.DatatypeConverter;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -181,7 +183,7 @@ public class NonJavaKeyStoreImpl extends AbstractConfiguredObject<NonJavaKeyStor
}
@StateTransition(currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.DELETED)
- protected void doDelete()
+ protected ListenableFuture<Void> doDelete()
{
// verify that it is not in use
String storeName = getName();
@@ -199,12 +201,14 @@ public class NonJavaKeyStoreImpl extends AbstractConfiguredObject<NonJavaKeyStor
}
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE)
- protected void doActivate()
+ protected ListenableFuture<Void> doActivate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java
index bd46b76a66..c18189785d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java
@@ -44,6 +44,8 @@ import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.security.auth.x500.X500Principal;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -169,7 +171,7 @@ public class NonJavaTrustStoreImpl
}
@StateTransition(currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.DELETED)
- protected void doDelete()
+ protected ListenableFuture<Void> doDelete()
{
// verify that it is not in use
String storeName = getName();
@@ -212,12 +214,14 @@ public class NonJavaTrustStoreImpl
}
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE)
- protected void doActivate()
+ protected ListenableFuture<Void> doActivate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
index e0eb083bd4..3bd44a92ea 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
@@ -32,6 +32,7 @@ import java.security.AccessController;
import java.security.Principal;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java
index 88a761fe19..255457a846 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java
@@ -27,6 +27,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -151,13 +154,14 @@ public abstract class AbstractAuthenticationManager<T extends AbstractAuthentica
}
@StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED )
- protected void startQuiesced()
+ protected ListenableFuture<Void> startQuiesced()
{
setState(State.QUIESCED);
+ return Futures.immediateFuture(null);
}
@StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.QUIESCED }, desiredState = State.ACTIVE )
- protected void activate()
+ protected ListenableFuture<Void> activate()
{
try
{
@@ -175,11 +179,11 @@ public abstract class AbstractAuthenticationManager<T extends AbstractAuthentica
throw e;
}
}
-
+ return Futures.immediateFuture(null);
}
@StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED)
- protected void doDelete()
+ protected ListenableFuture<Void> doDelete()
{
String providerName = getName();
@@ -195,15 +199,50 @@ public abstract class AbstractAuthenticationManager<T extends AbstractAuthentica
}
}
- close();
- if (_preferencesProvider != null)
- {
- _preferencesProvider.delete();
- }
- deleted();
+ final SettableFuture<Void> returnVal = SettableFuture.create();
- setState(State.DELETED);
+ final ListenableFuture<Void> future = closeAsync();
+ future.addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ if (_preferencesProvider != null)
+ {
+ _preferencesProvider.deleteAsync().addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ deleted();
+ setState(State.DELETED);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor());
+ }
+ else
+ {
+ try
+ {
+ deleted();
+
+ setState(State.DELETED);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }
+ }, getTaskExecutor().getExecutor());
+ return returnVal;
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java
index 7773d9e98d..50a2a36130 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java
@@ -92,22 +92,15 @@ public abstract class ConfigModelPasswordManagingAuthenticationProvider<X extend
@Override
public void deleteUser(final String user) throws AccountNotFoundException
{
- runTask(new VoidTaskWithException<AccountNotFoundException>()
+ final ManagedUser authUser = getUser(user);
+ if(authUser != null)
{
- @Override
- public void execute() throws AccountNotFoundException
- {
- final ManagedUser authUser = getUser(user);
- if(authUser != null)
- {
- authUser.delete();
- }
- else
- {
- throw new AccountNotFoundException("No such user: '" + user + "'");
- }
- }
- });
+ authUser.delete();
+ }
+ else
+ {
+ throw new AccountNotFoundException("No such user: '" + user + "'");
+ }
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java
index b317b93d71..db69445c41 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java
@@ -27,6 +27,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.configuration.updater.VoidTask;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObject;
@@ -85,10 +88,11 @@ class ManagedUser extends AbstractConfiguredObject<ManagedUser> implements User<
}
@StateTransition(currentState = {State.ACTIVE}, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
_authenticationManager.getUserMap().remove(getName());
deleted();
+ return Futures.immediateFuture(null);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
index 0fb8938233..0fcab33f5d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
@@ -40,6 +40,9 @@ import javax.security.auth.login.AccountNotFoundException;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.BrokerProperties;
@@ -119,16 +122,9 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal
super.onOpen();
_principalDatabase = createDatabase();
initialise();
- List<Principal> users = _principalDatabase == null ? Collections.<Principal>emptyList() : _principalDatabase.getUsers();
- for (Principal user : users)
- {
- PrincipalAdapter principalAdapter = new PrincipalAdapter(user);
- principalAdapter.registerWithParents();
- principalAdapter.open();
- _userMap.put(user, principalAdapter);
- }
}
+
protected abstract PrincipalDatabase createDatabase();
@@ -217,9 +213,44 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal
return _principalDatabase;
}
+ @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE)
+ public ListenableFuture<Void> activate()
+ {
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ final List<Principal> users = _principalDatabase == null ? Collections.<Principal>emptyList() : _principalDatabase.getUsers();
+ _userMap.clear();
+ if(!users.isEmpty())
+ {
+ for (final Principal user : users)
+ {
+ final PrincipalAdapter principalAdapter = new PrincipalAdapter(user);
+ principalAdapter.registerWithParents();
+ principalAdapter.openAsync().addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ _userMap.put(user, principalAdapter);
+ if (_userMap.size() == users.size())
+ {
+ setState(State.ACTIVE);
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor());
+
+ }
- @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED)
- public void doDelete()
+ return returnVal;
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
+ }
+ }
+
+ @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED, State.UNINITIALIZED}, desiredState = State.DELETED)
+ public ListenableFuture<Void> doDelete()
{
File file = new File(_path);
if (file.exists() && file.isFile())
@@ -228,6 +259,7 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal
}
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@Override
@@ -479,13 +511,14 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal
}
@StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE)
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
try
{
@@ -503,7 +536,7 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal
{
LOGGER.warn("Failed to delete user " + _user, e);
}
-
+ return Futures.immediateFuture(null);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java
index 98607d2490..96d32f4179 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java
@@ -22,6 +22,9 @@ package org.apache.qpid.server.security.group;
import java.util.Map;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Group;
@@ -77,16 +80,18 @@ public class GroupImpl extends AbstractConfiguredObject<GroupImpl> implements Gr
@StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE )
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = {State.ACTIVE}, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
deleted();
+ return Futures.immediateFuture(null);
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java
index ea17db6ce7..a86d380b2b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java
@@ -23,6 +23,9 @@ package org.apache.qpid.server.security.group;
import java.security.Principal;
import java.util.Map;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Group;
import org.apache.qpid.server.model.GroupMember;
@@ -61,15 +64,17 @@ public class GroupMemberImpl extends AbstractConfiguredObject<GroupMemberImpl> i
@StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE )
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = {State.ACTIVE}, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
deleted();
+ return Futures.immediateFuture(null);
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java
index ecc166f8fc..7dc032cc90 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java
@@ -26,6 +26,9 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
@@ -89,16 +92,18 @@ public class GroupProviderImpl extends AbstractConfiguredObject<GroupProviderImp
}
@StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE )
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = {State.ACTIVE}, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
deleted();
+ return Futures.immediateFuture(null);
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index 4dfaa716cf..5868ae61c5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -45,6 +45,7 @@ import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
+import org.apache.qpid.server.util.FutureResult;
public abstract class AbstractJDBCMessageStore implements MessageStore
{
@@ -834,10 +835,10 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
}
- private StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws StoreException
+ private FutureResult commitTranAsync(ConnectionWrapper connWrapper) throws StoreException
{
commitTran(connWrapper);
- return StoreFuture.IMMEDIATE_FUTURE;
+ return FutureResult.IMMEDIATE_FUTURE;
}
private void abortTran(ConnectionWrapper connWrapper) throws StoreException
@@ -1231,14 +1232,14 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
@Override
- public StoreFuture commitTranAsync()
+ public FutureResult commitTranAsync()
{
checkMessageStoreOpen();
doPreCommitActions();
- StoreFuture storeFuture = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper);
+ FutureResult futureResult = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper);
storedSizeChange(_storeSizeIncrease);
doPostCommitActions();
- return storeFuture;
+ return futureResult;
}
private void doPreCommitActions()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index efe040fbb3..eb887b4ef5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
+import org.apache.qpid.server.util.FutureResult;
/** A simple message store that stores the messages in a thread-safe structure in memory. */
public class MemoryMessageStore implements MessageStore
@@ -58,9 +59,9 @@ public class MemoryMessageStore implements MessageStore
private Set<Xid> _localDistributedTransactionsRemoves = new HashSet<Xid>();
@Override
- public StoreFuture commitTranAsync()
+ public FutureResult commitTranAsync()
{
- return StoreFuture.IMMEDIATE_FUTURE;
+ return FutureResult.IMMEDIATE_FUTURE;
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
index 6f7afccac0..007f3ab796 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.store;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.util.FutureResult;
public interface Transaction
{
@@ -53,7 +54,7 @@ public interface Transaction
* Commits all operations performed within a given transactional context.
*
*/
- StoreFuture commitTranAsync();
+ FutureResult commitTranAsync();
/**
* Abandons all operations performed within a given transactional context.
@@ -72,4 +73,4 @@ public interface Transaction
void recordXid(long format, byte[] globalId, byte[] branchId, Transaction.Record[] enqueues,
Transaction.Record[] dequeues);
-} \ No newline at end of file
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
new file mode 100644
index 0000000000..ae5816a0d1
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
@@ -0,0 +1,642 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLPeerUnverifiedException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.Ticker;
+import org.apache.qpid.transport.network.TransportEncryption;
+import org.apache.qpid.transport.network.security.ssl.SSLUtil;
+import org.apache.qpid.util.SystemUtils;
+
+public class NonBlockingConnection implements NetworkConnection, ByteBufferSender
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class);
+ private final SocketChannel _socketChannel;
+ private final long _timeout;
+ private final Ticker _ticker;
+ private final SelectorThread _selector;
+ private int _maxReadIdle;
+ private int _maxWriteIdle;
+ private Principal _principal;
+ private boolean _principalChecked;
+ private final Object _lock = new Object();
+
+ public static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
+
+ private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
+ private final List<ByteBuffer> _encryptedOutput = new ArrayList<>();
+
+ private final String _remoteSocketAddress;
+ private final AtomicBoolean _closed = new AtomicBoolean(false);
+ private final ServerProtocolEngine _protocolEngine;
+ private final int _receiveBufSize;
+ private final Set<TransportEncryption> _encryptionSet;
+ private final SSLContext _sslContext;
+ private final Runnable _onTransportEncryptionAction;
+ private ByteBuffer _netInputBuffer;
+ private SSLEngine _sslEngine;
+
+ private ByteBuffer _currentBuffer;
+
+ private TransportEncryption _transportEncryption;
+ private SSLEngineResult _status;
+ private volatile boolean _fullyWritten = true;
+ private boolean _workDone;
+
+
+ public NonBlockingConnection(SocketChannel socketChannel,
+ ServerProtocolEngine delegate,
+ int sendBufferSize,
+ int receiveBufferSize,
+ long timeout,
+ Ticker ticker,
+ final Set<TransportEncryption> encryptionSet,
+ final SSLContext sslContext,
+ final boolean wantClientAuth,
+ final boolean needClientAuth,
+ final Collection<String> enabledCipherSuites,
+ final Collection<String> disabledCipherSuites,
+ final Runnable onTransportEncryptionAction,
+ final SelectorThread selectorThread)
+ {
+ _socketChannel = socketChannel;
+ _timeout = timeout;
+ _ticker = ticker;
+ _selector = selectorThread;
+
+ _protocolEngine = delegate;
+ _receiveBufSize = receiveBufferSize;
+ _encryptionSet = encryptionSet;
+ _sslContext = sslContext;
+ _onTransportEncryptionAction = onTransportEncryptionAction;
+
+ delegate.setWorkListener(new Action<ServerProtocolEngine>()
+ {
+ @Override
+ public void performAction(final ServerProtocolEngine object)
+ {
+ _selector.wakeup();
+ }
+ });
+
+ if(encryptionSet.size() == 1)
+ {
+ _transportEncryption = _encryptionSet.iterator().next();
+ if (_transportEncryption == TransportEncryption.TLS)
+ {
+ onTransportEncryptionAction.run();
+ }
+ }
+
+ if(encryptionSet.contains(TransportEncryption.TLS))
+ {
+ _sslEngine = _sslContext.createSSLEngine();
+ _sslEngine.setUseClientMode(false);
+ SSLUtil.removeSSLv3Support(_sslEngine);
+ SSLUtil.updateEnabledCipherSuites(_sslEngine, enabledCipherSuites, disabledCipherSuites);
+
+ if(needClientAuth)
+ {
+ _sslEngine.setNeedClientAuth(true);
+ }
+ else if(wantClientAuth)
+ {
+ _sslEngine.setWantClientAuth(true);
+ }
+ _netInputBuffer = ByteBuffer.allocate(Math.max(_sslEngine.getSession().getPacketBufferSize(), _receiveBufSize * 2));
+ }
+
+ try
+ {
+ _remoteSocketAddress = _socketChannel.getRemoteAddress().toString();
+ _socketChannel.configureBlocking(false);
+ }
+ catch (IOException e)
+ {
+ throw new SenderException("Unable to prepare the channel for non-blocking IO", e);
+ }
+
+
+ }
+
+
+ public Ticker getTicker()
+ {
+ return _ticker;
+ }
+
+ public SocketChannel getSocketChannel()
+ {
+ return _socketChannel;
+ }
+
+ public void start()
+ {
+ }
+
+ public ByteBufferSender getSender()
+ {
+ return this;
+ }
+
+ public void close()
+ {
+ LOGGER.debug("Closing " + _remoteSocketAddress);
+ if(_closed.compareAndSet(false,true))
+ {
+ _protocolEngine.notifyWork();
+ getSelector().wakeup();
+ }
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _socketChannel.socket().getRemoteSocketAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _socketChannel.socket().getLocalSocketAddress();
+ }
+
+ public void setMaxWriteIdle(int sec)
+ {
+ _maxWriteIdle = sec;
+ }
+
+ public void setMaxReadIdle(int sec)
+ {
+ _maxReadIdle = sec;
+ }
+
+ @Override
+ public Principal getPeerPrincipal()
+ {
+ synchronized (_lock)
+ {
+ if(!_principalChecked)
+ {
+ if (_sslEngine != null)
+ {
+ try
+ {
+ _principal = _sslEngine.getSession().getPeerPrincipal();
+ }
+ catch (SSLPeerUnverifiedException e)
+ {
+ return null;
+ }
+ }
+
+ _principalChecked = true;
+ }
+
+ return _principal;
+ }
+ }
+
+ @Override
+ public int getMaxReadIdle()
+ {
+ return _maxReadIdle;
+ }
+
+ @Override
+ public int getMaxWriteIdle()
+ {
+ return _maxWriteIdle;
+ }
+
+ public boolean canRead()
+ {
+ return true;
+ }
+
+ public boolean waitingForWrite()
+ {
+ return !_fullyWritten;
+ }
+
+ public boolean isStateChanged()
+ {
+
+ return _protocolEngine.hasWork();
+ }
+
+ public boolean doWork()
+ {
+ _protocolEngine.clearWork();
+ final boolean closed = _closed.get();
+ if (!closed)
+ {
+ try
+ {
+ _workDone = false;
+
+ long currentTime = System.currentTimeMillis();
+ int tick = _ticker.getTimeToNextTick(currentTime);
+ if (tick <= 0)
+ {
+ _ticker.tick(currentTime);
+ }
+
+ _protocolEngine.setMessageAssignmentSuspended(true);
+
+ _protocolEngine.processPending();
+
+ _protocolEngine.setTransportBlockedForWriting(!doWrite());
+ boolean dataRead = doRead();
+ _fullyWritten = doWrite();
+ _protocolEngine.setTransportBlockedForWriting(!_fullyWritten);
+
+ if(dataRead || (_workDone && _netInputBuffer != null && _netInputBuffer.position() != 0))
+ {
+ _protocolEngine.notifyWork();
+ }
+
+ // tell all consumer targets that it is okay to accept more
+ _protocolEngine.setMessageAssignmentSuspended(false);
+ }
+ catch (IOException e)
+ {
+ LOGGER.info("Exception performing I/O for thread '" + _remoteSocketAddress + "': " + e);
+ LOGGER.debug("Closing " + _remoteSocketAddress);
+ if(_closed.compareAndSet(false,true))
+ {
+ _protocolEngine.notifyWork();
+ }
+ }
+ }
+ else
+ {
+
+ if(!SystemUtils.isWindows())
+ {
+ try
+ {
+ _socketChannel.shutdownInput();
+ }
+ catch (IOException e)
+ {
+ LOGGER.info("Exception shutting down input for thread '" + _remoteSocketAddress + "': " + e);
+
+ }
+ }
+ try
+ {
+ while(!doWrite())
+ {
+ }
+ }
+ catch (IOException e)
+ {
+ LOGGER.info("Exception performing final write/close for thread '" + _remoteSocketAddress + "': " + e);
+
+ }
+ LOGGER.debug("Closing receiver");
+ _protocolEngine.closed();
+
+ try
+ {
+ if(!SystemUtils.isWindows())
+ {
+ _socketChannel.shutdownOutput();
+ }
+
+ _socketChannel.close();
+ }
+ catch (IOException e)
+ {
+ LOGGER.info("Exception closing socket thread '" + _remoteSocketAddress + "': " + e);
+ }
+ }
+
+ return closed;
+
+ }
+
+ public SelectorThread getSelector()
+ {
+ return _selector;
+ }
+
+ public boolean looksLikeSSLv2ClientHello(final byte[] headerBytes)
+ {
+ return headerBytes[0] == -128 &&
+ headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x
+ (headerBytes[4] == 0 || // SSL 3.0
+ headerBytes[4] == 1 || // TLS 1.0
+ headerBytes[4] == 2 || // TLS 1.1
+ headerBytes[4] == 3);
+ }
+
+ public boolean doRead() throws IOException
+ {
+ boolean readData = false;
+ if(_transportEncryption == TransportEncryption.NONE)
+ {
+ int remaining = 0;
+ while (remaining == 0 && !_closed.get())
+ {
+ if (_currentBuffer == null || _currentBuffer.remaining() == 0)
+ {
+ _currentBuffer = ByteBuffer.allocate(_receiveBufSize);
+ }
+ int read = _socketChannel.read(_currentBuffer);
+ if(read > 0)
+ {
+ readData = true;
+ }
+ if (read == -1)
+ {
+ _closed.set(true);
+ }
+ remaining = _currentBuffer.remaining();
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Read " + read + " byte(s)");
+ }
+ ByteBuffer dup = _currentBuffer.duplicate();
+ dup.flip();
+ _currentBuffer = _currentBuffer.slice();
+ _protocolEngine.received(dup);
+ }
+ }
+ else if(_transportEncryption == TransportEncryption.TLS)
+ {
+ int read = 1;
+ while(!_closed.get() && read > 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || _status.getStatus() != SSLEngineResult.Status.CLOSED))
+ {
+ read = _socketChannel.read(_netInputBuffer);
+ if (read == -1)
+ {
+ _closed.set(true);
+ }
+ else if(read > 0)
+ {
+ readData = true;
+ }
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Read " + read + " encrypted bytes ");
+ }
+
+ _netInputBuffer.flip();
+
+
+ int unwrapped = 0;
+ boolean tasksRun;
+ do
+ {
+ ByteBuffer appInputBuffer =
+ ByteBuffer.allocate(_sslEngine.getSession().getApplicationBufferSize() + 50);
+
+ _status = _sslEngine.unwrap(_netInputBuffer, appInputBuffer);
+ tasksRun = runSSLEngineTasks(_status);
+
+ appInputBuffer.flip();
+ unwrapped = appInputBuffer.remaining();
+ if(unwrapped > 0)
+ {
+ readData = true;
+ }
+ _protocolEngine.received(appInputBuffer);
+ }
+ while(unwrapped > 0 || tasksRun);
+
+ _netInputBuffer.compact();
+
+ }
+ }
+ else
+ {
+ int read = 1;
+ while (!_closed.get() && read > 0)
+ {
+
+ read = _socketChannel.read(_netInputBuffer);
+ if (read == -1)
+ {
+ _closed.set(true);
+ }
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Read " + read + " possibly encrypted bytes " + _netInputBuffer);
+ }
+
+ if (_netInputBuffer.position() >= NUMBER_OF_BYTES_FOR_TLS_CHECK)
+ {
+ _netInputBuffer.flip();
+ final byte[] headerBytes = new byte[NUMBER_OF_BYTES_FOR_TLS_CHECK];
+ ByteBuffer dup = _netInputBuffer.duplicate();
+ dup.get(headerBytes);
+
+ _transportEncryption = looksLikeSSL(headerBytes) ? TransportEncryption.TLS : TransportEncryption.NONE;
+ LOGGER.debug("Identified transport encryption as " + _transportEncryption);
+
+ if (_transportEncryption == TransportEncryption.NONE)
+ {
+ _protocolEngine.received(_netInputBuffer);
+ }
+ else
+ {
+ _onTransportEncryptionAction.run();
+ _netInputBuffer.compact();
+ readData = doRead();
+ }
+ break;
+ }
+ }
+ }
+ return readData;
+ }
+
+ public boolean doWrite() throws IOException
+ {
+
+ ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()];
+ Iterator<ByteBuffer> bufferIterator = _buffers.iterator();
+ for (int i = 0; i < bufArray.length; i++)
+ {
+ bufArray[i] = bufferIterator.next();
+ }
+
+ int byteBuffersWritten = 0;
+
+ if(_transportEncryption == TransportEncryption.NONE)
+ {
+
+
+ long written = _socketChannel.write(bufArray);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Written " + written + " bytes");
+ }
+
+ for (ByteBuffer buf : bufArray)
+ {
+ if (buf.remaining() == 0)
+ {
+ byteBuffersWritten++;
+ _buffers.poll();
+ }
+ }
+
+
+ return bufArray.length == byteBuffersWritten;
+ }
+ else if(_transportEncryption == TransportEncryption.TLS)
+ {
+ int remaining = 0;
+ do
+ {
+ if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP)
+ {
+ _workDone = true;
+ final ByteBuffer netBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize());
+ _status = _sslEngine.wrap(bufArray, netBuffer);
+ runSSLEngineTasks(_status);
+
+ netBuffer.flip();
+ remaining = netBuffer.remaining();
+ if (remaining != 0)
+ {
+ _encryptedOutput.add(netBuffer);
+ }
+ for (ByteBuffer buf : bufArray)
+ {
+ if (buf.remaining() == 0)
+ {
+ byteBuffersWritten++;
+ _buffers.poll();
+ }
+ }
+ }
+
+ }
+ while(remaining != 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP);
+ ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new ByteBuffer[_encryptedOutput.size()]);
+ long written = _socketChannel.write(encryptedBuffers);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Written " + written + " encrypted bytes");
+ }
+ ListIterator<ByteBuffer> iter = _encryptedOutput.listIterator();
+ while(iter.hasNext())
+ {
+ ByteBuffer buf = iter.next();
+ if(buf.remaining() == 0)
+ {
+ iter.remove();
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ return bufArray.length == byteBuffersWritten;
+
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ public boolean looksLikeSSLv3ClientHello(final byte[] headerBytes)
+ {
+ return headerBytes[0] == 22 && // SSL Handshake
+ (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x
+ (headerBytes[2] == 0 || // SSL 3.0
+ headerBytes[2] == 1 || // TLS 1.0
+ headerBytes[2] == 2 || // TLS 1.1
+ headerBytes[2] == 3)) && // TLS1.2
+ (headerBytes[5] == 1); // client_hello
+ }
+
+ public boolean runSSLEngineTasks(final SSLEngineResult status)
+ {
+ if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK)
+ {
+ Runnable task;
+ while((task = _sslEngine.getDelegatedTask()) != null)
+ {
+ task.run();
+ }
+ return true;
+ }
+ return false;
+ }
+
+ public boolean looksLikeSSL(final byte[] headerBytes)
+ {
+ return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes);
+ }
+
+ @Override
+ public void send(final ByteBuffer msg)
+ {
+ assert Thread.currentThread().getName().startsWith(SelectorThread.IO_THREAD_NAME_PREFIX) : "Send called by unexpected thread " + Thread.currentThread().getName();
+
+ if (_closed.get())
+ {
+ LOGGER.warn("Send ignored as the connection is already closed");
+ }
+ else
+ {
+ _buffers.add(msg);
+ _protocolEngine.notifyWork();
+ }
+ }
+
+ @Override
+ public void flush()
+ {
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
new file mode 100644
index 0000000000..79313712a5
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
@@ -0,0 +1,187 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Set;
+
+import javax.net.ssl.SSLContext;
+
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.configuration.CommonProperties;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.TransportEncryption;
+import org.apache.qpid.transport.network.io.AbstractNetworkTransport;
+import org.apache.qpid.transport.network.io.IdleTimeoutTicker;
+
+public class NonBlockingNetworkTransport
+{
+
+ private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
+ private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
+ CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
+ private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
+ CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
+ private SelectorThread _selector;
+
+
+ private Set<TransportEncryption> _encryptionSet;
+ private volatile boolean _closed = false;
+ private NetworkTransportConfiguration _config;
+ private ProtocolEngineFactory _factory;
+ private SSLContext _sslContext;
+ private ServerSocketChannel _serverSocket;
+ private int _timeout;
+
+ public void close()
+ {
+ if(_selector != null)
+ {
+ try
+ {
+ if (_serverSocket != null)
+ {
+ _selector.cancelAcceptingSocket(_serverSocket);
+ _serverSocket.close();
+ }
+ }
+ catch (IOException e)
+ {
+ // TODO
+ e.printStackTrace();
+ }
+ finally
+ {
+
+ _selector.close();
+ }
+ }
+ }
+
+ public void accept(NetworkTransportConfiguration config,
+ ProtocolEngineFactory factory,
+ SSLContext sslContext,
+ final Set<TransportEncryption> encryptionSet)
+ {
+ try
+ {
+
+ _config = config;
+ _factory = factory;
+ _sslContext = sslContext;
+ _timeout = TIMEOUT;
+
+ InetSocketAddress address = config.getAddress();
+
+ _serverSocket = ServerSocketChannel.open();
+
+ _serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true);
+ _serverSocket.bind(address);
+ _serverSocket.configureBlocking(false);
+ _encryptionSet = encryptionSet;
+
+ _selector = new SelectorThread(config.getAddress().toString(), this);
+ _selector.start();
+ _selector.addAcceptingSocket(_serverSocket);
+ }
+ catch (IOException e)
+ {
+ throw new TransportException("Failed to start AMQP on port : " + config, e);
+ }
+
+
+ }
+
+ public int getAcceptingPort()
+ {
+ return _serverSocket == null ? -1 : _serverSocket.socket().getLocalPort();
+ }
+
+ public void acceptSocketChannel(final SocketChannel socketChannel) throws IOException
+ {
+ final ServerProtocolEngine engine =
+ (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket()
+ .getRemoteSocketAddress());
+
+ if(engine != null)
+ {
+ socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay());
+ socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
+
+ final Integer sendBufferSize = _config.getSendBufferSize();
+ final Integer receiveBufferSize = _config.getReceiveBufferSize();
+
+ socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
+ socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
+
+
+ final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
+
+ NonBlockingConnection connection =
+ new NonBlockingConnection(socketChannel,
+ engine,
+ sendBufferSize,
+ receiveBufferSize,
+ _timeout,
+ ticker,
+ _encryptionSet,
+ _sslContext,
+ _config.wantClientAuth(),
+ _config.needClientAuth(),
+ _config.getEnabledCipherSuites(),
+ _config.getDisabledCipherSuites(),
+ new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ engine.encryptedTransport();
+ }
+ },
+ _selector);
+
+ engine.setNetworkConnection(connection, connection.getSender());
+ connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
+
+ ticker.setConnection(connection);
+
+ connection.start();
+
+ _selector.addConnection(connection);
+
+ }
+ else
+ {
+ socketChannel.close();
+ }
+ }
+
+
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
new file mode 100644
index 0000000000..774888e934
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.transport;
+
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.thread.LoggingUncaughtExceptionHandler;
+
+
+public class SelectorThread extends Thread
+{
+ private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(SelectorThread.class);
+
+ public static final String IO_THREAD_NAME_PREFIX = "NCS-";
+ private final Queue<Runnable> _tasks = new ConcurrentLinkedQueue<>();
+ private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>();
+ private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>();
+ private final Selector _selector;
+ private final AtomicBoolean _closed = new AtomicBoolean();
+ private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler();
+ private final NonBlockingNetworkTransport _transport;
+
+ SelectorThread(final String name, final NonBlockingNetworkTransport nonBlockingNetworkTransport)
+ {
+ super("SelectorThread-"+name);
+ _transport = nonBlockingNetworkTransport;
+ try
+ {
+ _selector = Selector.open();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void addAcceptingSocket(final ServerSocketChannel socketChannel)
+ {
+ _tasks.add(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+
+ try
+ {
+ socketChannel.register(_selector, SelectionKey.OP_ACCEPT);
+ }
+ catch (ClosedChannelException e)
+ {
+ // TODO
+ e.printStackTrace();
+ }
+ }
+ });
+ _selector.wakeup();
+ }
+
+ public void cancelAcceptingSocket(final ServerSocketChannel socketChannel)
+ {
+ _tasks.add(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ SelectionKey selectionKey = socketChannel.keyFor(_selector);
+ if(selectionKey != null)
+ {
+ selectionKey.cancel();
+ }
+ }
+ });
+ _selector.wakeup();
+ }
+
+ @Override
+ public void run()
+ {
+
+ long nextTimeout = 0;
+
+ try
+ {
+ while (!_closed.get())
+ {
+
+ _selector.select(nextTimeout);
+
+ while(_tasks.peek() != null)
+ {
+ Runnable task = _tasks.poll();
+ task.run();
+ }
+
+ List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
+
+
+ Set<SelectionKey> selectionKeys = _selector.selectedKeys();
+ for (SelectionKey key : selectionKeys)
+ {
+ if(key.isAcceptable())
+ {
+ // todo - should we schedule this rather than running in this thread?
+ SocketChannel acceptedChannel = ((ServerSocketChannel)key.channel()).accept();
+ _transport.acceptSocketChannel(acceptedChannel);
+ }
+ else
+ {
+ NonBlockingConnection connection = (NonBlockingConnection) key.attachment();
+
+ key.channel().register(_selector, 0);
+
+ toBeScheduled.add(connection);
+ _unscheduledConnections.remove(connection);
+ }
+
+ }
+ selectionKeys.clear();
+
+ while (_unregisteredConnections.peek() != null)
+ {
+ NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll();
+ _unscheduledConnections.add(unregisteredConnection);
+
+
+ final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0)
+ | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0);
+ unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection);
+
+ }
+
+ long currentTime = System.currentTimeMillis();
+ Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator();
+ nextTimeout = Integer.MAX_VALUE;
+ while (iterator.hasNext())
+ {
+ NonBlockingConnection connection = iterator.next();
+
+ int period = connection.getTicker().getTimeToNextTick(currentTime);
+
+ if (period <= 0 || connection.isStateChanged())
+ {
+ toBeScheduled.add(connection);
+ connection.getSocketChannel().register(_selector, 0).cancel();
+ iterator.remove();
+ }
+ else
+ {
+ nextTimeout = Math.min(period, nextTimeout);
+ }
+ }
+
+ for (NonBlockingConnection connection : toBeScheduled)
+ {
+ _scheduler.schedule(connection);
+ }
+
+ }
+ }
+ catch (IOException e)
+ {
+ //TODO
+ e.printStackTrace();
+ }
+ finally
+ {
+ try
+ {
+ _selector.close();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+
+
+
+ }
+
+ public void addConnection(final NonBlockingConnection connection)
+ {
+ _unregisteredConnections.add(connection);
+ _selector.wakeup();
+
+ }
+
+ public void wakeup()
+ {
+ _selector.wakeup();
+ }
+
+ public void close()
+ {
+ _closed.set(true);
+ _selector.wakeup();
+ _scheduler.close();
+ }
+
+ private class NetworkConnectionScheduler
+ {
+ private final ScheduledThreadPoolExecutor _executor;
+ private final AtomicInteger _running = new AtomicInteger();
+ private final int _poolSize;
+
+ private NetworkConnectionScheduler()
+ {
+ _poolSize = Runtime.getRuntime().availableProcessors();
+ _executor = new ScheduledThreadPoolExecutor(_poolSize);
+ _executor.prestartAllCoreThreads();
+ }
+
+ public void processConnection(final NonBlockingConnection connection)
+ {
+ try
+ {
+ _running.incrementAndGet();
+ boolean rerun;
+ do
+ {
+ rerun = false;
+ boolean closed = connection.doWork();
+
+ if (!closed)
+ {
+
+ if (connection.isStateChanged())
+ {
+ if (_running.get() == _poolSize)
+ {
+ schedule(connection);
+ }
+ else
+ {
+ rerun = true;
+ }
+ }
+ else
+ {
+ SelectorThread.this.addConnection(connection);
+ }
+ }
+
+ } while (rerun);
+ }
+ finally
+ {
+ _running.decrementAndGet();
+ }
+ }
+
+ public void schedule(final NonBlockingConnection connection)
+ {
+ _executor.submit(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ String currentName = Thread.currentThread().getName();
+ try
+ {
+ Thread.currentThread().setName(
+ IO_THREAD_NAME_PREFIX + connection.getRemoteAddress().toString());
+ processConnection(connection);
+ }
+ finally
+ {
+ Thread.currentThread().setName(currentName);
+ }
+ }
+ });
+ }
+
+ public void close()
+ {
+ _executor.shutdown();
+ }
+
+
+
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
index 8f7a267771..7874437a2f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.transport;
import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
import java.net.InetSocketAddress;
+import java.util.EnumSet;
import java.util.Collection;
import java.util.Set;
@@ -34,11 +35,11 @@ import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
import org.apache.qpid.transport.NetworkTransportConfiguration;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.transport.network.TransportEncryption;
class TCPandSSLTransport implements AcceptingTransport
{
- private IncomingNetworkTransport _networkTransport;
+ private NonBlockingNetworkTransport _networkTransport;
private Set<Transport> _transports;
private SSLContext _sslContext;
private InetSocketAddress _bindingSocketAddress;
@@ -62,7 +63,7 @@ class TCPandSSLTransport implements AcceptingTransport
@Override
public void start()
{
- String bindingAddress = ((AmqpPort<?>)_port).getBindingAddress();
+ String bindingAddress = _port.getBindingAddress();
if (WILDCARD_ADDRESS.equals(bindingAddress))
{
bindingAddress = null;
@@ -78,17 +79,25 @@ class TCPandSSLTransport implements AcceptingTransport
}
final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration();
- _networkTransport = org.apache.qpid.transport.network.Transport.getIncomingTransportInstance();
+ _networkTransport = new NonBlockingNetworkTransport();
final MultiVersionProtocolEngineFactory protocolEngineFactory =
new MultiVersionProtocolEngineFactory(
- _port.getParent(Broker.class), _transports.contains(Transport.TCP) ? _sslContext : null,
- settings.wantClientAuth(), settings.needClientAuth(),
+ _port.getParent(Broker.class),
_supported,
_defaultSupportedProtocolReply,
_port,
_transports.contains(Transport.TCP) ? Transport.TCP : Transport.SSL);
- _networkTransport.accept(settings, protocolEngineFactory, _transports.contains(Transport.TCP) ? null : _sslContext);
+ EnumSet<TransportEncryption> encryptionSet = EnumSet.noneOf(TransportEncryption.class);
+ if(_transports.contains(Transport.TCP))
+ {
+ encryptionSet.add(TransportEncryption.NONE);
+ }
+ if(_transports.contains(Transport.SSL))
+ {
+ encryptionSet.add(TransportEncryption.TLS);
+ }
+ _networkTransport.accept(settings, protocolEngineFactory, _sslContext, encryptionSet);
}
public int getAcceptingPort()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
index 65064b015c..809c234cc6 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
@@ -30,7 +30,7 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -55,7 +55,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
public static interface FutureRecorder
{
- public void recordFuture(StoreFuture future, Action action);
+ public void recordFuture(FutureResult future, Action action);
}
@@ -83,7 +83,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
*/
public void addPostTransactionAction(final Action immediateAction)
{
- addFuture(StoreFuture.IMMEDIATE_FUTURE, immediateAction);
+ addFuture(FutureResult.IMMEDIATE_FUTURE, immediateAction);
}
@@ -92,7 +92,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
Transaction txn = null;
try
{
- StoreFuture future;
+ FutureResult future;
if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
@@ -108,7 +108,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
else
{
- future = StoreFuture.IMMEDIATE_FUTURE;
+ future = FutureResult.IMMEDIATE_FUTURE;
}
addFuture(future, postTransactionAction);
postTransactionAction = null;
@@ -120,7 +120,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
- private void addFuture(final StoreFuture future, final Action action)
+ private void addFuture(final FutureResult future, final Action action)
{
if(action != null)
{
@@ -135,7 +135,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
}
- private void addEnqueueFuture(final StoreFuture future, final Action action, boolean persistent)
+ private void addEnqueueFuture(final FutureResult future, final Action action, boolean persistent)
{
if(action != null)
{
@@ -178,7 +178,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
}
- StoreFuture future;
+ FutureResult future;
if(txn != null)
{
future = txn.commitTranAsync();
@@ -186,7 +186,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
else
{
- future = StoreFuture.IMMEDIATE_FUTURE;
+ future = FutureResult.IMMEDIATE_FUTURE;
}
addFuture(future, postTransactionAction);
postTransactionAction = null;
@@ -204,7 +204,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
Transaction txn = null;
try
{
- StoreFuture future;
+ FutureResult future;
if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
@@ -219,7 +219,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
else
{
- future = StoreFuture.IMMEDIATE_FUTURE;
+ future = FutureResult.IMMEDIATE_FUTURE;
}
addEnqueueFuture(future, postTransactionAction, message.isPersistent());
postTransactionAction = null;
@@ -255,7 +255,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
}
- StoreFuture future;
+ FutureResult future;
if (txn != null)
{
future = txn.commitTranAsync();
@@ -263,7 +263,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
else
{
- future = StoreFuture.IMMEDIATE_FUTURE;
+ future = FutureResult.IMMEDIATE_FUTURE;
}
addEnqueueFuture(future, postTransactionAction, message.isPersistent());
postTransactionAction = null;
@@ -281,7 +281,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
{
if(immediatePostTransactionAction != null)
{
- addFuture(StoreFuture.IMMEDIATE_FUTURE, new Action()
+ addFuture(FutureResult.IMMEDIATE_FUTURE, new Action()
{
public void postCommit()
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index 349ec793fe..b800556312 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.txn;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,7 +33,7 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -53,7 +54,7 @@ public class LocalTransaction implements ServerTransaction
private final MessageStore _transactionLog;
private volatile long _txnStartTime = 0L;
private volatile long _txnUpdateTime = 0l;
- private StoreFuture _asyncTran;
+ private FutureResult _asyncTran;
public LocalTransaction(MessageStore transactionLog)
{
@@ -271,16 +272,16 @@ public class LocalTransaction implements ServerTransaction
}
}
- public StoreFuture commitAsync(final Runnable deferred)
+ public FutureResult commitAsync(final Runnable deferred)
{
sync();
- StoreFuture future = StoreFuture.IMMEDIATE_FUTURE;
+ FutureResult future = FutureResult.IMMEDIATE_FUTURE;
if(_transaction != null)
{
- future = new StoreFuture()
+ future = new FutureResult()
{
private volatile boolean _completed = false;
- private StoreFuture _underlying = _transaction.commitTranAsync();
+ private FutureResult _underlying = _transaction.commitTranAsync();
@Override
public boolean isComplete()
@@ -298,6 +299,17 @@ public class LocalTransaction implements ServerTransaction
}
}
+ @Override
+ public void waitForCompletion(final long timeout) throws TimeoutException
+ {
+
+ if(!_completed)
+ {
+ _underlying.waitForCompletion(timeout);
+ checkUnderlyingCompletion();
+ }
+ }
+
private synchronized boolean checkUnderlyingCompletion()
{
if(!_completed && _underlying.isComplete())
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoreFuture.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/FutureResult.java
index 7d3bf90a75..2aab3081ee 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoreFuture.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/FutureResult.java
@@ -18,11 +18,13 @@
* under the License.
*
*/
-package org.apache.qpid.server.store;
+package org.apache.qpid.server.util;
-public interface StoreFuture
+import java.util.concurrent.TimeoutException;
+
+public interface FutureResult
{
- StoreFuture IMMEDIATE_FUTURE = new StoreFuture()
+ FutureResult IMMEDIATE_FUTURE = new FutureResult()
{
public boolean isComplete()
{
@@ -32,9 +34,17 @@ public interface StoreFuture
public void waitForCompletion()
{
}
+
+ @Override
+ public void waitForCompletion(final long timeout) throws TimeoutException
+ {
+
+ }
};
boolean isComplete();
void waitForCompletion();
+
+ void waitForCompletion(long timeout) throws TimeoutException;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 3680e476c7..220beb20f8 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -37,10 +37,15 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -61,6 +66,7 @@ import org.apache.qpid.server.message.MessageNode;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.*;
+import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.adapter.ConnectionAdapter;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.ConnectionValidator;
@@ -384,27 +390,65 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
return isStoreEmptyHandler.isEmpty();
}
- protected void createDefaultExchanges()
+ protected ListenableFuture<Void> createDefaultExchanges()
{
- Subject.doAs(getSecurityManager().getSubjectWithAddedSystemRights(), new PrivilegedAction<Void>()
+ return Subject.doAs(getSecurityManager().getSubjectWithAddedSystemRights(), new PrivilegedAction<ListenableFuture<Void>>()
{
+ private static final int TOTAL_STANDARD_EXCHANGES = 4;
+ private final AtomicInteger _createdExchangeCount = new AtomicInteger();
+ private SettableFuture<Void> _future = SettableFuture.create();
+
@Override
- public Void run()
+ public ListenableFuture<Void> run()
{
addStandardExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
addStandardExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS);
addStandardExchange(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
addStandardExchange(ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
- return null;
+ return _future;
+ }
+
+ private void standardExchangeCreated()
+ {
+ if(_createdExchangeCount.incrementAndGet() == TOTAL_STANDARD_EXCHANGES)
+ {
+ _future.set(null);
+ }
}
- void addStandardExchange(String name, String type)
+ ListenableFuture<Void> addStandardExchange(String name, String type)
{
+
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(Exchange.NAME, name);
attributes.put(Exchange.TYPE, type);
attributes.put(Exchange.ID, UUIDGenerator.generateExchangeUUID(name, getName()));
- childAdded(addExchange(attributes));
+ final ListenableFuture<ExchangeImpl> future = addExchangeAsync(attributes);
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ Futures.addCallback(future, new FutureCallback<ExchangeImpl>()
+ {
+ @Override
+ public void onSuccess(final ExchangeImpl result)
+ {
+ try
+ {
+ childAdded(result);
+ }
+ finally
+ {
+ standardExchangeCreated();
+ }
+
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ standardExchangeCreated();
+ }
+ }, getTaskExecutor().getExecutor());
+
+ return returnVal;
}
});
}
@@ -747,6 +791,23 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
+ private ListenableFuture<ExchangeImpl> addExchangeAsync(Map<String,Object> attributes)
+ throws ExchangeExistsException, ReservedExchangeNameException,
+ NoFactoryForTypeException
+ {
+ try
+ {
+ ListenableFuture result = getObjectFactory().createAsync(Exchange.class, attributes, this);
+ return result;
+ }
+ catch (DuplicateNameException e)
+ {
+ throw new ExchangeExistsException(getExchange(e.getName()));
+ }
+
+ }
+
+
@Override
public void removeExchange(ExchangeImpl exchange, boolean force)
throws ExchangeIsAlternateException, RequiredExchangeException
@@ -777,9 +838,11 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
@Override
- protected void beforeClose()
+ protected ListenableFuture<Void> beforeClose()
{
setState(State.UNAVAILABLE);
+
+ return super.beforeClose();
}
@Override
@@ -1277,37 +1340,76 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
@StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED )
- protected void doStop()
+ protected ListenableFuture<Void> doStop()
{
- closeChildren();
- shutdownHouseKeeping();
- closeMessageStore();
- setState(State.STOPPED);
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ closeChildren().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ shutdownHouseKeeping();
+ closeMessageStore();
+ setState(State.STOPPED);
+
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
+ return returnVal;
}
@StateTransition( currentState = { State.ACTIVE, State.ERRORED }, desiredState = State.DELETED )
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
if(_deleted.compareAndSet(false,true))
{
+ final SettableFuture<Void> returnVal = SettableFuture.create();
String hostName = getName();
- close();
+ closeAsync().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ MessageStore ms = getMessageStore();
+ if (ms != null)
+ {
+ try
+ {
+ ms.onDelete(AbstractVirtualHost.this);
+ }
+ catch (Exception e)
+ {
+ _logger.warn("Exception occurred on message store deletion", e);
+ }
+ }
+ deleted();
+ setState(State.DELETED);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
- MessageStore ms = getMessageStore();
- if (ms != null)
- {
- try
- {
- ms.onDelete(this);
- }
- catch (Exception e)
- {
- _logger.warn("Exception occurred on message store deletion", e);
- }
- }
- deleted();
- setState(State.DELETED);
+ return returnVal;
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
}
}
@@ -1496,7 +1598,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
@StateTransition( currentState = { State.UNINITIALIZED,State.ERRORED }, desiredState = State.ACTIVE )
- private void onActivate()
+ private ListenableFuture<Void> onActivate()
{
_houseKeepingTasks = new ScheduledThreadPoolExecutor(getHousekeepingThreadCount(), new SuppressingInheritedAccessControlContextThreadFactory());
@@ -1516,9 +1618,28 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
if (isStoreEmpty())
{
- createDefaultExchanges();
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ createDefaultExchanges().addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ postCreateDefaultExchangeTasks();
+ returnVal.set(null);
+ }
+ }, getTaskExecutor().getExecutor());
+ return returnVal;
}
+ else
+ {
+ postCreateDefaultExchangeTasks();
+ return Futures.immediateFuture(null);
+ }
+ }
+
+ private void postCreateDefaultExchangeTasks()
+ {
if(getContextValue(Boolean.class, USE_ASYNC_RECOVERY))
{
_messageStoreRecoverer = new AsynchronousMessageStoreRecoverer();
@@ -1553,9 +1674,32 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
scheduleHouseKeepingTask(getHousekeepingCheckPeriod(), _fileSystemSpaceChecker);
}
}
+ private static class ChildCounter
+ {
+ private final AtomicInteger _count = new AtomicInteger();
+ private final Runnable _task;
+
+ private ChildCounter(final Runnable task)
+ {
+ _task = task;
+ }
+
+ public void incrementCount()
+ {
+ _count.incrementAndGet();
+ }
+
+ public void decrementCount()
+ {
+ if(_count.decrementAndGet() == 0)
+ {
+ _task.run();
+ }
+ }
+ }
@StateTransition( currentState = { State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE )
- private void onRestart()
+ private ListenableFuture<Void> onRestart()
{
resetStatistics();
@@ -1586,6 +1730,25 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
new GenericRecoverer(this).recover(records);
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ final ChildCounter counter = new ChildCounter(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ onActivate().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ returnVal.set(null);
+ }
+ }, getTaskExecutor().getExecutor()
+ );
+ }
+ });
+ counter.incrementCount();
Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>()
{
@Override
@@ -1596,14 +1759,22 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
@Override
public void performAction(final ConfiguredObject<?> object)
{
- object.open();
+ counter.incrementCount();
+ object.openAsync().addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ counter.decrementCount();
+ }
+ }, getTaskExecutor().getExecutor());
}
});
return null;
}
});
-
- onActivate();
+ counter.decrementCount();
+ return returnVal;
}
private class FileSystemSpaceChecker extends HouseKeepingTask
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java
new file mode 100644
index 0000000000..a0bab0b0a6
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+
+public class VirtualHostUnavailableException extends ConnectionScopedRuntimeException
+{
+ public VirtualHostUnavailableException(VirtualHostImpl<?, ?, ?> host)
+ {
+ super("Virtualhost state "
+ + host.getState()
+ + " prevents the message from being sent");
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
index 03c30a9cd4..fd73963b68 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
@@ -29,6 +29,8 @@ import java.util.Map;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -68,7 +70,7 @@ public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandard
}
@Override
- protected void activate()
+ protected ListenableFuture<Void> activate()
{
if (LOGGER.isDebugEnabled())
{
@@ -107,15 +109,21 @@ public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandard
if (host != null)
{
final VirtualHost<?,?,?> recoveredHost = host;
- Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>()
- {
- @Override
- public Object run()
- {
- recoveredHost.open();
- return null;
- }
- });
+ final ListenableFuture<Void> openFuture = Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(),
+ new PrivilegedAction<ListenableFuture<Void>>()
+ {
+ @Override
+ public ListenableFuture<Void> run()
+ {
+ return recoveredHost.openAsync();
+
+ }
+ });
+ return openFuture;
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
index a343b71501..8e08554358 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
@@ -37,6 +37,10 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -119,16 +123,47 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<
}
@StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE )
- protected void doActivate()
+ protected ListenableFuture<Void> doActivate()
{
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+
try
{
- activate();
- setState(State.ACTIVE);
+ Futures.addCallback(activate(),
+ new FutureCallback<Void>()
+ {
+ @Override
+ public void onSuccess(final Void result)
+ {
+ try
+ {
+ setState(State.ACTIVE);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+
+ setState(State.ERRORED);
+ returnVal.set(null);
+ if (_broker.isManagementMode())
+ {
+ LOGGER.warn("Failed to make " + this + " active.", t);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
}
catch(RuntimeException e)
{
setState(State.ERRORED);
+ returnVal.set(null);
if (_broker.isManagementMode())
{
LOGGER.warn("Failed to make " + this + " active.", e);
@@ -138,6 +173,7 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<
throw e;
}
}
+ return returnVal;
}
@Override
@@ -180,39 +216,73 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<
}
@StateTransition( currentState = { State.ACTIVE, State.STOPPED, State.ERRORED}, desiredState = State.DELETED )
- protected void doDelete()
+ protected ListenableFuture<Void> doDelete()
{
+ final SettableFuture<Void> returnVal = SettableFuture.create();
setState(State.DELETED);
deleteVirtualHostIfExists();
- close();
- deleted();
- DurableConfigurationStore configurationStore = getConfigurationStore();
- if (configurationStore != null)
+ final ListenableFuture<Void> closeFuture = closeAsync();
+ closeFuture.addListener(new Runnable()
{
- configurationStore.onDelete(this);
- }
+ @Override
+ public void run()
+ {
+ try
+ {
+ deleted();
+ DurableConfigurationStore configurationStore = getConfigurationStore();
+ if (configurationStore != null)
+ {
+ configurationStore.onDelete(AbstractVirtualHostNode.this);
+ }
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor());
+
+ return returnVal;
+
}
- protected void deleteVirtualHostIfExists()
+ protected ListenableFuture<Void> deleteVirtualHostIfExists()
{
VirtualHost<?, ?, ?> virtualHost = getVirtualHost();
if (virtualHost != null)
{
- virtualHost.delete();
+ return virtualHost.deleteAsync();
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
}
}
@StateTransition( currentState = { State.ACTIVE, State.ERRORED, State.UNINITIALIZED }, desiredState = State.STOPPED )
- protected void doStop()
+ protected ListenableFuture<Void> doStop()
{
- stopAndSetStateTo(State.STOPPED);
+ return stopAndSetStateTo(State.STOPPED);
}
- protected void stopAndSetStateTo(State stoppedState)
+ protected ListenableFuture<Void> stopAndSetStateTo(final State stoppedState)
{
- closeChildren();
- closeConfigurationStoreSafely();
- setState(stoppedState);
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+
+ ListenableFuture<Void> childCloseFuture = closeChildren();
+ childCloseFuture.addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ closeConfigurationStoreSafely();
+ setState(stoppedState);
+ returnVal.set(null);
+ }
+ }, getTaskExecutor().getExecutor());
+
+ return returnVal;
}
@Override
@@ -270,7 +340,7 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<
protected abstract DurableConfigurationStore createConfigurationStore();
- protected abstract void activate();
+ protected abstract ListenableFuture<Void> activate();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java
index c94d113514..8a160f83d7 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java
@@ -24,6 +24,8 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,7 +65,7 @@ public class RedirectingVirtualHostNodeImpl
}
@StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE )
- protected void doActivate()
+ protected ListenableFuture<Void> doActivate()
{
try
{
@@ -83,6 +85,7 @@ public class RedirectingVirtualHostNodeImpl
throw e;
}
}
+ return Futures.immediateFuture(null);
}
@Override