diff options
author | Keith Wall <kwall@apache.org> | 2015-02-10 18:10:16 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2015-02-10 18:10:16 +0000 |
commit | f5ee46517eb096030a6c44b14b801eb2aaeb9392 (patch) | |
tree | 25544486642cc770061489663dba650d85769404 /qpid/java | |
parent | 085486ebe5ff21133b9caf1c31625ac6ea356568 (diff) | |
download | qpid-python-f5ee46517eb096030a6c44b14b801eb2aaeb9392.tar.gz |
Refactoring: make the queue no longer be responsible for pushing messages onto the wire
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1658773 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
26 files changed, 417 insertions, 59 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java index 0421a66abf..cad7b71fdd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java @@ -23,12 +23,14 @@ package org.apache.qpid.server.consumer; import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.util.StateChangeListener; public abstract class AbstractConsumerTarget implements ConsumerTarget @@ -41,6 +43,7 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget private final Lock _stateChangeLock = new ReentrantLock(); private final AtomicInteger _stateActivates = new AtomicInteger(); + private ConcurrentLinkedQueue<ConsumerMessageInstancePair> _queue = new ConcurrentLinkedQueue(); protected AbstractConsumerTarget(final State initialState) @@ -48,6 +51,22 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget _state = new AtomicReference<State>(initialState); } + @Override + public void processPendingMessages() + { + while(hasMessagesToSend()) + { + sendNextMessage(); + } + } + + @Override + public final boolean isSuspended() + { + return getSessionModel().getConnectionModel().isMessageAssignmentSuspended() || doIsSuspended(); + } + + protected abstract boolean doIsSuspended(); public final State getState() { @@ -136,4 +155,42 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget _stateChangeLock.unlock(); } + @Override + public final long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) + { + _queue.add(new ConsumerMessageInstancePair(consumer, entry, batch)); + + getSessionModel().getConnectionModel().flushBatched(); + return entry.getMessage().getSize(); + } + + protected abstract void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch); + + @Override + public boolean hasMessagesToSend() + { + return !_queue.isEmpty(); + } + + @Override + public void sendNextMessage() + { + + ConsumerMessageInstancePair consumerMessage = _queue.peek(); + if (consumerMessage != null) + { + _queue.poll(); + + ConsumerImpl consumer = consumerMessage.getConsumer(); + MessageInstance entry = consumerMessage.getEntry(); + boolean batch = consumerMessage.isBatch(); + doSend(consumer, entry, batch); + + if (consumer.acquires()) + { + entry.unlockAcquisition(); + } + } + + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java index b15b01ede5..3b196df902 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java @@ -31,6 +31,8 @@ public interface ConsumerImpl void externalStateChange(); + ConsumerTarget getTarget(); + enum Option { ACQUIRES, diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java new file mode 100644 index 0000000000..aa5e419ce2 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.server.consumer; + +import org.apache.qpid.server.message.MessageInstance; + +public class ConsumerMessageInstancePair +{ + private final ConsumerImpl _consumer; + private final MessageInstance _entry; + private final boolean _batch; + + public ConsumerMessageInstancePair(final ConsumerImpl consumer, final MessageInstance entry, final boolean batch) + { + _consumer = consumer; + _entry = entry; + _batch = batch; + + } + + public ConsumerImpl getConsumer() + { + return _consumer; + } + + public MessageInstance getEntry() + { + return _entry; + } + + public boolean isBatch() + { + return _batch; + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java index 5aef922da5..b2e8cec315 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java @@ -33,6 +33,8 @@ public interface ConsumerTarget void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> listener); + void processPendingMessages(); + enum State { ACTIVE, SUSPENDED, CLOSED @@ -54,6 +56,10 @@ public interface ConsumerTarget long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch); + boolean hasMessagesToSend(); + + void sendNextMessage(); + void flushBatched(); void queueDeleted(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java index 26e8271d14..96900d9a5a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java @@ -107,4 +107,7 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends void removeSessionListener(SessionModelListener listener); + void flushBatched(); + + boolean isMessageAssignmentSuspended(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index 40aa1bbafd..d488ccc138 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -115,4 +115,6 @@ public interface AMQSessionModel<T extends AMQSessionModel<T,C>, C extends AMQCo long getTransactionUpdateTime(); void transportStateChanged(); + + void processPendingMessages(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index eb7599c5cc..946992cbb6 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -84,6 +84,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine _onCloseTask = onCloseTask; } + @Override + public void setMessageAssignmentSuspended(final boolean value) + { + _delegate.setMessageAssignmentSuspended(value); + } + + @Override + public boolean isMessageAssignmentSuspended() + { + return _delegate.isMessageAssignmentSuspended(); + } + public SocketAddress getRemoteAddress() { return _delegate.getRemoteAddress(); @@ -198,10 +210,33 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine return _delegate.getLastWriteTime(); } - + @Override + public void processPendingMessages() + { + _delegate.processPendingMessages(); + } private class ClosedDelegateProtocolEngine implements ServerProtocolEngine { + + @Override + public void setMessageAssignmentSuspended(final boolean value) + { + + } + + @Override + public boolean isMessageAssignmentSuspended() + { + return false; + } + + @Override + public void processPendingMessages() + { + + } + public SocketAddress getRemoteAddress() { return _network.getRemoteAddress(); @@ -318,6 +353,23 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine return 0; } + @Override + public void setMessageAssignmentSuspended(final boolean value) + { + } + + @Override + public boolean isMessageAssignmentSuspended() + { + return false; + } + + @Override + public void processPendingMessages() + { + + } + public void received(ByteBuffer msg) { _lastReadTime = System.currentTimeMillis(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 26cbf379a0..a545ce6e10 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -1148,10 +1148,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> else { deliverMessage(sub, entry, false); - if(sub.acquires()) - { - entry.unlockAcquisition(); - } } } } @@ -1978,10 +1974,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> else { deliverMessage(sub, node, batch); - if(sub.acquires()) - { - node.unlockAcquisition(); - } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java index a2c275e797..c459737c46 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java @@ -52,5 +52,4 @@ public interface QueueConsumer<X extends QueueConsumer<X>> extends ConsumerImpl, QueueContext getQueueContext(); - ConsumerTarget getTarget(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java index c85e4058a1..4fb89575aa 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java @@ -507,6 +507,7 @@ class QueueConsumerImpl return _selector; } + @Override public String toLogString() { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index 47ed224133..8b424d2c9e 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -178,6 +178,18 @@ public class MockConsumer implements ConsumerTarget return size; } + @Override + public boolean hasMessagesToSend() + { + return false; + } + + @Override + public void sendNextMessage() + { + + } + public void flushBatched() { @@ -230,6 +242,12 @@ public class MockConsumer implements ConsumerTarget } } + @Override + public void processPendingMessages() + { + + } + public ArrayList<MessageInstance> getMessages() { return messages; @@ -462,6 +480,12 @@ public class MockConsumer implements ConsumerTarget { } + + @Override + public void processPendingMessages() + { + + } } private static class MockConnectionModel implements AMQConnectionModel @@ -594,6 +618,18 @@ public class MockConsumer implements ConsumerTarget } @Override + public void flushBatched() + { + + } + + @Override + public boolean isMessageAssignmentSuspended() + { + return false; + } + + @Override public String getClientVersion() { return null; diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index afa4fb8bc0..209f6663ec 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -104,7 +104,8 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC _name = name; } - public boolean isSuspended() + @Override + public boolean doIsSuspended() { return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension } @@ -195,7 +196,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC private final AddMessageDispositionListenerAction _postIdSettingAction; - public long send(final ConsumerImpl consumer, final MessageInstance entry, boolean batch) + public void doSend(final ConsumerImpl consumer, final MessageInstance entry, boolean batch) { ServerMessage serverMsg = entry.getMessage(); @@ -346,7 +347,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC { recordUnacknowledged(entry); } - return size; } void recordUnacknowledged(MessageInstance entry) diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index 401c6fc939..3e8ba7cfab 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -32,6 +32,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.Constant; import org.apache.qpid.transport.network.Assembler; @@ -55,6 +56,8 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol private long _lastWriteTime = _createTime; private volatile boolean _transportBlockedForWriting; + private volatile boolean _messageAssignmentSuspended; + public ProtocolEngine_0_10(ServerConnection conn, NetworkConnection network) { @@ -67,6 +70,20 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol } } + @Override + public boolean isMessageAssignmentSuspended() + { + return _messageAssignmentSuspended; + } + + @Override + public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended) + { + _messageAssignmentSuspended = messageAssignmentSuspended; + } + + + public void setNetworkConnection(final NetworkConnection network, final ByteBufferSender sender) { if(!getSubject().equals(Subject.getSubject(AccessController.getContext()))) @@ -252,4 +269,12 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol _connection.transportStateChanged(); } + @Override + public void processPendingMessages() + { + for (AMQSessionModel session : _connection.getSessionModels()) + { + session.processPendingMessages(); + } + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index cbd569d036..d9b4495d6e 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -685,4 +685,17 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S ssn.transportStateChanged(); } } + + @Override + public void flushBatched() + { + getSender().flush(); + } + + + @Override + public boolean isMessageAssignmentSuspended() + { + return _serverProtocolEngine.isMessageAssignmentSuspended(); + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 1d8676edd6..3659d6ce01 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -1135,6 +1135,15 @@ public class ServerSession extends Session } } + @Override + public void processPendingMessages() + { + for(ConsumerTarget target : getSubscriptions()) + { + target.processPendingMessages(); + } + } + public final long getMaxUncommittedInMemorySize() { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index a149214455..be28024d13 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -3606,4 +3606,14 @@ public class AMQChannel } } } + + @Override + public void processPendingMessages() + { + + for(ConsumerTarget target : _tag2SubscriptionTargetMap.values()) + { + target.processPendingMessages(); + } + } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 233f68aeb6..1b69edb50e 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -202,6 +202,22 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private long _maxMessageSize; private volatile boolean _transportBlockedForWriting; + private volatile boolean _messageAssignmentSuspended; + + + @Override + public boolean isMessageAssignmentSuspended() + { + return _messageAssignmentSuspended; + } + + @Override + public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended) + { + _messageAssignmentSuspended = messageAssignmentSuspended; + } + + public AMQProtocolEngine(Broker<?> broker, final NetworkConnection network, final long connectionId, @@ -331,9 +347,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { final long arrivalTime = System.currentTimeMillis(); - if(!_authenticated && - (arrivalTime - _creationTime) > _port.getContextValue(Long.class, - Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)) + if (!_authenticated && + (arrivalTime - _creationTime) > _port.getContextValue(Long.class, + Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)) { _logger.warn("Connection has taken more than " + _port.getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY) @@ -388,7 +404,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } catch (StoreException e) { - if(_virtualHost.getState() == State.ACTIVE) + if (_virtualHost.getState() == State.ACTIVE) { throw e; } @@ -1362,7 +1378,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { closeConnection(0, new AMQConnectionException(cause, message, 0, 0, getMethodRegistry(), - null)); + null)); } public void block() @@ -2049,4 +2065,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return _closing.get(); } + @Override + public void processPendingMessages() + { + for (AMQSessionModel session : getSessionModels()) + { + session.processPendingMessages(); + } + } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index d6642aef2e..d33a4aafd8 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.v0_8; import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -33,6 +34,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.consumer.ConsumerImpl; +import org.apache.qpid.server.consumer.ConsumerMessageInstancePair; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; @@ -99,6 +101,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen return _consumers; } + static final class BrowserConsumer extends ConsumerTarget_0_8 { public BrowserConsumer(AMQChannel channel, @@ -123,7 +126,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen * @throws org.apache.qpid.AMQException */ @Override - public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) + public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch) { // We don't decrement the reference here as we don't want to consume the message // but we do want to send it to the client. @@ -131,7 +134,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen synchronized (getChannel()) { long deliveryTag = getChannel().getNextDeliveryTag(); - return sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag); + sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag); } } @@ -178,7 +181,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen * @param batch */ @Override - public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) + public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch) { // if we do not need to wait for client acknowledgements // we can decrement the reference count immediately. @@ -205,7 +208,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen } ref.release(); - return size; } @@ -278,9 +280,10 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen * @param batch */ @Override - public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) + public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch) { + // put queue entry on a list and then notify the connection to read list. synchronized (getChannel()) { @@ -292,12 +295,15 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen entry.addStateChangeListener(getReleasedStateChangeListener()); long size = sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag); entry.incrementDeliveryCount(); - return size; } + + } + + } @@ -382,7 +388,8 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen return subscriber + "]"; } - public boolean isSuspended() + @Override + public boolean doIsSuspended() { return getState()!=State.ACTIVE || _channel.isSuspended() || _deleted.get() || _channel.getConnectionModel().isStopped(); } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index b55bd03a91..b6c23dff7a 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -65,7 +65,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod private final AmqpPort<?> _port; private final Broker<?> _broker; private final SubjectCreator _subjectCreator; - private final ServerProtocolEngine _protocolEngine; + private final ProtocolEngine_1_0_0_SASL _protocolEngine; private VirtualHostImpl _vhost; private final Transport _transport; private final ConnectionEndpoint _conn; @@ -110,7 +110,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod AmqpPort<?> port, Transport transport, final SubjectCreator subjectCreator, - final ServerProtocolEngine protocolEngine) + final ProtocolEngine_1_0_0_SASL protocolEngine) { _protocolEngine = protocolEngine; _broker = broker; @@ -498,4 +498,16 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod session.transportStateChanged(); } } + + @Override + public void flushBatched() + { + _protocolEngine.flushBatched(); + } + + @Override + public boolean isMessageAssignmentSuspended() + { + return _protocolEngine.isMessageAssignmentSuspended(); + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index a44768ffdc..589bd0ec04 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -83,7 +83,8 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget return _link.getEndpoint(); } - public boolean isSuspended() + @Override + public boolean doIsSuspended() { return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE; @@ -113,22 +114,10 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget } } - public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) + public void doSend(final ConsumerImpl consumer, final MessageInstance entry, boolean batch) { // TODO - long size = entry.getMessage().getSize(); - send(entry); - return size; - } - - public void flushBatched() - { - // TODO - } - - public void send(final MessageInstance queueEntry) - { - ServerMessage serverMessage = queueEntry.getMessage(); + ServerMessage serverMessage = entry.getMessage(); Message_1_0 message; if(serverMessage instanceof Message_1_0) { @@ -168,7 +157,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget payload.flip(); } - if(queueEntry.getDeliveryCount() != 0) + if(entry.getDeliveryCount() != 0) { payload = payload.duplicate(); ValueHandler valueHandler = new ValueHandler(_typeRegistry); @@ -200,7 +189,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget header.setPriority(oldHeader.getPriority()); header.setTtl(oldHeader.getTtl()); } - header.setDeliveryCount(UnsignedInteger.valueOf(queueEntry.getDeliveryCount())); + header.setDeliveryCount(UnsignedInteger.valueOf(entry.getDeliveryCount())); _sectionEncoder.reset(); _sectionEncoder.encodeObject(header); Binary encodedHeader = _sectionEncoder.getEncoding(); @@ -230,10 +219,10 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget else { UnsettledAction action = _acquires - ? new DispositionAction(tag, queueEntry) - : new DoNothingAction(tag, queueEntry); + ? new DispositionAction(tag, entry) + : new DoNothingAction(tag, entry); - _link.addUnsettled(tag, action, queueEntry); + _link.addUnsettled(tag, action, entry); } if(_transactionId != null) @@ -257,9 +246,9 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget public void onRollback() { - if(queueEntry.isAcquiredBy(getConsumer())) + if(entry.isAcquiredBy(getConsumer())) { - queueEntry.release(); + entry.release(); _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true); @@ -274,12 +263,17 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget } else { - queueEntry.release(); + entry.release(); } } } + public void flushBatched() + { + // TODO + } + public void queueDeleted() { //TODO diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java index 147ccd4edd..57f070804a 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java @@ -56,6 +56,7 @@ import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.port.AmqpPort; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.UsernamePrincipal; import org.apache.qpid.server.util.ServerScopedRuntimeException; @@ -135,6 +136,10 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut private State _state = State.A; + private volatile boolean _messageAssignmentSuspended; + + + public ProtocolEngine_1_0_0_SASL(final NetworkConnection networkDriver, final Broker<?> broker, long id, AmqpPort<?> port, Transport transport) @@ -150,6 +155,19 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut } + @Override + public boolean isMessageAssignmentSuspended() + { + return _messageAssignmentSuspended; + } + + @Override + public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended) + { + _messageAssignmentSuspended = messageAssignmentSuspended; + } + + public SocketAddress getRemoteAddress() { return _network.getRemoteAddress(); @@ -576,4 +594,17 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut } + public void flushBatched() + { + _sender.flush(); + } + + @Override + public void processPendingMessages() + { + for (AMQSessionModel session : _connection.getSessionModels()) + { + session.processPendingMessages(); + } + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 01c11b9eca..dd03469d0f 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -899,6 +899,16 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio return 0L; } + @Override + public void processPendingMessages() + { + for(Consumer<?> consumer : getConsumers()) + { + + ((ConsumerImpl)consumer).getTarget().processPendingMessages(); + } + } + private void consumerAdded(Consumer<?> consumer) { for(ConsumerListener l : _consumerListeners) diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java index 3f873a24ff..c03dc4e1be 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java @@ -164,6 +164,12 @@ class ManagementNodeConsumer implements ConsumerImpl } + @Override + public ConsumerTarget getTarget() + { + return _target; + } + ManagementNode getManagementNode() { return _managementNode; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java index 35d262cdb3..df4d7c7721 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java @@ -34,4 +34,10 @@ public interface ServerProtocolEngine extends ProtocolEngine boolean isTransportBlockedForWriting(); void setTransportBlockedForWriting(boolean blocked); + + void setMessageAssignmentSuspended(boolean value); + + boolean isMessageAssignmentSuspended(); + + void processPendingMessages(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java index 6599f4443c..c17ee500b0 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java @@ -61,7 +61,7 @@ public class NonBlockingSenderReceiver implements ByteBufferSender private final String _remoteSocketAddress; private final AtomicBoolean _closed = new AtomicBoolean(false); - private final ServerProtocolEngine _receiver; + private final ServerProtocolEngine _protocolEngine; private final int _receiveBufSize; private final Ticker _ticker; private final Set<TransportEncryption> _encryptionSet; @@ -81,7 +81,7 @@ public class NonBlockingSenderReceiver implements ByteBufferSender public NonBlockingSenderReceiver(final NonBlockingConnection connection, - ServerProtocolEngine receiver, + ServerProtocolEngine protocolEngine, int receiveBufSize, Ticker ticker, final Set<TransportEncryption> encryptionSet, @@ -94,7 +94,7 @@ public class NonBlockingSenderReceiver implements ByteBufferSender { _connection = connection; _socketChannel = connection.getSocketChannel(); - _receiver = receiver; + _protocolEngine = protocolEngine; _receiveBufSize = receiveBufSize; _ticker = ticker; _encryptionSet = encryptionSet; @@ -170,15 +170,22 @@ public class NonBlockingSenderReceiver implements ByteBufferSender _ticker.tick(currentTime); } - _receiver.setTransportBlockedForWriting(!doWrite()); + _protocolEngine.setMessageAssignmentSuspended(true); + + _protocolEngine.processPendingMessages(); + + _protocolEngine.setTransportBlockedForWriting(!doWrite()); boolean dataRead = doRead(); _fullyWritten = doWrite(); - _receiver.setTransportBlockedForWriting(!_fullyWritten); + _protocolEngine.setTransportBlockedForWriting(!_fullyWritten); if(dataRead || (_workDone && _netInputBuffer != null && _netInputBuffer.position() != 0)) { _stateChanged.set(true); } + + // tell all consumer targets that it is okay to accept more + _protocolEngine.setMessageAssignmentSuspended(false); } catch (IOException e) { @@ -213,7 +220,7 @@ public class NonBlockingSenderReceiver implements ByteBufferSender } LOGGER.debug("Closing receiver"); - _receiver.closed(); + _protocolEngine.closed(); try { @@ -373,7 +380,7 @@ public class NonBlockingSenderReceiver implements ByteBufferSender ByteBuffer dup = _currentBuffer.duplicate(); dup.flip(); _currentBuffer = _currentBuffer.slice(); - _receiver.received(dup); + _protocolEngine.received(dup); } } else if(_transportEncryption == TransportEncryption.TLS) @@ -414,7 +421,7 @@ public class NonBlockingSenderReceiver implements ByteBufferSender { readData = true; } - _receiver.received(appInputBuffer); + _protocolEngine.received(appInputBuffer); } while(unwrapped > 0 || tasksRun); @@ -451,7 +458,7 @@ public class NonBlockingSenderReceiver implements ByteBufferSender if (_transportEncryption == TransportEncryption.NONE) { - _receiver.received(_netInputBuffer); + _protocolEngine.received(_netInputBuffer); } else { diff --git a/qpid/java/test-profiles/JavaExcludes b/qpid/java/test-profiles/JavaExcludes index b52987f369..9f88879ff2 100644 --- a/qpid/java/test-profiles/JavaExcludes +++ b/qpid/java/test-profiles/JavaExcludes @@ -27,3 +27,5 @@ org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy //QPID-4153 Messages causing a runtime selector error should be dead-lettered (or something similar) org.apache.qpid.test.client.message.SelectorTest#testRuntimeSelectorError + +org.apache.qpid.server.protocol.v0_8.AckTest |