summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-02-10 18:10:16 +0000
committerKeith Wall <kwall@apache.org>2015-02-10 18:10:16 +0000
commitf5ee46517eb096030a6c44b14b801eb2aaeb9392 (patch)
tree25544486642cc770061489663dba650d85769404 /qpid/java
parent085486ebe5ff21133b9caf1c31625ac6ea356568 (diff)
downloadqpid-python-f5ee46517eb096030a6c44b14b801eb2aaeb9392.tar.gz
Refactoring: make the queue no longer be responsible for pushing messages onto the wire
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1658773 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java57
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java52
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java2
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java54
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java1
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java36
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java6
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java25
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java13
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java9
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java10
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java34
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java21
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java16
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java40
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java31
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java10
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java6
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java6
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java25
-rw-r--r--qpid/java/test-profiles/JavaExcludes2
26 files changed, 417 insertions, 59 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index 0421a66abf..cad7b71fdd 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -23,12 +23,14 @@ package org.apache.qpid.server.consumer;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.util.StateChangeListener;
public abstract class AbstractConsumerTarget implements ConsumerTarget
@@ -41,6 +43,7 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget
private final Lock _stateChangeLock = new ReentrantLock();
private final AtomicInteger _stateActivates = new AtomicInteger();
+ private ConcurrentLinkedQueue<ConsumerMessageInstancePair> _queue = new ConcurrentLinkedQueue();
protected AbstractConsumerTarget(final State initialState)
@@ -48,6 +51,22 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget
_state = new AtomicReference<State>(initialState);
}
+ @Override
+ public void processPendingMessages()
+ {
+ while(hasMessagesToSend())
+ {
+ sendNextMessage();
+ }
+ }
+
+ @Override
+ public final boolean isSuspended()
+ {
+ return getSessionModel().getConnectionModel().isMessageAssignmentSuspended() || doIsSuspended();
+ }
+
+ protected abstract boolean doIsSuspended();
public final State getState()
{
@@ -136,4 +155,42 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget
_stateChangeLock.unlock();
}
+ @Override
+ public final long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+ {
+ _queue.add(new ConsumerMessageInstancePair(consumer, entry, batch));
+
+ getSessionModel().getConnectionModel().flushBatched();
+ return entry.getMessage().getSize();
+ }
+
+ protected abstract void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
+
+ @Override
+ public boolean hasMessagesToSend()
+ {
+ return !_queue.isEmpty();
+ }
+
+ @Override
+ public void sendNextMessage()
+ {
+
+ ConsumerMessageInstancePair consumerMessage = _queue.peek();
+ if (consumerMessage != null)
+ {
+ _queue.poll();
+
+ ConsumerImpl consumer = consumerMessage.getConsumer();
+ MessageInstance entry = consumerMessage.getEntry();
+ boolean batch = consumerMessage.isBatch();
+ doSend(consumer, entry, batch);
+
+ if (consumer.acquires())
+ {
+ entry.unlockAcquisition();
+ }
+ }
+
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
index b15b01ede5..3b196df902 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
@@ -31,6 +31,8 @@ public interface ConsumerImpl
void externalStateChange();
+ ConsumerTarget getTarget();
+
enum Option
{
ACQUIRES,
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java
new file mode 100644
index 0000000000..aa5e419ce2
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.consumer;
+
+import org.apache.qpid.server.message.MessageInstance;
+
+public class ConsumerMessageInstancePair
+{
+ private final ConsumerImpl _consumer;
+ private final MessageInstance _entry;
+ private final boolean _batch;
+
+ public ConsumerMessageInstancePair(final ConsumerImpl consumer, final MessageInstance entry, final boolean batch)
+ {
+ _consumer = consumer;
+ _entry = entry;
+ _batch = batch;
+
+ }
+
+ public ConsumerImpl getConsumer()
+ {
+ return _consumer;
+ }
+
+ public MessageInstance getEntry()
+ {
+ return _entry;
+ }
+
+ public boolean isBatch()
+ {
+ return _batch;
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
index 5aef922da5..b2e8cec315 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
@@ -33,6 +33,8 @@ public interface ConsumerTarget
void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> listener);
+ void processPendingMessages();
+
enum State
{
ACTIVE, SUSPENDED, CLOSED
@@ -54,6 +56,10 @@ public interface ConsumerTarget
long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
+ boolean hasMessagesToSend();
+
+ void sendNextMessage();
+
void flushBatched();
void queueDeleted();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
index 26e8271d14..96900d9a5a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
@@ -107,4 +107,7 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends
void removeSessionListener(SessionModelListener listener);
+ void flushBatched();
+
+ boolean isMessageAssignmentSuspended();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
index 40aa1bbafd..d488ccc138 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
@@ -115,4 +115,6 @@ public interface AMQSessionModel<T extends AMQSessionModel<T,C>, C extends AMQCo
long getTransactionUpdateTime();
void transportStateChanged();
+
+ void processPendingMessages();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index eb7599c5cc..946992cbb6 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -84,6 +84,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
_onCloseTask = onCloseTask;
}
+ @Override
+ public void setMessageAssignmentSuspended(final boolean value)
+ {
+ _delegate.setMessageAssignmentSuspended(value);
+ }
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ return _delegate.isMessageAssignmentSuspended();
+ }
+
public SocketAddress getRemoteAddress()
{
return _delegate.getRemoteAddress();
@@ -198,10 +210,33 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
return _delegate.getLastWriteTime();
}
-
+ @Override
+ public void processPendingMessages()
+ {
+ _delegate.processPendingMessages();
+ }
private class ClosedDelegateProtocolEngine implements ServerProtocolEngine
{
+
+ @Override
+ public void setMessageAssignmentSuspended(final boolean value)
+ {
+
+ }
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ return false;
+ }
+
+ @Override
+ public void processPendingMessages()
+ {
+
+ }
+
public SocketAddress getRemoteAddress()
{
return _network.getRemoteAddress();
@@ -318,6 +353,23 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
return 0;
}
+ @Override
+ public void setMessageAssignmentSuspended(final boolean value)
+ {
+ }
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ return false;
+ }
+
+ @Override
+ public void processPendingMessages()
+ {
+
+ }
+
public void received(ByteBuffer msg)
{
_lastReadTime = System.currentTimeMillis();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 26cbf379a0..a545ce6e10 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1148,10 +1148,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
else
{
deliverMessage(sub, entry, false);
- if(sub.acquires())
- {
- entry.unlockAcquisition();
- }
}
}
}
@@ -1978,10 +1974,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
else
{
deliverMessage(sub, node, batch);
- if(sub.acquires())
- {
- node.unlockAcquisition();
- }
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
index a2c275e797..c459737c46 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
@@ -52,5 +52,4 @@ public interface QueueConsumer<X extends QueueConsumer<X>> extends ConsumerImpl,
QueueContext getQueueContext();
- ConsumerTarget getTarget();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index c85e4058a1..4fb89575aa 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -507,6 +507,7 @@ class QueueConsumerImpl
return _selector;
}
+
@Override
public String toLogString()
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index 47ed224133..8b424d2c9e 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -178,6 +178,18 @@ public class MockConsumer implements ConsumerTarget
return size;
}
+ @Override
+ public boolean hasMessagesToSend()
+ {
+ return false;
+ }
+
+ @Override
+ public void sendNextMessage()
+ {
+
+ }
+
public void flushBatched()
{
@@ -230,6 +242,12 @@ public class MockConsumer implements ConsumerTarget
}
}
+ @Override
+ public void processPendingMessages()
+ {
+
+ }
+
public ArrayList<MessageInstance> getMessages()
{
return messages;
@@ -462,6 +480,12 @@ public class MockConsumer implements ConsumerTarget
{
}
+
+ @Override
+ public void processPendingMessages()
+ {
+
+ }
}
private static class MockConnectionModel implements AMQConnectionModel
@@ -594,6 +618,18 @@ public class MockConsumer implements ConsumerTarget
}
@Override
+ public void flushBatched()
+ {
+
+ }
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ return false;
+ }
+
+ @Override
public String getClientVersion()
{
return null;
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index afa4fb8bc0..209f6663ec 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -104,7 +104,8 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
_name = name;
}
- public boolean isSuspended()
+ @Override
+ public boolean doIsSuspended()
{
return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension
}
@@ -195,7 +196,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
private final AddMessageDispositionListenerAction _postIdSettingAction;
- public long send(final ConsumerImpl consumer, final MessageInstance entry, boolean batch)
+ public void doSend(final ConsumerImpl consumer, final MessageInstance entry, boolean batch)
{
ServerMessage serverMsg = entry.getMessage();
@@ -346,7 +347,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
{
recordUnacknowledged(entry);
}
- return size;
}
void recordUnacknowledged(MessageInstance entry)
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index 401c6fc939..3e8ba7cfab 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -32,6 +32,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.Constant;
import org.apache.qpid.transport.network.Assembler;
@@ -55,6 +56,8 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
private long _lastWriteTime = _createTime;
private volatile boolean _transportBlockedForWriting;
+ private volatile boolean _messageAssignmentSuspended;
+
public ProtocolEngine_0_10(ServerConnection conn,
NetworkConnection network)
{
@@ -67,6 +70,20 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
}
}
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ return _messageAssignmentSuspended;
+ }
+
+ @Override
+ public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
+ {
+ _messageAssignmentSuspended = messageAssignmentSuspended;
+ }
+
+
+
public void setNetworkConnection(final NetworkConnection network, final ByteBufferSender sender)
{
if(!getSubject().equals(Subject.getSubject(AccessController.getContext())))
@@ -252,4 +269,12 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
_connection.transportStateChanged();
}
+ @Override
+ public void processPendingMessages()
+ {
+ for (AMQSessionModel session : _connection.getSessionModels())
+ {
+ session.processPendingMessages();
+ }
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index cbd569d036..d9b4495d6e 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -685,4 +685,17 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
ssn.transportStateChanged();
}
}
+
+ @Override
+ public void flushBatched()
+ {
+ getSender().flush();
+ }
+
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ return _serverProtocolEngine.isMessageAssignmentSuspended();
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 1d8676edd6..3659d6ce01 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -1135,6 +1135,15 @@ public class ServerSession extends Session
}
}
+ @Override
+ public void processPendingMessages()
+ {
+ for(ConsumerTarget target : getSubscriptions())
+ {
+ target.processPendingMessages();
+ }
+ }
+
public final long getMaxUncommittedInMemorySize()
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index a149214455..be28024d13 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -3606,4 +3606,14 @@ public class AMQChannel
}
}
}
+
+ @Override
+ public void processPendingMessages()
+ {
+
+ for(ConsumerTarget target : _tag2SubscriptionTargetMap.values())
+ {
+ target.processPendingMessages();
+ }
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 233f68aeb6..1b69edb50e 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -202,6 +202,22 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
private long _maxMessageSize;
private volatile boolean _transportBlockedForWriting;
+ private volatile boolean _messageAssignmentSuspended;
+
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ return _messageAssignmentSuspended;
+ }
+
+ @Override
+ public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
+ {
+ _messageAssignmentSuspended = messageAssignmentSuspended;
+ }
+
+
public AMQProtocolEngine(Broker<?> broker,
final NetworkConnection network,
final long connectionId,
@@ -331,9 +347,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
{
final long arrivalTime = System.currentTimeMillis();
- if(!_authenticated &&
- (arrivalTime - _creationTime) > _port.getContextValue(Long.class,
- Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))
+ if (!_authenticated &&
+ (arrivalTime - _creationTime) > _port.getContextValue(Long.class,
+ Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))
{
_logger.warn("Connection has taken more than "
+ _port.getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)
@@ -388,7 +404,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
}
catch (StoreException e)
{
- if(_virtualHost.getState() == State.ACTIVE)
+ if (_virtualHost.getState() == State.ACTIVE)
{
throw e;
}
@@ -1362,7 +1378,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
{
closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
getMethodRegistry(),
- null));
+ null));
}
public void block()
@@ -2049,4 +2065,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
return _closing.get();
}
+ @Override
+ public void processPendingMessages()
+ {
+ for (AMQSessionModel session : getSessionModels())
+ {
+ session.processPendingMessages();
+ }
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index d6642aef2e..d33a4aafd8 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol.v0_8;
import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -33,6 +34,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerMessageInstancePair;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
@@ -99,6 +101,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
return _consumers;
}
+
static final class BrowserConsumer extends ConsumerTarget_0_8
{
public BrowserConsumer(AMQChannel channel,
@@ -123,7 +126,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
* @throws org.apache.qpid.AMQException
*/
@Override
- public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+ public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
// We don't decrement the reference here as we don't want to consume the message
// but we do want to send it to the client.
@@ -131,7 +134,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
synchronized (getChannel())
{
long deliveryTag = getChannel().getNextDeliveryTag();
- return sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
+ sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
}
}
@@ -178,7 +181,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
* @param batch
*/
@Override
- public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+ public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
// if we do not need to wait for client acknowledgements
// we can decrement the reference count immediately.
@@ -205,7 +208,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
}
ref.release();
- return size;
}
@@ -278,9 +280,10 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
* @param batch
*/
@Override
- public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+ public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
+ // put queue entry on a list and then notify the connection to read list.
synchronized (getChannel())
{
@@ -292,12 +295,15 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
entry.addStateChangeListener(getReleasedStateChangeListener());
long size = sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
entry.incrementDeliveryCount();
- return size;
}
+
+
}
+
+
}
@@ -382,7 +388,8 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
return subscriber + "]";
}
- public boolean isSuspended()
+ @Override
+ public boolean doIsSuspended()
{
return getState()!=State.ACTIVE || _channel.isSuspended() || _deleted.get() || _channel.getConnectionModel().isStopped();
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index b55bd03a91..b6c23dff7a 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -65,7 +65,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
private final AmqpPort<?> _port;
private final Broker<?> _broker;
private final SubjectCreator _subjectCreator;
- private final ServerProtocolEngine _protocolEngine;
+ private final ProtocolEngine_1_0_0_SASL _protocolEngine;
private VirtualHostImpl _vhost;
private final Transport _transport;
private final ConnectionEndpoint _conn;
@@ -110,7 +110,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
AmqpPort<?> port,
Transport transport,
final SubjectCreator subjectCreator,
- final ServerProtocolEngine protocolEngine)
+ final ProtocolEngine_1_0_0_SASL protocolEngine)
{
_protocolEngine = protocolEngine;
_broker = broker;
@@ -498,4 +498,16 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
session.transportStateChanged();
}
}
+
+ @Override
+ public void flushBatched()
+ {
+ _protocolEngine.flushBatched();
+ }
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ return _protocolEngine.isMessageAssignmentSuspended();
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index a44768ffdc..589bd0ec04 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -83,7 +83,8 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
return _link.getEndpoint();
}
- public boolean isSuspended()
+ @Override
+ public boolean doIsSuspended()
{
return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE;
@@ -113,22 +114,10 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
}
}
- public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+ public void doSend(final ConsumerImpl consumer, final MessageInstance entry, boolean batch)
{
// TODO
- long size = entry.getMessage().getSize();
- send(entry);
- return size;
- }
-
- public void flushBatched()
- {
- // TODO
- }
-
- public void send(final MessageInstance queueEntry)
- {
- ServerMessage serverMessage = queueEntry.getMessage();
+ ServerMessage serverMessage = entry.getMessage();
Message_1_0 message;
if(serverMessage instanceof Message_1_0)
{
@@ -168,7 +157,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
payload.flip();
}
- if(queueEntry.getDeliveryCount() != 0)
+ if(entry.getDeliveryCount() != 0)
{
payload = payload.duplicate();
ValueHandler valueHandler = new ValueHandler(_typeRegistry);
@@ -200,7 +189,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
header.setPriority(oldHeader.getPriority());
header.setTtl(oldHeader.getTtl());
}
- header.setDeliveryCount(UnsignedInteger.valueOf(queueEntry.getDeliveryCount()));
+ header.setDeliveryCount(UnsignedInteger.valueOf(entry.getDeliveryCount()));
_sectionEncoder.reset();
_sectionEncoder.encodeObject(header);
Binary encodedHeader = _sectionEncoder.getEncoding();
@@ -230,10 +219,10 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
else
{
UnsettledAction action = _acquires
- ? new DispositionAction(tag, queueEntry)
- : new DoNothingAction(tag, queueEntry);
+ ? new DispositionAction(tag, entry)
+ : new DoNothingAction(tag, entry);
- _link.addUnsettled(tag, action, queueEntry);
+ _link.addUnsettled(tag, action, entry);
}
if(_transactionId != null)
@@ -257,9 +246,9 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
public void onRollback()
{
- if(queueEntry.isAcquiredBy(getConsumer()))
+ if(entry.isAcquiredBy(getConsumer()))
{
- queueEntry.release();
+ entry.release();
_link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
@@ -274,12 +263,17 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
}
else
{
- queueEntry.release();
+ entry.release();
}
}
}
+ public void flushBatched()
+ {
+ // TODO
+ }
+
public void queueDeleted()
{
//TODO
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
index 147ccd4edd..57f070804a 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
@@ -56,6 +56,7 @@ import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -135,6 +136,10 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
private State _state = State.A;
+ private volatile boolean _messageAssignmentSuspended;
+
+
+
public ProtocolEngine_1_0_0_SASL(final NetworkConnection networkDriver, final Broker<?> broker,
long id, AmqpPort<?> port, Transport transport)
@@ -150,6 +155,19 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
}
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ return _messageAssignmentSuspended;
+ }
+
+ @Override
+ public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
+ {
+ _messageAssignmentSuspended = messageAssignmentSuspended;
+ }
+
+
public SocketAddress getRemoteAddress()
{
return _network.getRemoteAddress();
@@ -576,4 +594,17 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
}
+ public void flushBatched()
+ {
+ _sender.flush();
+ }
+
+ @Override
+ public void processPendingMessages()
+ {
+ for (AMQSessionModel session : _connection.getSessionModels())
+ {
+ session.processPendingMessages();
+ }
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 01c11b9eca..dd03469d0f 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -899,6 +899,16 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
return 0L;
}
+ @Override
+ public void processPendingMessages()
+ {
+ for(Consumer<?> consumer : getConsumers())
+ {
+
+ ((ConsumerImpl)consumer).getTarget().processPendingMessages();
+ }
+ }
+
private void consumerAdded(Consumer<?> consumer)
{
for(ConsumerListener l : _consumerListeners)
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
index 3f873a24ff..c03dc4e1be 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
@@ -164,6 +164,12 @@ class ManagementNodeConsumer implements ConsumerImpl
}
+ @Override
+ public ConsumerTarget getTarget()
+ {
+ return _target;
+ }
+
ManagementNode getManagementNode()
{
return _managementNode;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
index 35d262cdb3..df4d7c7721 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
@@ -34,4 +34,10 @@ public interface ServerProtocolEngine extends ProtocolEngine
boolean isTransportBlockedForWriting();
void setTransportBlockedForWriting(boolean blocked);
+
+ void setMessageAssignmentSuspended(boolean value);
+
+ boolean isMessageAssignmentSuspended();
+
+ void processPendingMessages();
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
index 6599f4443c..c17ee500b0 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
@@ -61,7 +61,7 @@ public class NonBlockingSenderReceiver implements ByteBufferSender
private final String _remoteSocketAddress;
private final AtomicBoolean _closed = new AtomicBoolean(false);
- private final ServerProtocolEngine _receiver;
+ private final ServerProtocolEngine _protocolEngine;
private final int _receiveBufSize;
private final Ticker _ticker;
private final Set<TransportEncryption> _encryptionSet;
@@ -81,7 +81,7 @@ public class NonBlockingSenderReceiver implements ByteBufferSender
public NonBlockingSenderReceiver(final NonBlockingConnection connection,
- ServerProtocolEngine receiver,
+ ServerProtocolEngine protocolEngine,
int receiveBufSize,
Ticker ticker,
final Set<TransportEncryption> encryptionSet,
@@ -94,7 +94,7 @@ public class NonBlockingSenderReceiver implements ByteBufferSender
{
_connection = connection;
_socketChannel = connection.getSocketChannel();
- _receiver = receiver;
+ _protocolEngine = protocolEngine;
_receiveBufSize = receiveBufSize;
_ticker = ticker;
_encryptionSet = encryptionSet;
@@ -170,15 +170,22 @@ public class NonBlockingSenderReceiver implements ByteBufferSender
_ticker.tick(currentTime);
}
- _receiver.setTransportBlockedForWriting(!doWrite());
+ _protocolEngine.setMessageAssignmentSuspended(true);
+
+ _protocolEngine.processPendingMessages();
+
+ _protocolEngine.setTransportBlockedForWriting(!doWrite());
boolean dataRead = doRead();
_fullyWritten = doWrite();
- _receiver.setTransportBlockedForWriting(!_fullyWritten);
+ _protocolEngine.setTransportBlockedForWriting(!_fullyWritten);
if(dataRead || (_workDone && _netInputBuffer != null && _netInputBuffer.position() != 0))
{
_stateChanged.set(true);
}
+
+ // tell all consumer targets that it is okay to accept more
+ _protocolEngine.setMessageAssignmentSuspended(false);
}
catch (IOException e)
{
@@ -213,7 +220,7 @@ public class NonBlockingSenderReceiver implements ByteBufferSender
}
LOGGER.debug("Closing receiver");
- _receiver.closed();
+ _protocolEngine.closed();
try
{
@@ -373,7 +380,7 @@ public class NonBlockingSenderReceiver implements ByteBufferSender
ByteBuffer dup = _currentBuffer.duplicate();
dup.flip();
_currentBuffer = _currentBuffer.slice();
- _receiver.received(dup);
+ _protocolEngine.received(dup);
}
}
else if(_transportEncryption == TransportEncryption.TLS)
@@ -414,7 +421,7 @@ public class NonBlockingSenderReceiver implements ByteBufferSender
{
readData = true;
}
- _receiver.received(appInputBuffer);
+ _protocolEngine.received(appInputBuffer);
}
while(unwrapped > 0 || tasksRun);
@@ -451,7 +458,7 @@ public class NonBlockingSenderReceiver implements ByteBufferSender
if (_transportEncryption == TransportEncryption.NONE)
{
- _receiver.received(_netInputBuffer);
+ _protocolEngine.received(_netInputBuffer);
}
else
{
diff --git a/qpid/java/test-profiles/JavaExcludes b/qpid/java/test-profiles/JavaExcludes
index b52987f369..9f88879ff2 100644
--- a/qpid/java/test-profiles/JavaExcludes
+++ b/qpid/java/test-profiles/JavaExcludes
@@ -27,3 +27,5 @@ org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy
//QPID-4153 Messages causing a runtime selector error should be dead-lettered (or something similar)
org.apache.qpid.test.client.message.SelectorTest#testRuntimeSelectorError
+
+org.apache.qpid.server.protocol.v0_8.AckTest