diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache')
76 files changed, 3427 insertions, 1228 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java index 6dd6c58853..bc5d30a0f0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java @@ -29,9 +29,13 @@ import java.security.PrivilegedExceptionAction; import java.util.HashSet; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.security.auth.Subject; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; @@ -106,11 +110,16 @@ public class Broker implements BrokerShutdownProvider { if(_systemConfig != null) { - _systemConfig.close(); + ListenableFuture<Void> closeResult = _systemConfig.closeAsync(); + closeResult.get(30000l, TimeUnit.MILLISECONDS); } _taskExecutor.stop(); } + catch (TimeoutException | InterruptedException | ExecutionException e) + { + LOGGER.warn("Attempting to cleanly shutdown took too long, exiting immediately"); + } finally { if (_configuringOwnLogging) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java index c2c0cc77fa..06ce48ffa2 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java @@ -28,6 +28,9 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.configuration.updater.VoidTask; import org.apache.qpid.server.exchange.AbstractExchange; @@ -196,7 +199,7 @@ public class BindingImpl } @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { if(_deleted.compareAndSet(false,true)) { @@ -209,12 +212,14 @@ public class BindingImpl deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } public void addStateChangeListener(StateChangeListener<BindingImpl,State> listener) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java index e0c03fe822..8d572189b3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.configuration.updater; import java.util.concurrent.CancellationException; +import java.util.concurrent.Executor; import java.util.concurrent.Future; public interface TaskExecutor @@ -43,4 +44,7 @@ public interface TaskExecutor <T> Future<T> submit(Task<T> task) throws CancellationException; + boolean isTaskExecutorThread(); + + Executor getExecutor(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java index 96e4e256b2..0f59494850 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -48,6 +49,7 @@ public class TaskExecutorImpl implements TaskExecutor private volatile Thread _taskThread; private final AtomicBoolean _running = new AtomicBoolean(); private volatile ExecutorService _executor; + private final ImmediateIfSameThreadExecutor _wrappedExecutor = new ImmediateIfSameThreadExecutor(); @Override @@ -67,7 +69,7 @@ public class TaskExecutorImpl implements TaskExecutor @Override public Thread newThread(Runnable r) { - _taskThread = new Thread(r, TASK_EXECUTION_THREAD_NAME); + _taskThread = new TaskThread(r, TASK_EXECUTION_THREAD_NAME, TaskExecutorImpl.this); return _taskThread; } }); @@ -277,7 +279,13 @@ public class TaskExecutorImpl implements TaskExecutor } } - private boolean isTaskExecutorThread() + @Override + public Executor getExecutor() + { + return _wrappedExecutor; + } + + public boolean isTaskExecutorThread() { return Thread.currentThread() == _taskThread; } @@ -373,4 +381,41 @@ public class TaskExecutorImpl implements TaskExecutor return get(); } } + + private class ImmediateIfSameThreadExecutor implements Executor + { + + @Override + public void execute(final Runnable command) + { + if(isTaskExecutorThread() + || (_executor == null && (Thread.currentThread() instanceof TaskThread + && ((TaskThread)Thread.currentThread()).getTaskExecutor() == TaskExecutorImpl.this))) + { + command.run(); + } + else + { + _executor.execute(command); + } + + } + } + + private static class TaskThread extends Thread + { + + private final TaskExecutorImpl _taskExecutor; + + public TaskThread(final Runnable r, final String name, final TaskExecutorImpl taskExecutor) + { + super(r, name); + _taskExecutor = taskExecutor; + } + + public TaskExecutorImpl getTaskExecutor() + { + return _taskExecutor; + } + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java index 82ae9f6454..237a5b4069 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java @@ -20,11 +20,12 @@ */ package org.apache.qpid.server.connection; +import java.net.SocketAddress; + +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.security.auth.SocketConnectionPrincipal; -import java.net.SocketAddress; - public class ConnectionPrincipal implements SocketConnectionPrincipal { private final AMQConnectionModel _connection; @@ -51,6 +52,11 @@ public class ConnectionPrincipal implements SocketConnectionPrincipal return _connection; } + public VirtualHost<?,?,?> getVirtualHost() + { + return _connection.getVirtualHost(); + } + @Override public boolean equals(final Object o) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java index 883785c7ce..a24195075e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java @@ -74,7 +74,7 @@ public class ConnectionRegistry implements IConnectionRegistry AMQConnectionModel connection = itr.next(); try { - connection.close(AMQConstant.CONNECTION_FORCED, replyText); + connection.closeAsync(AMQConstant.CONNECTION_FORCED, replyText); } catch (Exception e) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java index 0421a66abf..be4ac9d427 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java @@ -23,17 +23,21 @@ package org.apache.qpid.server.consumer; import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.apache.log4j.Logger; + +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.util.StateChangeListener; public abstract class AbstractConsumerTarget implements ConsumerTarget { - + private static final Logger LOGGER = Logger.getLogger(AbstractConsumerTarget.class); private final AtomicReference<State> _state; private final Set<StateChangeListener<ConsumerTarget, State>> _stateChangeListeners = new @@ -41,6 +45,7 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget private final Lock _stateChangeLock = new ReentrantLock(); private final AtomicInteger _stateActivates = new AtomicInteger(); + private ConcurrentLinkedQueue<ConsumerMessageInstancePair> _queue = new ConcurrentLinkedQueue(); protected AbstractConsumerTarget(final State initialState) @@ -48,6 +53,26 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget _state = new AtomicReference<State>(initialState); } + @Override + public void processPending() + { + while(hasMessagesToSend()) + { + sendNextMessage(); + } + + processClosed(); + } + + protected abstract void processClosed(); + + @Override + public final boolean isSuspended() + { + return getSessionModel().getConnectionModel().isMessageAssignmentSuspended() || doIsSuspended(); + } + + protected abstract boolean doIsSuspended(); public final State getState() { @@ -101,6 +126,7 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget } } + @Override public final void notifyCurrentState() { @@ -136,4 +162,41 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget _stateChangeLock.unlock(); } + @Override + public final long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) + { + _queue.add(new ConsumerMessageInstancePair(consumer, entry, batch)); + + getSessionModel().getConnectionModel().notifyWork(); + return entry.getMessage().getSize(); + } + + protected abstract void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch); + + @Override + public boolean hasMessagesToSend() + { + return !_queue.isEmpty(); + } + + @Override + public void sendNextMessage() + { + ConsumerMessageInstancePair consumerMessage = _queue.peek(); + if (consumerMessage != null) + { + _queue.poll(); + + ConsumerImpl consumer = consumerMessage.getConsumer(); + MessageInstance entry = consumerMessage.getEntry(); + boolean batch = consumerMessage.isBatch(); + doSend(consumer, entry, batch); + + if (consumer.acquires()) + { + entry.unlockAcquisition(); + } + } + + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java index c0db72d498..e17eca8614 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java @@ -22,6 +22,8 @@ package org.apache.qpid.server.consumer; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -31,6 +33,8 @@ public interface ConsumerImpl void externalStateChange(); + ConsumerTarget getTarget(); + enum Option { ACQUIRES, diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java new file mode 100644 index 0000000000..aa5e419ce2 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.server.consumer; + +import org.apache.qpid.server.message.MessageInstance; + +public class ConsumerMessageInstancePair +{ + private final ConsumerImpl _consumer; + private final MessageInstance _entry; + private final boolean _batch; + + public ConsumerMessageInstancePair(final ConsumerImpl consumer, final MessageInstance entry, final boolean batch) + { + _consumer = consumer; + _entry = entry; + _batch = batch; + + } + + public ConsumerImpl getConsumer() + { + return _consumer; + } + + public MessageInstance getEntry() + { + return _entry; + } + + public boolean isBatch() + { + return _batch; + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java index 5aef922da5..cef566793f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java @@ -33,6 +33,8 @@ public interface ConsumerTarget void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> listener); + void processPending(); + enum State { ACTIVE, SUSPENDED, CLOSED @@ -44,6 +46,8 @@ public interface ConsumerTarget void consumerRemoved(ConsumerImpl sub); + void notifyCurrentState(); + void addStateListener(StateChangeListener<ConsumerTarget, State> listener); long getUnacknowledgedBytes(); @@ -54,6 +58,10 @@ public interface ConsumerTarget long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch); + boolean hasMessagesToSend(); + + void sendNextMessage(); + void flushBatched(); void queueDeleted(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index cb026e175b..61bbe6f732 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -36,6 +37,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.exchange.ExchangeDefaults; @@ -63,12 +68,12 @@ import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; -import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; import org.apache.qpid.server.virtualhost.RequiredExchangeException; import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException; public abstract class AbstractExchange<T extends AbstractExchange<T>> extends AbstractConfiguredObject<T> @@ -479,7 +484,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> { if (_virtualHost.getState() != State.ACTIVE) { - throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent"); + throw new VirtualHostUnavailableException(this._virtualHost); } List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties); @@ -593,9 +598,18 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> } @Override - public boolean addBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments) + public boolean addBinding(final String bindingKey, final AMQQueue queue, final Map<String, Object> arguments) { - return makeBinding(null, bindingKey, queue, arguments, false); + return doSync(doOnConfigThread(new Callable<ListenableFuture<Boolean>>() + { + @Override + public ListenableFuture<Boolean> call() throws Exception + { + return makeBindingAsync(null, bindingKey, queue, arguments, false); + } + })); + + } @Override @@ -603,12 +617,20 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> final AMQQueue queue, final Map<String, Object> arguments) { - final BindingImpl existingBinding = getBinding(bindingKey, queue); - return makeBinding(existingBinding == null ? null : existingBinding.getId(), - bindingKey, - queue, - arguments, - true); + return doSync(doOnConfigThread(new Callable<ListenableFuture<Boolean>>() + { + @Override + public ListenableFuture<Boolean> call() throws Exception + { + + final BindingImpl existingBinding = getBinding(bindingKey, queue); + return makeBindingAsync(existingBinding == null ? null : existingBinding.getId(), + bindingKey, + queue, + arguments, + true); + } + })); } @@ -634,7 +656,15 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> doRemoveBinding(b); queue.removeBinding(b); - b.delete(); + // TODO - RG - Fix bindings! + if(getTaskExecutor().isTaskExecutorThread()) + { + b.deleteAsync(); + } + else + { + b.delete(); + } } } @@ -651,7 +681,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> return _bindingsMap.get(new BindingIdentifier(bindingKey,queue)); } - private boolean makeBinding(UUID id, + private ListenableFuture<Boolean> makeBindingAsync(UUID id, String bindingKey, AMQQueue queue, Map<String, Object> arguments, @@ -685,22 +715,45 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> attributes.put(Binding.ID, id); attributes.put(Binding.ARGUMENTS, arguments); - BindingImpl b = new BindingImpl(attributes, queue, this); - b.create(); // Must be called before addBinding as it resolves automated attributes. + final BindingImpl b = new BindingImpl(attributes, queue, this); - addBinding(b); - return true; + final SettableFuture<Boolean> returnVal = SettableFuture.create(); + + Futures.addCallback(b.createAsync(), new FutureCallback<Void>() + { + @Override + public void onSuccess(final Void result) + { + try + { + addBinding(b); + returnVal.set(true); + } + catch(Throwable t) + { + returnVal.setException(t); + } + } + + @Override + public void onFailure(final Throwable t) + { + returnVal.setException(t); + } + }, getTaskExecutor().getExecutor()); // Must be called before addBinding as it resolves automated attributes. + + return returnVal; } else if(force) { Map<String,Object> oldArguments = existingMapping.getArguments(); existingMapping.setArguments(arguments); onBindingUpdated(existingMapping, oldArguments); - return true; + return Futures.immediateFuture(true); } else { - return false; + return Futures.immediateFuture(false); } } } @@ -723,22 +776,24 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.DELETED) - private void doDeleteBeforeInitialize() + private ListenableFuture<Void> doDeleteBeforeInitialize() { preSetAlternateExchange(); setState(State.DELETED); + return Futures.immediateFuture(null); } @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { try { @@ -748,8 +803,9 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> } catch (ExchangeIsAlternateException e) { - return; + } + return Futures.immediateFuture(null); } @Override @@ -860,4 +916,5 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> return binding; } + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java index 3e377ebaa6..be98665df8 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java @@ -107,8 +107,4 @@ public interface ExchangeImpl<T extends ExchangeImpl<T>> extends Exchange<T>, Ex void bindingRemoved(ExchangeImpl exchange, BindingImpl binding); } - public void addBindingListener(BindingListener listener); - - public void removeBindingListener(BindingListener listener); - } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java deleted file mode 100644 index be3a13d2d3..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java +++ /dev/null @@ -1,87 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.flow; - - -import java.util.concurrent.atomic.AtomicLong; - -public class BytesOnlyCreditManager extends AbstractFlowCreditManager -{ - private final AtomicLong _bytesCredit; - - public BytesOnlyCreditManager(long initialCredit) - { - _bytesCredit = new AtomicLong(initialCredit); - } - - public long getMessageCredit() - { - return -1L; - } - - public long getBytesCredit() - { - return _bytesCredit.get(); - } - - public void restoreCredit(long messageCredit, long bytesCredit) - { - _bytesCredit.addAndGet(bytesCredit); - setSuspended(false); - } - - public void removeAllCredit() - { - _bytesCredit.set(0L); - } - - public boolean hasCredit() - { - return _bytesCredit.get() > 0L; - } - - public boolean useCreditForMessage(long msgSize) - { - if(hasCredit()) - { - if(_bytesCredit.addAndGet(-msgSize) >= 0) - { - return true; - } - else - { - _bytesCredit.addAndGet(msgSize); - setSuspended(true); - return false; - } - } - else - { - return false; - } - - } - - public void setBytesCredit(long bytesCredit) - { - _bytesCredit.set( bytesCredit ); - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java index 280f2851a4..08aac0b511 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java @@ -24,10 +24,6 @@ package org.apache.qpid.server.flow; public interface FlowCreditManager { - long getMessageCredit(); - - long getBytesCredit(); - public static interface FlowCreditManagerListener { void creditStateChanged(boolean hasCredit); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java deleted file mode 100644 index 89fc60666b..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java +++ /dev/null @@ -1,53 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.flow; - - -public class LimitlessCreditManager extends AbstractFlowCreditManager implements FlowCreditManager -{ - public long getMessageCredit() - { - return -1L; - } - - public long getBytesCredit() - { - return -1L; - } - - public void restoreCredit(long messageCredit, long bytesCredit) - { - } - - public void removeAllCredit() - { - } - - public boolean hasCredit() - { - return true; - } - - public boolean useCreditForMessage(long msgSize) - { - return true; - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java deleted file mode 100644 index 31c1fda968..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java +++ /dev/null @@ -1,90 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.flow; - - -public class MessageAndBytesCreditManager extends AbstractFlowCreditManager implements FlowCreditManager -{ - private long _messageCredit; - private long _bytesCredit; - - public MessageAndBytesCreditManager(final long messageCredit, final long bytesCredit) - { - _messageCredit = messageCredit; - _bytesCredit = bytesCredit; - } - - public synchronized long getMessageCredit() - { - return _messageCredit; - } - - public synchronized long getBytesCredit() - { - return _bytesCredit; - } - - public synchronized void restoreCredit(long messageCredit, long bytesCredit) - { - _messageCredit += messageCredit; - _bytesCredit += bytesCredit; - setSuspended(hasCredit()); - } - - public synchronized void removeAllCredit() - { - _messageCredit = 0L; - _bytesCredit = 0L; - setSuspended(true); - } - - public synchronized boolean hasCredit() - { - return (_messageCredit > 0L) && ( _bytesCredit > 0L ); - } - - public synchronized boolean useCreditForMessage(final long msgSize) - { - if(_messageCredit == 0L) - { - setSuspended(true); - return false; - } - else - { - if(msgSize > _bytesCredit) - { - setSuspended(true); - return false; - } - _messageCredit--; - _bytesCredit -= msgSize; - setSuspended(false); - return true; - } - - } - - public synchronized void setBytesCredit(long bytesCredit) - { - _bytesCredit = bytesCredit; - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java deleted file mode 100644 index 1817e8ad31..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java +++ /dev/null @@ -1,86 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.flow; - -import java.util.concurrent.atomic.AtomicLong; - -public class MessageOnlyCreditManager extends AbstractFlowCreditManager implements FlowCreditManager -{ - private final AtomicLong _messageCredit; - - public MessageOnlyCreditManager(final long initialCredit) - { - _messageCredit = new AtomicLong(initialCredit); - } - - public long getMessageCredit() - { - return _messageCredit.get(); - } - - public long getBytesCredit() - { - return -1L; - } - - public void restoreCredit(long messageCredit, long bytesCredit) - { - _messageCredit.addAndGet(messageCredit); - setSuspended(false); - - } - - public void removeAllCredit() - { - setSuspended(true); - _messageCredit.set(0L); - } - - public boolean hasCredit() - { - return _messageCredit.get() > 0L; - } - - public boolean useCreditForMessage(long msgSize) - { - if(hasCredit()) - { - if(_messageCredit.addAndGet(-1L) >= 0) - { - setSuspended(false); - return true; - } - else - { - _messageCredit.addAndGet(1L); - setSuspended(true); - return false; - } - } - else - { - setSuspended(true); - return false; - } - - } - -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java deleted file mode 100644 index fc2d4bfb53..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java +++ /dev/null @@ -1,192 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.flow; - - -public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager -{ - - private volatile long _bytesCreditLimit; - private volatile long _messageCreditLimit; - - private volatile long _bytesCredit; - private volatile long _messageCredit; - - public Pre0_10CreditManager(long bytesCreditLimit, long messageCreditLimit) - { - _bytesCreditLimit = bytesCreditLimit; - _messageCreditLimit = messageCreditLimit; - _bytesCredit = bytesCreditLimit; - _messageCredit = messageCreditLimit; - } - - - public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit) - { - long bytesCreditChange = bytesCreditLimit - _bytesCreditLimit; - long messageCreditChange = messageCreditLimit - _messageCreditLimit; - - - - if(bytesCreditChange != 0L) - { - if(bytesCreditLimit == 0L) - { - _bytesCredit = 0; - } - else - { - _bytesCredit += bytesCreditChange; - } - } - - - if(messageCreditChange != 0L) - { - if(messageCreditLimit == 0L) - { - _messageCredit = 0; - } - else - { - _messageCredit += messageCreditChange; - } - } - - - _bytesCreditLimit = bytesCreditLimit; - _messageCreditLimit = messageCreditLimit; - - setSuspended(!hasCredit()); - - } - - - public long getMessageCredit() - { - return _messageCredit; - } - - public long getBytesCredit() - { - return _bytesCredit; - } - - public synchronized void restoreCredit(final long messageCredit, final long bytesCredit) - { - final long messageCreditLimit = _messageCreditLimit; - boolean notifyIncrease = true; - if(messageCreditLimit != 0L) - { - notifyIncrease = (_messageCredit != 0); - long newCredit = _messageCredit + messageCredit; - _messageCredit = newCredit > messageCreditLimit ? messageCreditLimit : newCredit; - } - - - final long bytesCreditLimit = _bytesCreditLimit; - if(bytesCreditLimit != 0L) - { - long newCredit = _bytesCredit + bytesCredit; - _bytesCredit = newCredit > bytesCreditLimit ? bytesCreditLimit : newCredit; - if(notifyIncrease && bytesCredit>0) - { - notifyIncreaseBytesCredit(); - } - } - - - - setSuspended(!hasCredit()); - - } - - public synchronized void removeAllCredit() - { - _bytesCredit = 0L; - _messageCredit = 0L; - setSuspended(!hasCredit()); - } - - public synchronized boolean hasCredit() - { - return (_bytesCreditLimit == 0L || _bytesCredit > 0) - && (_messageCreditLimit == 0L || _messageCredit > 0); - } - - public synchronized boolean useCreditForMessage(final long msgSize) - { - if(_messageCreditLimit != 0L) - { - if(_messageCredit != 0L) - { - if(_bytesCreditLimit == 0L) - { - _messageCredit--; - - return true; - } - else - { - if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit)) - { - _messageCredit--; - _bytesCredit -= msgSize; - - return true; - } - else - { - return false; - } - } - } - else - { - setSuspended(true); - return false; - } - } - else - { - if(_bytesCreditLimit == 0L) - { - - return true; - } - else - { - if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit)) - { - _bytesCredit -= msgSize; - - return true; - } - else - { - return false; - } - } - - } - - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java index e63638213e..529aa230d4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java @@ -41,12 +41,23 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; +import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.JsonProcessingException; @@ -68,6 +79,7 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.security.encryption.ConfigurationSecretEncrypter; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.util.Strings; @@ -162,7 +174,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im private final OwnAttributeResolver _attributeResolver = new OwnAttributeResolver(this); - @ManagedAttributeField( afterSet = "attainStateIfOpenedOrReopenFailed" ) + @ManagedAttributeField private State _desiredState; private boolean _openComplete; private boolean _openFailed; @@ -439,24 +451,84 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im @Override public final void open() { - if(_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED)) + doSync(openAsync()); + } + + + public final ListenableFuture<Void> openAsync() + { + return doOnConfigThread(new Callable<ListenableFuture<Void>>() + { + @Override + public ListenableFuture<Void> call() throws Exception + { + if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED)) + { + _openFailed = false; + OpenExceptionHandler exceptionHandler = new OpenExceptionHandler(); + try + { + doResolution(true, exceptionHandler); + doValidation(true, exceptionHandler); + doOpening(true, exceptionHandler); + return doAttainState(exceptionHandler); + } + catch (RuntimeException e) + { + exceptionHandler.handleException(e, AbstractConfiguredObject.this); + return Futures.immediateFuture(null); + } + } + else + { + return Futures.immediateFuture(null); + } + + } + }); + + } + + protected final <T> ListenableFuture<T> doOnConfigThread(final Callable<ListenableFuture<T>> action) + { + final SettableFuture<T> returnVal = SettableFuture.create(); + + _taskExecutor.submit(new Task<Void>() { - _openFailed = false; - OpenExceptionHandler exceptionHandler = new OpenExceptionHandler(); - try - { - doResolution(true, exceptionHandler); - doValidation(true, exceptionHandler); - doOpening(true, exceptionHandler); - doAttainState(exceptionHandler); - } - catch(RuntimeException e) + + @Override + public Void execute() { - exceptionHandler.handleException(e, this); + try + { + Futures.addCallback(action.call(), new FutureCallback<T>() + { + @Override + public void onSuccess(final T result) + { + returnVal.set(result); + } + + @Override + public void onFailure(final Throwable t) + { + returnVal.setException(t); + } + }); + } + catch (Exception e) + { + returnVal.setException(e); + } + return null; } - } + }); + + return returnVal; } + + public void registerWithParents() { for(ConfiguredObject<?> parent : _parents.values()) @@ -468,17 +540,78 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } } - protected void closeChildren() + private class ChildCounter { + private final AtomicInteger _count = new AtomicInteger(); + private final Runnable _task; + + private ChildCounter(final Runnable task) + { + _task = task; + } + + public void incrementCount() + { + _count.incrementAndGet(); + } + + public void decrementCount() + { + if(_count.decrementAndGet() == 0) + { + _task.run(); + } + } + } + + protected final ListenableFuture<Void> closeChildren() + { + final SettableFuture<Void> returnVal = SettableFuture.create(); + final ChildCounter counter = new ChildCounter(new Runnable() + { + @Override + public void run() + { + returnVal.set(null); + LOGGER.debug("All children closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName() ); + + } + }); + counter.incrementCount(); + + applyToChildren(new Action<ConfiguredObject<?>>() { @Override public void performAction(final ConfiguredObject<?> child) { - child.close(); + counter.incrementCount(); + ListenableFuture<Void> close = child.closeAsync(); + Futures.addCallback(close, new FutureCallback<Void>() + { + @Override + public void onSuccess(final Void result) + { + counter.decrementCount(); + } + + @Override + public void onFailure(final Throwable t) + { + LOGGER.error("Exception occurred while closing " + + child.getClass().getSimpleName() + + " : '" + + child.getName() + + "'", t); + // No need to decrement counter as setting the exception will complete the future + returnVal.setException(t); + } + }, MoreExecutors.sameThreadExecutor()); } }); + counter.decrementCount(); + for(Collection<ConfiguredObject<?>> childList : _children.values()) { childList.clear(); @@ -494,23 +627,60 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im childNameMap.clear(); } + return returnVal; + } + + @Override + public void close() + { + doSync(closeAsync()); } @Override - public final void close() + public final ListenableFuture<Void> closeAsync() { - if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED)) + return doOnConfigThread(new Callable<ListenableFuture<Void>>() { - beforeClose(); - closeChildren(); - onClose(); - unregister(false); + @Override + public ListenableFuture<Void> call() throws Exception + { + LOGGER.debug("Closing " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName()); + + if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED)) + { + + return doAfter(beforeClose(), new Callable<ListenableFuture<Void>>() + { + @Override + public ListenableFuture<Void> call() throws Exception + { + return closeChildren(); + } + }).then(new Runnable() + { + @Override + public void run() + { + onClose(); + unregister(false); + LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName()); + } + }); + } + else + { + LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName()); + + return Futures.immediateFuture(null); + } + } + }); - } } - protected void beforeClose() + protected ListenableFuture<Void> beforeClose() { + return Futures.immediateFuture(null); } protected void onClose() @@ -519,48 +689,65 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im public final void create() { - if(_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED)) + doSync(createAsync()); + } + + public final ListenableFuture<Void> createAsync() + { + return doOnConfigThread(new Callable<ListenableFuture<Void>>() { - final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser(); - if(currentUser != null) + @Override + public ListenableFuture<Void> call() throws Exception { - String currentUserName = currentUser.getName(); - _attributes.put(LAST_UPDATED_BY, currentUserName); - _attributes.put(CREATED_BY, currentUserName); - _lastUpdatedBy = currentUserName; - _createdBy = currentUserName; - } - final long currentTime = System.currentTimeMillis(); - _attributes.put(LAST_UPDATED_TIME, currentTime); - _attributes.put(CREATED_TIME, currentTime); - _lastUpdatedTime = currentTime; - _createdTime = currentTime; + if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED)) + { + final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser(); + if (currentUser != null) + { + String currentUserName = currentUser.getName(); + _attributes.put(LAST_UPDATED_BY, currentUserName); + _attributes.put(CREATED_BY, currentUserName); + _lastUpdatedBy = currentUserName; + _createdBy = currentUserName; + } + final long currentTime = System.currentTimeMillis(); + _attributes.put(LAST_UPDATED_TIME, currentTime); + _attributes.put(CREATED_TIME, currentTime); + _lastUpdatedTime = currentTime; + _createdTime = currentTime; + + CreateExceptionHandler createExceptionHandler = new CreateExceptionHandler(); + try + { + doResolution(true, createExceptionHandler); + doValidation(true, createExceptionHandler); + validateOnCreate(); + registerWithParents(); + } + catch (RuntimeException e) + { + createExceptionHandler.handleException(e, AbstractConfiguredObject.this); + } - CreateExceptionHandler createExceptionHandler = new CreateExceptionHandler(); - try - { - doResolution(true, createExceptionHandler); - doValidation(true, createExceptionHandler); - validateOnCreate(); - registerWithParents(); - } - catch(RuntimeException e) - { - createExceptionHandler.handleException(e, this); - } + final AbstractConfiguredObjectExceptionHandler unregisteringExceptionHandler = + new CreateExceptionHandler(true); + + try + { + doCreation(true, unregisteringExceptionHandler); + doOpening(true, unregisteringExceptionHandler); + return doAttainState(unregisteringExceptionHandler); + } + catch (RuntimeException e) + { + unregisteringExceptionHandler.handleException(e, AbstractConfiguredObject.this); + } + } + return Futures.immediateFuture(null); - AbstractConfiguredObjectExceptionHandler unregisteringExceptionHandler = new CreateExceptionHandler(true); - try - { - doCreation(true, unregisteringExceptionHandler); - doOpening(true, unregisteringExceptionHandler); - doAttainState(unregisteringExceptionHandler); - } - catch(RuntimeException e) - { - unregisteringExceptionHandler.handleException(e, this); } - } + }); + } protected void validateOnCreate() @@ -610,8 +797,40 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im { } - private void doAttainState(final AbstractConfiguredObjectExceptionHandler exceptionHandler) + private ListenableFuture<Void> doAttainState(final AbstractConfiguredObjectExceptionHandler exceptionHandler) { + final SettableFuture<Void> returnVal = SettableFuture.create(); + final ChildCounter counter = new ChildCounter(new Runnable() + { + @Override + public void run() + { + try + { + attainState().addListener(new Runnable() + { + @Override + public void run() + { + returnVal.set(null); + } + }, getTaskExecutor().getExecutor()); + } + catch(RuntimeException e) + { + try + { + exceptionHandler.handleException(e, AbstractConfiguredObject.this); + returnVal.set(null); + } + catch(Throwable t) + { + returnVal.setException(t); + } + } + } + }); + counter.incrementCount(); applyToChildren(new Action<ConfiguredObject<?>>() { @Override @@ -619,22 +838,43 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im { if (child instanceof AbstractConfiguredObject) { - AbstractConfiguredObject configuredObject = (AbstractConfiguredObject) child; + final AbstractConfiguredObject configuredObject = (AbstractConfiguredObject) child; if (configuredObject._dynamicState.get() == DynamicState.OPENED) { - try - { - configuredObject.doAttainState(exceptionHandler); - } - catch (RuntimeException e) - { - exceptionHandler.handleException(e, configuredObject); - } + counter.incrementCount(); + Futures.addCallback(configuredObject.doAttainState(exceptionHandler), + new FutureCallback() + { + @Override + public void onSuccess(final Object result) + { + counter.decrementCount(); + } + + @Override + public void onFailure(final Throwable t) + { + try + { + if (t instanceof RuntimeException) + { + exceptionHandler.handleException((RuntimeException) t, + configuredObject); + } + } + finally + { + counter.decrementCount(); + } + } + },getTaskExecutor().getExecutor()); + } } } }); - attainState(); + counter.decrementCount(); + return returnVal; } protected void doOpening(boolean skipCheck, final AbstractConfiguredObjectExceptionHandler exceptionHandler) @@ -890,16 +1130,17 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } } - private void attainStateIfOpenedOrReopenFailed() + private ListenableFuture<Void> attainStateIfOpenedOrReopenFailed() { if (_openComplete || getDesiredState() == State.DELETED) { - attainState(); + return attainState(); } else if (_openFailed) { - open(); + return openAsync(); } + return Futures.immediateFuture(null); } protected void onOpen() @@ -907,10 +1148,11 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } - protected void attainState() + protected ListenableFuture<Void> attainState() { State currentState = getState(); State desiredState = getDesiredState(); + ListenableFuture<Void> returnVal; if(currentState != desiredState) { Method stateChangingMethod = getStateChangeMethod(currentState, desiredState); @@ -918,7 +1160,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im { try { - stateChangingMethod.invoke(this); + returnVal = (ListenableFuture<Void>) stateChangingMethod.invoke(this); } catch (IllegalAccessException e) { @@ -938,7 +1180,16 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im throw new ServerScopedRuntimeException("Unexpected checked exception when calling state transition", underlying); } } + else + { + returnVal = Futures.immediateFuture(null); + } } + else + { + returnVal = Futures.immediateFuture(null); + } + return returnVal; } private Method getStateChangeMethod(final State currentState, final State desiredState) @@ -1013,44 +1264,72 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } - private State setDesiredState(final State desiredState) + private ListenableFuture<Void> setDesiredState(final State desiredState) throws IllegalStateTransitionException, AccessControlException { - - return runTask(new Task<State>() + return doOnConfigThread(new Callable<ListenableFuture<Void>>() + { + @Override + public ListenableFuture<Void> call() throws Exception + { + final State state = getState(); + final State currentDesiredState = getDesiredState(); + if(desiredState == currentDesiredState && desiredState != state) + { + return doAfter(attainStateIfOpenedOrReopenFailed(), new Runnable() + { + @Override + public void run() { - @Override - public State execute() + final State currentState = getState(); + if (currentState != state) { + notifyStateChanged(state, currentState); + } - State state = getState(); - if(desiredState == getDesiredState() && desiredState != state) - { - attainStateIfOpenedOrReopenFailed(); - final State currentState = getState(); - if (currentState != state) - { - notifyStateChanged(state, currentState); - } - return currentState; - } - else - { - setAttributes(Collections.<String, Object>singletonMap(DESIRED_STATE, - desiredState)); + } + }); + } + else + { + ConfiguredObject<?> proxyForValidation = + createProxyForValidation(Collections.<String, Object>singletonMap( + ConfiguredObject.DESIRED_STATE, + desiredState)); + Set<String> desiredStateOnlySet = Collections.unmodifiableSet( + Collections.singleton(ConfiguredObject.DESIRED_STATE)); + authoriseSetAttributes(proxyForValidation, desiredStateOnlySet); + validateChange(proxyForValidation, desiredStateOnlySet); + + if (changeAttribute(ConfiguredObject.DESIRED_STATE, currentDesiredState, desiredState)) + { + attributeSet(ConfiguredObject.DESIRED_STATE, + currentDesiredState, + desiredState); + + return doAfter(attainStateIfOpenedOrReopenFailed(),new Runnable() + { + @Override + public void run() + { + if (getState() == desiredState) + { + notifyStateChanged(state, desiredState); + } + + } + } + ); + } + else + { + return Futures.immediateFuture(null); + } + } + + } + }); - if (getState() == desiredState) - { - notifyStateChanged(state, desiredState); - return desiredState; - } - else - { - return getState(); - } - } - } - }); } @Override @@ -1429,20 +1708,62 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im public final void stop() { - setDesiredState(State.STOPPED); + doSync(setDesiredState(State.STOPPED)); } public final void delete() { - if(getState() == State.UNINITIALIZED) + doSync(deleteAsync()); + } + + protected final <R> R doSync(ListenableFuture<R> async) + { + try + { + return async.get(); + } + catch (InterruptedException e) { - _desiredState = State.DELETED; + throw new ServerScopedRuntimeException(e); + } + catch (ExecutionException e) + { + Throwable cause = e.getCause(); + if(cause instanceof RuntimeException) + { + throw (RuntimeException) cause; + } + else if(cause instanceof Error) + { + throw (Error) cause; + } + else if(cause != null) + { + throw new ServerScopedRuntimeException(cause); + } + else + { + throw new ServerScopedRuntimeException(e); + } + } - setDesiredState(State.DELETED); + } + + public final ListenableFuture<Void> deleteAsync() + { + return setDesiredState(State.DELETED); + } + + public final void start() + { + doSync(startAsync()); + } + public ListenableFuture<Void> startAsync() + { + return setDesiredState(State.ACTIVE); } - public final void start() { setDesiredState(State.ACTIVE); } protected void deleted() { @@ -1527,24 +1848,175 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im _taskExecutor.run(task); } + @Override + public void setAttributes(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException + { + doSync(setAttributesAsync(attributes)); + } + + protected final ChainedListenableFuture doAfter(ListenableFuture<Void> first, final Runnable second) + { + return doAfter(getTaskExecutor().getExecutor(), first, second); + } + + protected static final ChainedListenableFuture doAfter(Executor executor, ListenableFuture<Void> first, final Runnable second) + { + final ChainedSettableFuture returnVal = new ChainedSettableFuture(executor); + Futures.addCallback(first, new FutureCallback<Void>() + { + @Override + public void onSuccess(final Void result) + { + try + { + second.run(); + returnVal.set(null); + } + catch(Throwable e) + { + returnVal.setException(e); + } + } + + @Override + public void onFailure(final Throwable t) + { + returnVal.setException(t); + } + }, executor); + + return returnVal; + } + + public static interface ChainedListenableFuture extends ListenableFuture<Void> + { + ChainedListenableFuture then(Runnable r); + ChainedListenableFuture then(Callable<ListenableFuture<Void>> r); + } + + public static class ChainedSettableFuture extends AbstractFuture<Void> implements ChainedListenableFuture + { + private final Executor _exector; + + public ChainedSettableFuture(final Executor executor) + { + _exector = executor; + } + + @Override + public boolean set(Void value) + { + return super.set(value); + } + + @Override + public boolean setException(Throwable throwable) + { + return super.setException(throwable); + } + + @Override + public ChainedListenableFuture then(final Runnable r) + { + return doAfter(_exector, this, r); + } + + @Override + public ChainedListenableFuture then(final Callable<ListenableFuture<Void>> r) + { + return doAfter(_exector, this,r); + } + } + + protected final ChainedListenableFuture doAfter(ListenableFuture<Void> first, final Callable<ListenableFuture<Void>> second) + { + return doAfter(getTaskExecutor().getExecutor(), first, second); + } + + protected static final ChainedListenableFuture doAfter(final Executor executor, ListenableFuture<Void> first, final Callable<ListenableFuture<Void>> second) + { + final ChainedSettableFuture returnVal = new ChainedSettableFuture(executor); + Futures.addCallback(first, new FutureCallback<Void>() + { + @Override + public void onSuccess(final Void result) + { + try + { + final ListenableFuture<Void> future = second.call(); + Futures.addCallback(future, new FutureCallback<Void>() + { + @Override + public void onSuccess(final Void result) + { + returnVal.set(null); + } + + @Override + public void onFailure(final Throwable t) + { + returnVal.setException(t); + } + }, executor); + + } + catch(Throwable e) + { + returnVal.setException(e); + } + } + + @Override + public void onFailure(final Throwable t) + { + returnVal.setException(t); + } + }, executor); + + return returnVal; + } @Override - public void setAttributes(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException + public ListenableFuture<Void> setAttributesAsync(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException { + final Map<String,Object> updateAttributes = new HashMap<>(attributes); + Object desiredState = updateAttributes.remove(ConfiguredObject.DESIRED_STATE); runTask(new VoidTask() { @Override public void execute() { authoriseSetAttributes(createProxyForValidation(attributes), attributes.keySet()); - changeAttributes(attributes); + validateChange(createProxyForValidation(attributes), attributes.keySet()); + + changeAttributes(updateAttributes); } }); + if(desiredState != null) + { + State state; + if(desiredState instanceof State) + { + state = (State)desiredState; + } + else if(desiredState instanceof String) + { + state = State.valueOf((String)desiredState); + } + else + { + throw new IllegalArgumentException("Cannot convert an object of type " + desiredState.getClass().getName() + " to a State"); + } + return setDesiredState(state); + } + else + { + return Futures.immediateFuture(null); + } } protected void changeAttributes(final Map<String, Object> attributes) { - validateChange(createProxyForValidation(attributes), attributes.keySet()); Collection<String> names = getAttributeNames(); for (String name : names) { @@ -1938,6 +2410,74 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } } + private static class CloseResult implements FutureResult + { + private volatile FutureResult _childFutureResult; + + @Override + public boolean isComplete() + { + return _childFutureResult != null && _childFutureResult.isComplete(); + } + + @Override + public void waitForCompletion() + { + synchronized (this) + { + while (_childFutureResult == null) + { + try + { + wait(); + } + catch (InterruptedException e) + { + + } + } + } + _childFutureResult.waitForCompletion(); + + } + + @Override + public void waitForCompletion(final long timeout) throws TimeoutException + { + long startTime = System.currentTimeMillis(); + long remaining = timeout; + + synchronized (this) + { + while (_childFutureResult == null && remaining > 0) + { + try + { + wait(remaining); + } + catch (InterruptedException e) + { + + } + remaining = startTime + timeout - System.currentTimeMillis(); + + if(remaining <= 0) + { + throw new TimeoutException("Completion did not occur within given timeout: " + timeout); + } + } + } + _childFutureResult.waitForCompletion(remaining); + + } + + public synchronized void setChildFutureResult(final FutureResult childFutureResult) + { + _childFutureResult = childFutureResult; + notifyAll(); + } + } + private static class AttributeGettingHandler implements InvocationHandler { @@ -2127,7 +2667,8 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im { if (source.getState() != State.DELETED) { - source.delete(); + // TODO - RG - This isn't right :-( + source.deleteAsync(); } } finally diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java index 5bf5e337ad..f97d2dfe14 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java @@ -23,6 +23,10 @@ package org.apache.qpid.server.model; import java.util.HashMap; import java.util.Map; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; + import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; import org.apache.qpid.server.store.ConfiguredObjectDependency; import org.apache.qpid.server.store.ConfiguredObjectRecord; @@ -59,6 +63,26 @@ abstract public class AbstractConfiguredObjectTypeFactory<X extends AbstractConf return instance; } + + @Override + public ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory, + final Map<String, Object> attributes, + final ConfiguredObject<?>... parents) + { + final SettableFuture<X> returnVal = SettableFuture.create(); + final X instance = createInstance(attributes, parents); + final ListenableFuture<Void> createFuture = instance.createAsync(); + createFuture.addListener(new Runnable() + { + @Override + public void run() + { + returnVal.set(instance); + } + }, MoreExecutors.sameThreadExecutor()); + return returnVal; + } + protected abstract X createInstance(Map<String, Object> attributes, ConfiguredObject<?>... parents); public final <C extends ConfiguredObject<?>> C getParent(Class<C> parentClass, ConfiguredObject<?>... parents) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java index b421c5aaf1..c6ac7d4073 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java @@ -31,6 +31,9 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; + import org.apache.qpid.common.QpidProperties; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.configuration.store.ManagementModeStoreHandler; @@ -194,11 +197,11 @@ public abstract class AbstractSystemConfig<X extends SystemConfig<X>> } @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE) - protected void activate() + protected ListenableFuture<Void> activate() { final EventLogger eventLogger = _eventLogger; - EventLogger startupLogger; + final EventLogger startupLogger; if (isStartupLoggedToSystemOut()) { //Create the composite (logging+SystemOut MessageLogger to be used during startup @@ -232,17 +235,34 @@ public abstract class AbstractSystemConfig<X extends SystemConfig<X>> BrokerStoreUpgraderAndRecoverer upgrader = new BrokerStoreUpgraderAndRecoverer(this); upgrader.perform(); - Broker broker = getBroker(); + final Broker broker = getBroker(); broker.setEventLogger(startupLogger); - broker.open(); - - if (broker.getState() == State.ACTIVE) - { - startupLogger.message(BrokerMessages.READY()); - broker.setEventLogger(eventLogger); - } - + final SettableFuture<Void> returnVal = SettableFuture.create(); + broker.openAsync().addListener( + new Runnable() + { + @Override + public void run() + { + try + { + + if (broker.getState() == State.ACTIVE) + { + startupLogger.message(BrokerMessages.READY()); + broker.setEventLogger(eventLogger); + } + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor() + ); + + return returnVal; } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java index 944ed97ccc..c56698c60c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java @@ -45,5 +45,4 @@ public interface Binding<X extends Binding<X>> extends ConfiguredObject<X> @ManagedStatistic long getMatches(); - void delete(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java new file mode 100644 index 0000000000..5e9d794e14 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.server.model; + + +public interface CloseFuture +{ + public void runWhenComplete(final Runnable closeRunnable); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java index 2d60879861..d2ab317f0e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java @@ -26,6 +26,8 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.store.ConfiguredObjectRecord; @@ -236,6 +238,8 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>> ConfiguredObject... otherParents); void setAttributes(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException; + ListenableFuture<Void> setAttributesAsync(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException; + Class<? extends ConfiguredObject> getCategoryClass(); Class<? extends ConfiguredObject> getTypeClass(); @@ -248,8 +252,12 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>> ConfiguredObjectRecord asObjectRecord(); void open(); + ListenableFuture<Void> openAsync(); void close(); + ListenableFuture<Void> closeAsync(); + + ListenableFuture<Void> deleteAsync(); TaskExecutor getTaskExecutor(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java index 7d4023862b..ed7c841344 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.model; import java.util.Collection; import java.util.Map; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.UnresolvedConfiguredObject; @@ -34,6 +36,8 @@ public interface ConfiguredObjectFactory <X extends ConfiguredObject<X>> X create(Class<X> clazz, Map<String, Object> attributes, ConfiguredObject<?>... parents); + <X extends ConfiguredObject<X>> ListenableFuture<X> createAsync(Class<X> clazz, Map<String, Object> attributes, ConfiguredObject<?>... parents); + <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(Class<X> categoryClass, diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java index 5026df0e19..82da0fd206 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java @@ -26,6 +26,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; import org.apache.qpid.server.plugin.QpidServiceLoader; import org.apache.qpid.server.store.ConfiguredObjectRecord; @@ -112,6 +114,18 @@ public class ConfiguredObjectFactoryImpl implements ConfiguredObjectFactory return factory.create(this, attributes, parents); } + + @Override + public <X extends ConfiguredObject<X>> ListenableFuture<X> createAsync(Class<X> clazz, + final Map<String, Object> attributes, + final ConfiguredObject<?>... parents) + { + ConfiguredObjectTypeFactory<X> factory = getConfiguredObjectTypeFactory(clazz, attributes); + + return factory.createAsync(this, attributes, parents); + } + + @Override public <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(final Class<X> categoryClass, Map<String, Object> attributes) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java index d0c6fb041e..a93e6a602f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java @@ -40,6 +40,7 @@ import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.plugin.ConfiguredObjectRegistration; @@ -801,20 +802,37 @@ public class ConfiguredObjectTypeRegistry { if(m.isAnnotationPresent(StateTransition.class)) { - if(m.getParameterTypes().length == 0) + if(ListenableFuture.class.isAssignableFrom(m.getReturnType())) { - m.setAccessible(true); - StateTransition annotation = m.getAnnotation(StateTransition.class); + if (m.getParameterTypes().length == 0) + { + m.setAccessible(true); + StateTransition annotation = m.getAnnotation(StateTransition.class); + + for (State state : annotation.currentState()) + { + addStateTransition(state, annotation.desiredState(), m, map); + } - for(State state : annotation.currentState()) + } + else { - addStateTransition(state, annotation.desiredState(), m, map); + throw new ServerScopedRuntimeException( + "A state transition method must have no arguments. Method " + + m.getName() + + " on " + + clazz.getName() + + " does not meet this criteria."); } - } else { - throw new ServerScopedRuntimeException("A state transition method must have no arguments. Method " + m.getName() + " on " + clazz.getName() + " does not meet this criteria."); + throw new ServerScopedRuntimeException( + "A state transition method must return a ListenableFuture. Method " + + m.getName() + + " on " + + clazz.getName() + + " does not meet this criteria."); } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java index b28441438d..1c245363a5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java @@ -103,7 +103,6 @@ public interface Connection<X extends Connection<X>> extends ConfiguredObject<X> //children Collection<Session> getSessions(); - void delete(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java index 7318a58640..999a3594b4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.model; import java.util.Collection; import java.util.Set; +import com.google.common.util.concurrent.ListenableFuture; + @ManagedObject public interface Port<X extends Port<X>> extends ConfiguredObject<X> { @@ -76,4 +78,6 @@ public interface Port<X extends Port<X>> extends ConfiguredObject<X> void start(); + ListenableFuture<Void> startAsync(); + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java index cc758ba7c9..c2338c08d8 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -147,8 +147,6 @@ public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>, void stop(); - void delete(); - String getRedirectHost(AmqpPort<?> port); public static interface Transaction diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java index be1d6ebf59..036c4d7716 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java @@ -35,6 +35,9 @@ import java.util.regex.Pattern; import javax.security.auth.Subject; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.common.QpidProperties; @@ -234,13 +237,40 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple } @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.ACTIVE ) - private void activate() + private ListenableFuture<Void> activate() { if(_parent.isManagementMode()) { - _managementModeAuthenticationProvider.open(); + final SettableFuture<Void> returnVal = SettableFuture.create(); + + _managementModeAuthenticationProvider.openAsync().addListener( + new Runnable() + { + @Override + public void run() + { + try + { + activateWithoutManagementMode(); + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor() + ); + return returnVal; } + else + { + activateWithoutManagementMode(); + return Futures.immediateFuture(null); + } + } + private void activateWithoutManagementMode() + { boolean hasBrokerAnyErroredChildren = false; for (final Class<? extends ConfiguredObject> childClass : getModel().getChildTypes(getCategoryClass())) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java index e03904789d..8bcbba9ac4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java @@ -27,10 +27,13 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.model.AbstractConfiguredObject; +import org.apache.qpid.server.model.CloseFuture; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Connection; import org.apache.qpid.server.model.Port; @@ -51,6 +54,7 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection private final Action _underlyingConnectionDeleteTask; private final AtomicBoolean _underlyingClosed = new AtomicBoolean(false); private AMQConnectionModel _underlyingConnection; + private final AtomicBoolean _closing = new AtomicBoolean(); public ConnectionAdapter(final AMQConnectionModel conn) { @@ -156,17 +160,59 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection } @StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { - closeUnderlyingConnection(); - deleted(); - setState(State.DELETED); + final SettableFuture<Void> returnVal = SettableFuture.create(); + asyncClose().addListener( + new Runnable() + { + @Override + public void run() + { + try + { + deleted(); + setState(State.DELETED); + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor() + ); + return returnVal; + } + + @Override + protected ListenableFuture<Void> beforeClose() + { + _closing.set(true); + + return asyncClose(); + + } + + private ListenableFuture<Void> asyncClose() + { + final SettableFuture<Void> closeFuture = SettableFuture.create(); + + _underlyingConnection.addDeleteTask(new Action() + { + @Override + public void performAction(final Object object) + { + closeFuture.set(null); + } + }); + + _underlyingConnection.closeAsync(AMQConstant.CONNECTION_FORCED, "Connection closed by external action"); + return closeFuture; } @Override protected void onClose() { - closeUnderlyingConnection(); } @Override @@ -233,23 +279,54 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection // SessionAdapter installs delete task to cause session model object to delete } - private void closeUnderlyingConnection() + + private static class ConnectionCloseFuture implements CloseFuture { - if (_underlyingClosed.compareAndSet(false, true)) + private boolean _closed; + + public synchronized void connectionClosed() { - _underlyingConnection.removeDeleteTask(_underlyingConnectionDeleteTask); - try + _closed = true; + notifyAll(); + + } + + @Override + public void runWhenComplete(final Runnable closeRunnable) + { + if (_closed ) { - _underlyingConnection.close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action"); + closeRunnable.run(); } - catch (Exception e) + else { - LOGGER.warn("Exception closing connection " - + _underlyingConnection.getConnectionId() - + " from " - + _underlyingConnection.getRemoteAddressString(), e); - } + Thread t = new Thread(new Runnable() + { + @Override + public void run() + { + synchronized (ConnectionCloseFuture.this) + { + while (!_closed) + { + try + { + ConnectionCloseFuture.this.wait(); + } + catch (InterruptedException e) + { + } + } + + closeRunnable.run(); + } + } + }); + + t.setDaemon(true); + t.start(); + } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java index fda8a6f2e9..1a119be32d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java @@ -31,6 +31,9 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.BrokerProperties; @@ -145,7 +148,8 @@ public class FileBasedGroupProviderImpl GroupAdapter groupAdapter = new GroupAdapter(attrMap); principals.add(groupAdapter); groupAdapter.registerWithParents(); - groupAdapter.open(); + // TODO - we know this is safe, but the sync method shouldn't really be called from the management thread + groupAdapter.openAsync(); } } @@ -261,7 +265,7 @@ public class FileBasedGroupProviderImpl } @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE ) - private void activate() + private ListenableFuture<Void> activate() { if (_groupDatabase != null) { @@ -278,29 +282,48 @@ public class FileBasedGroupProviderImpl throw new IllegalConfigurationException(String.format("Cannot load groups from '%s'", getPath())); } } + return Futures.immediateFuture(null); } @StateTransition( currentState = { State.QUIESCED, State.ACTIVE, State.ERRORED}, desiredState = State.DELETED ) - private void doDelete() + private ListenableFuture<Void> doDelete() { - close(); - File file = new File(getPath()); - if (file.exists()) - { - if (!file.delete()) - { - throw new IllegalConfigurationException("Cannot delete group file"); - } - } - - deleted(); - setState(State.DELETED); + final SettableFuture<Void> returnVal = SettableFuture.create(); + closeAsync().addListener( + new Runnable() + { + @Override + public void run() + { + try + { + File file = new File(getPath()); + if (file.exists()) + { + if (!file.delete()) + { + throw new IllegalConfigurationException("Cannot delete group file"); + } + } + + deleted(); + setState(State.DELETED); + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor() + ); + return returnVal; } @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED) - private void startQuiesced() + private ListenableFuture<Void> startQuiesced() { setState(State.QUIESCED); + return Futures.immediateFuture(null); } public Set<Principal> getGroupPrincipalsForUser(String username) @@ -352,9 +375,10 @@ public class FileBasedGroupProviderImpl } @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.ACTIVE ) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @Override @@ -371,7 +395,8 @@ public class FileBasedGroupProviderImpl attrMap.put(GroupMember.NAME, principal.getName()); GroupMemberAdapter groupMemberAdapter = new GroupMemberAdapter(attrMap); groupMemberAdapter.registerWithParents(); - groupMemberAdapter.open(); + // todo - this will be safe, but the synchronous open should not be called from the management thread + groupMemberAdapter.openAsync(); members.add(groupMemberAdapter); } _groupPrincipal = new GroupPrincipal(getName()); @@ -432,11 +457,12 @@ public class FileBasedGroupProviderImpl } @StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED ) - private void doDelete() + private ListenableFuture<Void> doDelete() { _groupDatabase.removeGroup(getName()); deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } @Override @@ -494,17 +520,19 @@ public class FileBasedGroupProviderImpl } @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { _groupDatabase.removeUserFromGroup(getName(), GroupAdapter.this.getName()); deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java index 2b77b0d2a9..500df8cb87 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java @@ -37,16 +37,17 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; -import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.util.BaseAction; -import org.apache.qpid.server.util.FileHelper; import org.codehaus.jackson.JsonParser; import org.codehaus.jackson.JsonProcessingException; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializationConfig; import org.codehaus.jackson.type.TypeReference; +import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.AuthenticationProvider; @@ -55,6 +56,8 @@ import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.StateTransition; +import org.apache.qpid.server.util.BaseAction; +import org.apache.qpid.server.util.FileHelper; public class FileSystemPreferencesProviderImpl @@ -128,7 +131,7 @@ public class FileSystemPreferencesProviderImpl } @StateTransition( currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE ) - private void activate() + private ListenableFuture<Void> activate() { if (_store != null) { @@ -138,6 +141,7 @@ public class FileSystemPreferencesProviderImpl { throw new IllegalStateException("Cannot open preferences provider " + getName() + " in state " + getState() ); } + return Futures.immediateFuture(null); } @Override @@ -171,33 +175,52 @@ public class FileSystemPreferencesProviderImpl } @StateTransition(currentState = { State.ACTIVE }, desiredState = State.QUIESCED) - private void doQuiesce() + private ListenableFuture<Void> doQuiesce() { if(_store != null) { _store.close(); } setState(State.QUIESCED); + return Futures.immediateFuture(null); } @StateTransition(currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED }, desiredState = State.DELETED ) - private void doDelete() + private ListenableFuture<Void> doDelete() { - close(); + final SettableFuture<Void> returnVal = SettableFuture.create(); + closeAsync().addListener( + new Runnable() + { + @Override + public void run() + { + try + { + if(_store != null) + { + _store.close(); + _store.delete(); + deleted(); + _authenticationProvider.setPreferencesProvider(null); + + } + setState(State.DELETED); + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor() + ); - if(_store != null) - { - _store.close(); - _store.delete(); - deleted(); - _authenticationProvider.setPreferencesProvider(null); + return returnVal; - } - setState(State.DELETED); } @StateTransition(currentState = State.QUIESCED, desiredState = State.ACTIVE ) - private void restart() + private ListenableFuture<Void> restart() { if (_store == null) { @@ -206,6 +229,7 @@ public class FileSystemPreferencesProviderImpl _store.open(); setState(State.ACTIVE); + return Futures.immediateFuture(null); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java index 7c9b439e93..cb412e8d41 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java @@ -26,6 +26,9 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Consumer; @@ -169,10 +172,11 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl } @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java index 791bbe4dd3..0e6f18a70a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java @@ -27,6 +27,9 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -226,14 +229,24 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo } @StateTransition(currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED ) - private void doDelete() + private ListenableFuture<Void> doDelete() { - close(); - setState(State.DELETED); + final SettableFuture<Void> returnVal = SettableFuture.create(); + closeAsync().addListener(new Runnable() + { + @Override + public void run() + { + setState(State.DELETED); + returnVal.set(null); + + } + }, getTaskExecutor().getExecutor()); + return returnVal; } @StateTransition( currentState = {State.UNINITIALIZED, State.QUIESCED, State.ERRORED}, desiredState = State.ACTIVE ) - protected void activate() + protected ListenableFuture<Void> activate() { try { @@ -244,12 +257,14 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo setState(State.ERRORED); throw new IllegalConfigurationException("Unable to active port '" + getName() + "'of type " + getType() + " on " + getPort(), e); } + return Futures.immediateFuture(null); } @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED) - private void startQuiesced() + private ListenableFuture<Void> startQuiesced() { setState(State.QUIESCED); + return Futures.immediateFuture(null); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java index 43cb5f0c62..350f137a04 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java @@ -40,6 +40,8 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.codehaus.jackson.map.ObjectMapper; import org.apache.qpid.server.configuration.BrokerProperties; @@ -118,6 +120,8 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider< private final Broker<?> _broker; private AcceptingTransport _transport; + private final AtomicBoolean _closing = new AtomicBoolean(); + private final SettableFuture _noConnectionsRemain = SettableFuture.create(); @ManagedObjectFactoryConstructor public AmqpPortImpl(Map<String, Object> attributes, Broker<?> broker) @@ -254,6 +258,19 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider< } @Override + protected ListenableFuture<Void> beforeClose() + { + _closing.set(true); + + if (_connectionCount.get() == 0) + { + _noConnectionsRemain.set(null); + } + + return _noConnectionsRemain; + } + + @Override protected void onClose() { if (_transport != null) @@ -262,6 +279,8 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider< { _broker.getEventLogger().message(BrokerMessages.SHUTTING_DOWN(String.valueOf(transport), getPort())); } + + _transport.close(); } } @@ -500,6 +519,11 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider< _connectionCountWarningGiven.compareAndSet(true,false); } + if (_closing.get() && _connectionCount.get() == 0) + { + _noConnectionsRemain.set(null); + } + return openConnections; } @@ -511,7 +535,7 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider< @Override public boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress) { - return _maxOpenConnections < 0 || _connectionCount.get() < _maxOpenConnections; + return !_closing.get() && ( _maxOpenConnections < 0 || _connectionCount.get() < _maxOpenConnections ); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java index 870621f292..5c3000db4a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.model.port; import java.util.Map; import java.util.Set; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; @@ -108,6 +110,14 @@ public class PortFactory<X extends Port<X>> implements ConfiguredObjectTypeFacto } @Override + public ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory, + final Map<String, Object> attributes, + final ConfiguredObject<?>... parents) + { + return getPortFactory(factory, attributes, (Broker<?>)parents[0]).createAsync(factory, attributes,parents); + } + + @Override public UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory factory, final ConfiguredObjectRecord record, final ConfiguredObject<?>... parents) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java index 0d16b4ffc7..cd0187034e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java @@ -20,19 +20,23 @@ */ package org.apache.qpid.server.plugin; +import java.util.Map; + +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.UnresolvedConfiguredObject; -import java.util.Map; - public interface ConfiguredObjectTypeFactory<X extends ConfiguredObject<X>> extends Pluggable { Class<? super X> getCategoryClass(); X create(final ConfiguredObjectFactory factory, Map<String, Object> attributes, ConfiguredObject<?>... parents); + ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory, Map<String, Object> attributes, ConfiguredObject<?>... parents); + UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory factory, ConfiguredObjectRecord record, ConfiguredObject<?>... parents); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java index 6e1b6529d8..6100a2eb80 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java @@ -19,7 +19,7 @@ package org.apache.qpid.server.plugin;/* * */ -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java index 26e8271d14..95b9bf8970 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java @@ -40,7 +40,7 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends * @param cause * @param message */ - public void close(AMQConstant cause, String message); + public void closeAsync(AMQConstant cause, String message); public void block(); @@ -53,7 +53,7 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends * @param cause * @param message */ - public void closeSession(S session, AMQConstant cause, String message); + public void closeSessionAsync(S session, AMQConstant cause, String message); public long getConnectionId(); @@ -107,4 +107,8 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends void removeSessionListener(SessionModelListener listener); + void notifyWork(); + + boolean isMessageAssignmentSuspended(); + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index f13af479ad..3731ae262b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -113,4 +113,8 @@ public interface AMQSessionModel<T extends AMQSessionModel<T,C>, C extends AMQCo * @return the time of the last activity or 0 if not in a transaction */ long getTransactionUpdateTime(); + + void transportStateChanged(); + + void processPending(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index 49c0812f4a..2ccf595c26 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -24,40 +24,31 @@ package org.apache.qpid.server.protocol; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.security.Principal; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLPeerUnverifiedException; import javax.security.auth.Subject; import org.apache.log4j.Logger; -import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.plugin.ProtocolEngineCreator; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.security.SSLStatus; -import org.apache.qpid.transport.network.security.ssl.SSLBufferingSender; -import org.apache.qpid.transport.network.security.ssl.SSLReceiver; -import org.apache.qpid.transport.network.security.ssl.SSLUtil; public class MultiVersionProtocolEngine implements ServerProtocolEngine { private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class); private final long _id; - private final SSLContext _sslContext; - private final boolean _wantClientAuth; - private final boolean _needClientAuth; private final AmqpPort<?> _port; - private final Transport _transport; + private Transport _transport; private final ProtocolEngineCreator[] _creators; private final Runnable _onCloseTask; @@ -65,15 +56,13 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine private String _fqdn; private final Broker<?> _broker; private NetworkConnection _network; - private Sender<ByteBuffer> _sender; + private ByteBufferSender _sender; private final Protocol _defaultSupportedReply; private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine(); + private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>(); public MultiVersionProtocolEngine(final Broker<?> broker, - SSLContext sslContext, - boolean wantClientAuth, - boolean needClientAuth, final Set<Protocol> supported, final Protocol defaultSupportedReply, AmqpPort<?> port, @@ -92,15 +81,23 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine _broker = broker; _supported = supported; _defaultSupportedReply = defaultSupportedReply; - _sslContext = sslContext; - _wantClientAuth = wantClientAuth; - _needClientAuth = needClientAuth; _port = port; _transport = transport; _creators = creators; _onCloseTask = onCloseTask; } + @Override + public void setMessageAssignmentSuspended(final boolean value) + { + _delegate.setMessageAssignmentSuspended(value); + } + + @Override + public boolean isMessageAssignmentSuspended() + { + return _delegate.isMessageAssignmentSuspended(); + } public SocketAddress getRemoteAddress() { @@ -147,6 +144,12 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine _delegate.readerIdle(); } + @Override + public void encryptedTransport() + { + _delegate.encryptedTransport(); + } + public void received(ByteBuffer msg) { @@ -169,9 +172,21 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine return _delegate.getSubject(); } + @Override + public boolean isTransportBlockedForWriting() + { + return _delegate.isTransportBlockedForWriting(); + } + + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + _delegate.setTransportBlockedForWriting(blocked); + } + private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8; - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) + public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender) { _network = network; SocketAddress address = _network.getLocalAddress(); @@ -198,10 +213,82 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine return _delegate.getLastWriteTime(); } + @Override + public void processPending() + { + _delegate.processPending(); + } + + @Override + public boolean hasWork() + { + return _delegate.hasWork(); + } + + @Override + public void notifyWork() + { + _delegate.notifyWork(); + } + @Override + public void setWorkListener(final Action<ServerProtocolEngine> listener) + { + _workListener.set(listener); + _delegate.setWorkListener(listener); + } + + @Override + public void clearWork() + { + _delegate.clearWork(); + } private class ClosedDelegateProtocolEngine implements ServerProtocolEngine { + + @Override + public void setMessageAssignmentSuspended(final boolean value) + { + + } + + @Override + public boolean isMessageAssignmentSuspended() + { + return false; + } + + @Override + public void processPending() + { + + } + + @Override + public boolean hasWork() + { + return false; + } + + @Override + public void notifyWork() + { + + } + + @Override + public void setWorkListener(final Action<ServerProtocolEngine> listener) + { + + } + + @Override + public void clearWork() + { + + } + public SocketAddress getRemoteAddress() { return _network.getRemoteAddress(); @@ -247,7 +334,13 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) + @Override + public void encryptedTransport() + { + + } + + public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender) { } @@ -274,12 +367,24 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine { return new Subject(); } + + @Override + public boolean isTransportBlockedForWriting() + { + return false; + } + + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + } } private class SelfDelegateProtocolEngine implements ServerProtocolEngine { private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES); - private long _lastReadTime; + private long _lastReadTime = System.currentTimeMillis(); + private final AtomicBoolean _hasWork = new AtomicBoolean(); public SocketAddress getRemoteAddress() { @@ -301,6 +406,47 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine return 0; } + @Override + public void setMessageAssignmentSuspended(final boolean value) + { + } + + @Override + public boolean isMessageAssignmentSuspended() + { + return false; + } + + @Override + public void processPending() + { + + } + + @Override + public boolean hasWork() + { + return _hasWork.get(); + } + + @Override + public void notifyWork() + { + _hasWork.set(true); + } + + @Override + public void setWorkListener(final Action<ServerProtocolEngine> listener) + { + + } + + @Override + public void clearWork() + { + _hasWork.set(false); + } + public void received(ByteBuffer msg) { _lastReadTime = System.currentTimeMillis(); @@ -360,15 +506,6 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } } - - if(newDelegate == null && looksLikeSSL(headerBytes)) - { - if(_sslContext != null) - { - newDelegate = new SslDelegateProtocolEngine(); - } - } - // If no delegate is found then send back a supported protocol version id if(newDelegate == null) { @@ -398,8 +535,13 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } else { + boolean hasWork = _delegate.hasWork(); + if (hasWork) + { + newDelegate.notifyWork(); + } _delegate = newDelegate; - + _delegate.setWorkListener(_workListener.get()); _header.flip(); _delegate.received(_header); if(msg.hasRemaining()) @@ -423,6 +565,17 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine return _delegate.getSubject(); } + @Override + public boolean isTransportBlockedForWriting() + { + return false; + } + + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + } + public void exception(Throwable t) { _logger.error("Error establishing session", t); @@ -466,132 +619,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine _network.close(); } - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) - { - - } - - @Override - public long getLastReadTime() - { - return _lastReadTime; - } - @Override - public long getLastWriteTime() + public void encryptedTransport() { - return 0; - } - } - - private class SslDelegateProtocolEngine implements ServerProtocolEngine - { - private final MultiVersionProtocolEngine _decryptEngine; - private final SSLEngine _engine; - private final SSLReceiver _sslReceiver; - private final SSLBufferingSender _sslSender; - private long _lastReadTime; - - private SslDelegateProtocolEngine() - { - - _decryptEngine = new MultiVersionProtocolEngine(_broker, null, false, false, _supported, - _defaultSupportedReply, _port, Transport.SSL, _id, _creators, - null); - - _engine = _sslContext.createSSLEngine(); - _engine.setUseClientMode(false); - SSLUtil.removeSSLv3Support(_engine); - SSLUtil.updateEnabledCipherSuites(_engine, _port.getEnabledCipherSuites(), _port.getDisabledCipherSuites()); - - if(_needClientAuth) - { - _engine.setNeedClientAuth(true); - } - else if(_wantClientAuth) + if(_transport == Transport.TCP) { - _engine.setWantClientAuth(true); + _transport = Transport.SSL; } - - SSLStatus sslStatus = new SSLStatus(); - _sslReceiver = new SSLReceiver(_engine,_decryptEngine,sslStatus); - _sslSender = new SSLBufferingSender(_engine,_sender,sslStatus); - _decryptEngine.setNetworkConnection(new SSLNetworkConnection(_engine,_network, _sslSender), _sslSender); - } - - @Override - public void received(ByteBuffer msg) - { - _lastReadTime = System.currentTimeMillis(); - _sslReceiver.received(msg); - _sslSender.send(); - _sslSender.flush(); - } - - @Override - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) - { - //TODO - Implement - } - - @Override - public SocketAddress getRemoteAddress() - { - return _decryptEngine.getRemoteAddress(); } - @Override - public SocketAddress getLocalAddress() + public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender) { - return _decryptEngine.getLocalAddress(); - } - @Override - public long getWrittenBytes() - { - return _decryptEngine.getWrittenBytes(); - } - - @Override - public long getReadBytes() - { - return _decryptEngine.getReadBytes(); - } - - @Override - public void closed() - { - _decryptEngine.closed(); - } - - @Override - public void writerIdle() - { - _decryptEngine.writerIdle(); - } - - @Override - public void readerIdle() - { - _decryptEngine.readerIdle(); - } - - @Override - public void exception(Throwable t) - { - _decryptEngine.exception(t); - } - - @Override - public long getConnectionId() - { - return _decryptEngine.getConnectionId(); - } - - @Override - public Subject getSubject() - { - return _decryptEngine.getSubject(); } @Override @@ -603,132 +642,9 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine @Override public long getLastWriteTime() { - return _decryptEngine.getLastWriteTime(); + return 0; } } - private boolean looksLikeSSL(byte[] headerBytes) - { - return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes); - } - - private boolean looksLikeSSLv3ClientHello(byte[] headerBytes) - { - return headerBytes[0] == 22 && // SSL Handshake - (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x - (headerBytes[2] == 0 || // SSL 3.0 - headerBytes[2] == 1 || // TLS 1.0 - headerBytes[2] == 2 || // TLS 1.1 - headerBytes[2] == 3)) && // TLS1.2 - (headerBytes[5] == 1); // client_hello - } - - private boolean looksLikeSSLv2ClientHello(byte[] headerBytes) - { - return headerBytes[0] == -128 && - headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x - (headerBytes[4] == 0 || // SSL 3.0 - headerBytes[4] == 1 || // TLS 1.0 - headerBytes[4] == 2 || // TLS 1.1 - headerBytes[4] == 3); - } - - - private static class SSLNetworkConnection implements NetworkConnection - { - private final NetworkConnection _network; - private final SSLBufferingSender _sslSender; - private final SSLEngine _engine; - private Principal _principal; - private boolean _principalChecked; - private final Object _lock = new Object(); - - public SSLNetworkConnection(SSLEngine engine, NetworkConnection network, - SSLBufferingSender sslSender) - { - _engine = engine; - _network = network; - _sslSender = sslSender; - - } - - @Override - public Sender<ByteBuffer> getSender() - { - return _sslSender; - } - - @Override - public void start() - { - _network.start(); - } - - @Override - public void close() - { - _sslSender.close(); - - _network.close(); - } - - @Override - public SocketAddress getRemoteAddress() - { - return _network.getRemoteAddress(); - } - - @Override - public SocketAddress getLocalAddress() - { - return _network.getLocalAddress(); - } - - @Override - public void setMaxWriteIdle(int sec) - { - _network.setMaxWriteIdle(sec); - } - @Override - public void setMaxReadIdle(int sec) - { - _network.setMaxReadIdle(sec); - } - - @Override - public Principal getPeerPrincipal() - { - synchronized (_lock) - { - if(!_principalChecked) - { - try - { - _principal = _engine.getSession().getPeerPrincipal(); - } - catch (SSLPeerUnverifiedException e) - { - _principal = null; - } - - _principalChecked = true; - } - - return _principal; - } - } - - @Override - public int getMaxReadIdle() - { - return _network.getMaxReadIdle(); - } - - @Override - public int getMaxWriteIdle() - { - return _network.getMaxWriteIdle(); - } - } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java index 5c704c5967..a51717e79e 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java @@ -27,10 +27,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; -import javax.net.ssl.SSLContext; - import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.logging.messages.PortMessages; import org.apache.qpid.server.logging.subjects.PortLogSubject; import org.apache.qpid.server.model.Broker; @@ -48,9 +45,6 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory private final Broker<?> _broker; private final Set<Protocol> _supported; private final Protocol _defaultSupportedReply; - private final SSLContext _sslContext; - private final boolean _wantClientAuth; - private final boolean _needClientAuth; private final AmqpPort<?> _port; private final Transport _transport; private final ProtocolEngineCreator[] _creators; @@ -58,9 +52,6 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory _connectionCountDecrementingTask = new ConnectionCountDecrementingTask(); public MultiVersionProtocolEngineFactory(Broker<?> broker, - SSLContext sslContext, - boolean wantClientAuth, - boolean needClientAuth, final Set<Protocol> supportedVersions, final Protocol defaultSupportedReply, AmqpPort<?> port, @@ -73,7 +64,6 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory } _broker = broker; - _sslContext = sslContext; _supported = supportedVersions; _defaultSupportedReply = defaultSupportedReply; final List<ProtocolEngineCreator> creators = new ArrayList<ProtocolEngineCreator>(); @@ -83,18 +73,16 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory } Collections.sort(creators, new ProtocolEngineCreatorComparator()); _creators = creators.toArray(new ProtocolEngineCreator[creators.size()]); - _wantClientAuth = wantClientAuth; - _needClientAuth = needClientAuth; _port = port; _transport = transport; } - public ServerProtocolEngine newProtocolEngine(final SocketAddress remoteSocketAddress) + public MultiVersionProtocolEngine newProtocolEngine(final SocketAddress remoteSocketAddress) { if(_port.canAcceptNewConnection(remoteSocketAddress)) { _port.incrementConnectionCount(); - return new MultiVersionProtocolEngine(_broker, _sslContext, _wantClientAuth, _needClientAuth, + return new MultiVersionProtocolEngine(_broker, _supported, _defaultSupportedReply, _port, _transport, ID_GENERATOR.getAndIncrement(), _creators, _connectionCountDecrementingTask); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java new file mode 100644 index 0000000000..eba1f78ad0 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +import javax.security.auth.Subject; + +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.server.util.Action; + +public interface ServerProtocolEngine extends ProtocolEngine +{ + /** + * Gets the connection ID associated with this ProtocolEngine + */ + long getConnectionId(); + + Subject getSubject(); + + boolean isTransportBlockedForWriting(); + + void setTransportBlockedForWriting(boolean blocked); + + void setMessageAssignmentSuspended(boolean value); + + boolean isMessageAssignmentSuspended(); + + void processPending(); + + boolean hasWork(); + + void clearWork(); + + void notifyWork(); + + void setWorkListener(Action<ServerProtocolEngine> listener); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 04d5fef462..2a51657f60 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -43,11 +43,15 @@ import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.log4j.Logger; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.configuration.updater.Task; +import org.apache.qpid.server.configuration.updater.TaskWithException; import org.apache.qpid.server.connection.SessionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; @@ -96,6 +100,7 @@ import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException; import org.apache.qpid.transport.TransportException; public abstract class AbstractQueue<X extends AbstractQueue<X>> @@ -642,16 +647,51 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> @Override - public synchronized QueueConsumerImpl addConsumer(final ConsumerTarget target, - FilterManager filters, + public QueueConsumerImpl addConsumer(final ConsumerTarget target, + final FilterManager filters, final Class<? extends ServerMessage> messageClass, final String consumerName, - EnumSet<ConsumerImpl.Option> optionSet) + final EnumSet<ConsumerImpl.Option> optionSet) throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, ConsumerAccessRefused { + try + { + return getTaskExecutor().run(new TaskWithException<QueueConsumerImpl, Exception>() + { + + @Override + public QueueConsumerImpl execute() + throws Exception + { + + return addConsumerInternal(target, filters, messageClass, consumerName, optionSet); + } + }); + } + catch (ExistingExclusiveConsumer | ConsumerAccessRefused | + ExistingConsumerPreventsExclusive | RuntimeException e) + { + throw e; + } + catch (Exception e) + { + // Should never happen + throw new ServerScopedRuntimeException(e); + } + + } + + private QueueConsumerImpl addConsumerInternal(final ConsumerTarget target, + FilterManager filters, + final Class<? extends ServerMessage> messageClass, + final String consumerName, + EnumSet<ConsumerImpl.Option> optionSet) + throws ExistingExclusiveConsumer, ConsumerAccessRefused, + ExistingConsumerPreventsExclusive + { if (hasExclusiveConsumer()) { throw new ExistingExclusiveConsumer(); @@ -763,7 +803,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> QueueConsumerImpl consumer = new QueueConsumerImpl(this, target, consumerName, - filters, + filters, messageClass, optionSet); @@ -812,19 +852,18 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> deliverAsync(); return consumer; - } @Override - protected void beforeClose() + protected ListenableFuture<Void> beforeClose() { _closing = true; - super.beforeClose(); + return super.beforeClose(); } - synchronized void unregisterConsumer(final QueueConsumerImpl consumer) + void unregisterConsumer(final QueueConsumerImpl consumer) { if (consumer == null) { @@ -835,7 +874,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> if (removed) { - consumer.close(); + consumer.closeAsync(); // No longer can the queue have an exclusive consumer setExclusiveSubscriber(null); @@ -1219,10 +1258,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> else { deliverMessage(sub, entry, false); - if(sub.acquires()) - { - entry.unlockAcquisition(); - } } } } @@ -1798,7 +1833,15 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> for (BindingImpl b : bindingCopy) { - b.delete(); + // TODO - RG - Need to sort out bindings! + if(getTaskExecutor().isTaskExecutorThread()) + { + b.deleteAsync(); + } + else + { + b.delete(); + } } QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator(); @@ -1851,7 +1894,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } _deleteTaskList.clear(); - close(); + closeAsync(); deleted(); //Log Queue Deletion getEventLogger().message(_logSubject, QueueMessages.DELETED()); @@ -2050,10 +2093,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> else { deliverMessage(sub, node, batch); - if(sub.acquires()) - { - node.unlockAcquisition(); - } } } @@ -2221,7 +2260,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> if (consumerDone) { sub.flushBatched(); - if (lastLoop && getNextAvailableEntry(sub) == null) + boolean noMore = getNextAvailableEntry(sub) == null; + if (lastLoop && noMore) { sub.queueEmpty(); } @@ -2595,7 +2635,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> { if (_virtualHost.getState() != State.ACTIVE) { - throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent"); + throw new VirtualHostUnavailableException(this._virtualHost); } if(!message.isReferenced(this)) @@ -2660,7 +2700,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> return allowed; } - private synchronized void updateExclusivityPolicy(ExclusivityPolicy desiredPolicy) + private void updateExclusivityPolicy(ExclusivityPolicy desiredPolicy) throws ExistingConsumerPreventsExclusive { if(desiredPolicy == null) @@ -2862,24 +2902,27 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> //============= @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.DELETED) - private void doDeleteBeforeInitialize() + private ListenableFuture<Void> doDeleteBeforeInitialize() { preSetAlternateExchange(); setState(State.DELETED); + return Futures.immediateFuture(null); } @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { _virtualHost.removeQueue(this); preSetAlternateExchange(); setState(State.DELETED); + return Futures.immediateFuture(null); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java index a2c275e797..c459737c46 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java @@ -52,5 +52,4 @@ public interface QueueConsumer<X extends QueueConsumer<X>> extends ConsumerImpl, QueueContext getQueueContext(); - ConsumerTarget getTarget(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java index 12ab353c8a..b409b638b3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java @@ -198,7 +198,7 @@ class QueueConsumerImpl if(newState == ConsumerTarget.State.CLOSED && oldState != newState && !_closed.get()) { - close(); + closeAsync(); } final StateChangeListener<? super QueueConsumerImpl, State> stateListener = getStateListener(); if(stateListener != null) @@ -323,6 +323,7 @@ class QueueConsumerImpl public final void flush() { _queue.flushConsumer(this); + _target.processPending(); } public boolean resend(final QueueEntry entry) @@ -514,6 +515,7 @@ class QueueConsumerImpl return _selector; } + @Override public String toLogString() { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java index 19265ef453..b9ff6505fc 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java @@ -22,6 +22,8 @@ package org.apache.qpid.server.queue; import java.util.Map; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.Port; @@ -49,6 +51,14 @@ public class QueueFactory<X extends Queue<X>> implements ConfiguredObjectTypeFa } @Override + public ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory, + final Map<String, Object> attributes, + final ConfiguredObject<?>... parents) + { + return getQueueFactory(factory, attributes).createAsync(factory, attributes, parents); + } + + @Override public UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory factory, final ConfiguredObjectRecord record, final ConfiguredObject<?>... parents) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java index bf648186d2..aedfb3670a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java @@ -37,6 +37,9 @@ import java.util.Set; import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.Broker; @@ -96,7 +99,7 @@ public class FileKeyStoreImpl extends AbstractConfiguredObject<FileKeyStoreImpl> } @StateTransition(currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.DELETED) - protected void doDelete() + protected ListenableFuture<Void> doDelete() { // verify that it is not in use String storeName = getName(); @@ -111,12 +114,14 @@ public class FileKeyStoreImpl extends AbstractConfiguredObject<FileKeyStoreImpl> } deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE) - protected void doActivate() + protected ListenableFuture<Void> doActivate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java index b53dcf9ea1..ce5a394591 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java @@ -37,6 +37,9 @@ import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.X509TrustManager; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.AuthenticationProvider; @@ -96,7 +99,7 @@ public class FileTrustStoreImpl extends AbstractConfiguredObject<FileTrustStoreI } @StateTransition(currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.DELETED) - protected void doDelete() + protected ListenableFuture<Void> doDelete() { // verify that it is not in use String storeName = getName(); @@ -137,12 +140,14 @@ public class FileTrustStoreImpl extends AbstractConfiguredObject<FileTrustStoreI } deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE) - protected void doActivate() + protected ListenableFuture<Void> doActivate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java index 0c68cb467e..ea6a0394f0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java @@ -56,6 +56,8 @@ import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; import javax.xml.bind.DatatypeConverter; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -181,7 +183,7 @@ public class NonJavaKeyStoreImpl extends AbstractConfiguredObject<NonJavaKeyStor } @StateTransition(currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.DELETED) - protected void doDelete() + protected ListenableFuture<Void> doDelete() { // verify that it is not in use String storeName = getName(); @@ -199,12 +201,14 @@ public class NonJavaKeyStoreImpl extends AbstractConfiguredObject<NonJavaKeyStor } deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE) - protected void doActivate() + protected ListenableFuture<Void> doActivate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java index bd46b76a66..c18189785d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java @@ -44,6 +44,8 @@ import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import javax.security.auth.x500.X500Principal; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -169,7 +171,7 @@ public class NonJavaTrustStoreImpl } @StateTransition(currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.DELETED) - protected void doDelete() + protected ListenableFuture<Void> doDelete() { // verify that it is not in use String storeName = getName(); @@ -212,12 +214,14 @@ public class NonJavaTrustStoreImpl } deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE) - protected void doActivate() + protected ListenableFuture<Void> doActivate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java index e0eb083bd4..3bd44a92ea 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java @@ -32,6 +32,7 @@ import java.security.AccessController; import java.security.Principal; import java.util.Collection; import java.util.Collections; +import java.util.EnumSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java index 88a761fe19..255457a846 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java @@ -27,6 +27,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -151,13 +154,14 @@ public abstract class AbstractAuthenticationManager<T extends AbstractAuthentica } @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED ) - protected void startQuiesced() + protected ListenableFuture<Void> startQuiesced() { setState(State.QUIESCED); + return Futures.immediateFuture(null); } @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.QUIESCED }, desiredState = State.ACTIVE ) - protected void activate() + protected ListenableFuture<Void> activate() { try { @@ -175,11 +179,11 @@ public abstract class AbstractAuthenticationManager<T extends AbstractAuthentica throw e; } } - + return Futures.immediateFuture(null); } @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED) - protected void doDelete() + protected ListenableFuture<Void> doDelete() { String providerName = getName(); @@ -195,15 +199,50 @@ public abstract class AbstractAuthenticationManager<T extends AbstractAuthentica } } - close(); - if (_preferencesProvider != null) - { - _preferencesProvider.delete(); - } - deleted(); + final SettableFuture<Void> returnVal = SettableFuture.create(); - setState(State.DELETED); + final ListenableFuture<Void> future = closeAsync(); + future.addListener(new Runnable() + { + @Override + public void run() + { + if (_preferencesProvider != null) + { + _preferencesProvider.deleteAsync().addListener(new Runnable() + { + @Override + public void run() + { + try + { + deleted(); + setState(State.DELETED); + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor()); + } + else + { + try + { + deleted(); + + setState(State.DELETED); + } + finally + { + returnVal.set(null); + } + } + } + }, getTaskExecutor().getExecutor()); + return returnVal; } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java index 7773d9e98d..50a2a36130 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java @@ -92,22 +92,15 @@ public abstract class ConfigModelPasswordManagingAuthenticationProvider<X extend @Override public void deleteUser(final String user) throws AccountNotFoundException { - runTask(new VoidTaskWithException<AccountNotFoundException>() + final ManagedUser authUser = getUser(user); + if(authUser != null) { - @Override - public void execute() throws AccountNotFoundException - { - final ManagedUser authUser = getUser(user); - if(authUser != null) - { - authUser.delete(); - } - else - { - throw new AccountNotFoundException("No such user: '" + user + "'"); - } - } - }); + authUser.delete(); + } + else + { + throw new AccountNotFoundException("No such user: '" + user + "'"); + } } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java index b317b93d71..db69445c41 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java @@ -27,6 +27,9 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.configuration.updater.VoidTask; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.ConfiguredObject; @@ -85,10 +88,11 @@ class ManagedUser extends AbstractConfiguredObject<ManagedUser> implements User< } @StateTransition(currentState = {State.ACTIVE}, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { _authenticationManager.getUserMap().remove(getName()); deleted(); + return Futures.immediateFuture(null); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java index 0fb8938233..0fcab33f5d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java @@ -40,6 +40,9 @@ import javax.security.auth.login.AccountNotFoundException; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.BrokerProperties; @@ -119,16 +122,9 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal super.onOpen(); _principalDatabase = createDatabase(); initialise(); - List<Principal> users = _principalDatabase == null ? Collections.<Principal>emptyList() : _principalDatabase.getUsers(); - for (Principal user : users) - { - PrincipalAdapter principalAdapter = new PrincipalAdapter(user); - principalAdapter.registerWithParents(); - principalAdapter.open(); - _userMap.put(user, principalAdapter); - } } + protected abstract PrincipalDatabase createDatabase(); @@ -217,9 +213,44 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal return _principalDatabase; } + @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE) + public ListenableFuture<Void> activate() + { + final SettableFuture<Void> returnVal = SettableFuture.create(); + final List<Principal> users = _principalDatabase == null ? Collections.<Principal>emptyList() : _principalDatabase.getUsers(); + _userMap.clear(); + if(!users.isEmpty()) + { + for (final Principal user : users) + { + final PrincipalAdapter principalAdapter = new PrincipalAdapter(user); + principalAdapter.registerWithParents(); + principalAdapter.openAsync().addListener(new Runnable() + { + @Override + public void run() + { + _userMap.put(user, principalAdapter); + if (_userMap.size() == users.size()) + { + setState(State.ACTIVE); + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor()); + + } - @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED) - public void doDelete() + return returnVal; + } + else + { + return Futures.immediateFuture(null); + } + } + + @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED, State.UNINITIALIZED}, desiredState = State.DELETED) + public ListenableFuture<Void> doDelete() { File file = new File(_path); if (file.exists() && file.isFile()) @@ -228,6 +259,7 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal } deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } @Override @@ -479,13 +511,14 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal } @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { try { @@ -503,7 +536,7 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal { LOGGER.warn("Failed to delete user " + _user, e); } - + return Futures.immediateFuture(null); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java index 98607d2490..96d32f4179 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java @@ -22,6 +22,9 @@ package org.apache.qpid.server.security.group; import java.util.Map; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Group; @@ -77,16 +80,18 @@ public class GroupImpl extends AbstractConfiguredObject<GroupImpl> implements Gr @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE ) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @StateTransition(currentState = {State.ACTIVE}, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { deleted(); + return Futures.immediateFuture(null); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java index ea17db6ce7..a86d380b2b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java @@ -23,6 +23,9 @@ package org.apache.qpid.server.security.group; import java.security.Principal; import java.util.Map; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.Group; import org.apache.qpid.server.model.GroupMember; @@ -61,15 +64,17 @@ public class GroupMemberImpl extends AbstractConfiguredObject<GroupMemberImpl> i @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE ) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @StateTransition(currentState = {State.ACTIVE}, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { deleted(); + return Futures.immediateFuture(null); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java index ecc166f8fc..7dc032cc90 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java @@ -26,6 +26,9 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; @@ -89,16 +92,18 @@ public class GroupProviderImpl extends AbstractConfiguredObject<GroupProviderImp } @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE ) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @StateTransition(currentState = {State.ACTIVE}, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { deleted(); + return Futures.immediateFuture(null); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index 4dfaa716cf..5868ae61c5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -45,6 +45,7 @@ import org.apache.qpid.server.plugin.MessageMetaDataType; import org.apache.qpid.server.store.handler.DistributedTransactionHandler; import org.apache.qpid.server.store.handler.MessageHandler; import org.apache.qpid.server.store.handler.MessageInstanceHandler; +import org.apache.qpid.server.util.FutureResult; public abstract class AbstractJDBCMessageStore implements MessageStore { @@ -834,10 +835,10 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } } - private StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws StoreException + private FutureResult commitTranAsync(ConnectionWrapper connWrapper) throws StoreException { commitTran(connWrapper); - return StoreFuture.IMMEDIATE_FUTURE; + return FutureResult.IMMEDIATE_FUTURE; } private void abortTran(ConnectionWrapper connWrapper) throws StoreException @@ -1231,14 +1232,14 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } @Override - public StoreFuture commitTranAsync() + public FutureResult commitTranAsync() { checkMessageStoreOpen(); doPreCommitActions(); - StoreFuture storeFuture = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper); + FutureResult futureResult = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper); storedSizeChange(_storeSizeIncrease); doPostCommitActions(); - return storeFuture; + return futureResult; } private void doPreCommitActions() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index efe040fbb3..eb887b4ef5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.store.handler.DistributedTransactionHandler; import org.apache.qpid.server.store.handler.MessageHandler; import org.apache.qpid.server.store.handler.MessageInstanceHandler; +import org.apache.qpid.server.util.FutureResult; /** A simple message store that stores the messages in a thread-safe structure in memory. */ public class MemoryMessageStore implements MessageStore @@ -58,9 +59,9 @@ public class MemoryMessageStore implements MessageStore private Set<Xid> _localDistributedTransactionsRemoves = new HashSet<Xid>(); @Override - public StoreFuture commitTranAsync() + public FutureResult commitTranAsync() { - return StoreFuture.IMMEDIATE_FUTURE; + return FutureResult.IMMEDIATE_FUTURE; } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java index 6f7afccac0..007f3ab796 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.store; import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.util.FutureResult; public interface Transaction { @@ -53,7 +54,7 @@ public interface Transaction * Commits all operations performed within a given transactional context. * */ - StoreFuture commitTranAsync(); + FutureResult commitTranAsync(); /** * Abandons all operations performed within a given transactional context. @@ -72,4 +73,4 @@ public interface Transaction void recordXid(long format, byte[] globalId, byte[] branchId, Transaction.Record[] enqueues, Transaction.Record[] dequeues); -}
\ No newline at end of file +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java new file mode 100644 index 0000000000..ae5816a0d1 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java @@ -0,0 +1,642 @@ +/* +* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.security.Principal; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; +import javax.net.ssl.SSLPeerUnverifiedException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.server.protocol.ServerProtocolEngine; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.transport.ByteBufferSender; +import org.apache.qpid.transport.SenderException; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.Ticker; +import org.apache.qpid.transport.network.TransportEncryption; +import org.apache.qpid.transport.network.security.ssl.SSLUtil; +import org.apache.qpid.util.SystemUtils; + +public class NonBlockingConnection implements NetworkConnection, ByteBufferSender +{ + private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class); + private final SocketChannel _socketChannel; + private final long _timeout; + private final Ticker _ticker; + private final SelectorThread _selector; + private int _maxReadIdle; + private int _maxWriteIdle; + private Principal _principal; + private boolean _principalChecked; + private final Object _lock = new Object(); + + public static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6; + + private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>(); + private final List<ByteBuffer> _encryptedOutput = new ArrayList<>(); + + private final String _remoteSocketAddress; + private final AtomicBoolean _closed = new AtomicBoolean(false); + private final ServerProtocolEngine _protocolEngine; + private final int _receiveBufSize; + private final Set<TransportEncryption> _encryptionSet; + private final SSLContext _sslContext; + private final Runnable _onTransportEncryptionAction; + private ByteBuffer _netInputBuffer; + private SSLEngine _sslEngine; + + private ByteBuffer _currentBuffer; + + private TransportEncryption _transportEncryption; + private SSLEngineResult _status; + private volatile boolean _fullyWritten = true; + private boolean _workDone; + + + public NonBlockingConnection(SocketChannel socketChannel, + ServerProtocolEngine delegate, + int sendBufferSize, + int receiveBufferSize, + long timeout, + Ticker ticker, + final Set<TransportEncryption> encryptionSet, + final SSLContext sslContext, + final boolean wantClientAuth, + final boolean needClientAuth, + final Collection<String> enabledCipherSuites, + final Collection<String> disabledCipherSuites, + final Runnable onTransportEncryptionAction, + final SelectorThread selectorThread) + { + _socketChannel = socketChannel; + _timeout = timeout; + _ticker = ticker; + _selector = selectorThread; + + _protocolEngine = delegate; + _receiveBufSize = receiveBufferSize; + _encryptionSet = encryptionSet; + _sslContext = sslContext; + _onTransportEncryptionAction = onTransportEncryptionAction; + + delegate.setWorkListener(new Action<ServerProtocolEngine>() + { + @Override + public void performAction(final ServerProtocolEngine object) + { + _selector.wakeup(); + } + }); + + if(encryptionSet.size() == 1) + { + _transportEncryption = _encryptionSet.iterator().next(); + if (_transportEncryption == TransportEncryption.TLS) + { + onTransportEncryptionAction.run(); + } + } + + if(encryptionSet.contains(TransportEncryption.TLS)) + { + _sslEngine = _sslContext.createSSLEngine(); + _sslEngine.setUseClientMode(false); + SSLUtil.removeSSLv3Support(_sslEngine); + SSLUtil.updateEnabledCipherSuites(_sslEngine, enabledCipherSuites, disabledCipherSuites); + + if(needClientAuth) + { + _sslEngine.setNeedClientAuth(true); + } + else if(wantClientAuth) + { + _sslEngine.setWantClientAuth(true); + } + _netInputBuffer = ByteBuffer.allocate(Math.max(_sslEngine.getSession().getPacketBufferSize(), _receiveBufSize * 2)); + } + + try + { + _remoteSocketAddress = _socketChannel.getRemoteAddress().toString(); + _socketChannel.configureBlocking(false); + } + catch (IOException e) + { + throw new SenderException("Unable to prepare the channel for non-blocking IO", e); + } + + + } + + + public Ticker getTicker() + { + return _ticker; + } + + public SocketChannel getSocketChannel() + { + return _socketChannel; + } + + public void start() + { + } + + public ByteBufferSender getSender() + { + return this; + } + + public void close() + { + LOGGER.debug("Closing " + _remoteSocketAddress); + if(_closed.compareAndSet(false,true)) + { + _protocolEngine.notifyWork(); + getSelector().wakeup(); + } + } + + public SocketAddress getRemoteAddress() + { + return _socketChannel.socket().getRemoteSocketAddress(); + } + + public SocketAddress getLocalAddress() + { + return _socketChannel.socket().getLocalSocketAddress(); + } + + public void setMaxWriteIdle(int sec) + { + _maxWriteIdle = sec; + } + + public void setMaxReadIdle(int sec) + { + _maxReadIdle = sec; + } + + @Override + public Principal getPeerPrincipal() + { + synchronized (_lock) + { + if(!_principalChecked) + { + if (_sslEngine != null) + { + try + { + _principal = _sslEngine.getSession().getPeerPrincipal(); + } + catch (SSLPeerUnverifiedException e) + { + return null; + } + } + + _principalChecked = true; + } + + return _principal; + } + } + + @Override + public int getMaxReadIdle() + { + return _maxReadIdle; + } + + @Override + public int getMaxWriteIdle() + { + return _maxWriteIdle; + } + + public boolean canRead() + { + return true; + } + + public boolean waitingForWrite() + { + return !_fullyWritten; + } + + public boolean isStateChanged() + { + + return _protocolEngine.hasWork(); + } + + public boolean doWork() + { + _protocolEngine.clearWork(); + final boolean closed = _closed.get(); + if (!closed) + { + try + { + _workDone = false; + + long currentTime = System.currentTimeMillis(); + int tick = _ticker.getTimeToNextTick(currentTime); + if (tick <= 0) + { + _ticker.tick(currentTime); + } + + _protocolEngine.setMessageAssignmentSuspended(true); + + _protocolEngine.processPending(); + + _protocolEngine.setTransportBlockedForWriting(!doWrite()); + boolean dataRead = doRead(); + _fullyWritten = doWrite(); + _protocolEngine.setTransportBlockedForWriting(!_fullyWritten); + + if(dataRead || (_workDone && _netInputBuffer != null && _netInputBuffer.position() != 0)) + { + _protocolEngine.notifyWork(); + } + + // tell all consumer targets that it is okay to accept more + _protocolEngine.setMessageAssignmentSuspended(false); + } + catch (IOException e) + { + LOGGER.info("Exception performing I/O for thread '" + _remoteSocketAddress + "': " + e); + LOGGER.debug("Closing " + _remoteSocketAddress); + if(_closed.compareAndSet(false,true)) + { + _protocolEngine.notifyWork(); + } + } + } + else + { + + if(!SystemUtils.isWindows()) + { + try + { + _socketChannel.shutdownInput(); + } + catch (IOException e) + { + LOGGER.info("Exception shutting down input for thread '" + _remoteSocketAddress + "': " + e); + + } + } + try + { + while(!doWrite()) + { + } + } + catch (IOException e) + { + LOGGER.info("Exception performing final write/close for thread '" + _remoteSocketAddress + "': " + e); + + } + LOGGER.debug("Closing receiver"); + _protocolEngine.closed(); + + try + { + if(!SystemUtils.isWindows()) + { + _socketChannel.shutdownOutput(); + } + + _socketChannel.close(); + } + catch (IOException e) + { + LOGGER.info("Exception closing socket thread '" + _remoteSocketAddress + "': " + e); + } + } + + return closed; + + } + + public SelectorThread getSelector() + { + return _selector; + } + + public boolean looksLikeSSLv2ClientHello(final byte[] headerBytes) + { + return headerBytes[0] == -128 && + headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x + (headerBytes[4] == 0 || // SSL 3.0 + headerBytes[4] == 1 || // TLS 1.0 + headerBytes[4] == 2 || // TLS 1.1 + headerBytes[4] == 3); + } + + public boolean doRead() throws IOException + { + boolean readData = false; + if(_transportEncryption == TransportEncryption.NONE) + { + int remaining = 0; + while (remaining == 0 && !_closed.get()) + { + if (_currentBuffer == null || _currentBuffer.remaining() == 0) + { + _currentBuffer = ByteBuffer.allocate(_receiveBufSize); + } + int read = _socketChannel.read(_currentBuffer); + if(read > 0) + { + readData = true; + } + if (read == -1) + { + _closed.set(true); + } + remaining = _currentBuffer.remaining(); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Read " + read + " byte(s)"); + } + ByteBuffer dup = _currentBuffer.duplicate(); + dup.flip(); + _currentBuffer = _currentBuffer.slice(); + _protocolEngine.received(dup); + } + } + else if(_transportEncryption == TransportEncryption.TLS) + { + int read = 1; + while(!_closed.get() && read > 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || _status.getStatus() != SSLEngineResult.Status.CLOSED)) + { + read = _socketChannel.read(_netInputBuffer); + if (read == -1) + { + _closed.set(true); + } + else if(read > 0) + { + readData = true; + } + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Read " + read + " encrypted bytes "); + } + + _netInputBuffer.flip(); + + + int unwrapped = 0; + boolean tasksRun; + do + { + ByteBuffer appInputBuffer = + ByteBuffer.allocate(_sslEngine.getSession().getApplicationBufferSize() + 50); + + _status = _sslEngine.unwrap(_netInputBuffer, appInputBuffer); + tasksRun = runSSLEngineTasks(_status); + + appInputBuffer.flip(); + unwrapped = appInputBuffer.remaining(); + if(unwrapped > 0) + { + readData = true; + } + _protocolEngine.received(appInputBuffer); + } + while(unwrapped > 0 || tasksRun); + + _netInputBuffer.compact(); + + } + } + else + { + int read = 1; + while (!_closed.get() && read > 0) + { + + read = _socketChannel.read(_netInputBuffer); + if (read == -1) + { + _closed.set(true); + } + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Read " + read + " possibly encrypted bytes " + _netInputBuffer); + } + + if (_netInputBuffer.position() >= NUMBER_OF_BYTES_FOR_TLS_CHECK) + { + _netInputBuffer.flip(); + final byte[] headerBytes = new byte[NUMBER_OF_BYTES_FOR_TLS_CHECK]; + ByteBuffer dup = _netInputBuffer.duplicate(); + dup.get(headerBytes); + + _transportEncryption = looksLikeSSL(headerBytes) ? TransportEncryption.TLS : TransportEncryption.NONE; + LOGGER.debug("Identified transport encryption as " + _transportEncryption); + + if (_transportEncryption == TransportEncryption.NONE) + { + _protocolEngine.received(_netInputBuffer); + } + else + { + _onTransportEncryptionAction.run(); + _netInputBuffer.compact(); + readData = doRead(); + } + break; + } + } + } + return readData; + } + + public boolean doWrite() throws IOException + { + + ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()]; + Iterator<ByteBuffer> bufferIterator = _buffers.iterator(); + for (int i = 0; i < bufArray.length; i++) + { + bufArray[i] = bufferIterator.next(); + } + + int byteBuffersWritten = 0; + + if(_transportEncryption == TransportEncryption.NONE) + { + + + long written = _socketChannel.write(bufArray); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Written " + written + " bytes"); + } + + for (ByteBuffer buf : bufArray) + { + if (buf.remaining() == 0) + { + byteBuffersWritten++; + _buffers.poll(); + } + } + + + return bufArray.length == byteBuffersWritten; + } + else if(_transportEncryption == TransportEncryption.TLS) + { + int remaining = 0; + do + { + if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) + { + _workDone = true; + final ByteBuffer netBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize()); + _status = _sslEngine.wrap(bufArray, netBuffer); + runSSLEngineTasks(_status); + + netBuffer.flip(); + remaining = netBuffer.remaining(); + if (remaining != 0) + { + _encryptedOutput.add(netBuffer); + } + for (ByteBuffer buf : bufArray) + { + if (buf.remaining() == 0) + { + byteBuffersWritten++; + _buffers.poll(); + } + } + } + + } + while(remaining != 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP); + ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new ByteBuffer[_encryptedOutput.size()]); + long written = _socketChannel.write(encryptedBuffers); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Written " + written + " encrypted bytes"); + } + ListIterator<ByteBuffer> iter = _encryptedOutput.listIterator(); + while(iter.hasNext()) + { + ByteBuffer buf = iter.next(); + if(buf.remaining() == 0) + { + iter.remove(); + } + else + { + break; + } + } + + return bufArray.length == byteBuffersWritten; + + } + else + { + return true; + } + } + + public boolean looksLikeSSLv3ClientHello(final byte[] headerBytes) + { + return headerBytes[0] == 22 && // SSL Handshake + (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x + (headerBytes[2] == 0 || // SSL 3.0 + headerBytes[2] == 1 || // TLS 1.0 + headerBytes[2] == 2 || // TLS 1.1 + headerBytes[2] == 3)) && // TLS1.2 + (headerBytes[5] == 1); // client_hello + } + + public boolean runSSLEngineTasks(final SSLEngineResult status) + { + if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK) + { + Runnable task; + while((task = _sslEngine.getDelegatedTask()) != null) + { + task.run(); + } + return true; + } + return false; + } + + public boolean looksLikeSSL(final byte[] headerBytes) + { + return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes); + } + + @Override + public void send(final ByteBuffer msg) + { + assert Thread.currentThread().getName().startsWith(SelectorThread.IO_THREAD_NAME_PREFIX) : "Send called by unexpected thread " + Thread.currentThread().getName(); + + if (_closed.get()) + { + LOGGER.warn("Send ignored as the connection is already closed"); + } + else + { + _buffers.add(msg); + _protocolEngine.notifyWork(); + } + } + + @Override + public void flush() + { + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java new file mode 100644 index 0000000000..79313712a5 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java @@ -0,0 +1,187 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.StandardSocketOptions; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Set; + +import javax.net.ssl.SSLContext; + +import org.slf4j.LoggerFactory; + +import org.apache.qpid.configuration.CommonProperties; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.server.protocol.ServerProtocolEngine; +import org.apache.qpid.transport.NetworkTransportConfiguration; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.TransportEncryption; +import org.apache.qpid.transport.network.io.AbstractNetworkTransport; +import org.apache.qpid.transport.network.io.IdleTimeoutTicker; + +public class NonBlockingNetworkTransport +{ + + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class); + private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME, + CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT); + private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME , + CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT); + private SelectorThread _selector; + + + private Set<TransportEncryption> _encryptionSet; + private volatile boolean _closed = false; + private NetworkTransportConfiguration _config; + private ProtocolEngineFactory _factory; + private SSLContext _sslContext; + private ServerSocketChannel _serverSocket; + private int _timeout; + + public void close() + { + if(_selector != null) + { + try + { + if (_serverSocket != null) + { + _selector.cancelAcceptingSocket(_serverSocket); + _serverSocket.close(); + } + } + catch (IOException e) + { + // TODO + e.printStackTrace(); + } + finally + { + + _selector.close(); + } + } + } + + public void accept(NetworkTransportConfiguration config, + ProtocolEngineFactory factory, + SSLContext sslContext, + final Set<TransportEncryption> encryptionSet) + { + try + { + + _config = config; + _factory = factory; + _sslContext = sslContext; + _timeout = TIMEOUT; + + InetSocketAddress address = config.getAddress(); + + _serverSocket = ServerSocketChannel.open(); + + _serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true); + _serverSocket.bind(address); + _serverSocket.configureBlocking(false); + _encryptionSet = encryptionSet; + + _selector = new SelectorThread(config.getAddress().toString(), this); + _selector.start(); + _selector.addAcceptingSocket(_serverSocket); + } + catch (IOException e) + { + throw new TransportException("Failed to start AMQP on port : " + config, e); + } + + + } + + public int getAcceptingPort() + { + return _serverSocket == null ? -1 : _serverSocket.socket().getLocalPort(); + } + + public void acceptSocketChannel(final SocketChannel socketChannel) throws IOException + { + final ServerProtocolEngine engine = + (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket() + .getRemoteSocketAddress()); + + if(engine != null) + { + socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay()); + socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT); + + final Integer sendBufferSize = _config.getSendBufferSize(); + final Integer receiveBufferSize = _config.getReceiveBufferSize(); + + socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize); + socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); + + + final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); + + NonBlockingConnection connection = + new NonBlockingConnection(socketChannel, + engine, + sendBufferSize, + receiveBufferSize, + _timeout, + ticker, + _encryptionSet, + _sslContext, + _config.wantClientAuth(), + _config.needClientAuth(), + _config.getEnabledCipherSuites(), + _config.getDisabledCipherSuites(), + new Runnable() + { + + @Override + public void run() + { + engine.encryptedTransport(); + } + }, + _selector); + + engine.setNetworkConnection(connection, connection.getSender()); + connection.setMaxReadIdle(HANDSHAKE_TIMEOUT); + + ticker.setConnection(connection); + + connection.start(); + + _selector.addConnection(connection); + + } + else + { + socketChannel.close(); + } + } + + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java new file mode 100644 index 0000000000..774888e934 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.server.transport; + +import java.io.IOException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.LoggerFactory; + +import org.apache.qpid.thread.LoggingUncaughtExceptionHandler; + + +public class SelectorThread extends Thread +{ + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(SelectorThread.class); + + public static final String IO_THREAD_NAME_PREFIX = "NCS-"; + private final Queue<Runnable> _tasks = new ConcurrentLinkedQueue<>(); + private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>(); + private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>(); + private final Selector _selector; + private final AtomicBoolean _closed = new AtomicBoolean(); + private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler(); + private final NonBlockingNetworkTransport _transport; + + SelectorThread(final String name, final NonBlockingNetworkTransport nonBlockingNetworkTransport) + { + super("SelectorThread-"+name); + _transport = nonBlockingNetworkTransport; + try + { + _selector = Selector.open(); + } + catch (IOException e) + { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + public void addAcceptingSocket(final ServerSocketChannel socketChannel) + { + _tasks.add(new Runnable() + { + @Override + public void run() + { + + try + { + socketChannel.register(_selector, SelectionKey.OP_ACCEPT); + } + catch (ClosedChannelException e) + { + // TODO + e.printStackTrace(); + } + } + }); + _selector.wakeup(); + } + + public void cancelAcceptingSocket(final ServerSocketChannel socketChannel) + { + _tasks.add(new Runnable() + { + @Override + public void run() + { + SelectionKey selectionKey = socketChannel.keyFor(_selector); + if(selectionKey != null) + { + selectionKey.cancel(); + } + } + }); + _selector.wakeup(); + } + + @Override + public void run() + { + + long nextTimeout = 0; + + try + { + while (!_closed.get()) + { + + _selector.select(nextTimeout); + + while(_tasks.peek() != null) + { + Runnable task = _tasks.poll(); + task.run(); + } + + List<NonBlockingConnection> toBeScheduled = new ArrayList<>(); + + + Set<SelectionKey> selectionKeys = _selector.selectedKeys(); + for (SelectionKey key : selectionKeys) + { + if(key.isAcceptable()) + { + // todo - should we schedule this rather than running in this thread? + SocketChannel acceptedChannel = ((ServerSocketChannel)key.channel()).accept(); + _transport.acceptSocketChannel(acceptedChannel); + } + else + { + NonBlockingConnection connection = (NonBlockingConnection) key.attachment(); + + key.channel().register(_selector, 0); + + toBeScheduled.add(connection); + _unscheduledConnections.remove(connection); + } + + } + selectionKeys.clear(); + + while (_unregisteredConnections.peek() != null) + { + NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll(); + _unscheduledConnections.add(unregisteredConnection); + + + final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0) + | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0); + unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection); + + } + + long currentTime = System.currentTimeMillis(); + Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator(); + nextTimeout = Integer.MAX_VALUE; + while (iterator.hasNext()) + { + NonBlockingConnection connection = iterator.next(); + + int period = connection.getTicker().getTimeToNextTick(currentTime); + + if (period <= 0 || connection.isStateChanged()) + { + toBeScheduled.add(connection); + connection.getSocketChannel().register(_selector, 0).cancel(); + iterator.remove(); + } + else + { + nextTimeout = Math.min(period, nextTimeout); + } + } + + for (NonBlockingConnection connection : toBeScheduled) + { + _scheduler.schedule(connection); + } + + } + } + catch (IOException e) + { + //TODO + e.printStackTrace(); + } + finally + { + try + { + _selector.close(); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + + + + + } + + public void addConnection(final NonBlockingConnection connection) + { + _unregisteredConnections.add(connection); + _selector.wakeup(); + + } + + public void wakeup() + { + _selector.wakeup(); + } + + public void close() + { + _closed.set(true); + _selector.wakeup(); + _scheduler.close(); + } + + private class NetworkConnectionScheduler + { + private final ScheduledThreadPoolExecutor _executor; + private final AtomicInteger _running = new AtomicInteger(); + private final int _poolSize; + + private NetworkConnectionScheduler() + { + _poolSize = Runtime.getRuntime().availableProcessors(); + _executor = new ScheduledThreadPoolExecutor(_poolSize); + _executor.prestartAllCoreThreads(); + } + + public void processConnection(final NonBlockingConnection connection) + { + try + { + _running.incrementAndGet(); + boolean rerun; + do + { + rerun = false; + boolean closed = connection.doWork(); + + if (!closed) + { + + if (connection.isStateChanged()) + { + if (_running.get() == _poolSize) + { + schedule(connection); + } + else + { + rerun = true; + } + } + else + { + SelectorThread.this.addConnection(connection); + } + } + + } while (rerun); + } + finally + { + _running.decrementAndGet(); + } + } + + public void schedule(final NonBlockingConnection connection) + { + _executor.submit(new Runnable() + { + @Override + public void run() + { + String currentName = Thread.currentThread().getName(); + try + { + Thread.currentThread().setName( + IO_THREAD_NAME_PREFIX + connection.getRemoteAddress().toString()); + processConnection(connection); + } + finally + { + Thread.currentThread().setName(currentName); + } + } + }); + } + + public void close() + { + _executor.shutdown(); + } + + + + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java index 8f7a267771..7874437a2f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.transport; import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; import java.net.InetSocketAddress; +import java.util.EnumSet; import java.util.Collection; import java.util.Set; @@ -34,11 +35,11 @@ import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; import org.apache.qpid.transport.NetworkTransportConfiguration; -import org.apache.qpid.transport.network.IncomingNetworkTransport; +import org.apache.qpid.transport.network.TransportEncryption; class TCPandSSLTransport implements AcceptingTransport { - private IncomingNetworkTransport _networkTransport; + private NonBlockingNetworkTransport _networkTransport; private Set<Transport> _transports; private SSLContext _sslContext; private InetSocketAddress _bindingSocketAddress; @@ -62,7 +63,7 @@ class TCPandSSLTransport implements AcceptingTransport @Override public void start() { - String bindingAddress = ((AmqpPort<?>)_port).getBindingAddress(); + String bindingAddress = _port.getBindingAddress(); if (WILDCARD_ADDRESS.equals(bindingAddress)) { bindingAddress = null; @@ -78,17 +79,25 @@ class TCPandSSLTransport implements AcceptingTransport } final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration(); - _networkTransport = org.apache.qpid.transport.network.Transport.getIncomingTransportInstance(); + _networkTransport = new NonBlockingNetworkTransport(); final MultiVersionProtocolEngineFactory protocolEngineFactory = new MultiVersionProtocolEngineFactory( - _port.getParent(Broker.class), _transports.contains(Transport.TCP) ? _sslContext : null, - settings.wantClientAuth(), settings.needClientAuth(), + _port.getParent(Broker.class), _supported, _defaultSupportedProtocolReply, _port, _transports.contains(Transport.TCP) ? Transport.TCP : Transport.SSL); - _networkTransport.accept(settings, protocolEngineFactory, _transports.contains(Transport.TCP) ? null : _sslContext); + EnumSet<TransportEncryption> encryptionSet = EnumSet.noneOf(TransportEncryption.class); + if(_transports.contains(Transport.TCP)) + { + encryptionSet.add(TransportEncryption.NONE); + } + if(_transports.contains(Transport.SSL)) + { + encryptionSet.add(TransportEncryption.TLS); + } + _networkTransport.accept(settings, protocolEngineFactory, _sslContext, encryptionSet); } public int getAcceptingPort() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java index 65064b015c..809c234cc6 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java @@ -30,7 +30,7 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; @@ -55,7 +55,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction public static interface FutureRecorder { - public void recordFuture(StoreFuture future, Action action); + public void recordFuture(FutureResult future, Action action); } @@ -83,7 +83,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction */ public void addPostTransactionAction(final Action immediateAction) { - addFuture(StoreFuture.IMMEDIATE_FUTURE, immediateAction); + addFuture(FutureResult.IMMEDIATE_FUTURE, immediateAction); } @@ -92,7 +92,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction Transaction txn = null; try { - StoreFuture future; + FutureResult future; if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) @@ -108,7 +108,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } else { - future = StoreFuture.IMMEDIATE_FUTURE; + future = FutureResult.IMMEDIATE_FUTURE; } addFuture(future, postTransactionAction); postTransactionAction = null; @@ -120,7 +120,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } - private void addFuture(final StoreFuture future, final Action action) + private void addFuture(final FutureResult future, final Action action) { if(action != null) { @@ -135,7 +135,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } } - private void addEnqueueFuture(final StoreFuture future, final Action action, boolean persistent) + private void addEnqueueFuture(final FutureResult future, final Action action, boolean persistent) { if(action != null) { @@ -178,7 +178,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } } - StoreFuture future; + FutureResult future; if(txn != null) { future = txn.commitTranAsync(); @@ -186,7 +186,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } else { - future = StoreFuture.IMMEDIATE_FUTURE; + future = FutureResult.IMMEDIATE_FUTURE; } addFuture(future, postTransactionAction); postTransactionAction = null; @@ -204,7 +204,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction Transaction txn = null; try { - StoreFuture future; + FutureResult future; if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) @@ -219,7 +219,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } else { - future = StoreFuture.IMMEDIATE_FUTURE; + future = FutureResult.IMMEDIATE_FUTURE; } addEnqueueFuture(future, postTransactionAction, message.isPersistent()); postTransactionAction = null; @@ -255,7 +255,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } } - StoreFuture future; + FutureResult future; if (txn != null) { future = txn.commitTranAsync(); @@ -263,7 +263,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } else { - future = StoreFuture.IMMEDIATE_FUTURE; + future = FutureResult.IMMEDIATE_FUTURE; } addEnqueueFuture(future, postTransactionAction, message.isPersistent()); postTransactionAction = null; @@ -281,7 +281,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction { if(immediatePostTransactionAction != null) { - addFuture(StoreFuture.IMMEDIATE_FUTURE, new Action() + addFuture(FutureResult.IMMEDIATE_FUTURE, new Action() { public void postCommit() { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index 349ec793fe..b800556312 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.txn; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +33,7 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; @@ -53,7 +54,7 @@ public class LocalTransaction implements ServerTransaction private final MessageStore _transactionLog; private volatile long _txnStartTime = 0L; private volatile long _txnUpdateTime = 0l; - private StoreFuture _asyncTran; + private FutureResult _asyncTran; public LocalTransaction(MessageStore transactionLog) { @@ -271,16 +272,16 @@ public class LocalTransaction implements ServerTransaction } } - public StoreFuture commitAsync(final Runnable deferred) + public FutureResult commitAsync(final Runnable deferred) { sync(); - StoreFuture future = StoreFuture.IMMEDIATE_FUTURE; + FutureResult future = FutureResult.IMMEDIATE_FUTURE; if(_transaction != null) { - future = new StoreFuture() + future = new FutureResult() { private volatile boolean _completed = false; - private StoreFuture _underlying = _transaction.commitTranAsync(); + private FutureResult _underlying = _transaction.commitTranAsync(); @Override public boolean isComplete() @@ -298,6 +299,17 @@ public class LocalTransaction implements ServerTransaction } } + @Override + public void waitForCompletion(final long timeout) throws TimeoutException + { + + if(!_completed) + { + _underlying.waitForCompletion(timeout); + checkUnderlyingCompletion(); + } + } + private synchronized boolean checkUnderlyingCompletion() { if(!_completed && _underlying.isComplete()) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoreFuture.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/FutureResult.java index 7d3bf90a75..2aab3081ee 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoreFuture.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/FutureResult.java @@ -18,11 +18,13 @@ * under the License. * */ -package org.apache.qpid.server.store; +package org.apache.qpid.server.util; -public interface StoreFuture +import java.util.concurrent.TimeoutException; + +public interface FutureResult { - StoreFuture IMMEDIATE_FUTURE = new StoreFuture() + FutureResult IMMEDIATE_FUTURE = new FutureResult() { public boolean isComplete() { @@ -32,9 +34,17 @@ public interface StoreFuture public void waitForCompletion() { } + + @Override + public void waitForCompletion(final long timeout) throws TimeoutException + { + + } }; boolean isComplete(); void waitForCompletion(); + + void waitForCompletion(long timeout) throws TimeoutException; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 3680e476c7..220beb20f8 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -37,10 +37,15 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.exchange.ExchangeDefaults; @@ -61,6 +66,7 @@ import org.apache.qpid.server.message.MessageNode; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.*; +import org.apache.qpid.server.model.Connection; import org.apache.qpid.server.model.adapter.ConnectionAdapter; import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.plugin.ConnectionValidator; @@ -384,27 +390,65 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte return isStoreEmptyHandler.isEmpty(); } - protected void createDefaultExchanges() + protected ListenableFuture<Void> createDefaultExchanges() { - Subject.doAs(getSecurityManager().getSubjectWithAddedSystemRights(), new PrivilegedAction<Void>() + return Subject.doAs(getSecurityManager().getSubjectWithAddedSystemRights(), new PrivilegedAction<ListenableFuture<Void>>() { + private static final int TOTAL_STANDARD_EXCHANGES = 4; + private final AtomicInteger _createdExchangeCount = new AtomicInteger(); + private SettableFuture<Void> _future = SettableFuture.create(); + @Override - public Void run() + public ListenableFuture<Void> run() { addStandardExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); addStandardExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS); addStandardExchange(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); addStandardExchange(ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS); - return null; + return _future; + } + + private void standardExchangeCreated() + { + if(_createdExchangeCount.incrementAndGet() == TOTAL_STANDARD_EXCHANGES) + { + _future.set(null); + } } - void addStandardExchange(String name, String type) + ListenableFuture<Void> addStandardExchange(String name, String type) { + Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(Exchange.NAME, name); attributes.put(Exchange.TYPE, type); attributes.put(Exchange.ID, UUIDGenerator.generateExchangeUUID(name, getName())); - childAdded(addExchange(attributes)); + final ListenableFuture<ExchangeImpl> future = addExchangeAsync(attributes); + final SettableFuture<Void> returnVal = SettableFuture.create(); + Futures.addCallback(future, new FutureCallback<ExchangeImpl>() + { + @Override + public void onSuccess(final ExchangeImpl result) + { + try + { + childAdded(result); + } + finally + { + standardExchangeCreated(); + } + + } + + @Override + public void onFailure(final Throwable t) + { + standardExchangeCreated(); + } + }, getTaskExecutor().getExecutor()); + + return returnVal; } }); } @@ -747,6 +791,23 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } + private ListenableFuture<ExchangeImpl> addExchangeAsync(Map<String,Object> attributes) + throws ExchangeExistsException, ReservedExchangeNameException, + NoFactoryForTypeException + { + try + { + ListenableFuture result = getObjectFactory().createAsync(Exchange.class, attributes, this); + return result; + } + catch (DuplicateNameException e) + { + throw new ExchangeExistsException(getExchange(e.getName())); + } + + } + + @Override public void removeExchange(ExchangeImpl exchange, boolean force) throws ExchangeIsAlternateException, RequiredExchangeException @@ -777,9 +838,11 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } @Override - protected void beforeClose() + protected ListenableFuture<Void> beforeClose() { setState(State.UNAVAILABLE); + + return super.beforeClose(); } @Override @@ -1277,37 +1340,76 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } @StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED ) - protected void doStop() + protected ListenableFuture<Void> doStop() { - closeChildren(); - shutdownHouseKeeping(); - closeMessageStore(); - setState(State.STOPPED); + final SettableFuture<Void> returnVal = SettableFuture.create(); + closeChildren().addListener( + new Runnable() + { + @Override + public void run() + { + try + { + shutdownHouseKeeping(); + closeMessageStore(); + setState(State.STOPPED); + + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor() + ); + return returnVal; } @StateTransition( currentState = { State.ACTIVE, State.ERRORED }, desiredState = State.DELETED ) - private void doDelete() + private ListenableFuture<Void> doDelete() { if(_deleted.compareAndSet(false,true)) { + final SettableFuture<Void> returnVal = SettableFuture.create(); String hostName = getName(); - close(); + closeAsync().addListener( + new Runnable() + { + @Override + public void run() + { + try + { + MessageStore ms = getMessageStore(); + if (ms != null) + { + try + { + ms.onDelete(AbstractVirtualHost.this); + } + catch (Exception e) + { + _logger.warn("Exception occurred on message store deletion", e); + } + } + deleted(); + setState(State.DELETED); + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor() + ); - MessageStore ms = getMessageStore(); - if (ms != null) - { - try - { - ms.onDelete(this); - } - catch (Exception e) - { - _logger.warn("Exception occurred on message store deletion", e); - } - } - deleted(); - setState(State.DELETED); + return returnVal; + } + else + { + return Futures.immediateFuture(null); } } @@ -1496,7 +1598,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } @StateTransition( currentState = { State.UNINITIALIZED,State.ERRORED }, desiredState = State.ACTIVE ) - private void onActivate() + private ListenableFuture<Void> onActivate() { _houseKeepingTasks = new ScheduledThreadPoolExecutor(getHousekeepingThreadCount(), new SuppressingInheritedAccessControlContextThreadFactory()); @@ -1516,9 +1618,28 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte if (isStoreEmpty()) { - createDefaultExchanges(); + final SettableFuture<Void> returnVal = SettableFuture.create(); + createDefaultExchanges().addListener(new Runnable() + { + @Override + public void run() + { + postCreateDefaultExchangeTasks(); + returnVal.set(null); + } + }, getTaskExecutor().getExecutor()); + return returnVal; } + else + { + postCreateDefaultExchangeTasks(); + return Futures.immediateFuture(null); + } + } + + private void postCreateDefaultExchangeTasks() + { if(getContextValue(Boolean.class, USE_ASYNC_RECOVERY)) { _messageStoreRecoverer = new AsynchronousMessageStoreRecoverer(); @@ -1553,9 +1674,32 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte scheduleHouseKeepingTask(getHousekeepingCheckPeriod(), _fileSystemSpaceChecker); } } + private static class ChildCounter + { + private final AtomicInteger _count = new AtomicInteger(); + private final Runnable _task; + + private ChildCounter(final Runnable task) + { + _task = task; + } + + public void incrementCount() + { + _count.incrementAndGet(); + } + + public void decrementCount() + { + if(_count.decrementAndGet() == 0) + { + _task.run(); + } + } + } @StateTransition( currentState = { State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) - private void onRestart() + private ListenableFuture<Void> onRestart() { resetStatistics(); @@ -1586,6 +1730,25 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte new GenericRecoverer(this).recover(records); + final SettableFuture<Void> returnVal = SettableFuture.create(); + final ChildCounter counter = new ChildCounter(new Runnable() + { + @Override + public void run() + { + onActivate().addListener( + new Runnable() + { + @Override + public void run() + { + returnVal.set(null); + } + }, getTaskExecutor().getExecutor() + ); + } + }); + counter.incrementCount(); Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>() { @Override @@ -1596,14 +1759,22 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte @Override public void performAction(final ConfiguredObject<?> object) { - object.open(); + counter.incrementCount(); + object.openAsync().addListener(new Runnable() + { + @Override + public void run() + { + counter.decrementCount(); + } + }, getTaskExecutor().getExecutor()); } }); return null; } }); - - onActivate(); + counter.decrementCount(); + return returnVal; } private class FileSystemSpaceChecker extends HouseKeepingTask diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java new file mode 100644 index 0000000000..a0bab0b0a6 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; + +public class VirtualHostUnavailableException extends ConnectionScopedRuntimeException +{ + public VirtualHostUnavailableException(VirtualHostImpl<?, ?, ?> host) + { + super("Virtualhost state " + + host.getState() + + " prevents the message from being sent"); + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java index 03c30a9cd4..fd73963b68 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java @@ -29,6 +29,8 @@ import java.util.Map; import javax.security.auth.Subject; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -68,7 +70,7 @@ public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandard } @Override - protected void activate() + protected ListenableFuture<Void> activate() { if (LOGGER.isDebugEnabled()) { @@ -107,15 +109,21 @@ public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandard if (host != null) { final VirtualHost<?,?,?> recoveredHost = host; - Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>() - { - @Override - public Object run() - { - recoveredHost.open(); - return null; - } - }); + final ListenableFuture<Void> openFuture = Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), + new PrivilegedAction<ListenableFuture<Void>>() + { + @Override + public ListenableFuture<Void> run() + { + return recoveredHost.openAsync(); + + } + }); + return openFuture; + } + else + { + return Futures.immediateFuture(null); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java index a343b71501..8e08554358 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java @@ -37,6 +37,10 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.exchange.ExchangeDefaults; @@ -119,16 +123,47 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode< } @StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) - protected void doActivate() + protected ListenableFuture<Void> doActivate() { + final SettableFuture<Void> returnVal = SettableFuture.create(); + try { - activate(); - setState(State.ACTIVE); + Futures.addCallback(activate(), + new FutureCallback<Void>() + { + @Override + public void onSuccess(final Void result) + { + try + { + setState(State.ACTIVE); + } + finally + { + returnVal.set(null); + } + + } + + @Override + public void onFailure(final Throwable t) + { + + setState(State.ERRORED); + returnVal.set(null); + if (_broker.isManagementMode()) + { + LOGGER.warn("Failed to make " + this + " active.", t); + } + } + }, getTaskExecutor().getExecutor() + ); } catch(RuntimeException e) { setState(State.ERRORED); + returnVal.set(null); if (_broker.isManagementMode()) { LOGGER.warn("Failed to make " + this + " active.", e); @@ -138,6 +173,7 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode< throw e; } } + return returnVal; } @Override @@ -180,39 +216,73 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode< } @StateTransition( currentState = { State.ACTIVE, State.STOPPED, State.ERRORED}, desiredState = State.DELETED ) - protected void doDelete() + protected ListenableFuture<Void> doDelete() { + final SettableFuture<Void> returnVal = SettableFuture.create(); setState(State.DELETED); deleteVirtualHostIfExists(); - close(); - deleted(); - DurableConfigurationStore configurationStore = getConfigurationStore(); - if (configurationStore != null) + final ListenableFuture<Void> closeFuture = closeAsync(); + closeFuture.addListener(new Runnable() { - configurationStore.onDelete(this); - } + @Override + public void run() + { + try + { + deleted(); + DurableConfigurationStore configurationStore = getConfigurationStore(); + if (configurationStore != null) + { + configurationStore.onDelete(AbstractVirtualHostNode.this); + } + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor()); + + return returnVal; + } - protected void deleteVirtualHostIfExists() + protected ListenableFuture<Void> deleteVirtualHostIfExists() { VirtualHost<?, ?, ?> virtualHost = getVirtualHost(); if (virtualHost != null) { - virtualHost.delete(); + return virtualHost.deleteAsync(); + } + else + { + return Futures.immediateFuture(null); } } @StateTransition( currentState = { State.ACTIVE, State.ERRORED, State.UNINITIALIZED }, desiredState = State.STOPPED ) - protected void doStop() + protected ListenableFuture<Void> doStop() { - stopAndSetStateTo(State.STOPPED); + return stopAndSetStateTo(State.STOPPED); } - protected void stopAndSetStateTo(State stoppedState) + protected ListenableFuture<Void> stopAndSetStateTo(final State stoppedState) { - closeChildren(); - closeConfigurationStoreSafely(); - setState(stoppedState); + final SettableFuture<Void> returnVal = SettableFuture.create(); + + ListenableFuture<Void> childCloseFuture = closeChildren(); + childCloseFuture.addListener(new Runnable() + { + @Override + public void run() + { + closeConfigurationStoreSafely(); + setState(stoppedState); + returnVal.set(null); + } + }, getTaskExecutor().getExecutor()); + + return returnVal; } @Override @@ -270,7 +340,7 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode< protected abstract DurableConfigurationStore createConfigurationStore(); - protected abstract void activate(); + protected abstract ListenableFuture<Void> activate(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java index c94d113514..8a160f83d7 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java @@ -24,6 +24,8 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +65,7 @@ public class RedirectingVirtualHostNodeImpl } @StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) - protected void doActivate() + protected ListenableFuture<Void> doActivate() { try { @@ -83,6 +85,7 @@ public class RedirectingVirtualHostNodeImpl throw e; } } + return Futures.immediateFuture(null); } @Override |