diff options
Diffstat (limited to 'qpid/java/broker-plugins')
41 files changed, 2061 insertions, 455 deletions
diff --git a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java index 0904379ab4..99db75ac91 100644 --- a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java +++ b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java @@ -25,6 +25,9 @@ import java.util.Collections; import java.util.Map; import java.util.Set; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -149,7 +152,7 @@ public class ACLFileAccessControlProviderImpl @StateTransition(currentState = {State.UNINITIALIZED, State.QUIESCED, State.ERRORED}, desiredState = State.ACTIVE) - private void activate() + private ListenableFuture<Void> activate() { if(_broker.isManagementMode()) @@ -177,6 +180,7 @@ public class ACLFileAccessControlProviderImpl } } } + return Futures.immediateFuture(null); } @Override @@ -190,17 +194,36 @@ public class ACLFileAccessControlProviderImpl } @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.QUIESCED) - private void startQuiesced() + private ListenableFuture<Void> startQuiesced() { setState(State.QUIESCED); + return Futures.immediateFuture(null); } @StateTransition(currentState = {State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { - close(); - setState(State.DELETED); - deleted(); + final SettableFuture<Void> returnVal = SettableFuture.create(); + closeAsync().addListener( + new Runnable() + { + @Override + public void run() + { + try + { + + setState(State.DELETED); + deleted(); + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor() + ); + return returnVal; } public AccessControl getAccessControl() diff --git a/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java b/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java index a34ac16e80..2a691b3652 100644 --- a/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java +++ b/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java @@ -30,6 +30,7 @@ import java.util.UUID; import java.util.regex.Pattern; import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.model.AccessControlProvider; import org.apache.qpid.server.model.Broker; @@ -54,7 +55,9 @@ public class ACLFileAccessControlProviderFactoryTest extends QpidTestCase when(_broker.getObjectFactory()).thenReturn(_objectFactory); when(_broker.getModel()).thenReturn(_objectFactory.getModel()); when(_broker.getCategoryClass()).thenReturn(Broker.class); - when(_broker.getTaskExecutor()).thenReturn(mock(TaskExecutor.class)); + TaskExecutor taskExecutor = new CurrentThreadTaskExecutor(); + taskExecutor.start(); + when(_broker.getTaskExecutor()).thenReturn(taskExecutor); } public void testCreateInstanceWhenAclFileIsNotPresent() diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index 89d681111b..5affe3019c 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -104,7 +104,8 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC _name = name; } - public boolean isSuspended() + @Override + public boolean doIsSuspended() { return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension } @@ -158,6 +159,10 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC return _name; } + public void transportStateChanged() + { + _creditManager.restoreCredit(0, 0); + } public static class AddMessageDispositionListenerAction implements Runnable { @@ -191,7 +196,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC private final AddMessageDispositionListenerAction _postIdSettingAction; - public long send(final ConsumerImpl consumer, final MessageInstance entry, boolean batch) + public void doSend(final ConsumerImpl consumer, final MessageInstance entry, boolean batch) { ServerMessage serverMsg = entry.getMessage(); @@ -342,7 +347,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC { recordUnacknowledged(entry); } - return size; } void recordUnacknowledged(MessageInstance entry) @@ -555,10 +559,10 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC switch(flowMode) { case CREDIT: - _creditManager = new CreditCreditManager(0l,0l); + _creditManager = new CreditCreditManager(0l, 0l, _session.getConnection().getProtocolEngine()); break; case WINDOW: - _creditManager = new WindowCreditManager(0l,0l); + _creditManager = new WindowCreditManager(0l, 0l, _session.getConnection().getProtocolEngine()); break; default: // this should never happen, as 0-10 is finalised and so the enum should never change @@ -628,7 +632,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC public void flushBatched() { - _session.getConnection().flush(); } @@ -657,4 +660,10 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC { return _unacknowledgedCount.longValue(); } + + @Override + protected void processClosed() + { + + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java index 8dddac9809..dd43ae7e11 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java @@ -21,48 +21,27 @@ package org.apache.qpid.server.protocol.v0_10; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.flow.AbstractFlowCreditManager; public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10 { + private final ServerProtocolEngine _serverProtocolEngine; private volatile long _bytesCredit; private volatile long _messageCredit; - public CreditCreditManager(long bytesCredit, long messageCredit) + public CreditCreditManager(long bytesCredit, long messageCredit, final ServerProtocolEngine serverProtocolEngine) { + _serverProtocolEngine = serverProtocolEngine; _bytesCredit = bytesCredit; _messageCredit = messageCredit; setSuspended(!hasCredit()); } - - public synchronized void setCreditLimits(final long bytesCredit, final long messageCredit) - { - _bytesCredit = bytesCredit; - _messageCredit = messageCredit; - - setSuspended(!hasCredit()); - - } - - - public long getMessageCredit() - { - return _messageCredit == -1L - ? Long.MAX_VALUE - : _messageCredit; - } - - public long getBytesCredit() - { - return _bytesCredit == -1L - ? Long.MAX_VALUE - : _bytesCredit; - } - public synchronized void restoreCredit(final long messageCredit, final long bytesCredit) { + setSuspended(!hasCredit()); } @@ -107,12 +86,17 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl public synchronized boolean hasCredit() { // Note !=, if credit is < 0 that indicates infinite credit - return (_bytesCredit != 0L && _messageCredit != 0L); + return (_bytesCredit != 0L && _messageCredit != 0L && !_serverProtocolEngine.isTransportBlockedForWriting()); } public synchronized boolean useCreditForMessage(long msgSize) { - if(_messageCredit >= 0L) + if (_serverProtocolEngine.isTransportBlockedForWriting()) + { + setSuspended(true); + return false; + } + else if(_messageCredit >= 0L) { if(_messageCredit > 0) { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java index 30aecdb2d2..4231045afd 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java @@ -23,7 +23,7 @@ package org.apache.qpid.server.protocol.v0_10; import java.net.InetSocketAddress; import java.net.SocketAddress; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; @@ -86,7 +86,10 @@ public class ProtocolEngineCreator_0_10 implements ProtocolEngineCreator conn.setRemoteAddress(network.getRemoteAddress()); conn.setLocalAddress(network.getLocalAddress()); - return new ProtocolEngine_0_10( conn, network); + ProtocolEngine_0_10 protocolEngine = new ProtocolEngine_0_10(conn, network); + conn.setProtocolEngine(protocolEngine); + + return protocolEngine; } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index 854cd388b9..e391bd6771 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -24,18 +24,23 @@ import java.net.SocketAddress; import java.nio.ByteBuffer; import java.security.AccessController; import java.security.PrivilegedAction; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; import org.apache.log4j.Logger; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.logging.messages.ConnectionMessages; +import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.Constant; -import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.Assembler; -import org.apache.qpid.transport.network.Disassembler; import org.apache.qpid.transport.network.InputHandler; import org.apache.qpid.transport.network.NetworkConnection; @@ -52,13 +57,20 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol private ServerConnection _connection; private long _createTime = System.currentTimeMillis(); - private long _lastReadTime; - private long _lastWriteTime; + private long _lastReadTime = _createTime; + private long _lastWriteTime = _createTime; + private volatile boolean _transportBlockedForWriting; + + private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>(); + + private final AtomicBoolean _stateChanged = new AtomicBoolean(); + private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>(); + public ProtocolEngine_0_10(ServerConnection conn, NetworkConnection network) { - super(new Assembler(conn)); + super(new ServerAssembler(conn)); _connection = conn; if(network != null) @@ -67,7 +79,33 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol } } - public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender) + @Override + public boolean isMessageAssignmentSuspended() + { + Thread lock = _messageAssignmentSuspended.get(); + return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread(); + } + + @Override + public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended) + { + _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null); + + if(!messageAssignmentSuspended) + { + for(AMQSessionModel<?,?> session : _connection.getSessionModels()) + { + for(Consumer<?> consumer : session.getConsumers()) + { + ((ConsumerImpl)consumer).getTarget().notifyCurrentState(); + } + } + } + } + + + + public void setNetworkConnection(final NetworkConnection network, final ByteBufferSender sender) { if(!getSubject().equals(Subject.getSubject(AccessController.getContext()))) { @@ -87,7 +125,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol _network = network; _connection.setNetworkConnection(network); - Disassembler disassembler = new Disassembler(wrapSender(sender), Constant.MIN_MAX_FRAME_SIZE); + ServerDisassembler disassembler = new ServerDisassembler(wrapSender(sender), Constant.MIN_MAX_FRAME_SIZE); _connection.setSender(disassembler); _connection.addFrameSizeObserver(disassembler); // FIXME Two log messages to maintain compatibility with earlier protocol versions @@ -96,23 +134,15 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol } } - private Sender<ByteBuffer> wrapSender(final Sender<ByteBuffer> sender) + private ByteBufferSender wrapSender(final ByteBufferSender sender) { - return new Sender<ByteBuffer>() + return new ByteBufferSender() { @Override - public void setIdleTimeout(int i) - { - sender.setIdleTimeout(i); - - } - - @Override public void send(ByteBuffer msg) { _lastWriteTime = System.currentTimeMillis(); sender.send(msg); - } @Override @@ -190,6 +220,11 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol return _writtenBytes; } + @Override + public void encryptedTransport() + { + } + public void writerIdle() { _connection.doHeartBeat(); @@ -215,11 +250,6 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol return getRemoteAddress().toString(); } - public String getAuthId() - { - return _connection.getAuthorizedPrincipal() == null ? null : _connection.getAuthorizedPrincipal().getName(); - } - public boolean isDurable() { return false; @@ -246,4 +276,54 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol { return _connection.getAuthorizedSubject(); } + + @Override + public boolean isTransportBlockedForWriting() + { + return _transportBlockedForWriting; + } + + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + _transportBlockedForWriting = blocked; + _connection.transportStateChanged(); + } + + @Override + public void processPending() + { + _connection.processPending(); + + } + + @Override + public boolean hasWork() + { + return _stateChanged.get(); + } + + @Override + public void notifyWork() + { + _stateChanged.set(true); + + final Action<ServerProtocolEngine> listener = _workListener.get(); + if(listener != null) + { + listener.performAction(this); + } + } + + @Override + public void clearWork() + { + _stateChanged.set(false); + } + + @Override + public void setWorkListener(final Action<ServerProtocolEngine> listener) + { + _workListener.set(listener); + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java new file mode 100644 index 0000000000..456c9d36d9 --- /dev/null +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_10; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.transport.network.Assembler; +import org.apache.qpid.transport.network.NetworkEvent; + +public class ServerAssembler extends Assembler +{ + private static final Logger LOGGER = LoggerFactory.getLogger(ServerAssembler.class); + + + private final ServerConnection _connection; + + public ServerAssembler(final ServerConnection connection) + { + super(connection); + _connection = connection; + } + + @Override + public void received(final NetworkEvent event) + { + if (!_connection.isIgnoreFutureInput()) + { + super.received(event); + } + else + { + LOGGER.debug("Ignored network event " + event + " as connection is ignoring further input "); + } + } + + +} diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 8567be37f0..2280377fca 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol.v0_10; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT; +import static org.apache.qpid.transport.Connection.State.CLOSING; import java.net.SocketAddress; import java.security.Principal; @@ -30,6 +31,8 @@ import java.security.PrivilegedAction; import java.text.MessageFormat; import java.util.ArrayList; import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -37,6 +40,7 @@ import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogSubject; @@ -66,7 +70,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S { private final Broker<?> _broker; - private Runnable _onOpenTask; private AtomicBoolean _logClosed = new AtomicBoolean(false); private final Subject _authorizedSubject = new Subject(); @@ -75,20 +78,26 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S private final long _connectionId; private final Object _reference = new Object(); private VirtualHostImpl<?,?,?> _virtualHost; - private AmqpPort<?> _port; - private AtomicLong _lastIoTime = new AtomicLong(); + private final AmqpPort<?> _port; + private final AtomicLong _lastIoTime = new AtomicLong(); private boolean _blocking; - private Transport _transport; + private final Transport _transport; - private final CopyOnWriteArrayList<Action<? super ServerConnection>> _taskList = + private final CopyOnWriteArrayList<Action<? super ServerConnection>> _connectionCloseTaskList = new CopyOnWriteArrayList<Action<? super ServerConnection>>(); + private final Queue<Action<? super ServerConnection>> _asyncTaskList = + new ConcurrentLinkedQueue<>(); + private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners = new CopyOnWriteArrayList<SessionModelListener>(); private volatile boolean _stopped; private int _messageCompressionThreshold; - private int _maxMessageSize; + private final int _maxMessageSize; + + private ServerProtocolEngine _serverProtocolEngine; + private boolean _ignoreFutureInput; public ServerConnection(final long connectionId, Broker<?> broker, @@ -140,10 +149,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S if (state == State.OPEN) { - if (_onOpenTask != null) - { - _onOpenTask.run(); - } getEventLogger().message(ConnectionMessages.OPEN(getClientId(), "0-10", getClientVersion(), @@ -189,6 +194,16 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S super.setConnectionDelegate(delegate); } + public ServerProtocolEngine getProtocolEngine() + { + return _serverProtocolEngine; + } + + public void setProtocolEngine(final ServerProtocolEngine serverProtocolEngine) + { + _serverProtocolEngine = serverProtocolEngine; + } + public VirtualHostImpl<?,?,?> getVirtualHost() { return _virtualHost; @@ -237,28 +252,32 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S return _stopped; } - public void onOpen(final Runnable task) - { - _onOpenTask = task; - } - - public void closeSession(ServerSession session, AMQConstant cause, String message) + public void closeSessionAsync(final ServerSession session, final AMQConstant cause, final String message) { - ExecutionException ex = new ExecutionException(); - ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR; - try - { - code = ExecutionErrorCode.get(cause.getCode()); - } - catch (IllegalArgumentException iae) + addAsyncTask(new Action<ServerConnection>() { - // Ignore, already set to INTERNAL_ERROR - } - ex.setErrorCode(code); - ex.setDescription(message); - session.invoke(ex); - session.close(cause, message); + @Override + public void performAction(final ServerConnection conn) + { + ExecutionException ex = new ExecutionException(); + ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR; + try + { + code = ExecutionErrorCode.get(cause.getCode()); + } + catch (IllegalArgumentException iae) + { + // Ignore, already set to INTERNAL_ERROR + } + ex.setErrorCode(code); + ex.setDescription(message); + session.invoke(ex); + + session.close(cause, message); + } + }); + } public LogSubject getLogSubject() @@ -355,25 +374,35 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S } } - public void close(AMQConstant cause, String message) + public void closeAsync(final AMQConstant cause, final String message) { - closeSubscriptions(); - performDeleteTasks(); - ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL; - try - { - replyCode = ConnectionCloseCode.get(cause.getCode()); - } - catch (IllegalArgumentException iae) + + addAsyncTask(new Action<ServerConnection>() { - // Ignore - } - close(replyCode, message); + @Override + public void performAction(final ServerConnection object) + { + closeSubscriptions(); + performDeleteTasks(); + + setState(CLOSING); + ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL; + try + { + replyCode = ConnectionCloseCode.get(cause.getCode()); + } + catch (IllegalArgumentException iae) + { + // Ignore + } + sendConnectionClose(replyCode, message); + } + }); } protected void performDeleteTasks() { - for(Action<? super ServerConnection> task : _taskList) + for(Action<? super ServerConnection> task : _connectionCloseTaskList) { task.performAction(this); } @@ -646,13 +675,19 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S @Override public void addDeleteTask(final Action<? super ServerConnection> task) { - _taskList.add(task); + _connectionCloseTaskList.add(task); + } + + private void addAsyncTask(final Action<ServerConnection> action) + { + _asyncTaskList.add(action); + notifyWork(); } @Override public void removeDeleteTask(final Action<? super ServerConnection> task) { - _taskList.remove(task); + _connectionCloseTaskList.remove(task); } public int getMessageCompressionThreshold() @@ -664,4 +699,51 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S { return _maxMessageSize; } + + public void transportStateChanged() + { + for (AMQSessionModel ssn : getSessionModels()) + { + ssn.transportStateChanged(); + } + } + + @Override + public void notifyWork() + { + _serverProtocolEngine.notifyWork(); + } + + + @Override + public boolean isMessageAssignmentSuspended() + { + return _serverProtocolEngine.isMessageAssignmentSuspended(); + } + + public void processPending() + { + while(_asyncTaskList.peek() != null) + { + Action<? super ServerConnection> asyncAction = _asyncTaskList.poll(); + asyncAction.performAction(this); + } + + for (AMQSessionModel session : getSessionModels()) + { + session.processPending(); + } + + } + + public void closeAndIgnoreFutureInput() + { + _ignoreFutureInput = true; + getSender().close(); + } + + public boolean isIgnoreFutureInput() + { + return _ignoreFutureInput; + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index 6e2a6cac7d..7f646b43b4 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -250,7 +250,7 @@ public class ServerConnectionDelegate extends ServerDelegate ") above the server's offered limit (" + getChannelMax() +")"); //Due to the error we must forcefully close the connection without negotiation - sconn.getSender().close(); + sconn.closeAndIgnoreFutureInput(); return; } @@ -261,7 +261,8 @@ public class ServerConnectionDelegate extends ServerDelegate ") above the server's offered limit (" + getFrameMax() +")"); //Due to the error we must forcefully close the connection without negotiation - sconn.getSender().close(); + sconn.closeAndIgnoreFutureInput(); + return; } else if(okMaxFrameSize > 0 && okMaxFrameSize < Constant.MIN_MAX_FRAME_SIZE) @@ -271,7 +272,7 @@ public class ServerConnectionDelegate extends ServerDelegate ") below the minimum permitted size (" + Constant.MIN_MAX_FRAME_SIZE +")"); //Due to the error we must forcefully close the connection without negotiation - sconn.getSender().close(); + sconn.closeAndIgnoreFutureInput(); return; } else if(okMaxFrameSize == 0) diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java new file mode 100644 index 0000000000..a42238a40d --- /dev/null +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java @@ -0,0 +1,248 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_10; + +import static java.lang.Math.min; +import static org.apache.qpid.transport.network.Frame.FIRST_FRAME; +import static org.apache.qpid.transport.network.Frame.FIRST_SEG; +import static org.apache.qpid.transport.network.Frame.HEADER_SIZE; +import static org.apache.qpid.transport.network.Frame.LAST_FRAME; +import static org.apache.qpid.transport.network.Frame.LAST_SEG; + +import java.nio.ByteBuffer; + +import org.apache.qpid.transport.ByteBufferSender; +import org.apache.qpid.transport.FrameSizeObserver; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.Method; +import org.apache.qpid.transport.ProtocolDelegate; +import org.apache.qpid.transport.ProtocolError; +import org.apache.qpid.transport.ProtocolEvent; +import org.apache.qpid.transport.ProtocolEventSender; +import org.apache.qpid.transport.ProtocolHeader; +import org.apache.qpid.transport.SegmentType; +import org.apache.qpid.transport.Struct; +import org.apache.qpid.transport.codec.Encoder; +import org.apache.qpid.transport.network.Frame; + +/** + * Disassembler + */ +public final class ServerDisassembler implements ProtocolEventSender, ProtocolDelegate<Void>, FrameSizeObserver +{ + private final ByteBufferSender _sender; + private int _maxPayload; + private final Object _sendLock = new Object(); + private final Encoder _encoder = new ServerEncoder(); + + public ServerDisassembler(ByteBufferSender sender, int maxFrame) + { + _sender = sender; + if (maxFrame <= HEADER_SIZE || maxFrame >= 64 * 1024) + { + throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame); + } + _maxPayload = maxFrame - HEADER_SIZE; + } + + public void send(ProtocolEvent event) + { + synchronized (_sendLock) + { + event.delegate(null, this); + } + } + + public void flush() + { + synchronized (_sendLock) + { + _sender.flush(); + } + } + + public void close() + { + synchronized (_sendLock) + { + _sender.close(); + } + } + + private void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf) + { + ByteBuffer data = ByteBuffer.wrap(new byte[HEADER_SIZE]); + + data.put(0, flags); + data.put(1, type); + data.putShort(2, (short) (size + HEADER_SIZE)); + data.put(5, track); + data.putShort(6, (short) channel); + + + ByteBuffer dup = buf.duplicate(); + dup.limit(dup.position() + size); + buf.position(buf.position() + size); + _sender.send(data); + _sender.send(dup); + + + } + + private void fragment(byte flags, SegmentType type, ProtocolEvent event, ByteBuffer buf) + { + byte typeb = (byte) type.getValue(); + byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0; + + int remaining = buf.remaining(); + boolean first = true; + while (true) + { + int size = min(_maxPayload, remaining); + remaining -= size; + + byte newflags = flags; + if (first) + { + newflags |= FIRST_FRAME; + first = false; + } + if (remaining == 0) + { + newflags |= LAST_FRAME; + } + + frame(newflags, typeb, track, event.getChannel(), size, buf); + + if (remaining == 0) + { + break; + } + } + } + + public void init(Void v, ProtocolHeader header) + { + _sender.send(header.toByteBuffer()); + _sender.flush(); +} + + public void control(Void v, Method method) + { + method(method, SegmentType.CONTROL); + } + + public void command(Void v, Method method) + { + method(method, SegmentType.COMMAND); + } + + private void method(Method method, SegmentType type) + { + Encoder enc = _encoder; + enc.init(); + enc.writeUint16(method.getEncodedType()); + if (type == SegmentType.COMMAND) + { + if (method.isSync()) + { + enc.writeUint16(0x0101); + } + else + { + enc.writeUint16(0x0100); + } + } + method.write(enc); + int methodLimit = enc.position(); + + byte flags = FIRST_SEG; + + boolean payload = method.hasPayload(); + if (!payload) + { + flags |= LAST_SEG; + } + + int headerLimit = -1; + if (payload) + { + final Header hdr = method.getHeader(); + if (hdr != null) + { + if (hdr.getDeliveryProperties() != null) + { + enc.writeStruct32(hdr.getDeliveryProperties()); + } + if (hdr.getMessageProperties() != null) + { + enc.writeStruct32(hdr.getMessageProperties()); + } + if (hdr.getNonStandardProperties() != null) + { + for (Struct st : hdr.getNonStandardProperties()) + { + enc.writeStruct32(st); + } + } + } + + headerLimit = enc.position(); + } + synchronized (_sendLock) + { + ByteBuffer buf = enc.underlyingBuffer(); + buf.position(0); + buf.limit(methodLimit); + + fragment(flags, type, method, buf.duplicate()); + if (payload) + { + ByteBuffer body = method.getBody(); + buf.limit(headerLimit); + buf.position(methodLimit); + + fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, buf.duplicate()); + if (body != null) + { + fragment(LAST_SEG, SegmentType.BODY, method, body.duplicate()); + } + + } + } + } + + public void error(Void v, ProtocolError error) + { + throw new IllegalArgumentException(String.valueOf(error)); + } + + @Override + public void setMaxFrameSize(final int maxFrame) + { + if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024) + { + throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame); + } + this._maxPayload = maxFrame - HEADER_SIZE; + + } +} diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java new file mode 100644 index 0000000000..6437015208 --- /dev/null +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java @@ -0,0 +1,371 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_10; + +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.util.UUID; + +import org.apache.qpid.transport.codec.AbstractEncoder; + + +public final class ServerEncoder extends AbstractEncoder +{ + public static final int DEFAULT_CAPACITY = 8192; + private final int _threshold; + private ByteBuffer _out; + private int _segment; + private int _initialCapacity; + + public ServerEncoder() + { + this(DEFAULT_CAPACITY); + } + + public ServerEncoder(int capacity) + { + _initialCapacity = capacity; + _threshold = capacity/16; + _out = ByteBuffer.allocate(capacity); + _segment = 0; + } + + public void init() + { + _out.position(_out.limit()); + _out.limit(_out.capacity()); + _out = _out.slice(); + if(_out.remaining() < _threshold) + { + _out = ByteBuffer.allocate(_initialCapacity); + } + _segment = 0; + } + + public ByteBuffer buffer() + { + int pos = _out.position(); + _out.position(_segment); + ByteBuffer slice = _out.slice(); + slice.limit(pos - _segment); + _out.position(pos); + return slice; + } + + public int position() + { + return _out.position(); + } + + public ByteBuffer underlyingBuffer() + { + return _out; + } + + private void grow(int size) + { + ByteBuffer old = _out; + int capacity = old.capacity(); + _out = ByteBuffer.allocate(Math.max(Math.max(capacity + size, 2*capacity), _initialCapacity)); + old.flip(); + _out.put(old); + } + + protected void doPut(byte b) + { + try + { + _out.put(b); + } + catch (BufferOverflowException e) + { + grow(1); + _out.put(b); + } + } + + protected void doPut(ByteBuffer src) + { + try + { + _out.put(src); + } + catch (BufferOverflowException e) + { + grow(src.remaining()); + _out.put(src); + } + } + + protected void put(byte[] bytes) + { + try + { + _out.put(bytes); + } + catch (BufferOverflowException e) + { + grow(bytes.length); + _out.put(bytes); + } + } + + public void writeUint8(short b) + { + assert b < 0x100; + + try + { + _out.put((byte) b); + } + catch (BufferOverflowException e) + { + grow(1); + _out.put((byte) b); + } + } + + public void writeUint16(int s) + { + assert s < 0x10000; + + try + { + _out.putShort((short) s); + } + catch (BufferOverflowException e) + { + grow(2); + _out.putShort((short) s); + } + } + + public void writeUint32(long i) + { + assert i < 0x100000000L; + + try + { + _out.putInt((int) i); + } + catch (BufferOverflowException e) + { + grow(4); + _out.putInt((int) i); + } + } + + public void writeUint64(long l) + { + try + { + _out.putLong(l); + } + catch (BufferOverflowException e) + { + grow(8); + _out.putLong(l); + } + } + + public int beginSize8() + { + int pos = _out.position(); + try + { + _out.put((byte) 0); + } + catch (BufferOverflowException e) + { + grow(1); + _out.put((byte) 0); + } + return pos; + } + + public void endSize8(int pos) + { + int cur = _out.position(); + _out.put(pos, (byte) (cur - pos - 1)); + } + + public int beginSize16() + { + int pos = _out.position(); + try + { + _out.putShort((short) 0); + } + catch (BufferOverflowException e) + { + grow(2); + _out.putShort((short) 0); + } + return pos; + } + + public void endSize16(int pos) + { + int cur = _out.position(); + _out.putShort(pos, (short) (cur - pos - 2)); + } + + public int beginSize32() + { + int pos = _out.position(); + try + { + _out.putInt(0); + } + catch (BufferOverflowException e) + { + grow(4); + _out.putInt(0); + } + return pos; + + } + + public void endSize32(int pos) + { + int cur = _out.position(); + _out.putInt(pos, (cur - pos - 4)); + + } + + public void writeDouble(double aDouble) + { + try + { + _out.putDouble(aDouble); + } + catch(BufferOverflowException exception) + { + grow(8); + _out.putDouble(aDouble); + } + } + + public void writeInt16(short aShort) + { + try + { + _out.putShort(aShort); + } + catch(BufferOverflowException exception) + { + grow(2); + _out.putShort(aShort); + } + } + + public void writeInt32(int anInt) + { + try + { + _out.putInt(anInt); + } + catch(BufferOverflowException exception) + { + grow(4); + _out.putInt(anInt); + } + } + + public void writeInt64(long aLong) + { + try + { + _out.putLong(aLong); + } + catch(BufferOverflowException exception) + { + grow(8); + _out.putLong(aLong); + } + } + + public void writeInt8(byte aByte) + { + try + { + _out.put(aByte); + } + catch(BufferOverflowException exception) + { + grow(1); + _out.put(aByte); + } + } + + public void writeBin128(byte[] byteArray) + { + byteArray = (byteArray != null) ? byteArray : new byte [16]; + + assert byteArray.length == 16; + + try + { + _out.put(byteArray); + } + catch(BufferOverflowException exception) + { + grow(16); + _out.put(byteArray); + } + } + + public void writeBin128(UUID id) + { + byte[] data = new byte[16]; + + long msb = id.getMostSignificantBits(); + long lsb = id.getLeastSignificantBits(); + + assert data.length == 16; + for (int i=7; i>=0; i--) + { + data[i] = (byte)(msb & 0xff); + msb = msb >> 8; + } + + for (int i=15; i>=8; i--) + { + data[i] = (byte)(lsb & 0xff); + lsb = (lsb >> 8); + } + writeBin128(data); + } + + public void writeFloat(float aFloat) + { + try + { + _out.putFloat(aFloat); + } + catch(BufferOverflowException exception) + { + grow(4); + _out.putFloat(aFloat); + } + } + +} diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 223de4f84e..67204427fb 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -56,6 +56,7 @@ import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; import org.apache.qpid.server.connection.SessionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; +import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.ChannelMessages; @@ -74,7 +75,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.AlreadyKnownDtxException; @@ -136,6 +137,7 @@ public class ServerSession extends Session private org.apache.qpid.server.model.Session<?> _modelObject; private long _blockTime; private long _blockingTimeout; + private boolean _wireBlockingState; public static interface MessageDispositionChangeListener { @@ -188,7 +190,7 @@ public class ServerSession extends Session @Override public void doTimeoutAction(String reason) { - getConnectionModel().closeSession(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason); + getConnectionModel().closeSessionAsync(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason); } }, getVirtualHost()); @@ -207,10 +209,6 @@ public class ServerSession extends Session if (state == State.OPEN) { getVirtualHost().getEventLogger().message(ChannelMessages.CREATE()); - if(_blocking.get()) - { - invokeBlock(); - } } } else @@ -244,6 +242,17 @@ public class ServerSession extends Session invoke(new MessageStop("")); } + private void invokeUnblock() + { + MessageFlow mf = new MessageFlow(); + mf.setUnit(MessageCreditUnit.MESSAGE); + mf.setDestination(""); + _outstandingCredit.set(Integer.MAX_VALUE); + mf.setValue(Integer.MAX_VALUE); + invoke(mf); + } + + @Override protected boolean isFull(int id) { @@ -823,12 +832,11 @@ public class ServerSession extends Session if(_blocking.compareAndSet(false,true)) { + getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name)); if(getState() == State.OPEN) { - invokeBlock(); + getConnection().notifyWork(); } - _blockTime = System.currentTimeMillis(); - getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name)); } @@ -852,28 +860,30 @@ public class ServerSession extends Session { if(_blocking.compareAndSet(true,false) && !isClosing()) { - _blockTime = 0l; getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED()); - MessageFlow mf = new MessageFlow(); - mf.setUnit(MessageCreditUnit.MESSAGE); - mf.setDestination(""); - _outstandingCredit.set(Integer.MAX_VALUE); - mf.setValue(Integer.MAX_VALUE); - invoke(mf); - - + getConnection().notifyWork(); } } } + boolean blockingTimeoutExceeded() { long blockTime = _blockTime; - boolean b = _blocking.get() && blockTime != 0 && (System.currentTimeMillis() - blockTime) > _blockingTimeout; + boolean b = _wireBlockingState && blockTime != 0 && (System.currentTimeMillis() - blockTime) > _blockingTimeout; return b; } @Override + public void transportStateChanged() + { + for(ConsumerTarget_0_10 consumerTarget : getSubscriptions()) + { + consumerTarget.transportStateChanged(); + } + } + + @Override public Object getConnectionReference() { return getConnection().getReference(); @@ -1002,17 +1012,17 @@ public class ServerSession extends Session return _unfinishedCommandsQueue.isEmpty() ? null : _unfinishedCommandsQueue.getLast(); } - public void recordFuture(final StoreFuture future, final ServerTransaction.Action action) + public void recordFuture(final FutureResult future, final ServerTransaction.Action action) { _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); } private static class AsyncCommand { - private final StoreFuture _future; + private final FutureResult _future; private ServerTransaction.Action _action; - public AsyncCommand(final StoreFuture future, final ServerTransaction.Action action) + public AsyncCommand(final FutureResult future, final ServerTransaction.Action action) { _future = future; _action = action; @@ -1125,6 +1135,32 @@ public class ServerSession extends Session } } + @Override + public void processPending() + { + boolean desiredBlockingState = _blocking.get(); + if (desiredBlockingState != _wireBlockingState) + { + _wireBlockingState = desiredBlockingState; + + if (desiredBlockingState) + { + invokeBlock(); + } + else + { + invokeUnblock(); + } + _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0; + } + + + for(ConsumerTarget target : getSubscriptions()) + { + target.processPending(); + } + } + public final long getMaxUncommittedInMemorySize() { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 8632d04048..dd634c36ff 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -36,8 +36,10 @@ import org.apache.log4j.Logger; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException; import org.apache.qpid.server.filter.AMQInvalidArgumentException; import org.apache.qpid.server.filter.ArrivalTimeFilter; import org.apache.qpid.server.filter.FilterManager; @@ -58,7 +60,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.txn.AlreadyKnownDtxException; import org.apache.qpid.server.txn.DtxNotSelectedException; @@ -133,7 +135,7 @@ public class ServerSessionDelegate extends SessionDelegate serverSession.accept(method.getTransfers()); if(!serverSession.isTransactional()) { - serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE, + serverSession.recordFuture(FutureResult.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, method)); } } @@ -246,8 +248,8 @@ public class ServerSessionDelegate extends SessionDelegate } else { - - FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L); + ServerProtocolEngine serverProtocolEngine = getServerConnection(session).getProtocolEngine(); + FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L, serverProtocolEngine); FilterManager filterManager = null; try @@ -421,58 +423,69 @@ public class ServerSessionDelegate extends SessionDelegate new MessageTransferMessage(storeMessage, serverSession.getReference()); MessageReference<MessageTransferMessage> reference = message.newReference(); - final InstanceProperties instanceProperties = new InstanceProperties() + try { - @Override - public Object getProperty(final Property prop) + final InstanceProperties instanceProperties = new InstanceProperties() { - switch (prop) + @Override + public Object getProperty(final Property prop) { - case EXPIRATION: - return message.getExpiration(); - case IMMEDIATE: - return message.isImmediate(); - case MANDATORY: - return (delvProps == null || !delvProps.getDiscardUnroutable()) - && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT; - case PERSISTENT: - return message.isPersistent(); - case REDELIVERED: - return delvProps.getRedelivered(); + switch (prop) + { + case EXPIRATION: + return message.getExpiration(); + case IMMEDIATE: + return message.isImmediate(); + case MANDATORY: + return (delvProps == null || !delvProps.getDiscardUnroutable()) + && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT; + case PERSISTENT: + return message.isPersistent(); + case REDELIVERED: + return delvProps.getRedelivered(); + } + return null; } - return null; - } - }; + }; - int enqueues = serverSession.enqueue(message, instanceProperties, destination); + int enqueues = serverSession.enqueue(message, instanceProperties, destination); - if (enqueues == 0) - { - if ((delvProps == null || !delvProps.getDiscardUnroutable()) - && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT) + if (enqueues == 0) { - RangeSet rejects = RangeSetFactory.createRangeSet(); - rejects.add(xfr.getId()); - MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable"); - ssn.invoke(reject); + if ((delvProps == null || !delvProps.getDiscardUnroutable()) + && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT) + { + RangeSet rejects = RangeSetFactory.createRangeSet(); + rejects.add(xfr.getId()); + MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable"); + ssn.invoke(reject); + } + else + { + virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(destination.getName(), + messageMetaData.getRoutingKey())); + } + } + + if (serverSession.isTransactional()) + { + serverSession.processed(xfr); } else { - virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(destination.getName(), - messageMetaData.getRoutingKey())); + serverSession.recordFuture(FutureResult.IMMEDIATE_FUTURE, + new CommandProcessedAction(serverSession, xfr)); } } - - if (serverSession.isTransactional()) + catch (VirtualHostUnavailableException e) { - serverSession.processed(xfr); + getServerConnection(serverSession).closeAsync(AMQConstant.CONNECTION_FORCED, e.getMessage()); } - else + finally { - serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE, - new CommandProcessedAction(serverSession, xfr)); + reference.release(); } - reference.release(); + } } @@ -589,7 +602,7 @@ public class ServerSessionDelegate extends SessionDelegate { try { - ((ServerSession)session).endDtx(method.getXid(), method.getFail(), method.getSuspend()); + ((ServerSession) session).endDtx(method.getXid(), method.getFail(), method.getSuspend()); } catch (TimeoutDtxException e) { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java index 8e48741b91..a7b08e3f83 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java @@ -21,11 +21,14 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.log4j.Logger; + +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.flow.AbstractFlowCreditManager; public class WindowCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10 { private static final Logger LOGGER = Logger.getLogger(WindowCreditManager.class); + private final ServerProtocolEngine _serverProtocolEngine; private volatile long _bytesCreditLimit; private volatile long _messageCreditLimit; @@ -33,39 +36,22 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl private volatile long _bytesUsed; private volatile long _messageUsed; - public WindowCreditManager() - { - this(0L, 0L); - } - - public WindowCreditManager(long bytesCreditLimit, long messageCreditLimit) + public WindowCreditManager(long bytesCreditLimit, + long messageCreditLimit, + ServerProtocolEngine serverProtocolEngine) { + _serverProtocolEngine = serverProtocolEngine; _bytesCreditLimit = bytesCreditLimit; _messageCreditLimit = messageCreditLimit; setSuspended(!hasCredit()); } - public long getBytesCreditLimit() - { - return _bytesCreditLimit; - } - public long getMessageCreditLimit() { return _messageCreditLimit; } - public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit) - { - _bytesCreditLimit = bytesCreditLimit; - _messageCreditLimit = messageCreditLimit; - - setSuspended(!hasCredit()); - - } - - public long getMessageCredit() { return _messageCreditLimit == -1L @@ -121,12 +107,18 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl public synchronized boolean hasCredit() { return (_bytesCreditLimit < 0L || _bytesCreditLimit > _bytesUsed) - && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed); + && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed) + && !_serverProtocolEngine.isTransportBlockedForWriting(); } public synchronized boolean useCreditForMessage(final long msgSize) { - if(_messageCreditLimit >= 0L) + if (_serverProtocolEngine.isTransportBlockedForWriting()) + { + setSuspended(true); + return false; + } + else if(_messageCreditLimit >= 0L) { if(_messageUsed < _messageCreditLimit) { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java index 1c4a694be6..b9f013d253 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java @@ -20,17 +20,25 @@ */ package org.apache.qpid.server.protocol.v0_10; -import org.apache.qpid.server.protocol.v0_10.WindowCreditManager; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.test.utils.QpidTestCase; public class WindowCreditManagerTest extends QpidTestCase { private WindowCreditManager _creditManager; + private ServerProtocolEngine _protocolEngine; protected void setUp() throws Exception { super.setUp(); - _creditManager = new WindowCreditManager(); + + _protocolEngine = mock(ServerProtocolEngine.class); + when(_protocolEngine.isTransportBlockedForWriting()).thenReturn(false); + + _creditManager = new WindowCreditManager(0l, 0l, _protocolEngine); } /** diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/pom.xml b/qpid/java/broker-plugins/amqp-0-8-protocol/pom.xml index 0c5d20a0a6..e09a3ba922 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/pom.xml +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/pom.xml @@ -29,6 +29,10 @@ <name>Qpid AMQP 0-8 Protocol Broker Plug-in</name> <description>AMQP 0-8, 0-9 and 0-9-1 protocol broker plug-in</description> + <properties> + <qpid.home>${basedir}/../</qpid.home> <!-- override for broker tests --> + </properties> + <dependencies> <dependency> <groupId>org.apache.qpid</groupId> diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 9afa7c393f..2a1fbe6881 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -40,7 +40,6 @@ import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; import javax.security.auth.Subject; @@ -66,8 +65,6 @@ import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.flow.MessageOnlyCreditManager; -import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.ChannelMessages; @@ -99,7 +96,6 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; @@ -108,6 +104,7 @@ import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.virtualhost.ExchangeExistsException; import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; import org.apache.qpid.server.virtualhost.QueueExistsException; @@ -133,7 +130,8 @@ public class AMQChannel private final int _channelId; - private final Pre0_10CreditManager _creditManager = new Pre0_10CreditManager(0l,0l); + private final Pre0_10CreditManager _creditManager; + private final FlowCreditManager _noAckCreditManager; /** * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that @@ -211,8 +209,13 @@ public class AMQChannel private final List<StoredMessage<MessageMetaData>> _uncommittedMessages = new ArrayList<>(); private long _maxUncommittedInMemorySize; + private boolean _wireBlockingState; + public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore) { + _creditManager = new Pre0_10CreditManager(0l,0l, connection); + _noAckCreditManager = new NoAckCreditManager(connection); + _connection = connection; _channelId = channelId; @@ -699,7 +702,7 @@ public class AMQChannel if(arguments != null && Boolean.TRUE.equals(arguments.get(AMQPFilterTypes.NO_CONSUME.getValue()))) { - target = ConsumerTarget_0_8.createBrowserTarget(this, tag, arguments, _creditManager); + target = ConsumerTarget_0_8.createBrowserTarget(this, tag, arguments, _noAckCreditManager); } else if(acks) { @@ -709,7 +712,7 @@ public class AMQChannel } else { - target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments, _creditManager); + target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments, _noAckCreditManager); options.add(ConsumerImpl.Option.ACQUIRES); options.add(ConsumerImpl.Option.SEES_REQUEUES); } @@ -1274,7 +1277,8 @@ public class AMQChannel // stop all subscriptions _rollingBack = true; - boolean requiresSuspend = _suspended.compareAndSet(false,true); + boolean requiresSuspend = _suspended.compareAndSet(false,true); // TODO This is probably superfluous owing to the + // message assignment suspended logic in NBC. // ensure all subscriptions have seen the change to the channel state for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values()) @@ -1653,12 +1657,14 @@ public class AMQChannel { if(_blockingEntities.add(this)) { + if(_blocking.compareAndSet(false,true)) { getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED("** All Queues **")); - flow(false); - _blockTime = System.currentTimeMillis(); + + + getConnection().notifyWork(); } } } @@ -1670,12 +1676,12 @@ public class AMQChannel if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false)) { getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED()); - - flow(true); + getConnection().notifyWork(); } } } + public synchronized void block(AMQQueue queue) { if(_blockingEntities.add(queue)) @@ -1684,8 +1690,7 @@ public class AMQChannel if(_blocking.compareAndSet(false,true)) { getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getName())); - flow(false); - _blockTime = System.currentTimeMillis(); + getConnection().notifyWork(); } } @@ -1698,12 +1703,19 @@ public class AMQChannel if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false) && !isClosing()) { getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED()); - flow(true); + getConnection().notifyWork(); } } } @Override + public void transportStateChanged() + { + _creditManager.restoreCredit(0, 0); + _noAckCreditManager.restoreCredit(0, 0); + } + + @Override public Object getConnectionReference() { return getConnection().getReference(); @@ -1743,16 +1755,7 @@ public class AMQChannel */ private void closeConnection(String reason) throws AMQException { - Lock receivedLock = _connection.getReceivedLock(); - receivedLock.lock(); - try - { - _connection.close(AMQConstant.RESOURCE_ERROR, reason); - } - finally - { - receivedLock.unlock(); - } + _connection.closeAsync(AMQConstant.RESOURCE_ERROR, reason); } public void deadLetter(long deliveryTag) @@ -1815,7 +1818,7 @@ public class AMQChannel } } - public void recordFuture(final StoreFuture future, final ServerTransaction.Action action) + public void recordFuture(final FutureResult future, final ServerTransaction.Action action) { _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); } @@ -1841,10 +1844,10 @@ public class AMQChannel private static class AsyncCommand { - private final StoreFuture _future; + private final FutureResult _future; private ServerTransaction.Action _action; - public AsyncCommand(final StoreFuture future, final ServerTransaction.Action action) + public AsyncCommand(final FutureResult future, final ServerTransaction.Action action) { _future = future; _action = action; @@ -2305,7 +2308,7 @@ public class AMQChannel private boolean blockingTimeoutExceeded() { - return _blocking.get() && (System.currentTimeMillis() - _blockTime) > _blockingTimeout; + return _wireBlockingState && (System.currentTimeMillis() - _blockTime) > _blockingTimeout; } @Override @@ -3639,4 +3642,22 @@ public class AMQChannel } } } + + @Override + public void processPending() + { + + boolean desiredBlockingState = _blocking.get(); + if (desiredBlockingState != _wireBlockingState) + { + _wireBlockingState = desiredBlockingState; + flow(!desiredBlockingState); + _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0; + } + + for(ConsumerTarget target : _tag2SubscriptionTargetMap.values()) + { + target.processPending(); + } + } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index cb145aac88..d7b5b00b26 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -36,13 +36,14 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; import javax.security.sasl.SaslException; @@ -58,7 +59,7 @@ import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.framing.*; import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; @@ -69,6 +70,7 @@ import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Transport; @@ -85,7 +87,7 @@ import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.SenderClosedException; import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.TransportException; @@ -96,6 +98,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQConnectionModel<AMQProtocolEngine, AMQChannel>, ServerMethodProcessor<ServerChannelMethodProcessor> { + + + enum ConnectionState { INIT, @@ -117,6 +122,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private static final long AWAIT_CLOSED_TIMEOUT = 60000; private final AmqpPort<?> _port; private final long _creationTime; + private final AtomicBoolean _stateChanged = new AtomicBoolean(); + private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>(); private AMQShortString _contextKey; @@ -139,11 +146,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, * The channels that the latest call to {@link #received(ByteBuffer)} applied to. * Used so we know which channels we need to call {@link AMQChannel#receivedComplete()} * on after handling the frames. - * - * Thread-safety: guarded by {@link #_receivedLock}. */ - private final Set<AMQChannel> _channelsForCurrentMessage = - new HashSet<>(); + private final Set<AMQChannel> _channelsForCurrentMessage = new HashSet<>(); private AMQDecoder _decoder; @@ -157,9 +161,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, /* AMQP Version for this session */ private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); private final MethodRegistry _methodRegistry = new MethodRegistry(_protocolVersion); - private final List<Action<? super AMQProtocolEngine>> _taskList = + private final List<Action<? super AMQProtocolEngine>> _connectionCloseTaskList = new CopyOnWriteArrayList<>(); + private final Queue<Action<? super AMQProtocolEngine>> _asyncTaskList = + new ConcurrentLinkedQueue<>(); + private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<>(); private ProtocolOutputConverter _protocolOutputConverter; private final Subject _authorizedSubject = new Subject(); @@ -179,13 +186,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; private NetworkConnection _network; - private Sender<ByteBuffer> _sender; + private ByteBufferSender _sender; private volatile boolean _deferFlush; - private long _lastReceivedTime; + private long _lastReceivedTime = System.currentTimeMillis(); // TODO consider if this is what we want? private boolean _blocking; - private final ReentrantLock _receivedLock; private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis()); private final Broker<?> _broker; private final Transport _transport; @@ -200,6 +206,34 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private int _currentMethodId; private int _binaryDataLimit; private long _maxMessageSize; + private volatile boolean _transportBlockedForWriting; + + private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>(); + + + @Override + public boolean isMessageAssignmentSuspended() + { + Thread lock = _messageAssignmentSuspended.get(); + return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread(); + } + + @Override + public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended) + { + _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null); + if(!messageAssignmentSuspended) + { + for(AMQSessionModel<?,?> session : getSessionModels()) + { + for(Consumer<?> consumer : session.getConsumers()) + { + ((ConsumerImpl)consumer).getTarget().notifyCurrentState(); + } + } + } + } + public AMQProtocolEngine(Broker<?> broker, final NetworkConnection network, @@ -211,7 +245,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _port = port; _transport = transport; _maxNoOfChannels = broker.getConnection_sessionCountLimit(); - _receivedLock = new ReentrantLock(); _decoder = new BrokerDecoder(this); _connectionID = connectionId; _logSubject = new ConnectionLogSubject(this); @@ -262,12 +295,28 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return _authorizedSubject; } + @Override + public boolean isTransportBlockedForWriting() + { + return _transportBlockedForWriting; + } + + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + _transportBlockedForWriting = blocked; + for(AMQChannel channel : _channelMap.values()) + { + channel.transportStateChanged(); + } + } + public void setNetworkConnection(NetworkConnection network) { setNetworkConnection(network, network.getSender()); } - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) + public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender) { _network = network; _sender = sender; @@ -294,10 +343,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return _closing.get(); } - public synchronized void flushBatched() - { - _sender.flush(); - } public ClientDeliveryMethod createDeliveryMethod(int channelId) @@ -314,9 +359,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { final long arrivalTime = System.currentTimeMillis(); - if(!_authenticated && - (arrivalTime - _creationTime) > _port.getContextValue(Long.class, - Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)) + if (!_authenticated && + (arrivalTime - _creationTime) > _port.getContextValue(Long.class, + Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)) { _logger.warn("Connection has taken more than " + _port.getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY) @@ -328,7 +373,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _lastIoTime = arrivalTime; _readBytes += msg.remaining(); - _receivedLock.lock(); try { _decoder.decodeBuffer(msg); @@ -371,7 +415,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } catch (StoreException e) { - if(_virtualHost.getState() == State.ACTIVE) + if (_virtualHost.getState() == State.ACTIVE) { throw e; } @@ -380,10 +424,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _logger.error("Store Exception ignored as virtual host no longer active", e); } } - finally - { - _receivedLock.unlock(); - } return null; } }); @@ -484,64 +524,22 @@ public class AMQProtocolEngine implements ServerProtocolEngine, writeFrame(responseBody.generateFrame(0)); _state = ConnectionState.AWAIT_START_OK; + _sender.flush(); + } catch (AMQException e) { _logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion()); writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); + _sender.flush(); } } private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY]; - private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(_reusableBytes); private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes); - private ByteBuffer asByteBuffer(AMQDataBlock block) - { - final int size = (int) block.getSize(); - - final byte[] data; - - - if(size > REUSABLE_BYTE_BUFFER_CAPACITY) - { - data= new byte[size]; - } - else - { - - data = _reusableBytes; - } - _reusableDataOutput.setBuffer(data); - - try - { - block.writePayload(_reusableDataOutput); - } - catch (IOException e) - { - throw new ServerScopedRuntimeException(e); - } - - final ByteBuffer buf; - - if(size <= REUSABLE_BYTE_BUFFER_CAPACITY) - { - buf = _reusableByteBuffer; - buf.position(0); - } - else - { - buf = ByteBuffer.wrap(data); - } - buf.limit(_reusableDataOutput.length()); - - return buf; - } - - /** * Convenience method that writes a frame to the protocol session. Equivalent to calling * getProtocolSession().write(). @@ -550,16 +548,21 @@ public class AMQProtocolEngine implements ServerProtocolEngine, */ public synchronized void writeFrame(AMQDataBlock frame) { - - final ByteBuffer buf = asByteBuffer(frame); - _writtenBytes += buf.remaining(); - if(_logger.isDebugEnabled()) { _logger.debug("SEND: " + frame); } - _sender.send(buf); + try + { + _writtenBytes += frame.writePayload(_sender); + } + catch (IOException e) + { + throw new ServerScopedRuntimeException(e); + } + + final long time = System.currentTimeMillis(); _lastIoTime = time; _lastWriteTime.set(time); @@ -796,14 +799,13 @@ public class AMQProtocolEngine implements ServerProtocolEngine, if(_closing.compareAndSet(false,true)) { // force sync of outstanding async work - _receivedLock.lock(); try { receivedComplete(); } finally { - _receivedLock.unlock(); + finishClose(connectionDropped); } @@ -845,7 +847,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { try { - for (Action<? super AMQProtocolEngine> task : _taskList) + for (Action<? super AMQProtocolEngine> task : _connectionCloseTaskList) { task.performAction(this); } @@ -867,17 +869,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { synchronized(this) { - final boolean lockHeld = _receivedLock.isHeldByCurrentThread(); final long endTime = System.currentTimeMillis() + AWAIT_CLOSED_TIMEOUT; while(!_closed && endTime > System.currentTimeMillis()) { try { - if(lockHeld) - { - _receivedLock.unlock(); - } wait(1000); } catch (InterruptedException e) @@ -885,13 +882,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Thread.currentThread().interrupt(); break; } - finally - { - if(lockHeld) - { - _receivedLock.lock(); - } - } } if (!_closed) @@ -1088,12 +1078,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, public void addDeleteTask(Action<? super AMQProtocolEngine> task) { - _taskList.add(task); + _connectionCloseTaskList.add(task); } public void removeDeleteTask(Action<? super AMQProtocolEngine> task) { - _taskList.remove(task); + _connectionCloseTaskList.remove(task); } public ProtocolOutputConverter getProtocolOutputConverter() @@ -1171,6 +1161,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } } + @Override + public void encryptedTransport() + { + } + public void readerIdle() { Subject.doAs(_authorizedSubject, new PrivilegedAction<Object>() @@ -1323,26 +1318,50 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return String.valueOf(getRemoteAddress()); } - public void closeSession(AMQChannel session, AMQConstant cause, String message) + public void closeSessionAsync(final AMQChannel session, final AMQConstant cause, final String message) { - int channelId = session.getChannelId(); - closeChannel(channelId, cause, message); + addAsyncTask(new Action<AMQProtocolEngine>() + { - MethodRegistry methodRegistry = getMethodRegistry(); - ChannelCloseBody responseBody = - methodRegistry.createChannelCloseBody( - cause.getCode(), - AMQShortString.validValueOf(message), - 0, 0); + @Override + public void performAction(final AMQProtocolEngine object) + { + int channelId = session.getChannelId(); + closeChannel(channelId, cause, message); + + MethodRegistry methodRegistry = getMethodRegistry(); + ChannelCloseBody responseBody = + methodRegistry.createChannelCloseBody( + cause.getCode(), + AMQShortString.validValueOf(message), + 0, 0); + + writeFrame(responseBody.generateFrame(channelId)); + } + }); - writeFrame(responseBody.generateFrame(channelId)); } - public void close(AMQConstant cause, String message) + public void closeAsync(final AMQConstant cause, final String message) { - closeConnection(0, new AMQConnectionException(cause, message, 0, 0, - getMethodRegistry(), - null)); + Action<AMQProtocolEngine> action = new Action<AMQProtocolEngine>() + { + @Override + public void performAction(final AMQProtocolEngine object) + { + closeConnection(0, new AMQConnectionException(cause, message, 0, 0, + getMethodRegistry(), + null)); + + } + }; + addAsyncTask(action); + } + + private void addAsyncTask(final Action<AMQProtocolEngine> action) + { + _asyncTaskList.add(action); + notifyWork(); } public void block() @@ -1922,11 +1941,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return _reference; } - public Lock getReceivedLock() - { - return _receivedLock; - } - @Override public long getLastReadTime() { @@ -2045,4 +2059,51 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return _closing.get(); } + @Override + public void processPending() + { + + + while(_asyncTaskList.peek() != null) + { + Action<? super AMQProtocolEngine> asyncAction = _asyncTaskList.poll(); + asyncAction.performAction(this); + } + + for (AMQSessionModel session : getSessionModels()) + { + session.processPending(); + } + } + + @Override + public boolean hasWork() + { + return _stateChanged.get(); + } + + @Override + public void notifyWork() + { + _stateChanged.set(true); + + final Action<ServerProtocolEngine> listener = _workListener.get(); + if(listener != null) + { + + listener.performAction(this); + } + } + + @Override + public void clearWork() + { + _stateChanged.set(false); + } + + @Override + public void setWorkListener(final Action<ServerProtocolEngine> listener) + { + _workListener.set(listener); + } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index 43982db2fd..a2113de8ea 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -75,6 +75,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen private final AtomicLong _unacknowledgedCount = new AtomicLong(0); private final AtomicLong _unacknowledgedBytes = new AtomicLong(0); private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>(); + private final AtomicBoolean _needToClose = new AtomicBoolean(); public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel, @@ -99,6 +100,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen return _consumers; } + static final class BrowserConsumer extends ConsumerTarget_0_8 { public BrowserConsumer(AMQChannel channel, @@ -123,7 +125,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen * @throws org.apache.qpid.AMQException */ @Override - public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) + public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch) { // We don't decrement the reference here as we don't want to consume the message // but we do want to send it to the client. @@ -131,17 +133,11 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen synchronized (getChannel()) { long deliveryTag = getChannel().getNextDeliveryTag(); - return sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag); + sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag); } } - @Override - public boolean allocateCredit(ServerMessage msg) - { - return true; - } - } public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel, @@ -184,7 +180,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen * @param batch */ @Override - public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) + public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch) { // if we do not need to wait for client acknowledgements // we can decrement the reference count immediately. @@ -211,14 +207,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen } ref.release(); - return size; - - } - @Override - public boolean allocateCredit(ServerMessage msg) - { - return true; } private static final ServerTransaction.Action NOOP = @@ -250,11 +239,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); } - public boolean allocateCredit(ServerMessage msg) - { - return getCreditManager().useCreditForMessage(msg.getSize()); - } - } @@ -295,9 +279,10 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen * @param batch */ @Override - public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) + public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch) { + // put queue entry on a list and then notify the connection to read list. synchronized (getChannel()) { @@ -309,12 +294,15 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen entry.addStateChangeListener(getReleasedStateChangeListener()); long size = sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag); entry.incrementDeliveryCount(); - return size; } + + } + + } @@ -399,7 +387,8 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen return subscriber + "]"; } - public boolean isSuspended() + @Override + public boolean doIsSuspended() { return getState()!=State.ACTIVE || _channel.isSuspended() || _deleted.get() || _channel.getConnectionModel().isStopped(); } @@ -525,6 +514,16 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen { if (isAutoClose()) { + _needToClose.set(true); + getChannel().getConnection().notifyWork(); + } + } + + @Override + protected void processClosed() + { + if (_needToClose.get() && getState() != State.CLOSED) + { close(); confirmAutoClose(); } @@ -533,8 +532,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen public void flushBatched() { _channel.getConnection().setDeferFlush(false); - - _channel.getConnection().flushBatched(); } protected void addUnacknowledgedMessage(MessageInstance entry) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java new file mode 100644 index 0000000000..af54c911dc --- /dev/null +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java @@ -0,0 +1,73 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.protocol.v0_8; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.qpid.server.flow.AbstractFlowCreditManager; +import org.apache.qpid.server.flow.FlowCreditManager; + +public class MessageOnlyCreditManager extends AbstractFlowCreditManager implements FlowCreditManager +{ + private final AtomicLong _messageCredit; + + public MessageOnlyCreditManager(final long initialCredit) + { + _messageCredit = new AtomicLong(initialCredit); + } + + public void restoreCredit(long messageCredit, long bytesCredit) + { + _messageCredit.addAndGet(messageCredit); + setSuspended(false); + + } + + public boolean hasCredit() + { + return _messageCredit.get() > 0L; + } + + public boolean useCreditForMessage(long msgSize) + { + if(hasCredit()) + { + if(_messageCredit.addAndGet(-1L) >= 0) + { + setSuspended(false); + return true; + } + else + { + _messageCredit.addAndGet(1L); + setSuspended(true); + return false; + } + } + else + { + setSuspended(true); + return false; + } + + } + +} diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java new file mode 100644 index 0000000000..6e5aab2dd5 --- /dev/null +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.server.protocol.v0_8; + +import org.apache.qpid.server.protocol.ServerProtocolEngine; +import org.apache.qpid.server.flow.AbstractFlowCreditManager; + +public class NoAckCreditManager extends AbstractFlowCreditManager +{ + private final ServerProtocolEngine _serverProtocolEngine; + + public NoAckCreditManager(ServerProtocolEngine serverProtocolEngine) + { + _serverProtocolEngine = serverProtocolEngine; + } + + @Override + public void restoreCredit(final long messageCredit, final long bytesCredit) + { + setSuspended(!hasCredit()); + } + + @Override + public boolean hasCredit() + { + return !_serverProtocolEngine.isTransportBlockedForWriting(); + } + + @Override + public boolean useCreditForMessage(final long msgSize) + { + if (!hasCredit()) + { + setSuspended(true); + return false; + } + return true; + } +} diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java new file mode 100644 index 0000000000..a869a707e1 --- /dev/null +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java @@ -0,0 +1,190 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.protocol.v0_8; + + +import org.apache.qpid.server.protocol.ServerProtocolEngine; +import org.apache.qpid.server.flow.AbstractFlowCreditManager; +import org.apache.qpid.server.flow.FlowCreditManager; + +public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager +{ + + private final ServerProtocolEngine _protocolEngine; + private volatile long _bytesCreditLimit; + private volatile long _messageCreditLimit; + + private volatile long _bytesCredit; + private volatile long _messageCredit; + + public Pre0_10CreditManager(long bytesCreditLimit, + long messageCreditLimit, + ServerProtocolEngine protocolEngine) + { + _protocolEngine = protocolEngine; + _bytesCreditLimit = bytesCreditLimit; + _messageCreditLimit = messageCreditLimit; + _bytesCredit = bytesCreditLimit; + _messageCredit = messageCreditLimit; + } + + + + public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit) + { + long bytesCreditChange = bytesCreditLimit - _bytesCreditLimit; + long messageCreditChange = messageCreditLimit - _messageCreditLimit; + + + + if(bytesCreditChange != 0L) + { + if(bytesCreditLimit == 0L) + { + _bytesCredit = 0; + } + else + { + _bytesCredit += bytesCreditChange; + } + } + + + if(messageCreditChange != 0L) + { + if(messageCreditLimit == 0L) + { + _messageCredit = 0; + } + else + { + _messageCredit += messageCreditChange; + } + } + + + _bytesCreditLimit = bytesCreditLimit; + _messageCreditLimit = messageCreditLimit; + + setSuspended(!hasCredit()); + + } + + + public synchronized void restoreCredit(final long messageCredit, final long bytesCredit) + { + final long messageCreditLimit = _messageCreditLimit; + boolean notifyIncrease = true; + if(messageCreditLimit != 0L) + { + notifyIncrease = (_messageCredit != 0); + long newCredit = _messageCredit + messageCredit; + _messageCredit = newCredit > messageCreditLimit ? messageCreditLimit : newCredit; + } + + + final long bytesCreditLimit = _bytesCreditLimit; + if(bytesCreditLimit != 0L) + { + long newCredit = _bytesCredit + bytesCredit; + _bytesCredit = newCredit > bytesCreditLimit ? bytesCreditLimit : newCredit; + if(notifyIncrease && bytesCredit>0) + { + notifyIncreaseBytesCredit(); + } + } + + + + setSuspended(!hasCredit()); + + } + + public synchronized boolean hasCredit() + { + return (_bytesCreditLimit == 0L || _bytesCredit > 0) + && (_messageCreditLimit == 0L || _messageCredit > 0) + && !_protocolEngine.isTransportBlockedForWriting(); + } + + public synchronized boolean useCreditForMessage(final long msgSize) + { + if (_protocolEngine.isTransportBlockedForWriting()) + { + setSuspended(true); + return false; + } + else if(_messageCreditLimit != 0L) + { + if(_messageCredit != 0L) + { + if(_bytesCreditLimit == 0L) + { + _messageCredit--; + + return true; + } + else + { + if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit)) + { + _messageCredit--; + _bytesCredit -= msgSize; + + return true; + } + else + { + return false; + } + } + } + else + { + setSuspended(true); + return false; + } + } + else + { + if(_bytesCreditLimit == 0L) + { + + return true; + } + else + { + if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit)) + { + _bytesCredit -= msgSize; + + return true; + } + else + { + return false; + } + } + + } + + } +} diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java index 0058fe86a9..e8cf028069 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java index 7253111114..8817e79aff 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java index e72cc4d058..af37b17d85 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java index b616aab126..4a84ccad37 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java @@ -42,6 +42,7 @@ import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.MessageConverterRegistry; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.util.GZIPUtils; public class ProtocolOutputConverterImpl implements ProtocolOutputConverter @@ -255,6 +256,15 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter } } + @Override + public long writePayload(final ByteBufferSender sender) throws IOException + { + ByteBuffer buf = _message.getContent(_offset, _length); + long size = buf.remaining(); + sender.send(buf.duplicate()); + return size; + } + public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException { throw new UnsupportedOperationException(); @@ -346,6 +356,15 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter _underlyingBody.writePayload(buffer); } + public long writePayload(ByteBufferSender sender) throws IOException + { + if(_underlyingBody == null) + { + _underlyingBody = createAMQBody(); + } + return _underlyingBody.writePayload(sender); + } + public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException { @@ -449,6 +468,18 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter } @Override + public long writePayload(final ByteBufferSender sender) throws IOException + { + long size = (new AMQFrame(_channel, _methodBody)).writePayload(sender); + + size += (new AMQFrame(_channel, _headerBody)).writePayload(sender); + + size += (new AMQFrame(_channel, _contentBody)).writePayload(sender); + + return size; + } + + @Override public String toString() { StringBuilder builder = new StringBuilder(); @@ -490,6 +521,14 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter } @Override + public long writePayload(final ByteBufferSender sender) throws IOException + { + long size = (new AMQFrame(_channel, _methodBody)).writePayload(sender); + size += (new AMQFrame(_channel, _headerBody)).writePayload(sender); + return size; + } + + @Override public String toString() { StringBuilder builder = new StringBuilder(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java index 9326f16703..55fc865850 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java @@ -31,8 +31,6 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.MessagePublishInfo; import org.apache.qpid.server.consumer.ConsumerImpl; -import org.apache.qpid.server.flow.LimitlessCreditManager; -import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.StoredMessage; @@ -328,7 +326,7 @@ public class AckTest extends QpidTestCase public void testMessageDequeueRestoresCreditTest() throws Exception { // Send 10 messages - Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1); + Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1, _protocolEngine); _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java index 6c6b746cf2..3a759cd772 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java @@ -36,11 +36,11 @@ import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.MessagePublishInfo; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageContentSource; @@ -50,7 +50,7 @@ import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.security.auth.UsernamePrincipal; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.network.NetworkConnection; public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter @@ -224,17 +224,6 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr // Then the AMQMinaProtocolSession can join on the returning future without a NPE. } - public void closeSession(AMQChannel session, AMQConstant cause, String message) - { - super.closeSession(session, cause, message); - - //Simulate the Client responding with a CloseOK - // should really update the StateManger but we don't have access here - // changeState(AMQState.CONNECTION_CLOSED); - ((AMQChannel)session).getConnection().closeSession(false); - - } - private class InternalWriteDeliverMethod implements ClientDeliveryMethod { private int _channelId; @@ -288,16 +277,12 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr private String _remoteHost = "127.0.0.1"; private String _localHost = "127.0.0.1"; private int _port = portNumber.incrementAndGet(); - private final Sender<ByteBuffer> _sender; + private final ByteBufferSender _sender; public TestNetworkConnection() { - _sender = new Sender<ByteBuffer>() + _sender = new ByteBufferSender() { - public void setIdleTimeout(int i) - { - } - public void send(ByteBuffer msg) { } @@ -358,7 +343,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr } @Override - public Sender<ByteBuffer> getSender() + public ByteBufferSender getSender() { return _sender; } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java new file mode 100644 index 0000000000..c4c89ac24a --- /dev/null +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java @@ -0,0 +1,47 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.protocol.v0_8; + + +import org.apache.qpid.server.flow.AbstractFlowCreditManager; +import org.apache.qpid.server.flow.FlowCreditManager; + +public class LimitlessCreditManager extends AbstractFlowCreditManager implements FlowCreditManager +{ + + public void restoreCredit(long messageCredit, long bytesCredit) + { + } + + public void removeAllCredit() + { + } + + public boolean hasCredit() + { + return true; + } + + public boolean useCreditForMessage(long msgSize) + { + return true; + } +} diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index 8e24d55da0..b515fda4a7 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -30,7 +30,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import javax.security.auth.Subject; @@ -51,6 +53,7 @@ import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.protocol.SessionModelListener; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; @@ -64,6 +67,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod private final AmqpPort<?> _port; private final Broker<?> _broker; private final SubjectCreator _subjectCreator; + private final ProtocolEngine_1_0_0_SASL _protocolEngine; private VirtualHostImpl _vhost; private final Transport _transport; private final ConnectionEndpoint _conn; @@ -98,15 +102,24 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod private List<Action<? super Connection_1_0>> _closeTasks = Collections.synchronizedList(new ArrayList<Action<? super Connection_1_0>>()); + + private final Queue<Action<? super Connection_1_0>> _asyncTaskList = + new ConcurrentLinkedQueue<>(); + + private boolean _closedOnOpen; + public Connection_1_0(Broker<?> broker, ConnectionEndpoint conn, long connectionId, AmqpPort<?> port, - Transport transport, final SubjectCreator subjectCreator) + Transport transport, + final SubjectCreator subjectCreator, + final ProtocolEngine_1_0_0_SASL protocolEngine) { + _protocolEngine = protocolEngine; _broker = broker; _port = port; _transport = transport; @@ -207,6 +220,13 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod _closeTasks.add( task ); } + private void addAsyncTask(final Action<Connection_1_0> action) + { + _asyncTaskList.add(action); + notifyWork(); + } + + public void closeReceived() { Collection<Session_1_0> sessions = new ArrayList(_sessions); @@ -245,9 +265,19 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod @Override - public void close(AMQConstant cause, String message) + public void closeAsync(AMQConstant cause, String message) { - _conn.close(); + Action<Connection_1_0> action = new Action<Connection_1_0>() + { + @Override + public void performAction(final Connection_1_0 object) + { + _conn.close(); + + } + }; + addAsyncTask(action); + } @Override @@ -263,9 +293,16 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod } @Override - public void closeSession(Session_1_0 session, AMQConstant cause, String message) + public void closeSessionAsync(final Session_1_0 session, final AMQConstant cause, final String message) { - session.close(cause, message); + addAsyncTask(new Action<Connection_1_0>() + { + @Override + public void performAction(final Connection_1_0 object) + { + session.close(cause, message); + } + }); } @Override @@ -363,6 +400,11 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod return _port; } + public ServerProtocolEngine getProtocolEngine() + { + return _protocolEngine; + } + @Override public Transport getTransport() { @@ -480,4 +522,38 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod } + public void transportStateChanged() + { + for (Session_1_0 session : _sessions) + { + session.transportStateChanged(); + } + } + + @Override + public void notifyWork() + { + _protocolEngine.notifyWork(); + } + + @Override + public boolean isMessageAssignmentSuspended() + { + return _protocolEngine.isMessageAssignmentSuspended(); + } + + public void processPending() + { + while(_asyncTaskList.peek() != null) + { + Action<? super Connection_1_0> asyncAction = _asyncTaskList.poll(); + asyncAction.performAction(this); + } + + for (AMQSessionModel session : getSessionModels()) + { + session.processPending(); + } + + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index 3b9521866c..fa2e543f8d 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -40,6 +40,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.Released; import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; @@ -83,9 +84,10 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget return _link.getEndpoint(); } - public boolean isSuspended() + @Override + public boolean doIsSuspended() { - return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE;// || !getEndpoint().hasCreditToSend(); + return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE; } @@ -113,22 +115,10 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget } } - public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) - { - // TODO - long size = entry.getMessage().getSize(); - send(entry); - return size; - } - - public void flushBatched() + public void doSend(final ConsumerImpl consumer, final MessageInstance entry, boolean batch) { // TODO - } - - public void send(final MessageInstance queueEntry) - { - ServerMessage serverMessage = queueEntry.getMessage(); + ServerMessage serverMessage = entry.getMessage(); Message_1_0 message; if(serverMessage instanceof Message_1_0) { @@ -168,7 +158,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget payload.flip(); } - if(queueEntry.getDeliveryCount() != 0) + if(entry.getDeliveryCount() != 0) { payload = payload.duplicate(); ValueHandler valueHandler = new ValueHandler(_typeRegistry); @@ -200,7 +190,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget header.setPriority(oldHeader.getPriority()); header.setTtl(oldHeader.getTtl()); } - header.setDeliveryCount(UnsignedInteger.valueOf(queueEntry.getDeliveryCount())); + header.setDeliveryCount(UnsignedInteger.valueOf(entry.getDeliveryCount())); _sectionEncoder.reset(); _sectionEncoder.encodeObject(header); Binary encodedHeader = _sectionEncoder.getEncoding(); @@ -230,10 +220,10 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget else { UnsettledAction action = _acquires - ? new DispositionAction(tag, queueEntry) - : new DoNothingAction(tag, queueEntry); + ? new DispositionAction(tag, entry) + : new DoNothingAction(tag, entry); - _link.addUnsettled(tag, action, queueEntry); + _link.addUnsettled(tag, action, entry); } if(_transactionId != null) @@ -257,9 +247,9 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget public void onRollback() { - if(queueEntry.isAcquiredBy(getConsumer())) + if(entry.isAcquiredBy(getConsumer())) { - queueEntry.release(); + entry.release(); _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true); @@ -274,12 +264,17 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget } else { - queueEntry.release(); + entry.release(); } } } + public void flushBatched() + { + // TODO + } + public void queueDeleted() { //TODO @@ -296,7 +291,9 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget { synchronized (_link.getLock()) { - final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend(); + + ServerProtocolEngine protocolEngine = getSession().getConnection().getProtocolEngine(); + final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend() && !protocolEngine.isTransportBlockedForWriting(); if(!hasCredit && getState() == State.ACTIVE) { suspend(); @@ -336,7 +333,8 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget { synchronized(_link.getLock()) { - if(isSuspended() && getEndpoint() != null) + ServerProtocolEngine protocolEngine = getSession().getConnection().getProtocolEngine(); + if(isSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting()) { updateState(State.SUSPENDED, State.ACTIVE); _transactionId = _link.getTransactionId(); @@ -544,4 +542,9 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget return 0; } + @Override + protected void processClosed() + { + + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java index fa8134cb55..e72dc17b57 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.protocol.v1_0; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java index 740b01e459..a0f10eee65 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java @@ -29,6 +29,8 @@ import java.security.PrivilegedAction; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; import javax.security.sasl.SaslException; @@ -52,14 +54,18 @@ import org.apache.qpid.amqp_1_0.type.transport.AmqpError; import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.common.ServerPropertyNames; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.port.AmqpPort; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.UsernamePrincipal; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ServerScopedRuntimeException; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; @@ -79,6 +85,9 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut private long _createTime = System.currentTimeMillis(); private ConnectionEndpoint _endpoint; private long _connectionId; + private final AtomicBoolean _stateChanged = new AtomicBoolean(); + private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>(); + private static final ByteBuffer HEADER = ByteBuffer.wrap(new byte[] @@ -116,8 +125,9 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut private byte _revision; private PrintWriter _out; private NetworkConnection _network; - private Sender<ByteBuffer> _sender; + private ByteBufferSender _sender; private Connection_1_0 _connection; + private volatile boolean _transportBlockedForWriting; static enum State { @@ -134,6 +144,10 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut private State _state = State.A; + private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>(); + + + public ProtocolEngine_1_0_0_SASL(final NetworkConnection networkDriver, final Broker<?> broker, long id, AmqpPort<?> port, Transport transport) @@ -149,6 +163,31 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut } + @Override + public boolean isMessageAssignmentSuspended() + { + Thread lock = _messageAssignmentSuspended.get(); + return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread(); + } + + @Override + public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended) + { + _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null); + + if(!messageAssignmentSuspended) + { + for(AMQSessionModel<?,?> session : _connection.getSessionModels()) + { + for(Consumer<?> consumer : session.getConsumers()) + { + ((ConsumerImpl)consumer).getTarget().notifyCurrentState(); + } + } + } + } + + public SocketAddress getRemoteAddress() { return _network.getRemoteAddress(); @@ -179,7 +218,12 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut //Todo } - public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender) + @Override + public void encryptedTransport() + { + } + + public void setNetworkConnection(final NetworkConnection network, final ByteBufferSender sender) { _network = network; _sender = sender; @@ -211,7 +255,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut _endpoint.setProperties(serverProperties); _endpoint.setRemoteAddress(getRemoteAddress()); - _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport, subjectCreator); + _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport, subjectCreator, this); _endpoint.setConnectionEventListener(_connection); _endpoint.setFrameOutputHandler(this); @@ -524,6 +568,8 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut } + + public void close() { _sender.close(); @@ -554,4 +600,60 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut { return _lastWriteTime; } + + @Override + public boolean isTransportBlockedForWriting() + { + return _transportBlockedForWriting; + } + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + _transportBlockedForWriting = blocked; + _connection.transportStateChanged(); + + } + + public void flushBatched() + { + _sender.flush(); + } + + @Override + public void processPending() + { + _connection.processPending(); + + } + + @Override + public boolean hasWork() + { + return _stateChanged.get(); + } + + @Override + public void notifyWork() + { + _stateChanged.set(true); + + final Action<ServerProtocolEngine> listener = _workListener.get(); + if(listener != null) + { + listener.performAction(this); + } + } + + @Override + public void clearWork() + { + _stateChanged.set(false); + } + + @Override + public void setWorkListener(final Action<ServerProtocolEngine> listener) + { + _workListener.set(listener); + } + } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index c952a3c585..fe36ba91cb 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -729,4 +729,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { return _consumer; } + + public ConsumerTarget_1_0 getConsumerTarget() + { + return _target; + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index b9ee0ad498..2a49e812f5 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -109,6 +109,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio private final Subject _subject = new Subject(); private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>(); + private final CopyOnWriteArrayList<SendingLink_1_0> _sendingLinks = new CopyOnWriteArrayList<>(); private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>(); private Session<?> _modelObject; @@ -211,7 +212,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio ); sendingLinkEndpoint.setLinkEventListener(new SubjectSpecificSendingLinkListener(sendingLink)); - registerConsumer(sendingLink.getConsumer()); + registerConsumer(sendingLink); link = sendingLink; if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) @@ -412,12 +413,14 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio } } - private void registerConsumer(final ConsumerImpl consumer) + private void registerConsumer(final SendingLink_1_0 link) { + ConsumerImpl consumer = link.getConsumer(); if(consumer instanceof Consumer<?>) { Consumer<?> modelConsumer = (Consumer<?>) consumer; _consumers.add(modelConsumer); + _sendingLinks.add(link); modelConsumer.addChangeListener(_consumerClosedListener); consumerAdded(modelConsumer); } @@ -615,6 +618,20 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio } @Override + public void transportStateChanged() + { + for(SendingLink_1_0 link : _sendingLinks) + { + ConsumerTarget_1_0 target = link.getConsumerTarget(); + target.flowStateChanged(); + + + } + + + } + + @Override public LogSubject getLogSubject() { return this; @@ -883,6 +900,16 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio return 0L; } + @Override + public void processPending() + { + for(Consumer<?> consumer : getConsumers()) + { + + ((ConsumerImpl)consumer).getTarget().processPending(); + } + } + private void consumerAdded(Consumer<?> consumer) { for(ConsumerListener l : _consumerListeners) diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java index ce612ec0b6..63c60d7400 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java @@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.Transaction; public abstract class GenericAbstractJDBCMessageStore extends org.apache.qpid.server.store.AbstractJDBCMessageStore @@ -131,7 +131,7 @@ public abstract class GenericAbstractJDBCMessageStore extends org.apache.qpid.se } @Override - public StoreFuture commitTranAsync() + public FutureResult commitTranAsync() { try { diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java index 3f873a24ff..28d8a6c88c 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java @@ -24,6 +24,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.message.MessageSource; @@ -124,7 +127,6 @@ class ManagementNodeConsumer implements ConsumerImpl @Override public void close() { - } @Override @@ -164,6 +166,12 @@ class ManagementNodeConsumer implements ConsumerImpl } + @Override + public ConsumerTarget getTarget() + { + return _target; + } + ManagementNode getManagementNode() { return _managementNode; diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java index 69920ff488..1a85a24e0b 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java @@ -39,6 +39,8 @@ import javax.servlet.DispatcherType; import javax.servlet.MultipartConfigElement; import javax.servlet.http.HttpServletRequest; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.log4j.Logger; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Server; @@ -130,7 +132,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem } @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE) - private void doStart() + private ListenableFuture<Void> doStart() { getBroker().getEventLogger().message(ManagementConsoleMessages.STARTUP(OPERATIONAL_LOGGING_NAME)); @@ -148,6 +150,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem getBroker().getEventLogger().message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME)); setState(State.ACTIVE); + return Futures.immediateFuture(null); } @Override @@ -206,7 +209,9 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem if(port.getState() != State.ACTIVE) { - port.start(); + + // TODO - RG + port.startAsync(); } Connector connector = null; diff --git a/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java index 52d7ba33a3..4327292336 100644 --- a/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java +++ b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.UUID; import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.configuration.updater.TaskExecutorImpl; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.model.AuthenticationProvider; import org.apache.qpid.server.model.Broker; @@ -58,7 +59,9 @@ public class HttpManagementTest extends QpidTestCase when(_broker.getModel()).thenReturn(objectFactory.getModel()); when(_broker.getCategoryClass()).thenReturn(Broker.class); when(_broker.getEventLogger()).thenReturn(mock(EventLogger.class)); - when(_broker.getTaskExecutor()).thenReturn(mock(TaskExecutor.class)); + TaskExecutor taskExecutor = new TaskExecutorImpl(); + taskExecutor.start(); + when(_broker.getTaskExecutor()).thenReturn(taskExecutor); Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(HttpManagement.HTTP_BASIC_AUTHENTICATION_ENABLED, false); diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java index 6c962c2901..06558b9f9a 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java @@ -32,6 +32,8 @@ import java.util.Set; import javax.management.InstanceAlreadyExistsException; import javax.management.JMException; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -105,7 +107,7 @@ public class JMXManagementPluginImpl } @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE) - private void doStart() throws JMException, IOException + private ListenableFuture<Void> doStart() throws JMException, IOException { _allowPortActivation = true; Broker<?> broker = getBroker(); @@ -125,7 +127,8 @@ public class JMXManagementPluginImpl registryPort.setPortManager(this); if(port.getState() != State.ACTIVE) { - port.start(); + // TODO - RG + port.startAsync(); } } @@ -135,7 +138,7 @@ public class JMXManagementPluginImpl connectorPort.setPortManager(this); if(port.getState() != State.ACTIVE) { - port.start(); + port.startAsync(); } } @@ -175,6 +178,7 @@ public class JMXManagementPluginImpl _objectRegistry.start(); setState(State.ACTIVE); _allowPortActivation = false; + return Futures.immediateFuture(null); } @Override diff --git a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java index a194ac70f9..896a7119f7 100644 --- a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java +++ b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java @@ -53,7 +53,7 @@ import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; import org.apache.qpid.server.transport.AcceptingTransport; import org.apache.qpid.server.util.ServerScopedRuntimeException; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.security.ssl.SSLUtil; @@ -81,9 +81,7 @@ class WebSocketProvider implements AcceptingTransport _supported = supported; _defaultSupportedProtocolReply = defaultSupportedProtocolReply; _factory = new MultiVersionProtocolEngineFactory( - _port.getParent(Broker.class), null, - _port.getWantClientAuth(), - _port.getNeedClientAuth(), + _port.getParent(Broker.class), _supported, _defaultSupportedProtocolReply, _port, @@ -242,7 +240,7 @@ class WebSocketProvider implements AcceptingTransport } } - private class ConnectionWrapper implements NetworkConnection, Sender<ByteBuffer> + private class ConnectionWrapper implements NetworkConnection, ByteBufferSender { private final WebSocket.Connection _connection; private final SocketAddress _localAddress; @@ -261,7 +259,7 @@ class WebSocketProvider implements AcceptingTransport } @Override - public Sender<ByteBuffer> getSender() + public ByteBufferSender getSender() { return this; } @@ -273,12 +271,6 @@ class WebSocketProvider implements AcceptingTransport } @Override - public void setIdleTimeout(final int i) - { - - } - - @Override public void send(final ByteBuffer msg) { try |