summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-plugins')
-rw-r--r--qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java35
-rw-r--r--qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java5
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java21
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java40
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java7
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java126
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java57
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java170
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java7
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java248
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java371
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java80
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java95
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java38
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java12
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/pom.xml4
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java79
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java287
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java49
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java73
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java56
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java190
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java39
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java4
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java25
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java47
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java86
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java55
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java2
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java112
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java5
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java31
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java4
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java10
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java9
-rw-r--r--qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java5
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java10
-rw-r--r--qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java16
41 files changed, 2061 insertions, 455 deletions
diff --git a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java
index 0904379ab4..99db75ac91 100644
--- a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java
+++ b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java
@@ -25,6 +25,9 @@ import java.util.Collections;
import java.util.Map;
import java.util.Set;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -149,7 +152,7 @@ public class ACLFileAccessControlProviderImpl
@StateTransition(currentState = {State.UNINITIALIZED, State.QUIESCED, State.ERRORED}, desiredState = State.ACTIVE)
- private void activate()
+ private ListenableFuture<Void> activate()
{
if(_broker.isManagementMode())
@@ -177,6 +180,7 @@ public class ACLFileAccessControlProviderImpl
}
}
}
+ return Futures.immediateFuture(null);
}
@Override
@@ -190,17 +194,36 @@ public class ACLFileAccessControlProviderImpl
}
@StateTransition(currentState = State.UNINITIALIZED, desiredState = State.QUIESCED)
- private void startQuiesced()
+ private ListenableFuture<Void> startQuiesced()
{
setState(State.QUIESCED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = {State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
- close();
- setState(State.DELETED);
- deleted();
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ closeAsync().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+
+ setState(State.DELETED);
+ deleted();
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
+ return returnVal;
}
public AccessControl getAccessControl()
diff --git a/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java b/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java
index a34ac16e80..2a691b3652 100644
--- a/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java
+++ b/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java
@@ -30,6 +30,7 @@ import java.util.UUID;
import java.util.regex.Pattern;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.model.AccessControlProvider;
import org.apache.qpid.server.model.Broker;
@@ -54,7 +55,9 @@ public class ACLFileAccessControlProviderFactoryTest extends QpidTestCase
when(_broker.getObjectFactory()).thenReturn(_objectFactory);
when(_broker.getModel()).thenReturn(_objectFactory.getModel());
when(_broker.getCategoryClass()).thenReturn(Broker.class);
- when(_broker.getTaskExecutor()).thenReturn(mock(TaskExecutor.class));
+ TaskExecutor taskExecutor = new CurrentThreadTaskExecutor();
+ taskExecutor.start();
+ when(_broker.getTaskExecutor()).thenReturn(taskExecutor);
}
public void testCreateInstanceWhenAclFileIsNotPresent()
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 89d681111b..5affe3019c 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -104,7 +104,8 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
_name = name;
}
- public boolean isSuspended()
+ @Override
+ public boolean doIsSuspended()
{
return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension
}
@@ -158,6 +159,10 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
return _name;
}
+ public void transportStateChanged()
+ {
+ _creditManager.restoreCredit(0, 0);
+ }
public static class AddMessageDispositionListenerAction implements Runnable
{
@@ -191,7 +196,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
private final AddMessageDispositionListenerAction _postIdSettingAction;
- public long send(final ConsumerImpl consumer, final MessageInstance entry, boolean batch)
+ public void doSend(final ConsumerImpl consumer, final MessageInstance entry, boolean batch)
{
ServerMessage serverMsg = entry.getMessage();
@@ -342,7 +347,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
{
recordUnacknowledged(entry);
}
- return size;
}
void recordUnacknowledged(MessageInstance entry)
@@ -555,10 +559,10 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
switch(flowMode)
{
case CREDIT:
- _creditManager = new CreditCreditManager(0l,0l);
+ _creditManager = new CreditCreditManager(0l, 0l, _session.getConnection().getProtocolEngine());
break;
case WINDOW:
- _creditManager = new WindowCreditManager(0l,0l);
+ _creditManager = new WindowCreditManager(0l, 0l, _session.getConnection().getProtocolEngine());
break;
default:
// this should never happen, as 0-10 is finalised and so the enum should never change
@@ -628,7 +632,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
public void flushBatched()
{
- _session.getConnection().flush();
}
@@ -657,4 +660,10 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
{
return _unacknowledgedCount.longValue();
}
+
+ @Override
+ protected void processClosed()
+ {
+
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
index 8dddac9809..dd43ae7e11 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
@@ -21,48 +21,27 @@
package org.apache.qpid.server.protocol.v0_10;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.flow.AbstractFlowCreditManager;
public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
{
+ private final ServerProtocolEngine _serverProtocolEngine;
private volatile long _bytesCredit;
private volatile long _messageCredit;
- public CreditCreditManager(long bytesCredit, long messageCredit)
+ public CreditCreditManager(long bytesCredit, long messageCredit, final ServerProtocolEngine serverProtocolEngine)
{
+ _serverProtocolEngine = serverProtocolEngine;
_bytesCredit = bytesCredit;
_messageCredit = messageCredit;
setSuspended(!hasCredit());
}
-
- public synchronized void setCreditLimits(final long bytesCredit, final long messageCredit)
- {
- _bytesCredit = bytesCredit;
- _messageCredit = messageCredit;
-
- setSuspended(!hasCredit());
-
- }
-
-
- public long getMessageCredit()
- {
- return _messageCredit == -1L
- ? Long.MAX_VALUE
- : _messageCredit;
- }
-
- public long getBytesCredit()
- {
- return _bytesCredit == -1L
- ? Long.MAX_VALUE
- : _bytesCredit;
- }
-
public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
{
+ setSuspended(!hasCredit());
}
@@ -107,12 +86,17 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl
public synchronized boolean hasCredit()
{
// Note !=, if credit is < 0 that indicates infinite credit
- return (_bytesCredit != 0L && _messageCredit != 0L);
+ return (_bytesCredit != 0L && _messageCredit != 0L && !_serverProtocolEngine.isTransportBlockedForWriting());
}
public synchronized boolean useCreditForMessage(long msgSize)
{
- if(_messageCredit >= 0L)
+ if (_serverProtocolEngine.isTransportBlockedForWriting())
+ {
+ setSuspended(true);
+ return false;
+ }
+ else if(_messageCredit >= 0L)
{
if(_messageCredit > 0)
{
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
index 30aecdb2d2..4231045afd 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
@@ -23,7 +23,7 @@ package org.apache.qpid.server.protocol.v0_10;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
@@ -86,7 +86,10 @@ public class ProtocolEngineCreator_0_10 implements ProtocolEngineCreator
conn.setRemoteAddress(network.getRemoteAddress());
conn.setLocalAddress(network.getLocalAddress());
- return new ProtocolEngine_0_10( conn, network);
+ ProtocolEngine_0_10 protocolEngine = new ProtocolEngine_0_10(conn, network);
+ conn.setProtocolEngine(protocolEngine);
+
+ return protocolEngine;
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index 854cd388b9..e391bd6771 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -24,18 +24,23 @@ import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.security.AccessController;
import java.security.PrivilegedAction;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
import org.apache.log4j.Logger;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.Constant;
-import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.Assembler;
-import org.apache.qpid.transport.network.Disassembler;
import org.apache.qpid.transport.network.InputHandler;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -52,13 +57,20 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
private ServerConnection _connection;
private long _createTime = System.currentTimeMillis();
- private long _lastReadTime;
- private long _lastWriteTime;
+ private long _lastReadTime = _createTime;
+ private long _lastWriteTime = _createTime;
+ private volatile boolean _transportBlockedForWriting;
+
+ private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>();
+
+ private final AtomicBoolean _stateChanged = new AtomicBoolean();
+ private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>();
+
public ProtocolEngine_0_10(ServerConnection conn,
NetworkConnection network)
{
- super(new Assembler(conn));
+ super(new ServerAssembler(conn));
_connection = conn;
if(network != null)
@@ -67,7 +79,33 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
}
}
- public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender)
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ Thread lock = _messageAssignmentSuspended.get();
+ return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread();
+ }
+
+ @Override
+ public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
+ {
+ _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);
+
+ if(!messageAssignmentSuspended)
+ {
+ for(AMQSessionModel<?,?> session : _connection.getSessionModels())
+ {
+ for(Consumer<?> consumer : session.getConsumers())
+ {
+ ((ConsumerImpl)consumer).getTarget().notifyCurrentState();
+ }
+ }
+ }
+ }
+
+
+
+ public void setNetworkConnection(final NetworkConnection network, final ByteBufferSender sender)
{
if(!getSubject().equals(Subject.getSubject(AccessController.getContext())))
{
@@ -87,7 +125,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
_network = network;
_connection.setNetworkConnection(network);
- Disassembler disassembler = new Disassembler(wrapSender(sender), Constant.MIN_MAX_FRAME_SIZE);
+ ServerDisassembler disassembler = new ServerDisassembler(wrapSender(sender), Constant.MIN_MAX_FRAME_SIZE);
_connection.setSender(disassembler);
_connection.addFrameSizeObserver(disassembler);
// FIXME Two log messages to maintain compatibility with earlier protocol versions
@@ -96,23 +134,15 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
}
}
- private Sender<ByteBuffer> wrapSender(final Sender<ByteBuffer> sender)
+ private ByteBufferSender wrapSender(final ByteBufferSender sender)
{
- return new Sender<ByteBuffer>()
+ return new ByteBufferSender()
{
@Override
- public void setIdleTimeout(int i)
- {
- sender.setIdleTimeout(i);
-
- }
-
- @Override
public void send(ByteBuffer msg)
{
_lastWriteTime = System.currentTimeMillis();
sender.send(msg);
-
}
@Override
@@ -190,6 +220,11 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
return _writtenBytes;
}
+ @Override
+ public void encryptedTransport()
+ {
+ }
+
public void writerIdle()
{
_connection.doHeartBeat();
@@ -215,11 +250,6 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
return getRemoteAddress().toString();
}
- public String getAuthId()
- {
- return _connection.getAuthorizedPrincipal() == null ? null : _connection.getAuthorizedPrincipal().getName();
- }
-
public boolean isDurable()
{
return false;
@@ -246,4 +276,54 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
{
return _connection.getAuthorizedSubject();
}
+
+ @Override
+ public boolean isTransportBlockedForWriting()
+ {
+ return _transportBlockedForWriting;
+ }
+
+ @Override
+ public void setTransportBlockedForWriting(final boolean blocked)
+ {
+ _transportBlockedForWriting = blocked;
+ _connection.transportStateChanged();
+ }
+
+ @Override
+ public void processPending()
+ {
+ _connection.processPending();
+
+ }
+
+ @Override
+ public boolean hasWork()
+ {
+ return _stateChanged.get();
+ }
+
+ @Override
+ public void notifyWork()
+ {
+ _stateChanged.set(true);
+
+ final Action<ServerProtocolEngine> listener = _workListener.get();
+ if(listener != null)
+ {
+ listener.performAction(this);
+ }
+ }
+
+ @Override
+ public void clearWork()
+ {
+ _stateChanged.set(false);
+ }
+
+ @Override
+ public void setWorkListener(final Action<ServerProtocolEngine> listener)
+ {
+ _workListener.set(listener);
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
new file mode 100644
index 0000000000..456c9d36d9
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.transport.network.Assembler;
+import org.apache.qpid.transport.network.NetworkEvent;
+
+public class ServerAssembler extends Assembler
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ServerAssembler.class);
+
+
+ private final ServerConnection _connection;
+
+ public ServerAssembler(final ServerConnection connection)
+ {
+ super(connection);
+ _connection = connection;
+ }
+
+ @Override
+ public void received(final NetworkEvent event)
+ {
+ if (!_connection.isIgnoreFutureInput())
+ {
+ super.received(event);
+ }
+ else
+ {
+ LOGGER.debug("Ignored network event " + event + " as connection is ignoring further input ");
+ }
+ }
+
+
+}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index 8567be37f0..2280377fca 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol.v0_10;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT;
+import static org.apache.qpid.transport.Connection.State.CLOSING;
import java.net.SocketAddress;
import java.security.Principal;
@@ -30,6 +31,8 @@ import java.security.PrivilegedAction;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -37,6 +40,7 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogSubject;
@@ -66,7 +70,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
{
private final Broker<?> _broker;
- private Runnable _onOpenTask;
private AtomicBoolean _logClosed = new AtomicBoolean(false);
private final Subject _authorizedSubject = new Subject();
@@ -75,20 +78,26 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
private final long _connectionId;
private final Object _reference = new Object();
private VirtualHostImpl<?,?,?> _virtualHost;
- private AmqpPort<?> _port;
- private AtomicLong _lastIoTime = new AtomicLong();
+ private final AmqpPort<?> _port;
+ private final AtomicLong _lastIoTime = new AtomicLong();
private boolean _blocking;
- private Transport _transport;
+ private final Transport _transport;
- private final CopyOnWriteArrayList<Action<? super ServerConnection>> _taskList =
+ private final CopyOnWriteArrayList<Action<? super ServerConnection>> _connectionCloseTaskList =
new CopyOnWriteArrayList<Action<? super ServerConnection>>();
+ private final Queue<Action<? super ServerConnection>> _asyncTaskList =
+ new ConcurrentLinkedQueue<>();
+
private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners =
new CopyOnWriteArrayList<SessionModelListener>();
private volatile boolean _stopped;
private int _messageCompressionThreshold;
- private int _maxMessageSize;
+ private final int _maxMessageSize;
+
+ private ServerProtocolEngine _serverProtocolEngine;
+ private boolean _ignoreFutureInput;
public ServerConnection(final long connectionId,
Broker<?> broker,
@@ -140,10 +149,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
if (state == State.OPEN)
{
- if (_onOpenTask != null)
- {
- _onOpenTask.run();
- }
getEventLogger().message(ConnectionMessages.OPEN(getClientId(),
"0-10",
getClientVersion(),
@@ -189,6 +194,16 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
super.setConnectionDelegate(delegate);
}
+ public ServerProtocolEngine getProtocolEngine()
+ {
+ return _serverProtocolEngine;
+ }
+
+ public void setProtocolEngine(final ServerProtocolEngine serverProtocolEngine)
+ {
+ _serverProtocolEngine = serverProtocolEngine;
+ }
+
public VirtualHostImpl<?,?,?> getVirtualHost()
{
return _virtualHost;
@@ -237,28 +252,32 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
return _stopped;
}
- public void onOpen(final Runnable task)
- {
- _onOpenTask = task;
- }
-
- public void closeSession(ServerSession session, AMQConstant cause, String message)
+ public void closeSessionAsync(final ServerSession session, final AMQConstant cause, final String message)
{
- ExecutionException ex = new ExecutionException();
- ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR;
- try
- {
- code = ExecutionErrorCode.get(cause.getCode());
- }
- catch (IllegalArgumentException iae)
+ addAsyncTask(new Action<ServerConnection>()
{
- // Ignore, already set to INTERNAL_ERROR
- }
- ex.setErrorCode(code);
- ex.setDescription(message);
- session.invoke(ex);
- session.close(cause, message);
+ @Override
+ public void performAction(final ServerConnection conn)
+ {
+ ExecutionException ex = new ExecutionException();
+ ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR;
+ try
+ {
+ code = ExecutionErrorCode.get(cause.getCode());
+ }
+ catch (IllegalArgumentException iae)
+ {
+ // Ignore, already set to INTERNAL_ERROR
+ }
+ ex.setErrorCode(code);
+ ex.setDescription(message);
+ session.invoke(ex);
+
+ session.close(cause, message);
+ }
+ });
+
}
public LogSubject getLogSubject()
@@ -355,25 +374,35 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
}
}
- public void close(AMQConstant cause, String message)
+ public void closeAsync(final AMQConstant cause, final String message)
{
- closeSubscriptions();
- performDeleteTasks();
- ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
- try
- {
- replyCode = ConnectionCloseCode.get(cause.getCode());
- }
- catch (IllegalArgumentException iae)
+
+ addAsyncTask(new Action<ServerConnection>()
{
- // Ignore
- }
- close(replyCode, message);
+ @Override
+ public void performAction(final ServerConnection object)
+ {
+ closeSubscriptions();
+ performDeleteTasks();
+
+ setState(CLOSING);
+ ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
+ try
+ {
+ replyCode = ConnectionCloseCode.get(cause.getCode());
+ }
+ catch (IllegalArgumentException iae)
+ {
+ // Ignore
+ }
+ sendConnectionClose(replyCode, message);
+ }
+ });
}
protected void performDeleteTasks()
{
- for(Action<? super ServerConnection> task : _taskList)
+ for(Action<? super ServerConnection> task : _connectionCloseTaskList)
{
task.performAction(this);
}
@@ -646,13 +675,19 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
@Override
public void addDeleteTask(final Action<? super ServerConnection> task)
{
- _taskList.add(task);
+ _connectionCloseTaskList.add(task);
+ }
+
+ private void addAsyncTask(final Action<ServerConnection> action)
+ {
+ _asyncTaskList.add(action);
+ notifyWork();
}
@Override
public void removeDeleteTask(final Action<? super ServerConnection> task)
{
- _taskList.remove(task);
+ _connectionCloseTaskList.remove(task);
}
public int getMessageCompressionThreshold()
@@ -664,4 +699,51 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
{
return _maxMessageSize;
}
+
+ public void transportStateChanged()
+ {
+ for (AMQSessionModel ssn : getSessionModels())
+ {
+ ssn.transportStateChanged();
+ }
+ }
+
+ @Override
+ public void notifyWork()
+ {
+ _serverProtocolEngine.notifyWork();
+ }
+
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ return _serverProtocolEngine.isMessageAssignmentSuspended();
+ }
+
+ public void processPending()
+ {
+ while(_asyncTaskList.peek() != null)
+ {
+ Action<? super ServerConnection> asyncAction = _asyncTaskList.poll();
+ asyncAction.performAction(this);
+ }
+
+ for (AMQSessionModel session : getSessionModels())
+ {
+ session.processPending();
+ }
+
+ }
+
+ public void closeAndIgnoreFutureInput()
+ {
+ _ignoreFutureInput = true;
+ getSender().close();
+ }
+
+ public boolean isIgnoreFutureInput()
+ {
+ return _ignoreFutureInput;
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index 6e2a6cac7d..7f646b43b4 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -250,7 +250,7 @@ public class ServerConnectionDelegate extends ServerDelegate
") above the server's offered limit (" + getChannelMax() +")");
//Due to the error we must forcefully close the connection without negotiation
- sconn.getSender().close();
+ sconn.closeAndIgnoreFutureInput();
return;
}
@@ -261,7 +261,8 @@ public class ServerConnectionDelegate extends ServerDelegate
") above the server's offered limit (" + getFrameMax() +")");
//Due to the error we must forcefully close the connection without negotiation
- sconn.getSender().close();
+ sconn.closeAndIgnoreFutureInput();
+
return;
}
else if(okMaxFrameSize > 0 && okMaxFrameSize < Constant.MIN_MAX_FRAME_SIZE)
@@ -271,7 +272,7 @@ public class ServerConnectionDelegate extends ServerDelegate
") below the minimum permitted size (" + Constant.MIN_MAX_FRAME_SIZE +")");
//Due to the error we must forcefully close the connection without negotiation
- sconn.getSender().close();
+ sconn.closeAndIgnoreFutureInput();
return;
}
else if(okMaxFrameSize == 0)
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java
new file mode 100644
index 0000000000..a42238a40d
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java
@@ -0,0 +1,248 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import static java.lang.Math.min;
+import static org.apache.qpid.transport.network.Frame.FIRST_FRAME;
+import static org.apache.qpid.transport.network.Frame.FIRST_SEG;
+import static org.apache.qpid.transport.network.Frame.HEADER_SIZE;
+import static org.apache.qpid.transport.network.Frame.LAST_FRAME;
+import static org.apache.qpid.transport.network.Frame.LAST_SEG;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.FrameSizeObserver;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.ProtocolDelegate;
+import org.apache.qpid.transport.ProtocolError;
+import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.ProtocolEventSender;
+import org.apache.qpid.transport.ProtocolHeader;
+import org.apache.qpid.transport.SegmentType;
+import org.apache.qpid.transport.Struct;
+import org.apache.qpid.transport.codec.Encoder;
+import org.apache.qpid.transport.network.Frame;
+
+/**
+ * Disassembler
+ */
+public final class ServerDisassembler implements ProtocolEventSender, ProtocolDelegate<Void>, FrameSizeObserver
+{
+ private final ByteBufferSender _sender;
+ private int _maxPayload;
+ private final Object _sendLock = new Object();
+ private final Encoder _encoder = new ServerEncoder();
+
+ public ServerDisassembler(ByteBufferSender sender, int maxFrame)
+ {
+ _sender = sender;
+ if (maxFrame <= HEADER_SIZE || maxFrame >= 64 * 1024)
+ {
+ throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame);
+ }
+ _maxPayload = maxFrame - HEADER_SIZE;
+ }
+
+ public void send(ProtocolEvent event)
+ {
+ synchronized (_sendLock)
+ {
+ event.delegate(null, this);
+ }
+ }
+
+ public void flush()
+ {
+ synchronized (_sendLock)
+ {
+ _sender.flush();
+ }
+ }
+
+ public void close()
+ {
+ synchronized (_sendLock)
+ {
+ _sender.close();
+ }
+ }
+
+ private void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf)
+ {
+ ByteBuffer data = ByteBuffer.wrap(new byte[HEADER_SIZE]);
+
+ data.put(0, flags);
+ data.put(1, type);
+ data.putShort(2, (short) (size + HEADER_SIZE));
+ data.put(5, track);
+ data.putShort(6, (short) channel);
+
+
+ ByteBuffer dup = buf.duplicate();
+ dup.limit(dup.position() + size);
+ buf.position(buf.position() + size);
+ _sender.send(data);
+ _sender.send(dup);
+
+
+ }
+
+ private void fragment(byte flags, SegmentType type, ProtocolEvent event, ByteBuffer buf)
+ {
+ byte typeb = (byte) type.getValue();
+ byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0;
+
+ int remaining = buf.remaining();
+ boolean first = true;
+ while (true)
+ {
+ int size = min(_maxPayload, remaining);
+ remaining -= size;
+
+ byte newflags = flags;
+ if (first)
+ {
+ newflags |= FIRST_FRAME;
+ first = false;
+ }
+ if (remaining == 0)
+ {
+ newflags |= LAST_FRAME;
+ }
+
+ frame(newflags, typeb, track, event.getChannel(), size, buf);
+
+ if (remaining == 0)
+ {
+ break;
+ }
+ }
+ }
+
+ public void init(Void v, ProtocolHeader header)
+ {
+ _sender.send(header.toByteBuffer());
+ _sender.flush();
+}
+
+ public void control(Void v, Method method)
+ {
+ method(method, SegmentType.CONTROL);
+ }
+
+ public void command(Void v, Method method)
+ {
+ method(method, SegmentType.COMMAND);
+ }
+
+ private void method(Method method, SegmentType type)
+ {
+ Encoder enc = _encoder;
+ enc.init();
+ enc.writeUint16(method.getEncodedType());
+ if (type == SegmentType.COMMAND)
+ {
+ if (method.isSync())
+ {
+ enc.writeUint16(0x0101);
+ }
+ else
+ {
+ enc.writeUint16(0x0100);
+ }
+ }
+ method.write(enc);
+ int methodLimit = enc.position();
+
+ byte flags = FIRST_SEG;
+
+ boolean payload = method.hasPayload();
+ if (!payload)
+ {
+ flags |= LAST_SEG;
+ }
+
+ int headerLimit = -1;
+ if (payload)
+ {
+ final Header hdr = method.getHeader();
+ if (hdr != null)
+ {
+ if (hdr.getDeliveryProperties() != null)
+ {
+ enc.writeStruct32(hdr.getDeliveryProperties());
+ }
+ if (hdr.getMessageProperties() != null)
+ {
+ enc.writeStruct32(hdr.getMessageProperties());
+ }
+ if (hdr.getNonStandardProperties() != null)
+ {
+ for (Struct st : hdr.getNonStandardProperties())
+ {
+ enc.writeStruct32(st);
+ }
+ }
+ }
+
+ headerLimit = enc.position();
+ }
+ synchronized (_sendLock)
+ {
+ ByteBuffer buf = enc.underlyingBuffer();
+ buf.position(0);
+ buf.limit(methodLimit);
+
+ fragment(flags, type, method, buf.duplicate());
+ if (payload)
+ {
+ ByteBuffer body = method.getBody();
+ buf.limit(headerLimit);
+ buf.position(methodLimit);
+
+ fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, buf.duplicate());
+ if (body != null)
+ {
+ fragment(LAST_SEG, SegmentType.BODY, method, body.duplicate());
+ }
+
+ }
+ }
+ }
+
+ public void error(Void v, ProtocolError error)
+ {
+ throw new IllegalArgumentException(String.valueOf(error));
+ }
+
+ @Override
+ public void setMaxFrameSize(final int maxFrame)
+ {
+ if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
+ {
+ throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame);
+ }
+ this._maxPayload = maxFrame - HEADER_SIZE;
+
+ }
+}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
new file mode 100644
index 0000000000..6437015208
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
@@ -0,0 +1,371 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import org.apache.qpid.transport.codec.AbstractEncoder;
+
+
+public final class ServerEncoder extends AbstractEncoder
+{
+ public static final int DEFAULT_CAPACITY = 8192;
+ private final int _threshold;
+ private ByteBuffer _out;
+ private int _segment;
+ private int _initialCapacity;
+
+ public ServerEncoder()
+ {
+ this(DEFAULT_CAPACITY);
+ }
+
+ public ServerEncoder(int capacity)
+ {
+ _initialCapacity = capacity;
+ _threshold = capacity/16;
+ _out = ByteBuffer.allocate(capacity);
+ _segment = 0;
+ }
+
+ public void init()
+ {
+ _out.position(_out.limit());
+ _out.limit(_out.capacity());
+ _out = _out.slice();
+ if(_out.remaining() < _threshold)
+ {
+ _out = ByteBuffer.allocate(_initialCapacity);
+ }
+ _segment = 0;
+ }
+
+ public ByteBuffer buffer()
+ {
+ int pos = _out.position();
+ _out.position(_segment);
+ ByteBuffer slice = _out.slice();
+ slice.limit(pos - _segment);
+ _out.position(pos);
+ return slice;
+ }
+
+ public int position()
+ {
+ return _out.position();
+ }
+
+ public ByteBuffer underlyingBuffer()
+ {
+ return _out;
+ }
+
+ private void grow(int size)
+ {
+ ByteBuffer old = _out;
+ int capacity = old.capacity();
+ _out = ByteBuffer.allocate(Math.max(Math.max(capacity + size, 2*capacity), _initialCapacity));
+ old.flip();
+ _out.put(old);
+ }
+
+ protected void doPut(byte b)
+ {
+ try
+ {
+ _out.put(b);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(1);
+ _out.put(b);
+ }
+ }
+
+ protected void doPut(ByteBuffer src)
+ {
+ try
+ {
+ _out.put(src);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(src.remaining());
+ _out.put(src);
+ }
+ }
+
+ protected void put(byte[] bytes)
+ {
+ try
+ {
+ _out.put(bytes);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(bytes.length);
+ _out.put(bytes);
+ }
+ }
+
+ public void writeUint8(short b)
+ {
+ assert b < 0x100;
+
+ try
+ {
+ _out.put((byte) b);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(1);
+ _out.put((byte) b);
+ }
+ }
+
+ public void writeUint16(int s)
+ {
+ assert s < 0x10000;
+
+ try
+ {
+ _out.putShort((short) s);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(2);
+ _out.putShort((short) s);
+ }
+ }
+
+ public void writeUint32(long i)
+ {
+ assert i < 0x100000000L;
+
+ try
+ {
+ _out.putInt((int) i);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(4);
+ _out.putInt((int) i);
+ }
+ }
+
+ public void writeUint64(long l)
+ {
+ try
+ {
+ _out.putLong(l);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(8);
+ _out.putLong(l);
+ }
+ }
+
+ public int beginSize8()
+ {
+ int pos = _out.position();
+ try
+ {
+ _out.put((byte) 0);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(1);
+ _out.put((byte) 0);
+ }
+ return pos;
+ }
+
+ public void endSize8(int pos)
+ {
+ int cur = _out.position();
+ _out.put(pos, (byte) (cur - pos - 1));
+ }
+
+ public int beginSize16()
+ {
+ int pos = _out.position();
+ try
+ {
+ _out.putShort((short) 0);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(2);
+ _out.putShort((short) 0);
+ }
+ return pos;
+ }
+
+ public void endSize16(int pos)
+ {
+ int cur = _out.position();
+ _out.putShort(pos, (short) (cur - pos - 2));
+ }
+
+ public int beginSize32()
+ {
+ int pos = _out.position();
+ try
+ {
+ _out.putInt(0);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(4);
+ _out.putInt(0);
+ }
+ return pos;
+
+ }
+
+ public void endSize32(int pos)
+ {
+ int cur = _out.position();
+ _out.putInt(pos, (cur - pos - 4));
+
+ }
+
+ public void writeDouble(double aDouble)
+ {
+ try
+ {
+ _out.putDouble(aDouble);
+ }
+ catch(BufferOverflowException exception)
+ {
+ grow(8);
+ _out.putDouble(aDouble);
+ }
+ }
+
+ public void writeInt16(short aShort)
+ {
+ try
+ {
+ _out.putShort(aShort);
+ }
+ catch(BufferOverflowException exception)
+ {
+ grow(2);
+ _out.putShort(aShort);
+ }
+ }
+
+ public void writeInt32(int anInt)
+ {
+ try
+ {
+ _out.putInt(anInt);
+ }
+ catch(BufferOverflowException exception)
+ {
+ grow(4);
+ _out.putInt(anInt);
+ }
+ }
+
+ public void writeInt64(long aLong)
+ {
+ try
+ {
+ _out.putLong(aLong);
+ }
+ catch(BufferOverflowException exception)
+ {
+ grow(8);
+ _out.putLong(aLong);
+ }
+ }
+
+ public void writeInt8(byte aByte)
+ {
+ try
+ {
+ _out.put(aByte);
+ }
+ catch(BufferOverflowException exception)
+ {
+ grow(1);
+ _out.put(aByte);
+ }
+ }
+
+ public void writeBin128(byte[] byteArray)
+ {
+ byteArray = (byteArray != null) ? byteArray : new byte [16];
+
+ assert byteArray.length == 16;
+
+ try
+ {
+ _out.put(byteArray);
+ }
+ catch(BufferOverflowException exception)
+ {
+ grow(16);
+ _out.put(byteArray);
+ }
+ }
+
+ public void writeBin128(UUID id)
+ {
+ byte[] data = new byte[16];
+
+ long msb = id.getMostSignificantBits();
+ long lsb = id.getLeastSignificantBits();
+
+ assert data.length == 16;
+ for (int i=7; i>=0; i--)
+ {
+ data[i] = (byte)(msb & 0xff);
+ msb = msb >> 8;
+ }
+
+ for (int i=15; i>=8; i--)
+ {
+ data[i] = (byte)(lsb & 0xff);
+ lsb = (lsb >> 8);
+ }
+ writeBin128(data);
+ }
+
+ public void writeFloat(float aFloat)
+ {
+ try
+ {
+ _out.putFloat(aFloat);
+ }
+ catch(BufferOverflowException exception)
+ {
+ grow(4);
+ _out.putFloat(aFloat);
+ }
+ }
+
+}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 223de4f84e..67204427fb 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -56,6 +56,7 @@ import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ChannelMessages;
@@ -74,7 +75,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
@@ -136,6 +137,7 @@ public class ServerSession extends Session
private org.apache.qpid.server.model.Session<?> _modelObject;
private long _blockTime;
private long _blockingTimeout;
+ private boolean _wireBlockingState;
public static interface MessageDispositionChangeListener
{
@@ -188,7 +190,7 @@ public class ServerSession extends Session
@Override
public void doTimeoutAction(String reason)
{
- getConnectionModel().closeSession(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason);
+ getConnectionModel().closeSessionAsync(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason);
}
}, getVirtualHost());
@@ -207,10 +209,6 @@ public class ServerSession extends Session
if (state == State.OPEN)
{
getVirtualHost().getEventLogger().message(ChannelMessages.CREATE());
- if(_blocking.get())
- {
- invokeBlock();
- }
}
}
else
@@ -244,6 +242,17 @@ public class ServerSession extends Session
invoke(new MessageStop(""));
}
+ private void invokeUnblock()
+ {
+ MessageFlow mf = new MessageFlow();
+ mf.setUnit(MessageCreditUnit.MESSAGE);
+ mf.setDestination("");
+ _outstandingCredit.set(Integer.MAX_VALUE);
+ mf.setValue(Integer.MAX_VALUE);
+ invoke(mf);
+ }
+
+
@Override
protected boolean isFull(int id)
{
@@ -823,12 +832,11 @@ public class ServerSession extends Session
if(_blocking.compareAndSet(false,true))
{
+ getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
if(getState() == State.OPEN)
{
- invokeBlock();
+ getConnection().notifyWork();
}
- _blockTime = System.currentTimeMillis();
- getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
}
@@ -852,28 +860,30 @@ public class ServerSession extends Session
{
if(_blocking.compareAndSet(true,false) && !isClosing())
{
- _blockTime = 0l;
getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED());
- MessageFlow mf = new MessageFlow();
- mf.setUnit(MessageCreditUnit.MESSAGE);
- mf.setDestination("");
- _outstandingCredit.set(Integer.MAX_VALUE);
- mf.setValue(Integer.MAX_VALUE);
- invoke(mf);
-
-
+ getConnection().notifyWork();
}
}
}
+
boolean blockingTimeoutExceeded()
{
long blockTime = _blockTime;
- boolean b = _blocking.get() && blockTime != 0 && (System.currentTimeMillis() - blockTime) > _blockingTimeout;
+ boolean b = _wireBlockingState && blockTime != 0 && (System.currentTimeMillis() - blockTime) > _blockingTimeout;
return b;
}
@Override
+ public void transportStateChanged()
+ {
+ for(ConsumerTarget_0_10 consumerTarget : getSubscriptions())
+ {
+ consumerTarget.transportStateChanged();
+ }
+ }
+
+ @Override
public Object getConnectionReference()
{
return getConnection().getReference();
@@ -1002,17 +1012,17 @@ public class ServerSession extends Session
return _unfinishedCommandsQueue.isEmpty() ? null : _unfinishedCommandsQueue.getLast();
}
- public void recordFuture(final StoreFuture future, final ServerTransaction.Action action)
+ public void recordFuture(final FutureResult future, final ServerTransaction.Action action)
{
_unfinishedCommandsQueue.add(new AsyncCommand(future, action));
}
private static class AsyncCommand
{
- private final StoreFuture _future;
+ private final FutureResult _future;
private ServerTransaction.Action _action;
- public AsyncCommand(final StoreFuture future, final ServerTransaction.Action action)
+ public AsyncCommand(final FutureResult future, final ServerTransaction.Action action)
{
_future = future;
_action = action;
@@ -1125,6 +1135,32 @@ public class ServerSession extends Session
}
}
+ @Override
+ public void processPending()
+ {
+ boolean desiredBlockingState = _blocking.get();
+ if (desiredBlockingState != _wireBlockingState)
+ {
+ _wireBlockingState = desiredBlockingState;
+
+ if (desiredBlockingState)
+ {
+ invokeBlock();
+ }
+ else
+ {
+ invokeUnblock();
+ }
+ _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
+ }
+
+
+ for(ConsumerTarget target : getSubscriptions())
+ {
+ target.processPending();
+ }
+ }
+
public final long getMaxUncommittedInMemorySize()
{
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 8632d04048..dd634c36ff 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -36,8 +36,10 @@ import org.apache.log4j.Logger;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.ArrivalTimeFilter;
import org.apache.qpid.server.filter.FilterManager;
@@ -58,7 +60,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.DtxNotSelectedException;
@@ -133,7 +135,7 @@ public class ServerSessionDelegate extends SessionDelegate
serverSession.accept(method.getTransfers());
if(!serverSession.isTransactional())
{
- serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE,
+ serverSession.recordFuture(FutureResult.IMMEDIATE_FUTURE,
new CommandProcessedAction(serverSession, method));
}
}
@@ -246,8 +248,8 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
-
- FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
+ ServerProtocolEngine serverProtocolEngine = getServerConnection(session).getProtocolEngine();
+ FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L, serverProtocolEngine);
FilterManager filterManager = null;
try
@@ -421,58 +423,69 @@ public class ServerSessionDelegate extends SessionDelegate
new MessageTransferMessage(storeMessage, serverSession.getReference());
MessageReference<MessageTransferMessage> reference = message.newReference();
- final InstanceProperties instanceProperties = new InstanceProperties()
+ try
{
- @Override
- public Object getProperty(final Property prop)
+ final InstanceProperties instanceProperties = new InstanceProperties()
{
- switch (prop)
+ @Override
+ public Object getProperty(final Property prop)
{
- case EXPIRATION:
- return message.getExpiration();
- case IMMEDIATE:
- return message.isImmediate();
- case MANDATORY:
- return (delvProps == null || !delvProps.getDiscardUnroutable())
- && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT;
- case PERSISTENT:
- return message.isPersistent();
- case REDELIVERED:
- return delvProps.getRedelivered();
+ switch (prop)
+ {
+ case EXPIRATION:
+ return message.getExpiration();
+ case IMMEDIATE:
+ return message.isImmediate();
+ case MANDATORY:
+ return (delvProps == null || !delvProps.getDiscardUnroutable())
+ && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT;
+ case PERSISTENT:
+ return message.isPersistent();
+ case REDELIVERED:
+ return delvProps.getRedelivered();
+ }
+ return null;
}
- return null;
- }
- };
+ };
- int enqueues = serverSession.enqueue(message, instanceProperties, destination);
+ int enqueues = serverSession.enqueue(message, instanceProperties, destination);
- if (enqueues == 0)
- {
- if ((delvProps == null || !delvProps.getDiscardUnroutable())
- && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+ if (enqueues == 0)
{
- RangeSet rejects = RangeSetFactory.createRangeSet();
- rejects.add(xfr.getId());
- MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
- ssn.invoke(reject);
+ if ((delvProps == null || !delvProps.getDiscardUnroutable())
+ && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+ {
+ RangeSet rejects = RangeSetFactory.createRangeSet();
+ rejects.add(xfr.getId());
+ MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
+ ssn.invoke(reject);
+ }
+ else
+ {
+ virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(destination.getName(),
+ messageMetaData.getRoutingKey()));
+ }
+ }
+
+ if (serverSession.isTransactional())
+ {
+ serverSession.processed(xfr);
}
else
{
- virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(destination.getName(),
- messageMetaData.getRoutingKey()));
+ serverSession.recordFuture(FutureResult.IMMEDIATE_FUTURE,
+ new CommandProcessedAction(serverSession, xfr));
}
}
-
- if (serverSession.isTransactional())
+ catch (VirtualHostUnavailableException e)
{
- serverSession.processed(xfr);
+ getServerConnection(serverSession).closeAsync(AMQConstant.CONNECTION_FORCED, e.getMessage());
}
- else
+ finally
{
- serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE,
- new CommandProcessedAction(serverSession, xfr));
+ reference.release();
}
- reference.release();
+
}
}
@@ -589,7 +602,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
try
{
- ((ServerSession)session).endDtx(method.getXid(), method.getFail(), method.getSuspend());
+ ((ServerSession) session).endDtx(method.getXid(), method.getFail(), method.getSuspend());
}
catch (TimeoutDtxException e)
{
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
index 8e48741b91..a7b08e3f83 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
@@ -21,11 +21,14 @@
package org.apache.qpid.server.protocol.v0_10;
import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.flow.AbstractFlowCreditManager;
public class WindowCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
{
private static final Logger LOGGER = Logger.getLogger(WindowCreditManager.class);
+ private final ServerProtocolEngine _serverProtocolEngine;
private volatile long _bytesCreditLimit;
private volatile long _messageCreditLimit;
@@ -33,39 +36,22 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl
private volatile long _bytesUsed;
private volatile long _messageUsed;
- public WindowCreditManager()
- {
- this(0L, 0L);
- }
-
- public WindowCreditManager(long bytesCreditLimit, long messageCreditLimit)
+ public WindowCreditManager(long bytesCreditLimit,
+ long messageCreditLimit,
+ ServerProtocolEngine serverProtocolEngine)
{
+ _serverProtocolEngine = serverProtocolEngine;
_bytesCreditLimit = bytesCreditLimit;
_messageCreditLimit = messageCreditLimit;
setSuspended(!hasCredit());
}
- public long getBytesCreditLimit()
- {
- return _bytesCreditLimit;
- }
-
public long getMessageCreditLimit()
{
return _messageCreditLimit;
}
- public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit)
- {
- _bytesCreditLimit = bytesCreditLimit;
- _messageCreditLimit = messageCreditLimit;
-
- setSuspended(!hasCredit());
-
- }
-
-
public long getMessageCredit()
{
return _messageCreditLimit == -1L
@@ -121,12 +107,18 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl
public synchronized boolean hasCredit()
{
return (_bytesCreditLimit < 0L || _bytesCreditLimit > _bytesUsed)
- && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed);
+ && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed)
+ && !_serverProtocolEngine.isTransportBlockedForWriting();
}
public synchronized boolean useCreditForMessage(final long msgSize)
{
- if(_messageCreditLimit >= 0L)
+ if (_serverProtocolEngine.isTransportBlockedForWriting())
+ {
+ setSuspended(true);
+ return false;
+ }
+ else if(_messageCreditLimit >= 0L)
{
if(_messageUsed < _messageCreditLimit)
{
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
index 1c4a694be6..b9f013d253 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
@@ -20,17 +20,25 @@
*/
package org.apache.qpid.server.protocol.v0_10;
-import org.apache.qpid.server.protocol.v0_10.WindowCreditManager;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.test.utils.QpidTestCase;
public class WindowCreditManagerTest extends QpidTestCase
{
private WindowCreditManager _creditManager;
+ private ServerProtocolEngine _protocolEngine;
protected void setUp() throws Exception
{
super.setUp();
- _creditManager = new WindowCreditManager();
+
+ _protocolEngine = mock(ServerProtocolEngine.class);
+ when(_protocolEngine.isTransportBlockedForWriting()).thenReturn(false);
+
+ _creditManager = new WindowCreditManager(0l, 0l, _protocolEngine);
}
/**
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/pom.xml b/qpid/java/broker-plugins/amqp-0-8-protocol/pom.xml
index 0c5d20a0a6..e09a3ba922 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/pom.xml
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/pom.xml
@@ -29,6 +29,10 @@
<name>Qpid AMQP 0-8 Protocol Broker Plug-in</name>
<description>AMQP 0-8, 0-9 and 0-9-1 protocol broker plug-in</description>
+ <properties>
+ <qpid.home>${basedir}/../</qpid.home> <!-- override for broker tests -->
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.qpid</groupId>
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 9afa7c393f..2a1fbe6881 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -40,7 +40,6 @@ import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
import javax.security.auth.Subject;
@@ -66,8 +65,6 @@ import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.flow.MessageOnlyCreditManager;
-import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ChannelMessages;
@@ -99,7 +96,6 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
@@ -108,6 +104,7 @@ import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.virtualhost.ExchangeExistsException;
import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
import org.apache.qpid.server.virtualhost.QueueExistsException;
@@ -133,7 +130,8 @@ public class AMQChannel
private final int _channelId;
- private final Pre0_10CreditManager _creditManager = new Pre0_10CreditManager(0l,0l);
+ private final Pre0_10CreditManager _creditManager;
+ private final FlowCreditManager _noAckCreditManager;
/**
* The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
@@ -211,8 +209,13 @@ public class AMQChannel
private final List<StoredMessage<MessageMetaData>> _uncommittedMessages = new ArrayList<>();
private long _maxUncommittedInMemorySize;
+ private boolean _wireBlockingState;
+
public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore)
{
+ _creditManager = new Pre0_10CreditManager(0l,0l, connection);
+ _noAckCreditManager = new NoAckCreditManager(connection);
+
_connection = connection;
_channelId = channelId;
@@ -699,7 +702,7 @@ public class AMQChannel
if(arguments != null && Boolean.TRUE.equals(arguments.get(AMQPFilterTypes.NO_CONSUME.getValue())))
{
- target = ConsumerTarget_0_8.createBrowserTarget(this, tag, arguments, _creditManager);
+ target = ConsumerTarget_0_8.createBrowserTarget(this, tag, arguments, _noAckCreditManager);
}
else if(acks)
{
@@ -709,7 +712,7 @@ public class AMQChannel
}
else
{
- target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments, _creditManager);
+ target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments, _noAckCreditManager);
options.add(ConsumerImpl.Option.ACQUIRES);
options.add(ConsumerImpl.Option.SEES_REQUEUES);
}
@@ -1274,7 +1277,8 @@ public class AMQChannel
// stop all subscriptions
_rollingBack = true;
- boolean requiresSuspend = _suspended.compareAndSet(false,true);
+ boolean requiresSuspend = _suspended.compareAndSet(false,true); // TODO This is probably superfluous owing to the
+ // message assignment suspended logic in NBC.
// ensure all subscriptions have seen the change to the channel state
for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
@@ -1653,12 +1657,14 @@ public class AMQChannel
{
if(_blockingEntities.add(this))
{
+
if(_blocking.compareAndSet(false,true))
{
getVirtualHost().getEventLogger().message(_logSubject,
ChannelMessages.FLOW_ENFORCED("** All Queues **"));
- flow(false);
- _blockTime = System.currentTimeMillis();
+
+
+ getConnection().notifyWork();
}
}
}
@@ -1670,12 +1676,12 @@ public class AMQChannel
if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false))
{
getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED());
-
- flow(true);
+ getConnection().notifyWork();
}
}
}
+
public synchronized void block(AMQQueue queue)
{
if(_blockingEntities.add(queue))
@@ -1684,8 +1690,7 @@ public class AMQChannel
if(_blocking.compareAndSet(false,true))
{
getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getName()));
- flow(false);
- _blockTime = System.currentTimeMillis();
+ getConnection().notifyWork();
}
}
@@ -1698,12 +1703,19 @@ public class AMQChannel
if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false) && !isClosing())
{
getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED());
- flow(true);
+ getConnection().notifyWork();
}
}
}
@Override
+ public void transportStateChanged()
+ {
+ _creditManager.restoreCredit(0, 0);
+ _noAckCreditManager.restoreCredit(0, 0);
+ }
+
+ @Override
public Object getConnectionReference()
{
return getConnection().getReference();
@@ -1743,16 +1755,7 @@ public class AMQChannel
*/
private void closeConnection(String reason) throws AMQException
{
- Lock receivedLock = _connection.getReceivedLock();
- receivedLock.lock();
- try
- {
- _connection.close(AMQConstant.RESOURCE_ERROR, reason);
- }
- finally
- {
- receivedLock.unlock();
- }
+ _connection.closeAsync(AMQConstant.RESOURCE_ERROR, reason);
}
public void deadLetter(long deliveryTag)
@@ -1815,7 +1818,7 @@ public class AMQChannel
}
}
- public void recordFuture(final StoreFuture future, final ServerTransaction.Action action)
+ public void recordFuture(final FutureResult future, final ServerTransaction.Action action)
{
_unfinishedCommandsQueue.add(new AsyncCommand(future, action));
}
@@ -1841,10 +1844,10 @@ public class AMQChannel
private static class AsyncCommand
{
- private final StoreFuture _future;
+ private final FutureResult _future;
private ServerTransaction.Action _action;
- public AsyncCommand(final StoreFuture future, final ServerTransaction.Action action)
+ public AsyncCommand(final FutureResult future, final ServerTransaction.Action action)
{
_future = future;
_action = action;
@@ -2305,7 +2308,7 @@ public class AMQChannel
private boolean blockingTimeoutExceeded()
{
- return _blocking.get() && (System.currentTimeMillis() - _blockTime) > _blockingTimeout;
+ return _wireBlockingState && (System.currentTimeMillis() - _blockTime) > _blockingTimeout;
}
@Override
@@ -3639,4 +3642,22 @@ public class AMQChannel
}
}
}
+
+ @Override
+ public void processPending()
+ {
+
+ boolean desiredBlockingState = _blocking.get();
+ if (desiredBlockingState != _wireBlockingState)
+ {
+ _wireBlockingState = desiredBlockingState;
+ flow(!desiredBlockingState);
+ _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
+ }
+
+ for(ConsumerTarget target : _tag2SubscriptionTargetMap.values())
+ {
+ target.processPending();
+ }
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index cb145aac88..d7b5b00b26 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -36,13 +36,14 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
import javax.security.sasl.SaslException;
@@ -58,7 +59,7 @@ import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.framing.*;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
@@ -69,6 +70,7 @@ import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Transport;
@@ -85,7 +87,7 @@ import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.SenderClosedException;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.TransportException;
@@ -96,6 +98,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
AMQConnectionModel<AMQProtocolEngine, AMQChannel>,
ServerMethodProcessor<ServerChannelMethodProcessor>
{
+
+
+
enum ConnectionState
{
INIT,
@@ -117,6 +122,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
private static final long AWAIT_CLOSED_TIMEOUT = 60000;
private final AmqpPort<?> _port;
private final long _creationTime;
+ private final AtomicBoolean _stateChanged = new AtomicBoolean();
+ private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>();
private AMQShortString _contextKey;
@@ -139,11 +146,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
* The channels that the latest call to {@link #received(ByteBuffer)} applied to.
* Used so we know which channels we need to call {@link AMQChannel#receivedComplete()}
* on after handling the frames.
- *
- * Thread-safety: guarded by {@link #_receivedLock}.
*/
- private final Set<AMQChannel> _channelsForCurrentMessage =
- new HashSet<>();
+ private final Set<AMQChannel> _channelsForCurrentMessage = new HashSet<>();
private AMQDecoder _decoder;
@@ -157,9 +161,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
/* AMQP Version for this session */
private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
private final MethodRegistry _methodRegistry = new MethodRegistry(_protocolVersion);
- private final List<Action<? super AMQProtocolEngine>> _taskList =
+ private final List<Action<? super AMQProtocolEngine>> _connectionCloseTaskList =
new CopyOnWriteArrayList<>();
+ private final Queue<Action<? super AMQProtocolEngine>> _asyncTaskList =
+ new ConcurrentLinkedQueue<>();
+
private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<>();
private ProtocolOutputConverter _protocolOutputConverter;
private final Subject _authorizedSubject = new Subject();
@@ -179,13 +186,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
private NetworkConnection _network;
- private Sender<ByteBuffer> _sender;
+ private ByteBufferSender _sender;
private volatile boolean _deferFlush;
- private long _lastReceivedTime;
+ private long _lastReceivedTime = System.currentTimeMillis(); // TODO consider if this is what we want?
private boolean _blocking;
- private final ReentrantLock _receivedLock;
private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis());
private final Broker<?> _broker;
private final Transport _transport;
@@ -200,6 +206,34 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
private int _currentMethodId;
private int _binaryDataLimit;
private long _maxMessageSize;
+ private volatile boolean _transportBlockedForWriting;
+
+ private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>();
+
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ Thread lock = _messageAssignmentSuspended.get();
+ return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread();
+ }
+
+ @Override
+ public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
+ {
+ _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);
+ if(!messageAssignmentSuspended)
+ {
+ for(AMQSessionModel<?,?> session : getSessionModels())
+ {
+ for(Consumer<?> consumer : session.getConsumers())
+ {
+ ((ConsumerImpl)consumer).getTarget().notifyCurrentState();
+ }
+ }
+ }
+ }
+
public AMQProtocolEngine(Broker<?> broker,
final NetworkConnection network,
@@ -211,7 +245,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
_port = port;
_transport = transport;
_maxNoOfChannels = broker.getConnection_sessionCountLimit();
- _receivedLock = new ReentrantLock();
_decoder = new BrokerDecoder(this);
_connectionID = connectionId;
_logSubject = new ConnectionLogSubject(this);
@@ -262,12 +295,28 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
return _authorizedSubject;
}
+ @Override
+ public boolean isTransportBlockedForWriting()
+ {
+ return _transportBlockedForWriting;
+ }
+
+ @Override
+ public void setTransportBlockedForWriting(final boolean blocked)
+ {
+ _transportBlockedForWriting = blocked;
+ for(AMQChannel channel : _channelMap.values())
+ {
+ channel.transportStateChanged();
+ }
+ }
+
public void setNetworkConnection(NetworkConnection network)
{
setNetworkConnection(network, network.getSender());
}
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
{
_network = network;
_sender = sender;
@@ -294,10 +343,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
return _closing.get();
}
- public synchronized void flushBatched()
- {
- _sender.flush();
- }
public ClientDeliveryMethod createDeliveryMethod(int channelId)
@@ -314,9 +359,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
{
final long arrivalTime = System.currentTimeMillis();
- if(!_authenticated &&
- (arrivalTime - _creationTime) > _port.getContextValue(Long.class,
- Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))
+ if (!_authenticated &&
+ (arrivalTime - _creationTime) > _port.getContextValue(Long.class,
+ Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))
{
_logger.warn("Connection has taken more than "
+ _port.getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)
@@ -328,7 +373,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
_lastIoTime = arrivalTime;
_readBytes += msg.remaining();
- _receivedLock.lock();
try
{
_decoder.decodeBuffer(msg);
@@ -371,7 +415,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
}
catch (StoreException e)
{
- if(_virtualHost.getState() == State.ACTIVE)
+ if (_virtualHost.getState() == State.ACTIVE)
{
throw e;
}
@@ -380,10 +424,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
_logger.error("Store Exception ignored as virtual host no longer active", e);
}
}
- finally
- {
- _receivedLock.unlock();
- }
return null;
}
});
@@ -484,64 +524,22 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
writeFrame(responseBody.generateFrame(0));
_state = ConnectionState.AWAIT_START_OK;
+ _sender.flush();
+
}
catch (AMQException e)
{
_logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion());
writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
+ _sender.flush();
}
}
private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY];
- private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(_reusableBytes);
private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes);
- private ByteBuffer asByteBuffer(AMQDataBlock block)
- {
- final int size = (int) block.getSize();
-
- final byte[] data;
-
-
- if(size > REUSABLE_BYTE_BUFFER_CAPACITY)
- {
- data= new byte[size];
- }
- else
- {
-
- data = _reusableBytes;
- }
- _reusableDataOutput.setBuffer(data);
-
- try
- {
- block.writePayload(_reusableDataOutput);
- }
- catch (IOException e)
- {
- throw new ServerScopedRuntimeException(e);
- }
-
- final ByteBuffer buf;
-
- if(size <= REUSABLE_BYTE_BUFFER_CAPACITY)
- {
- buf = _reusableByteBuffer;
- buf.position(0);
- }
- else
- {
- buf = ByteBuffer.wrap(data);
- }
- buf.limit(_reusableDataOutput.length());
-
- return buf;
- }
-
-
/**
* Convenience method that writes a frame to the protocol session. Equivalent to calling
* getProtocolSession().write().
@@ -550,16 +548,21 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
*/
public synchronized void writeFrame(AMQDataBlock frame)
{
-
- final ByteBuffer buf = asByteBuffer(frame);
- _writtenBytes += buf.remaining();
-
if(_logger.isDebugEnabled())
{
_logger.debug("SEND: " + frame);
}
- _sender.send(buf);
+ try
+ {
+ _writtenBytes += frame.writePayload(_sender);
+ }
+ catch (IOException e)
+ {
+ throw new ServerScopedRuntimeException(e);
+ }
+
+
final long time = System.currentTimeMillis();
_lastIoTime = time;
_lastWriteTime.set(time);
@@ -796,14 +799,13 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
if(_closing.compareAndSet(false,true))
{
// force sync of outstanding async work
- _receivedLock.lock();
try
{
receivedComplete();
}
finally
{
- _receivedLock.unlock();
+
finishClose(connectionDropped);
}
@@ -845,7 +847,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
{
try
{
- for (Action<? super AMQProtocolEngine> task : _taskList)
+ for (Action<? super AMQProtocolEngine> task : _connectionCloseTaskList)
{
task.performAction(this);
}
@@ -867,17 +869,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
{
synchronized(this)
{
- final boolean lockHeld = _receivedLock.isHeldByCurrentThread();
final long endTime = System.currentTimeMillis() + AWAIT_CLOSED_TIMEOUT;
while(!_closed && endTime > System.currentTimeMillis())
{
try
{
- if(lockHeld)
- {
- _receivedLock.unlock();
- }
wait(1000);
}
catch (InterruptedException e)
@@ -885,13 +882,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
Thread.currentThread().interrupt();
break;
}
- finally
- {
- if(lockHeld)
- {
- _receivedLock.lock();
- }
- }
}
if (!_closed)
@@ -1088,12 +1078,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
public void addDeleteTask(Action<? super AMQProtocolEngine> task)
{
- _taskList.add(task);
+ _connectionCloseTaskList.add(task);
}
public void removeDeleteTask(Action<? super AMQProtocolEngine> task)
{
- _taskList.remove(task);
+ _connectionCloseTaskList.remove(task);
}
public ProtocolOutputConverter getProtocolOutputConverter()
@@ -1171,6 +1161,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
}
}
+ @Override
+ public void encryptedTransport()
+ {
+ }
+
public void readerIdle()
{
Subject.doAs(_authorizedSubject, new PrivilegedAction<Object>()
@@ -1323,26 +1318,50 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
return String.valueOf(getRemoteAddress());
}
- public void closeSession(AMQChannel session, AMQConstant cause, String message)
+ public void closeSessionAsync(final AMQChannel session, final AMQConstant cause, final String message)
{
- int channelId = session.getChannelId();
- closeChannel(channelId, cause, message);
+ addAsyncTask(new Action<AMQProtocolEngine>()
+ {
- MethodRegistry methodRegistry = getMethodRegistry();
- ChannelCloseBody responseBody =
- methodRegistry.createChannelCloseBody(
- cause.getCode(),
- AMQShortString.validValueOf(message),
- 0, 0);
+ @Override
+ public void performAction(final AMQProtocolEngine object)
+ {
+ int channelId = session.getChannelId();
+ closeChannel(channelId, cause, message);
+
+ MethodRegistry methodRegistry = getMethodRegistry();
+ ChannelCloseBody responseBody =
+ methodRegistry.createChannelCloseBody(
+ cause.getCode(),
+ AMQShortString.validValueOf(message),
+ 0, 0);
+
+ writeFrame(responseBody.generateFrame(channelId));
+ }
+ });
- writeFrame(responseBody.generateFrame(channelId));
}
- public void close(AMQConstant cause, String message)
+ public void closeAsync(final AMQConstant cause, final String message)
{
- closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
- getMethodRegistry(),
- null));
+ Action<AMQProtocolEngine> action = new Action<AMQProtocolEngine>()
+ {
+ @Override
+ public void performAction(final AMQProtocolEngine object)
+ {
+ closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
+ getMethodRegistry(),
+ null));
+
+ }
+ };
+ addAsyncTask(action);
+ }
+
+ private void addAsyncTask(final Action<AMQProtocolEngine> action)
+ {
+ _asyncTaskList.add(action);
+ notifyWork();
}
public void block()
@@ -1922,11 +1941,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
return _reference;
}
- public Lock getReceivedLock()
- {
- return _receivedLock;
- }
-
@Override
public long getLastReadTime()
{
@@ -2045,4 +2059,51 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
return _closing.get();
}
+ @Override
+ public void processPending()
+ {
+
+
+ while(_asyncTaskList.peek() != null)
+ {
+ Action<? super AMQProtocolEngine> asyncAction = _asyncTaskList.poll();
+ asyncAction.performAction(this);
+ }
+
+ for (AMQSessionModel session : getSessionModels())
+ {
+ session.processPending();
+ }
+ }
+
+ @Override
+ public boolean hasWork()
+ {
+ return _stateChanged.get();
+ }
+
+ @Override
+ public void notifyWork()
+ {
+ _stateChanged.set(true);
+
+ final Action<ServerProtocolEngine> listener = _workListener.get();
+ if(listener != null)
+ {
+
+ listener.performAction(this);
+ }
+ }
+
+ @Override
+ public void clearWork()
+ {
+ _stateChanged.set(false);
+ }
+
+ @Override
+ public void setWorkListener(final Action<ServerProtocolEngine> listener)
+ {
+ _workListener.set(listener);
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index 43982db2fd..a2113de8ea 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -75,6 +75,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
+ private final AtomicBoolean _needToClose = new AtomicBoolean();
public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel,
@@ -99,6 +100,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
return _consumers;
}
+
static final class BrowserConsumer extends ConsumerTarget_0_8
{
public BrowserConsumer(AMQChannel channel,
@@ -123,7 +125,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
* @throws org.apache.qpid.AMQException
*/
@Override
- public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+ public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
// We don't decrement the reference here as we don't want to consume the message
// but we do want to send it to the client.
@@ -131,17 +133,11 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
synchronized (getChannel())
{
long deliveryTag = getChannel().getNextDeliveryTag();
- return sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
+ sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
}
}
- @Override
- public boolean allocateCredit(ServerMessage msg)
- {
- return true;
- }
-
}
public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
@@ -184,7 +180,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
* @param batch
*/
@Override
- public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+ public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
// if we do not need to wait for client acknowledgements
// we can decrement the reference count immediately.
@@ -211,14 +207,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
}
ref.release();
- return size;
-
- }
- @Override
- public boolean allocateCredit(ServerMessage msg)
- {
- return true;
}
private static final ServerTransaction.Action NOOP =
@@ -250,11 +239,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
}
- public boolean allocateCredit(ServerMessage msg)
- {
- return getCreditManager().useCreditForMessage(msg.getSize());
- }
-
}
@@ -295,9 +279,10 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
* @param batch
*/
@Override
- public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+ public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
+ // put queue entry on a list and then notify the connection to read list.
synchronized (getChannel())
{
@@ -309,12 +294,15 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
entry.addStateChangeListener(getReleasedStateChangeListener());
long size = sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
entry.incrementDeliveryCount();
- return size;
}
+
+
}
+
+
}
@@ -399,7 +387,8 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
return subscriber + "]";
}
- public boolean isSuspended()
+ @Override
+ public boolean doIsSuspended()
{
return getState()!=State.ACTIVE || _channel.isSuspended() || _deleted.get() || _channel.getConnectionModel().isStopped();
}
@@ -525,6 +514,16 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
{
if (isAutoClose())
{
+ _needToClose.set(true);
+ getChannel().getConnection().notifyWork();
+ }
+ }
+
+ @Override
+ protected void processClosed()
+ {
+ if (_needToClose.get() && getState() != State.CLOSED)
+ {
close();
confirmAutoClose();
}
@@ -533,8 +532,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
public void flushBatched()
{
_channel.getConnection().setDeferFlush(false);
-
- _channel.getConnection().flushBatched();
}
protected void addUnacknowledgedMessage(MessageInstance entry)
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java
new file mode 100644
index 0000000000..af54c911dc
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java
@@ -0,0 +1,73 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.protocol.v0_8;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.qpid.server.flow.AbstractFlowCreditManager;
+import org.apache.qpid.server.flow.FlowCreditManager;
+
+public class MessageOnlyCreditManager extends AbstractFlowCreditManager implements FlowCreditManager
+{
+ private final AtomicLong _messageCredit;
+
+ public MessageOnlyCreditManager(final long initialCredit)
+ {
+ _messageCredit = new AtomicLong(initialCredit);
+ }
+
+ public void restoreCredit(long messageCredit, long bytesCredit)
+ {
+ _messageCredit.addAndGet(messageCredit);
+ setSuspended(false);
+
+ }
+
+ public boolean hasCredit()
+ {
+ return _messageCredit.get() > 0L;
+ }
+
+ public boolean useCreditForMessage(long msgSize)
+ {
+ if(hasCredit())
+ {
+ if(_messageCredit.addAndGet(-1L) >= 0)
+ {
+ setSuspended(false);
+ return true;
+ }
+ else
+ {
+ _messageCredit.addAndGet(1L);
+ setSuspended(true);
+ return false;
+ }
+ }
+ else
+ {
+ setSuspended(true);
+ return false;
+ }
+
+ }
+
+}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java
new file mode 100644
index 0000000000..6e5aab2dd5
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.protocol.v0_8;
+
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.flow.AbstractFlowCreditManager;
+
+public class NoAckCreditManager extends AbstractFlowCreditManager
+{
+ private final ServerProtocolEngine _serverProtocolEngine;
+
+ public NoAckCreditManager(ServerProtocolEngine serverProtocolEngine)
+ {
+ _serverProtocolEngine = serverProtocolEngine;
+ }
+
+ @Override
+ public void restoreCredit(final long messageCredit, final long bytesCredit)
+ {
+ setSuspended(!hasCredit());
+ }
+
+ @Override
+ public boolean hasCredit()
+ {
+ return !_serverProtocolEngine.isTransportBlockedForWriting();
+ }
+
+ @Override
+ public boolean useCreditForMessage(final long msgSize)
+ {
+ if (!hasCredit())
+ {
+ setSuspended(true);
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
new file mode 100644
index 0000000000..a869a707e1
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
@@ -0,0 +1,190 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.protocol.v0_8;
+
+
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.flow.AbstractFlowCreditManager;
+import org.apache.qpid.server.flow.FlowCreditManager;
+
+public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager
+{
+
+ private final ServerProtocolEngine _protocolEngine;
+ private volatile long _bytesCreditLimit;
+ private volatile long _messageCreditLimit;
+
+ private volatile long _bytesCredit;
+ private volatile long _messageCredit;
+
+ public Pre0_10CreditManager(long bytesCreditLimit,
+ long messageCreditLimit,
+ ServerProtocolEngine protocolEngine)
+ {
+ _protocolEngine = protocolEngine;
+ _bytesCreditLimit = bytesCreditLimit;
+ _messageCreditLimit = messageCreditLimit;
+ _bytesCredit = bytesCreditLimit;
+ _messageCredit = messageCreditLimit;
+ }
+
+
+
+ public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit)
+ {
+ long bytesCreditChange = bytesCreditLimit - _bytesCreditLimit;
+ long messageCreditChange = messageCreditLimit - _messageCreditLimit;
+
+
+
+ if(bytesCreditChange != 0L)
+ {
+ if(bytesCreditLimit == 0L)
+ {
+ _bytesCredit = 0;
+ }
+ else
+ {
+ _bytesCredit += bytesCreditChange;
+ }
+ }
+
+
+ if(messageCreditChange != 0L)
+ {
+ if(messageCreditLimit == 0L)
+ {
+ _messageCredit = 0;
+ }
+ else
+ {
+ _messageCredit += messageCreditChange;
+ }
+ }
+
+
+ _bytesCreditLimit = bytesCreditLimit;
+ _messageCreditLimit = messageCreditLimit;
+
+ setSuspended(!hasCredit());
+
+ }
+
+
+ public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
+ {
+ final long messageCreditLimit = _messageCreditLimit;
+ boolean notifyIncrease = true;
+ if(messageCreditLimit != 0L)
+ {
+ notifyIncrease = (_messageCredit != 0);
+ long newCredit = _messageCredit + messageCredit;
+ _messageCredit = newCredit > messageCreditLimit ? messageCreditLimit : newCredit;
+ }
+
+
+ final long bytesCreditLimit = _bytesCreditLimit;
+ if(bytesCreditLimit != 0L)
+ {
+ long newCredit = _bytesCredit + bytesCredit;
+ _bytesCredit = newCredit > bytesCreditLimit ? bytesCreditLimit : newCredit;
+ if(notifyIncrease && bytesCredit>0)
+ {
+ notifyIncreaseBytesCredit();
+ }
+ }
+
+
+
+ setSuspended(!hasCredit());
+
+ }
+
+ public synchronized boolean hasCredit()
+ {
+ return (_bytesCreditLimit == 0L || _bytesCredit > 0)
+ && (_messageCreditLimit == 0L || _messageCredit > 0)
+ && !_protocolEngine.isTransportBlockedForWriting();
+ }
+
+ public synchronized boolean useCreditForMessage(final long msgSize)
+ {
+ if (_protocolEngine.isTransportBlockedForWriting())
+ {
+ setSuspended(true);
+ return false;
+ }
+ else if(_messageCreditLimit != 0L)
+ {
+ if(_messageCredit != 0L)
+ {
+ if(_bytesCreditLimit == 0L)
+ {
+ _messageCredit--;
+
+ return true;
+ }
+ else
+ {
+ if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit))
+ {
+ _messageCredit--;
+ _bytesCredit -= msgSize;
+
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+ else
+ {
+ setSuspended(true);
+ return false;
+ }
+ }
+ else
+ {
+ if(_bytesCreditLimit == 0L)
+ {
+
+ return true;
+ }
+ else
+ {
+ if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit))
+ {
+ _bytesCredit -= msgSize;
+
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ }
+
+ }
+}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java
index 0058fe86a9..e8cf028069 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java
index 7253111114..8817e79aff 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java
index e72cc4d058..af37b17d85 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
index b616aab126..4a84ccad37 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
@@ -42,6 +42,7 @@ import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.util.GZIPUtils;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
@@ -255,6 +256,15 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
}
+ @Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ ByteBuffer buf = _message.getContent(_offset, _length);
+ long size = buf.remaining();
+ sender.send(buf.duplicate());
+ return size;
+ }
+
public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
{
throw new UnsupportedOperationException();
@@ -346,6 +356,15 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
_underlyingBody.writePayload(buffer);
}
+ public long writePayload(ByteBufferSender sender) throws IOException
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ return _underlyingBody.writePayload(sender);
+ }
+
public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
throws AMQException
{
@@ -449,6 +468,18 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
@Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ long size = (new AMQFrame(_channel, _methodBody)).writePayload(sender);
+
+ size += (new AMQFrame(_channel, _headerBody)).writePayload(sender);
+
+ size += (new AMQFrame(_channel, _contentBody)).writePayload(sender);
+
+ return size;
+ }
+
+ @Override
public String toString()
{
StringBuilder builder = new StringBuilder();
@@ -490,6 +521,14 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
@Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ long size = (new AMQFrame(_channel, _methodBody)).writePayload(sender);
+ size += (new AMQFrame(_channel, _headerBody)).writePayload(sender);
+ return size;
+ }
+
+ @Override
public String toString()
{
StringBuilder builder = new StringBuilder();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
index 9326f16703..55fc865850 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
@@ -31,8 +31,6 @@ import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.MessagePublishInfo;
import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.flow.LimitlessCreditManager;
-import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.StoredMessage;
@@ -328,7 +326,7 @@ public class AckTest extends QpidTestCase
public void testMessageDequeueRestoresCreditTest() throws Exception
{
// Send 10 messages
- Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1);
+ Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1, _protocolEngine);
_subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager);
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
index 6c6b746cf2..3a759cd772 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
@@ -36,11 +36,11 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.MessagePublishInfo;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageContentSource;
@@ -50,7 +50,7 @@ import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.NetworkConnection;
public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter
@@ -224,17 +224,6 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
// Then the AMQMinaProtocolSession can join on the returning future without a NPE.
}
- public void closeSession(AMQChannel session, AMQConstant cause, String message)
- {
- super.closeSession(session, cause, message);
-
- //Simulate the Client responding with a CloseOK
- // should really update the StateManger but we don't have access here
- // changeState(AMQState.CONNECTION_CLOSED);
- ((AMQChannel)session).getConnection().closeSession(false);
-
- }
-
private class InternalWriteDeliverMethod implements ClientDeliveryMethod
{
private int _channelId;
@@ -288,16 +277,12 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
private String _remoteHost = "127.0.0.1";
private String _localHost = "127.0.0.1";
private int _port = portNumber.incrementAndGet();
- private final Sender<ByteBuffer> _sender;
+ private final ByteBufferSender _sender;
public TestNetworkConnection()
{
- _sender = new Sender<ByteBuffer>()
+ _sender = new ByteBufferSender()
{
- public void setIdleTimeout(int i)
- {
- }
-
public void send(ByteBuffer msg)
{
}
@@ -358,7 +343,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
}
@Override
- public Sender<ByteBuffer> getSender()
+ public ByteBufferSender getSender()
{
return _sender;
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java
new file mode 100644
index 0000000000..c4c89ac24a
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java
@@ -0,0 +1,47 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.protocol.v0_8;
+
+
+import org.apache.qpid.server.flow.AbstractFlowCreditManager;
+import org.apache.qpid.server.flow.FlowCreditManager;
+
+public class LimitlessCreditManager extends AbstractFlowCreditManager implements FlowCreditManager
+{
+
+ public void restoreCredit(long messageCredit, long bytesCredit)
+ {
+ }
+
+ public void removeAllCredit()
+ {
+ }
+
+ public boolean hasCredit()
+ {
+ return true;
+ }
+
+ public boolean useCreditForMessage(long msgSize)
+ {
+ return true;
+ }
+}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index 8e24d55da0..b515fda4a7 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -30,7 +30,9 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.security.auth.Subject;
@@ -51,6 +53,7 @@ import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.protocol.SessionModelListener;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
@@ -64,6 +67,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
private final AmqpPort<?> _port;
private final Broker<?> _broker;
private final SubjectCreator _subjectCreator;
+ private final ProtocolEngine_1_0_0_SASL _protocolEngine;
private VirtualHostImpl _vhost;
private final Transport _transport;
private final ConnectionEndpoint _conn;
@@ -98,15 +102,24 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
private List<Action<? super Connection_1_0>> _closeTasks =
Collections.synchronizedList(new ArrayList<Action<? super Connection_1_0>>());
+
+ private final Queue<Action<? super Connection_1_0>> _asyncTaskList =
+ new ConcurrentLinkedQueue<>();
+
+
private boolean _closedOnOpen;
+
public Connection_1_0(Broker<?> broker,
ConnectionEndpoint conn,
long connectionId,
AmqpPort<?> port,
- Transport transport, final SubjectCreator subjectCreator)
+ Transport transport,
+ final SubjectCreator subjectCreator,
+ final ProtocolEngine_1_0_0_SASL protocolEngine)
{
+ _protocolEngine = protocolEngine;
_broker = broker;
_port = port;
_transport = transport;
@@ -207,6 +220,13 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
_closeTasks.add( task );
}
+ private void addAsyncTask(final Action<Connection_1_0> action)
+ {
+ _asyncTaskList.add(action);
+ notifyWork();
+ }
+
+
public void closeReceived()
{
Collection<Session_1_0> sessions = new ArrayList(_sessions);
@@ -245,9 +265,19 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
@Override
- public void close(AMQConstant cause, String message)
+ public void closeAsync(AMQConstant cause, String message)
{
- _conn.close();
+ Action<Connection_1_0> action = new Action<Connection_1_0>()
+ {
+ @Override
+ public void performAction(final Connection_1_0 object)
+ {
+ _conn.close();
+
+ }
+ };
+ addAsyncTask(action);
+
}
@Override
@@ -263,9 +293,16 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
}
@Override
- public void closeSession(Session_1_0 session, AMQConstant cause, String message)
+ public void closeSessionAsync(final Session_1_0 session, final AMQConstant cause, final String message)
{
- session.close(cause, message);
+ addAsyncTask(new Action<Connection_1_0>()
+ {
+ @Override
+ public void performAction(final Connection_1_0 object)
+ {
+ session.close(cause, message);
+ }
+ });
}
@Override
@@ -363,6 +400,11 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
return _port;
}
+ public ServerProtocolEngine getProtocolEngine()
+ {
+ return _protocolEngine;
+ }
+
@Override
public Transport getTransport()
{
@@ -480,4 +522,38 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
}
+ public void transportStateChanged()
+ {
+ for (Session_1_0 session : _sessions)
+ {
+ session.transportStateChanged();
+ }
+ }
+
+ @Override
+ public void notifyWork()
+ {
+ _protocolEngine.notifyWork();
+ }
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ return _protocolEngine.isMessageAssignmentSuspended();
+ }
+
+ public void processPending()
+ {
+ while(_asyncTaskList.peek() != null)
+ {
+ Action<? super Connection_1_0> asyncAction = _asyncTaskList.poll();
+ asyncAction.performAction(this);
+ }
+
+ for (AMQSessionModel session : getSessionModels())
+ {
+ session.processPending();
+ }
+
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 3b9521866c..fa2e543f8d 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -40,6 +40,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.Released;
import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
@@ -83,9 +84,10 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
return _link.getEndpoint();
}
- public boolean isSuspended()
+ @Override
+ public boolean doIsSuspended()
{
- return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE;// || !getEndpoint().hasCreditToSend();
+ return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE;
}
@@ -113,22 +115,10 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
}
}
- public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
- {
- // TODO
- long size = entry.getMessage().getSize();
- send(entry);
- return size;
- }
-
- public void flushBatched()
+ public void doSend(final ConsumerImpl consumer, final MessageInstance entry, boolean batch)
{
// TODO
- }
-
- public void send(final MessageInstance queueEntry)
- {
- ServerMessage serverMessage = queueEntry.getMessage();
+ ServerMessage serverMessage = entry.getMessage();
Message_1_0 message;
if(serverMessage instanceof Message_1_0)
{
@@ -168,7 +158,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
payload.flip();
}
- if(queueEntry.getDeliveryCount() != 0)
+ if(entry.getDeliveryCount() != 0)
{
payload = payload.duplicate();
ValueHandler valueHandler = new ValueHandler(_typeRegistry);
@@ -200,7 +190,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
header.setPriority(oldHeader.getPriority());
header.setTtl(oldHeader.getTtl());
}
- header.setDeliveryCount(UnsignedInteger.valueOf(queueEntry.getDeliveryCount()));
+ header.setDeliveryCount(UnsignedInteger.valueOf(entry.getDeliveryCount()));
_sectionEncoder.reset();
_sectionEncoder.encodeObject(header);
Binary encodedHeader = _sectionEncoder.getEncoding();
@@ -230,10 +220,10 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
else
{
UnsettledAction action = _acquires
- ? new DispositionAction(tag, queueEntry)
- : new DoNothingAction(tag, queueEntry);
+ ? new DispositionAction(tag, entry)
+ : new DoNothingAction(tag, entry);
- _link.addUnsettled(tag, action, queueEntry);
+ _link.addUnsettled(tag, action, entry);
}
if(_transactionId != null)
@@ -257,9 +247,9 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
public void onRollback()
{
- if(queueEntry.isAcquiredBy(getConsumer()))
+ if(entry.isAcquiredBy(getConsumer()))
{
- queueEntry.release();
+ entry.release();
_link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
@@ -274,12 +264,17 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
}
else
{
- queueEntry.release();
+ entry.release();
}
}
}
+ public void flushBatched()
+ {
+ // TODO
+ }
+
public void queueDeleted()
{
//TODO
@@ -296,7 +291,9 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
{
synchronized (_link.getLock())
{
- final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend();
+
+ ServerProtocolEngine protocolEngine = getSession().getConnection().getProtocolEngine();
+ final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend() && !protocolEngine.isTransportBlockedForWriting();
if(!hasCredit && getState() == State.ACTIVE)
{
suspend();
@@ -336,7 +333,8 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
{
synchronized(_link.getLock())
{
- if(isSuspended() && getEndpoint() != null)
+ ServerProtocolEngine protocolEngine = getSession().getConnection().getProtocolEngine();
+ if(isSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting())
{
updateState(State.SUSPENDED, State.ACTIVE);
_transactionId = _link.getTransactionId();
@@ -544,4 +542,9 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
return 0;
}
+ @Override
+ protected void processClosed()
+ {
+
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
index fa8134cb55..e72dc17b57 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.protocol.v1_0;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
index 740b01e459..a0f10eee65 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
@@ -29,6 +29,8 @@ import java.security.PrivilegedAction;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
import javax.security.sasl.SaslException;
@@ -52,14 +54,18 @@ import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.common.ServerPropertyNames;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -79,6 +85,9 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
private long _createTime = System.currentTimeMillis();
private ConnectionEndpoint _endpoint;
private long _connectionId;
+ private final AtomicBoolean _stateChanged = new AtomicBoolean();
+ private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>();
+
private static final ByteBuffer HEADER =
ByteBuffer.wrap(new byte[]
@@ -116,8 +125,9 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
private byte _revision;
private PrintWriter _out;
private NetworkConnection _network;
- private Sender<ByteBuffer> _sender;
+ private ByteBufferSender _sender;
private Connection_1_0 _connection;
+ private volatile boolean _transportBlockedForWriting;
static enum State {
@@ -134,6 +144,10 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
private State _state = State.A;
+ private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>();
+
+
+
public ProtocolEngine_1_0_0_SASL(final NetworkConnection networkDriver, final Broker<?> broker,
long id, AmqpPort<?> port, Transport transport)
@@ -149,6 +163,31 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
}
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ Thread lock = _messageAssignmentSuspended.get();
+ return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread();
+ }
+
+ @Override
+ public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
+ {
+ _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);
+
+ if(!messageAssignmentSuspended)
+ {
+ for(AMQSessionModel<?,?> session : _connection.getSessionModels())
+ {
+ for(Consumer<?> consumer : session.getConsumers())
+ {
+ ((ConsumerImpl)consumer).getTarget().notifyCurrentState();
+ }
+ }
+ }
+ }
+
+
public SocketAddress getRemoteAddress()
{
return _network.getRemoteAddress();
@@ -179,7 +218,12 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
//Todo
}
- public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender)
+ @Override
+ public void encryptedTransport()
+ {
+ }
+
+ public void setNetworkConnection(final NetworkConnection network, final ByteBufferSender sender)
{
_network = network;
_sender = sender;
@@ -211,7 +255,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
_endpoint.setProperties(serverProperties);
_endpoint.setRemoteAddress(getRemoteAddress());
- _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport, subjectCreator);
+ _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport, subjectCreator, this);
_endpoint.setConnectionEventListener(_connection);
_endpoint.setFrameOutputHandler(this);
@@ -524,6 +568,8 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
}
+
+
public void close()
{
_sender.close();
@@ -554,4 +600,60 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
{
return _lastWriteTime;
}
+
+ @Override
+ public boolean isTransportBlockedForWriting()
+ {
+ return _transportBlockedForWriting;
+ }
+ @Override
+ public void setTransportBlockedForWriting(final boolean blocked)
+ {
+ _transportBlockedForWriting = blocked;
+ _connection.transportStateChanged();
+
+ }
+
+ public void flushBatched()
+ {
+ _sender.flush();
+ }
+
+ @Override
+ public void processPending()
+ {
+ _connection.processPending();
+
+ }
+
+ @Override
+ public boolean hasWork()
+ {
+ return _stateChanged.get();
+ }
+
+ @Override
+ public void notifyWork()
+ {
+ _stateChanged.set(true);
+
+ final Action<ServerProtocolEngine> listener = _workListener.get();
+ if(listener != null)
+ {
+ listener.performAction(this);
+ }
+ }
+
+ @Override
+ public void clearWork()
+ {
+ _stateChanged.set(false);
+ }
+
+ @Override
+ public void setWorkListener(final Action<ServerProtocolEngine> listener)
+ {
+ _workListener.set(listener);
+ }
+
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index c952a3c585..fe36ba91cb 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -729,4 +729,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
return _consumer;
}
+
+ public ConsumerTarget_1_0 getConsumerTarget()
+ {
+ return _target;
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index b9ee0ad498..2a49e812f5 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -109,6 +109,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
private final Subject _subject = new Subject();
private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>();
+ private final CopyOnWriteArrayList<SendingLink_1_0> _sendingLinks = new CopyOnWriteArrayList<>();
private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
private Session<?> _modelObject;
@@ -211,7 +212,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
);
sendingLinkEndpoint.setLinkEventListener(new SubjectSpecificSendingLinkListener(sendingLink));
- registerConsumer(sendingLink.getConsumer());
+ registerConsumer(sendingLink);
link = sendingLink;
if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable()))
@@ -412,12 +413,14 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
}
}
- private void registerConsumer(final ConsumerImpl consumer)
+ private void registerConsumer(final SendingLink_1_0 link)
{
+ ConsumerImpl consumer = link.getConsumer();
if(consumer instanceof Consumer<?>)
{
Consumer<?> modelConsumer = (Consumer<?>) consumer;
_consumers.add(modelConsumer);
+ _sendingLinks.add(link);
modelConsumer.addChangeListener(_consumerClosedListener);
consumerAdded(modelConsumer);
}
@@ -615,6 +618,20 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
}
@Override
+ public void transportStateChanged()
+ {
+ for(SendingLink_1_0 link : _sendingLinks)
+ {
+ ConsumerTarget_1_0 target = link.getConsumerTarget();
+ target.flowStateChanged();
+
+
+ }
+
+
+ }
+
+ @Override
public LogSubject getLogSubject()
{
return this;
@@ -883,6 +900,16 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
return 0L;
}
+ @Override
+ public void processPending()
+ {
+ for(Consumer<?> consumer : getConsumers())
+ {
+
+ ((ConsumerImpl)consumer).getTarget().processPending();
+ }
+ }
+
private void consumerAdded(Consumer<?> consumer)
{
for(ConsumerListener l : _consumerListeners)
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
index ce612ec0b6..63c60d7400 100644
--- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.Transaction;
public abstract class GenericAbstractJDBCMessageStore extends org.apache.qpid.server.store.AbstractJDBCMessageStore
@@ -131,7 +131,7 @@ public abstract class GenericAbstractJDBCMessageStore extends org.apache.qpid.se
}
@Override
- public StoreFuture commitTranAsync()
+ public FutureResult commitTranAsync()
{
try
{
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
index 3f873a24ff..28d8a6c88c 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
@@ -24,6 +24,9 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.message.MessageSource;
@@ -124,7 +127,6 @@ class ManagementNodeConsumer implements ConsumerImpl
@Override
public void close()
{
-
}
@Override
@@ -164,6 +166,12 @@ class ManagementNodeConsumer implements ConsumerImpl
}
+ @Override
+ public ConsumerTarget getTarget()
+ {
+ return _target;
+ }
+
ManagementNode getManagementNode()
{
return _managementNode;
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
index 69920ff488..1a85a24e0b 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
@@ -39,6 +39,8 @@ import javax.servlet.DispatcherType;
import javax.servlet.MultipartConfigElement;
import javax.servlet.http.HttpServletRequest;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.Logger;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
@@ -130,7 +132,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
}
@StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE)
- private void doStart()
+ private ListenableFuture<Void> doStart()
{
getBroker().getEventLogger().message(ManagementConsoleMessages.STARTUP(OPERATIONAL_LOGGING_NAME));
@@ -148,6 +150,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
getBroker().getEventLogger().message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME));
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@Override
@@ -206,7 +209,9 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
if(port.getState() != State.ACTIVE)
{
- port.start();
+
+ // TODO - RG
+ port.startAsync();
}
Connector connector = null;
diff --git a/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java
index 52d7ba33a3..4327292336 100644
--- a/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java
+++ b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java
@@ -30,6 +30,7 @@ import java.util.Map;
import java.util.UUID;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.Broker;
@@ -58,7 +59,9 @@ public class HttpManagementTest extends QpidTestCase
when(_broker.getModel()).thenReturn(objectFactory.getModel());
when(_broker.getCategoryClass()).thenReturn(Broker.class);
when(_broker.getEventLogger()).thenReturn(mock(EventLogger.class));
- when(_broker.getTaskExecutor()).thenReturn(mock(TaskExecutor.class));
+ TaskExecutor taskExecutor = new TaskExecutorImpl();
+ taskExecutor.start();
+ when(_broker.getTaskExecutor()).thenReturn(taskExecutor);
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(HttpManagement.HTTP_BASIC_AUTHENTICATION_ENABLED, false);
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java
index 6c962c2901..06558b9f9a 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java
@@ -32,6 +32,8 @@ import java.util.Set;
import javax.management.InstanceAlreadyExistsException;
import javax.management.JMException;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -105,7 +107,7 @@ public class JMXManagementPluginImpl
}
@StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE)
- private void doStart() throws JMException, IOException
+ private ListenableFuture<Void> doStart() throws JMException, IOException
{
_allowPortActivation = true;
Broker<?> broker = getBroker();
@@ -125,7 +127,8 @@ public class JMXManagementPluginImpl
registryPort.setPortManager(this);
if(port.getState() != State.ACTIVE)
{
- port.start();
+ // TODO - RG
+ port.startAsync();
}
}
@@ -135,7 +138,7 @@ public class JMXManagementPluginImpl
connectorPort.setPortManager(this);
if(port.getState() != State.ACTIVE)
{
- port.start();
+ port.startAsync();
}
}
@@ -175,6 +178,7 @@ public class JMXManagementPluginImpl
_objectRegistry.start();
setState(State.ACTIVE);
_allowPortActivation = false;
+ return Futures.immediateFuture(null);
}
@Override
diff --git a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
index a194ac70f9..896a7119f7 100644
--- a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
+++ b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
@@ -53,7 +53,7 @@ import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
import org.apache.qpid.server.transport.AcceptingTransport;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.security.ssl.SSLUtil;
@@ -81,9 +81,7 @@ class WebSocketProvider implements AcceptingTransport
_supported = supported;
_defaultSupportedProtocolReply = defaultSupportedProtocolReply;
_factory = new MultiVersionProtocolEngineFactory(
- _port.getParent(Broker.class), null,
- _port.getWantClientAuth(),
- _port.getNeedClientAuth(),
+ _port.getParent(Broker.class),
_supported,
_defaultSupportedProtocolReply,
_port,
@@ -242,7 +240,7 @@ class WebSocketProvider implements AcceptingTransport
}
}
- private class ConnectionWrapper implements NetworkConnection, Sender<ByteBuffer>
+ private class ConnectionWrapper implements NetworkConnection, ByteBufferSender
{
private final WebSocket.Connection _connection;
private final SocketAddress _localAddress;
@@ -261,7 +259,7 @@ class WebSocketProvider implements AcceptingTransport
}
@Override
- public Sender<ByteBuffer> getSender()
+ public ByteBufferSender getSender()
{
return this;
}
@@ -273,12 +271,6 @@ class WebSocketProvider implements AcceptingTransport
}
@Override
- public void setIdleTimeout(final int i)
- {
-
- }
-
- @Override
public void send(final ByteBuffer msg)
{
try