summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins/amqp-1-0-protocol/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-plugins/amqp-1-0-protocol/src')
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java86
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java55
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java2
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java112
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java5
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java31
6 files changed, 252 insertions, 39 deletions
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index 8e24d55da0..b515fda4a7 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -30,7 +30,9 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.security.auth.Subject;
@@ -51,6 +53,7 @@ import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.protocol.SessionModelListener;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
@@ -64,6 +67,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
private final AmqpPort<?> _port;
private final Broker<?> _broker;
private final SubjectCreator _subjectCreator;
+ private final ProtocolEngine_1_0_0_SASL _protocolEngine;
private VirtualHostImpl _vhost;
private final Transport _transport;
private final ConnectionEndpoint _conn;
@@ -98,15 +102,24 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
private List<Action<? super Connection_1_0>> _closeTasks =
Collections.synchronizedList(new ArrayList<Action<? super Connection_1_0>>());
+
+ private final Queue<Action<? super Connection_1_0>> _asyncTaskList =
+ new ConcurrentLinkedQueue<>();
+
+
private boolean _closedOnOpen;
+
public Connection_1_0(Broker<?> broker,
ConnectionEndpoint conn,
long connectionId,
AmqpPort<?> port,
- Transport transport, final SubjectCreator subjectCreator)
+ Transport transport,
+ final SubjectCreator subjectCreator,
+ final ProtocolEngine_1_0_0_SASL protocolEngine)
{
+ _protocolEngine = protocolEngine;
_broker = broker;
_port = port;
_transport = transport;
@@ -207,6 +220,13 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
_closeTasks.add( task );
}
+ private void addAsyncTask(final Action<Connection_1_0> action)
+ {
+ _asyncTaskList.add(action);
+ notifyWork();
+ }
+
+
public void closeReceived()
{
Collection<Session_1_0> sessions = new ArrayList(_sessions);
@@ -245,9 +265,19 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
@Override
- public void close(AMQConstant cause, String message)
+ public void closeAsync(AMQConstant cause, String message)
{
- _conn.close();
+ Action<Connection_1_0> action = new Action<Connection_1_0>()
+ {
+ @Override
+ public void performAction(final Connection_1_0 object)
+ {
+ _conn.close();
+
+ }
+ };
+ addAsyncTask(action);
+
}
@Override
@@ -263,9 +293,16 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
}
@Override
- public void closeSession(Session_1_0 session, AMQConstant cause, String message)
+ public void closeSessionAsync(final Session_1_0 session, final AMQConstant cause, final String message)
{
- session.close(cause, message);
+ addAsyncTask(new Action<Connection_1_0>()
+ {
+ @Override
+ public void performAction(final Connection_1_0 object)
+ {
+ session.close(cause, message);
+ }
+ });
}
@Override
@@ -363,6 +400,11 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
return _port;
}
+ public ServerProtocolEngine getProtocolEngine()
+ {
+ return _protocolEngine;
+ }
+
@Override
public Transport getTransport()
{
@@ -480,4 +522,38 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
}
+ public void transportStateChanged()
+ {
+ for (Session_1_0 session : _sessions)
+ {
+ session.transportStateChanged();
+ }
+ }
+
+ @Override
+ public void notifyWork()
+ {
+ _protocolEngine.notifyWork();
+ }
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ return _protocolEngine.isMessageAssignmentSuspended();
+ }
+
+ public void processPending()
+ {
+ while(_asyncTaskList.peek() != null)
+ {
+ Action<? super Connection_1_0> asyncAction = _asyncTaskList.poll();
+ asyncAction.performAction(this);
+ }
+
+ for (AMQSessionModel session : getSessionModels())
+ {
+ session.processPending();
+ }
+
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 3b9521866c..fa2e543f8d 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -40,6 +40,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.Released;
import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
@@ -83,9 +84,10 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
return _link.getEndpoint();
}
- public boolean isSuspended()
+ @Override
+ public boolean doIsSuspended()
{
- return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE;// || !getEndpoint().hasCreditToSend();
+ return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE;
}
@@ -113,22 +115,10 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
}
}
- public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
- {
- // TODO
- long size = entry.getMessage().getSize();
- send(entry);
- return size;
- }
-
- public void flushBatched()
+ public void doSend(final ConsumerImpl consumer, final MessageInstance entry, boolean batch)
{
// TODO
- }
-
- public void send(final MessageInstance queueEntry)
- {
- ServerMessage serverMessage = queueEntry.getMessage();
+ ServerMessage serverMessage = entry.getMessage();
Message_1_0 message;
if(serverMessage instanceof Message_1_0)
{
@@ -168,7 +158,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
payload.flip();
}
- if(queueEntry.getDeliveryCount() != 0)
+ if(entry.getDeliveryCount() != 0)
{
payload = payload.duplicate();
ValueHandler valueHandler = new ValueHandler(_typeRegistry);
@@ -200,7 +190,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
header.setPriority(oldHeader.getPriority());
header.setTtl(oldHeader.getTtl());
}
- header.setDeliveryCount(UnsignedInteger.valueOf(queueEntry.getDeliveryCount()));
+ header.setDeliveryCount(UnsignedInteger.valueOf(entry.getDeliveryCount()));
_sectionEncoder.reset();
_sectionEncoder.encodeObject(header);
Binary encodedHeader = _sectionEncoder.getEncoding();
@@ -230,10 +220,10 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
else
{
UnsettledAction action = _acquires
- ? new DispositionAction(tag, queueEntry)
- : new DoNothingAction(tag, queueEntry);
+ ? new DispositionAction(tag, entry)
+ : new DoNothingAction(tag, entry);
- _link.addUnsettled(tag, action, queueEntry);
+ _link.addUnsettled(tag, action, entry);
}
if(_transactionId != null)
@@ -257,9 +247,9 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
public void onRollback()
{
- if(queueEntry.isAcquiredBy(getConsumer()))
+ if(entry.isAcquiredBy(getConsumer()))
{
- queueEntry.release();
+ entry.release();
_link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
@@ -274,12 +264,17 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
}
else
{
- queueEntry.release();
+ entry.release();
}
}
}
+ public void flushBatched()
+ {
+ // TODO
+ }
+
public void queueDeleted()
{
//TODO
@@ -296,7 +291,9 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
{
synchronized (_link.getLock())
{
- final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend();
+
+ ServerProtocolEngine protocolEngine = getSession().getConnection().getProtocolEngine();
+ final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend() && !protocolEngine.isTransportBlockedForWriting();
if(!hasCredit && getState() == State.ACTIVE)
{
suspend();
@@ -336,7 +333,8 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
{
synchronized(_link.getLock())
{
- if(isSuspended() && getEndpoint() != null)
+ ServerProtocolEngine protocolEngine = getSession().getConnection().getProtocolEngine();
+ if(isSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting())
{
updateState(State.SUSPENDED, State.ACTIVE);
_transactionId = _link.getTransactionId();
@@ -544,4 +542,9 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
return 0;
}
+ @Override
+ protected void processClosed()
+ {
+
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
index fa8134cb55..e72dc17b57 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.protocol.v1_0;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
index 740b01e459..a0f10eee65 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
@@ -29,6 +29,8 @@ import java.security.PrivilegedAction;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
import javax.security.sasl.SaslException;
@@ -52,14 +54,18 @@ import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.common.ServerPropertyNames;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -79,6 +85,9 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
private long _createTime = System.currentTimeMillis();
private ConnectionEndpoint _endpoint;
private long _connectionId;
+ private final AtomicBoolean _stateChanged = new AtomicBoolean();
+ private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>();
+
private static final ByteBuffer HEADER =
ByteBuffer.wrap(new byte[]
@@ -116,8 +125,9 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
private byte _revision;
private PrintWriter _out;
private NetworkConnection _network;
- private Sender<ByteBuffer> _sender;
+ private ByteBufferSender _sender;
private Connection_1_0 _connection;
+ private volatile boolean _transportBlockedForWriting;
static enum State {
@@ -134,6 +144,10 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
private State _state = State.A;
+ private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>();
+
+
+
public ProtocolEngine_1_0_0_SASL(final NetworkConnection networkDriver, final Broker<?> broker,
long id, AmqpPort<?> port, Transport transport)
@@ -149,6 +163,31 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
}
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ Thread lock = _messageAssignmentSuspended.get();
+ return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread();
+ }
+
+ @Override
+ public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
+ {
+ _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);
+
+ if(!messageAssignmentSuspended)
+ {
+ for(AMQSessionModel<?,?> session : _connection.getSessionModels())
+ {
+ for(Consumer<?> consumer : session.getConsumers())
+ {
+ ((ConsumerImpl)consumer).getTarget().notifyCurrentState();
+ }
+ }
+ }
+ }
+
+
public SocketAddress getRemoteAddress()
{
return _network.getRemoteAddress();
@@ -179,7 +218,12 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
//Todo
}
- public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender)
+ @Override
+ public void encryptedTransport()
+ {
+ }
+
+ public void setNetworkConnection(final NetworkConnection network, final ByteBufferSender sender)
{
_network = network;
_sender = sender;
@@ -211,7 +255,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
_endpoint.setProperties(serverProperties);
_endpoint.setRemoteAddress(getRemoteAddress());
- _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport, subjectCreator);
+ _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport, subjectCreator, this);
_endpoint.setConnectionEventListener(_connection);
_endpoint.setFrameOutputHandler(this);
@@ -524,6 +568,8 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
}
+
+
public void close()
{
_sender.close();
@@ -554,4 +600,60 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
{
return _lastWriteTime;
}
+
+ @Override
+ public boolean isTransportBlockedForWriting()
+ {
+ return _transportBlockedForWriting;
+ }
+ @Override
+ public void setTransportBlockedForWriting(final boolean blocked)
+ {
+ _transportBlockedForWriting = blocked;
+ _connection.transportStateChanged();
+
+ }
+
+ public void flushBatched()
+ {
+ _sender.flush();
+ }
+
+ @Override
+ public void processPending()
+ {
+ _connection.processPending();
+
+ }
+
+ @Override
+ public boolean hasWork()
+ {
+ return _stateChanged.get();
+ }
+
+ @Override
+ public void notifyWork()
+ {
+ _stateChanged.set(true);
+
+ final Action<ServerProtocolEngine> listener = _workListener.get();
+ if(listener != null)
+ {
+ listener.performAction(this);
+ }
+ }
+
+ @Override
+ public void clearWork()
+ {
+ _stateChanged.set(false);
+ }
+
+ @Override
+ public void setWorkListener(final Action<ServerProtocolEngine> listener)
+ {
+ _workListener.set(listener);
+ }
+
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index c952a3c585..fe36ba91cb 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -729,4 +729,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
return _consumer;
}
+
+ public ConsumerTarget_1_0 getConsumerTarget()
+ {
+ return _target;
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index b9ee0ad498..2a49e812f5 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -109,6 +109,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
private final Subject _subject = new Subject();
private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>();
+ private final CopyOnWriteArrayList<SendingLink_1_0> _sendingLinks = new CopyOnWriteArrayList<>();
private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
private Session<?> _modelObject;
@@ -211,7 +212,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
);
sendingLinkEndpoint.setLinkEventListener(new SubjectSpecificSendingLinkListener(sendingLink));
- registerConsumer(sendingLink.getConsumer());
+ registerConsumer(sendingLink);
link = sendingLink;
if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable()))
@@ -412,12 +413,14 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
}
}
- private void registerConsumer(final ConsumerImpl consumer)
+ private void registerConsumer(final SendingLink_1_0 link)
{
+ ConsumerImpl consumer = link.getConsumer();
if(consumer instanceof Consumer<?>)
{
Consumer<?> modelConsumer = (Consumer<?>) consumer;
_consumers.add(modelConsumer);
+ _sendingLinks.add(link);
modelConsumer.addChangeListener(_consumerClosedListener);
consumerAdded(modelConsumer);
}
@@ -615,6 +618,20 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
}
@Override
+ public void transportStateChanged()
+ {
+ for(SendingLink_1_0 link : _sendingLinks)
+ {
+ ConsumerTarget_1_0 target = link.getConsumerTarget();
+ target.flowStateChanged();
+
+
+ }
+
+
+ }
+
+ @Override
public LogSubject getLogSubject()
{
return this;
@@ -883,6 +900,16 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
return 0L;
}
+ @Override
+ public void processPending()
+ {
+ for(Consumer<?> consumer : getConsumers())
+ {
+
+ ((ConsumerImpl)consumer).getTarget().processPending();
+ }
+ }
+
private void consumerAdded(Consumer<?> consumer)
{
for(ConsumerListener l : _consumerListeners)