summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java')
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java287
1 files changed, 174 insertions, 113 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index cb145aac88..d7b5b00b26 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -36,13 +36,14 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
import javax.security.sasl.SaslException;
@@ -58,7 +59,7 @@ import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.framing.*;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
@@ -69,6 +70,7 @@ import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Transport;
@@ -85,7 +87,7 @@ import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.SenderClosedException;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.TransportException;
@@ -96,6 +98,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
AMQConnectionModel<AMQProtocolEngine, AMQChannel>,
ServerMethodProcessor<ServerChannelMethodProcessor>
{
+
+
+
enum ConnectionState
{
INIT,
@@ -117,6 +122,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
private static final long AWAIT_CLOSED_TIMEOUT = 60000;
private final AmqpPort<?> _port;
private final long _creationTime;
+ private final AtomicBoolean _stateChanged = new AtomicBoolean();
+ private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>();
private AMQShortString _contextKey;
@@ -139,11 +146,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
* The channels that the latest call to {@link #received(ByteBuffer)} applied to.
* Used so we know which channels we need to call {@link AMQChannel#receivedComplete()}
* on after handling the frames.
- *
- * Thread-safety: guarded by {@link #_receivedLock}.
*/
- private final Set<AMQChannel> _channelsForCurrentMessage =
- new HashSet<>();
+ private final Set<AMQChannel> _channelsForCurrentMessage = new HashSet<>();
private AMQDecoder _decoder;
@@ -157,9 +161,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
/* AMQP Version for this session */
private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
private final MethodRegistry _methodRegistry = new MethodRegistry(_protocolVersion);
- private final List<Action<? super AMQProtocolEngine>> _taskList =
+ private final List<Action<? super AMQProtocolEngine>> _connectionCloseTaskList =
new CopyOnWriteArrayList<>();
+ private final Queue<Action<? super AMQProtocolEngine>> _asyncTaskList =
+ new ConcurrentLinkedQueue<>();
+
private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<>();
private ProtocolOutputConverter _protocolOutputConverter;
private final Subject _authorizedSubject = new Subject();
@@ -179,13 +186,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
private NetworkConnection _network;
- private Sender<ByteBuffer> _sender;
+ private ByteBufferSender _sender;
private volatile boolean _deferFlush;
- private long _lastReceivedTime;
+ private long _lastReceivedTime = System.currentTimeMillis(); // TODO consider if this is what we want?
private boolean _blocking;
- private final ReentrantLock _receivedLock;
private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis());
private final Broker<?> _broker;
private final Transport _transport;
@@ -200,6 +206,34 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
private int _currentMethodId;
private int _binaryDataLimit;
private long _maxMessageSize;
+ private volatile boolean _transportBlockedForWriting;
+
+ private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>();
+
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ Thread lock = _messageAssignmentSuspended.get();
+ return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread();
+ }
+
+ @Override
+ public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
+ {
+ _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);
+ if(!messageAssignmentSuspended)
+ {
+ for(AMQSessionModel<?,?> session : getSessionModels())
+ {
+ for(Consumer<?> consumer : session.getConsumers())
+ {
+ ((ConsumerImpl)consumer).getTarget().notifyCurrentState();
+ }
+ }
+ }
+ }
+
public AMQProtocolEngine(Broker<?> broker,
final NetworkConnection network,
@@ -211,7 +245,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
_port = port;
_transport = transport;
_maxNoOfChannels = broker.getConnection_sessionCountLimit();
- _receivedLock = new ReentrantLock();
_decoder = new BrokerDecoder(this);
_connectionID = connectionId;
_logSubject = new ConnectionLogSubject(this);
@@ -262,12 +295,28 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
return _authorizedSubject;
}
+ @Override
+ public boolean isTransportBlockedForWriting()
+ {
+ return _transportBlockedForWriting;
+ }
+
+ @Override
+ public void setTransportBlockedForWriting(final boolean blocked)
+ {
+ _transportBlockedForWriting = blocked;
+ for(AMQChannel channel : _channelMap.values())
+ {
+ channel.transportStateChanged();
+ }
+ }
+
public void setNetworkConnection(NetworkConnection network)
{
setNetworkConnection(network, network.getSender());
}
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
{
_network = network;
_sender = sender;
@@ -294,10 +343,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
return _closing.get();
}
- public synchronized void flushBatched()
- {
- _sender.flush();
- }
public ClientDeliveryMethod createDeliveryMethod(int channelId)
@@ -314,9 +359,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
{
final long arrivalTime = System.currentTimeMillis();
- if(!_authenticated &&
- (arrivalTime - _creationTime) > _port.getContextValue(Long.class,
- Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))
+ if (!_authenticated &&
+ (arrivalTime - _creationTime) > _port.getContextValue(Long.class,
+ Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))
{
_logger.warn("Connection has taken more than "
+ _port.getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)
@@ -328,7 +373,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
_lastIoTime = arrivalTime;
_readBytes += msg.remaining();
- _receivedLock.lock();
try
{
_decoder.decodeBuffer(msg);
@@ -371,7 +415,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
}
catch (StoreException e)
{
- if(_virtualHost.getState() == State.ACTIVE)
+ if (_virtualHost.getState() == State.ACTIVE)
{
throw e;
}
@@ -380,10 +424,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
_logger.error("Store Exception ignored as virtual host no longer active", e);
}
}
- finally
- {
- _receivedLock.unlock();
- }
return null;
}
});
@@ -484,64 +524,22 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
writeFrame(responseBody.generateFrame(0));
_state = ConnectionState.AWAIT_START_OK;
+ _sender.flush();
+
}
catch (AMQException e)
{
_logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion());
writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
+ _sender.flush();
}
}
private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY];
- private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(_reusableBytes);
private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes);
- private ByteBuffer asByteBuffer(AMQDataBlock block)
- {
- final int size = (int) block.getSize();
-
- final byte[] data;
-
-
- if(size > REUSABLE_BYTE_BUFFER_CAPACITY)
- {
- data= new byte[size];
- }
- else
- {
-
- data = _reusableBytes;
- }
- _reusableDataOutput.setBuffer(data);
-
- try
- {
- block.writePayload(_reusableDataOutput);
- }
- catch (IOException e)
- {
- throw new ServerScopedRuntimeException(e);
- }
-
- final ByteBuffer buf;
-
- if(size <= REUSABLE_BYTE_BUFFER_CAPACITY)
- {
- buf = _reusableByteBuffer;
- buf.position(0);
- }
- else
- {
- buf = ByteBuffer.wrap(data);
- }
- buf.limit(_reusableDataOutput.length());
-
- return buf;
- }
-
-
/**
* Convenience method that writes a frame to the protocol session. Equivalent to calling
* getProtocolSession().write().
@@ -550,16 +548,21 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
*/
public synchronized void writeFrame(AMQDataBlock frame)
{
-
- final ByteBuffer buf = asByteBuffer(frame);
- _writtenBytes += buf.remaining();
-
if(_logger.isDebugEnabled())
{
_logger.debug("SEND: " + frame);
}
- _sender.send(buf);
+ try
+ {
+ _writtenBytes += frame.writePayload(_sender);
+ }
+ catch (IOException e)
+ {
+ throw new ServerScopedRuntimeException(e);
+ }
+
+
final long time = System.currentTimeMillis();
_lastIoTime = time;
_lastWriteTime.set(time);
@@ -796,14 +799,13 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
if(_closing.compareAndSet(false,true))
{
// force sync of outstanding async work
- _receivedLock.lock();
try
{
receivedComplete();
}
finally
{
- _receivedLock.unlock();
+
finishClose(connectionDropped);
}
@@ -845,7 +847,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
{
try
{
- for (Action<? super AMQProtocolEngine> task : _taskList)
+ for (Action<? super AMQProtocolEngine> task : _connectionCloseTaskList)
{
task.performAction(this);
}
@@ -867,17 +869,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
{
synchronized(this)
{
- final boolean lockHeld = _receivedLock.isHeldByCurrentThread();
final long endTime = System.currentTimeMillis() + AWAIT_CLOSED_TIMEOUT;
while(!_closed && endTime > System.currentTimeMillis())
{
try
{
- if(lockHeld)
- {
- _receivedLock.unlock();
- }
wait(1000);
}
catch (InterruptedException e)
@@ -885,13 +882,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
Thread.currentThread().interrupt();
break;
}
- finally
- {
- if(lockHeld)
- {
- _receivedLock.lock();
- }
- }
}
if (!_closed)
@@ -1088,12 +1078,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
public void addDeleteTask(Action<? super AMQProtocolEngine> task)
{
- _taskList.add(task);
+ _connectionCloseTaskList.add(task);
}
public void removeDeleteTask(Action<? super AMQProtocolEngine> task)
{
- _taskList.remove(task);
+ _connectionCloseTaskList.remove(task);
}
public ProtocolOutputConverter getProtocolOutputConverter()
@@ -1171,6 +1161,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
}
}
+ @Override
+ public void encryptedTransport()
+ {
+ }
+
public void readerIdle()
{
Subject.doAs(_authorizedSubject, new PrivilegedAction<Object>()
@@ -1323,26 +1318,50 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
return String.valueOf(getRemoteAddress());
}
- public void closeSession(AMQChannel session, AMQConstant cause, String message)
+ public void closeSessionAsync(final AMQChannel session, final AMQConstant cause, final String message)
{
- int channelId = session.getChannelId();
- closeChannel(channelId, cause, message);
+ addAsyncTask(new Action<AMQProtocolEngine>()
+ {
- MethodRegistry methodRegistry = getMethodRegistry();
- ChannelCloseBody responseBody =
- methodRegistry.createChannelCloseBody(
- cause.getCode(),
- AMQShortString.validValueOf(message),
- 0, 0);
+ @Override
+ public void performAction(final AMQProtocolEngine object)
+ {
+ int channelId = session.getChannelId();
+ closeChannel(channelId, cause, message);
+
+ MethodRegistry methodRegistry = getMethodRegistry();
+ ChannelCloseBody responseBody =
+ methodRegistry.createChannelCloseBody(
+ cause.getCode(),
+ AMQShortString.validValueOf(message),
+ 0, 0);
+
+ writeFrame(responseBody.generateFrame(channelId));
+ }
+ });
- writeFrame(responseBody.generateFrame(channelId));
}
- public void close(AMQConstant cause, String message)
+ public void closeAsync(final AMQConstant cause, final String message)
{
- closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
- getMethodRegistry(),
- null));
+ Action<AMQProtocolEngine> action = new Action<AMQProtocolEngine>()
+ {
+ @Override
+ public void performAction(final AMQProtocolEngine object)
+ {
+ closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
+ getMethodRegistry(),
+ null));
+
+ }
+ };
+ addAsyncTask(action);
+ }
+
+ private void addAsyncTask(final Action<AMQProtocolEngine> action)
+ {
+ _asyncTaskList.add(action);
+ notifyWork();
}
public void block()
@@ -1922,11 +1941,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
return _reference;
}
- public Lock getReceivedLock()
- {
- return _receivedLock;
- }
-
@Override
public long getLastReadTime()
{
@@ -2045,4 +2059,51 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
return _closing.get();
}
+ @Override
+ public void processPending()
+ {
+
+
+ while(_asyncTaskList.peek() != null)
+ {
+ Action<? super AMQProtocolEngine> asyncAction = _asyncTaskList.poll();
+ asyncAction.performAction(this);
+ }
+
+ for (AMQSessionModel session : getSessionModels())
+ {
+ session.processPending();
+ }
+ }
+
+ @Override
+ public boolean hasWork()
+ {
+ return _stateChanged.get();
+ }
+
+ @Override
+ public void notifyWork()
+ {
+ _stateChanged.set(true);
+
+ final Action<ServerProtocolEngine> listener = _workListener.get();
+ if(listener != null)
+ {
+
+ listener.performAction(this);
+ }
+ }
+
+ @Override
+ public void clearWork()
+ {
+ _stateChanged.set(false);
+ }
+
+ @Override
+ public void setWorkListener(final Action<ServerProtocolEngine> listener)
+ {
+ _workListener.set(listener);
+ }
}