diff options
Diffstat (limited to 'qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java')
6 files changed, 252 insertions, 39 deletions
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index 8e24d55da0..b515fda4a7 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -30,7 +30,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import javax.security.auth.Subject; @@ -51,6 +53,7 @@ import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.protocol.SessionModelListener; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; @@ -64,6 +67,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod private final AmqpPort<?> _port; private final Broker<?> _broker; private final SubjectCreator _subjectCreator; + private final ProtocolEngine_1_0_0_SASL _protocolEngine; private VirtualHostImpl _vhost; private final Transport _transport; private final ConnectionEndpoint _conn; @@ -98,15 +102,24 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod private List<Action<? super Connection_1_0>> _closeTasks = Collections.synchronizedList(new ArrayList<Action<? super Connection_1_0>>()); + + private final Queue<Action<? super Connection_1_0>> _asyncTaskList = + new ConcurrentLinkedQueue<>(); + + private boolean _closedOnOpen; + public Connection_1_0(Broker<?> broker, ConnectionEndpoint conn, long connectionId, AmqpPort<?> port, - Transport transport, final SubjectCreator subjectCreator) + Transport transport, + final SubjectCreator subjectCreator, + final ProtocolEngine_1_0_0_SASL protocolEngine) { + _protocolEngine = protocolEngine; _broker = broker; _port = port; _transport = transport; @@ -207,6 +220,13 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod _closeTasks.add( task ); } + private void addAsyncTask(final Action<Connection_1_0> action) + { + _asyncTaskList.add(action); + notifyWork(); + } + + public void closeReceived() { Collection<Session_1_0> sessions = new ArrayList(_sessions); @@ -245,9 +265,19 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod @Override - public void close(AMQConstant cause, String message) + public void closeAsync(AMQConstant cause, String message) { - _conn.close(); + Action<Connection_1_0> action = new Action<Connection_1_0>() + { + @Override + public void performAction(final Connection_1_0 object) + { + _conn.close(); + + } + }; + addAsyncTask(action); + } @Override @@ -263,9 +293,16 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod } @Override - public void closeSession(Session_1_0 session, AMQConstant cause, String message) + public void closeSessionAsync(final Session_1_0 session, final AMQConstant cause, final String message) { - session.close(cause, message); + addAsyncTask(new Action<Connection_1_0>() + { + @Override + public void performAction(final Connection_1_0 object) + { + session.close(cause, message); + } + }); } @Override @@ -363,6 +400,11 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod return _port; } + public ServerProtocolEngine getProtocolEngine() + { + return _protocolEngine; + } + @Override public Transport getTransport() { @@ -480,4 +522,38 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod } + public void transportStateChanged() + { + for (Session_1_0 session : _sessions) + { + session.transportStateChanged(); + } + } + + @Override + public void notifyWork() + { + _protocolEngine.notifyWork(); + } + + @Override + public boolean isMessageAssignmentSuspended() + { + return _protocolEngine.isMessageAssignmentSuspended(); + } + + public void processPending() + { + while(_asyncTaskList.peek() != null) + { + Action<? super Connection_1_0> asyncAction = _asyncTaskList.poll(); + asyncAction.performAction(this); + } + + for (AMQSessionModel session : getSessionModels()) + { + session.processPending(); + } + + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index 3b9521866c..fa2e543f8d 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -40,6 +40,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.Released; import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; @@ -83,9 +84,10 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget return _link.getEndpoint(); } - public boolean isSuspended() + @Override + public boolean doIsSuspended() { - return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE;// || !getEndpoint().hasCreditToSend(); + return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE; } @@ -113,22 +115,10 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget } } - public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) - { - // TODO - long size = entry.getMessage().getSize(); - send(entry); - return size; - } - - public void flushBatched() + public void doSend(final ConsumerImpl consumer, final MessageInstance entry, boolean batch) { // TODO - } - - public void send(final MessageInstance queueEntry) - { - ServerMessage serverMessage = queueEntry.getMessage(); + ServerMessage serverMessage = entry.getMessage(); Message_1_0 message; if(serverMessage instanceof Message_1_0) { @@ -168,7 +158,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget payload.flip(); } - if(queueEntry.getDeliveryCount() != 0) + if(entry.getDeliveryCount() != 0) { payload = payload.duplicate(); ValueHandler valueHandler = new ValueHandler(_typeRegistry); @@ -200,7 +190,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget header.setPriority(oldHeader.getPriority()); header.setTtl(oldHeader.getTtl()); } - header.setDeliveryCount(UnsignedInteger.valueOf(queueEntry.getDeliveryCount())); + header.setDeliveryCount(UnsignedInteger.valueOf(entry.getDeliveryCount())); _sectionEncoder.reset(); _sectionEncoder.encodeObject(header); Binary encodedHeader = _sectionEncoder.getEncoding(); @@ -230,10 +220,10 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget else { UnsettledAction action = _acquires - ? new DispositionAction(tag, queueEntry) - : new DoNothingAction(tag, queueEntry); + ? new DispositionAction(tag, entry) + : new DoNothingAction(tag, entry); - _link.addUnsettled(tag, action, queueEntry); + _link.addUnsettled(tag, action, entry); } if(_transactionId != null) @@ -257,9 +247,9 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget public void onRollback() { - if(queueEntry.isAcquiredBy(getConsumer())) + if(entry.isAcquiredBy(getConsumer())) { - queueEntry.release(); + entry.release(); _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true); @@ -274,12 +264,17 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget } else { - queueEntry.release(); + entry.release(); } } } + public void flushBatched() + { + // TODO + } + public void queueDeleted() { //TODO @@ -296,7 +291,9 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget { synchronized (_link.getLock()) { - final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend(); + + ServerProtocolEngine protocolEngine = getSession().getConnection().getProtocolEngine(); + final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend() && !protocolEngine.isTransportBlockedForWriting(); if(!hasCredit && getState() == State.ACTIVE) { suspend(); @@ -336,7 +333,8 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget { synchronized(_link.getLock()) { - if(isSuspended() && getEndpoint() != null) + ServerProtocolEngine protocolEngine = getSession().getConnection().getProtocolEngine(); + if(isSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting()) { updateState(State.SUSPENDED, State.ACTIVE); _transactionId = _link.getTransactionId(); @@ -544,4 +542,9 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget return 0; } + @Override + protected void processClosed() + { + + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java index fa8134cb55..e72dc17b57 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.protocol.v1_0; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java index 740b01e459..a0f10eee65 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java @@ -29,6 +29,8 @@ import java.security.PrivilegedAction; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; import javax.security.sasl.SaslException; @@ -52,14 +54,18 @@ import org.apache.qpid.amqp_1_0.type.transport.AmqpError; import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.common.ServerPropertyNames; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.port.AmqpPort; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.UsernamePrincipal; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ServerScopedRuntimeException; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; @@ -79,6 +85,9 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut private long _createTime = System.currentTimeMillis(); private ConnectionEndpoint _endpoint; private long _connectionId; + private final AtomicBoolean _stateChanged = new AtomicBoolean(); + private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>(); + private static final ByteBuffer HEADER = ByteBuffer.wrap(new byte[] @@ -116,8 +125,9 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut private byte _revision; private PrintWriter _out; private NetworkConnection _network; - private Sender<ByteBuffer> _sender; + private ByteBufferSender _sender; private Connection_1_0 _connection; + private volatile boolean _transportBlockedForWriting; static enum State { @@ -134,6 +144,10 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut private State _state = State.A; + private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>(); + + + public ProtocolEngine_1_0_0_SASL(final NetworkConnection networkDriver, final Broker<?> broker, long id, AmqpPort<?> port, Transport transport) @@ -149,6 +163,31 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut } + @Override + public boolean isMessageAssignmentSuspended() + { + Thread lock = _messageAssignmentSuspended.get(); + return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread(); + } + + @Override + public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended) + { + _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null); + + if(!messageAssignmentSuspended) + { + for(AMQSessionModel<?,?> session : _connection.getSessionModels()) + { + for(Consumer<?> consumer : session.getConsumers()) + { + ((ConsumerImpl)consumer).getTarget().notifyCurrentState(); + } + } + } + } + + public SocketAddress getRemoteAddress() { return _network.getRemoteAddress(); @@ -179,7 +218,12 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut //Todo } - public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender) + @Override + public void encryptedTransport() + { + } + + public void setNetworkConnection(final NetworkConnection network, final ByteBufferSender sender) { _network = network; _sender = sender; @@ -211,7 +255,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut _endpoint.setProperties(serverProperties); _endpoint.setRemoteAddress(getRemoteAddress()); - _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport, subjectCreator); + _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport, subjectCreator, this); _endpoint.setConnectionEventListener(_connection); _endpoint.setFrameOutputHandler(this); @@ -524,6 +568,8 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut } + + public void close() { _sender.close(); @@ -554,4 +600,60 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut { return _lastWriteTime; } + + @Override + public boolean isTransportBlockedForWriting() + { + return _transportBlockedForWriting; + } + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + _transportBlockedForWriting = blocked; + _connection.transportStateChanged(); + + } + + public void flushBatched() + { + _sender.flush(); + } + + @Override + public void processPending() + { + _connection.processPending(); + + } + + @Override + public boolean hasWork() + { + return _stateChanged.get(); + } + + @Override + public void notifyWork() + { + _stateChanged.set(true); + + final Action<ServerProtocolEngine> listener = _workListener.get(); + if(listener != null) + { + listener.performAction(this); + } + } + + @Override + public void clearWork() + { + _stateChanged.set(false); + } + + @Override + public void setWorkListener(final Action<ServerProtocolEngine> listener) + { + _workListener.set(listener); + } + } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index c952a3c585..fe36ba91cb 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -729,4 +729,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { return _consumer; } + + public ConsumerTarget_1_0 getConsumerTarget() + { + return _target; + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index b9ee0ad498..2a49e812f5 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -109,6 +109,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio private final Subject _subject = new Subject(); private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>(); + private final CopyOnWriteArrayList<SendingLink_1_0> _sendingLinks = new CopyOnWriteArrayList<>(); private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>(); private Session<?> _modelObject; @@ -211,7 +212,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio ); sendingLinkEndpoint.setLinkEventListener(new SubjectSpecificSendingLinkListener(sendingLink)); - registerConsumer(sendingLink.getConsumer()); + registerConsumer(sendingLink); link = sendingLink; if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) @@ -412,12 +413,14 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio } } - private void registerConsumer(final ConsumerImpl consumer) + private void registerConsumer(final SendingLink_1_0 link) { + ConsumerImpl consumer = link.getConsumer(); if(consumer instanceof Consumer<?>) { Consumer<?> modelConsumer = (Consumer<?>) consumer; _consumers.add(modelConsumer); + _sendingLinks.add(link); modelConsumer.addChangeListener(_consumerClosedListener); consumerAdded(modelConsumer); } @@ -615,6 +618,20 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio } @Override + public void transportStateChanged() + { + for(SendingLink_1_0 link : _sendingLinks) + { + ConsumerTarget_1_0 target = link.getConsumerTarget(); + target.flowStateChanged(); + + + } + + + } + + @Override public LogSubject getLogSubject() { return this; @@ -883,6 +900,16 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio return 0L; } + @Override + public void processPending() + { + for(Consumer<?> consumer : getConsumers()) + { + + ((ConsumerImpl)consumer).getTarget().processPending(); + } + } + private void consumerAdded(Consumer<?> consumer) { for(ConsumerListener l : _consumerListeners) |