summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java152
1 files changed, 114 insertions, 38 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 2e3e417c95..dd9a00ce10 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -21,7 +21,6 @@
package org.apache.qpid.client;
import java.io.Serializable;
-import java.io.IOException;
import java.net.URISyntaxException;
import java.text.MessageFormat;
import java.util.ArrayList;
@@ -60,6 +59,7 @@ import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
+import javax.jms.TransactionRolledBackException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
@@ -113,7 +113,6 @@ import org.slf4j.LoggerFactory;
public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession
{
-
public static final class IdToConsumerMap<C extends BasicMessageConsumer>
{
private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
@@ -198,16 +197,32 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* The default value for immediate flag used by producers created by this session is false. That is, a consumer does
* not need to be attached to a queue.
*/
- protected static final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
+ protected final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
/**
* The default value for mandatory flag used by producers created by this session is true. That is, server will not
* silently drop messages where no queue is connected to the exchange for the message.
*/
- protected static final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
+ protected final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
+
+ protected final boolean DEFAULT_WAIT_ON_SEND = Boolean.parseBoolean(System.getProperty("qpid.default_wait_on_send", "false"));
+
+ /**
+ * The period to wait while flow controlled before sending a log message confirming that the session is still
+ * waiting on flow control being revoked
+ */
+ protected final long FLOW_CONTROL_WAIT_PERIOD = Long.getLong("qpid.flow_control_wait_notify_period",5000L);
+
+ /**
+ * The period to wait while flow controlled before declaring a failure
+ */
+ public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L;
+ protected final long FLOW_CONTROL_WAIT_FAILURE = Long.getLong("qpid.flow_control_wait_failure",
+ DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
protected final boolean DECLARE_QUEUES =
Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true"));
+
protected final boolean DECLARE_EXCHANGES =
Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true"));
@@ -244,10 +259,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private int _ticket;
/** Holds the high mark for prefetched message, at which the session is suspended. */
- private int _defaultPrefetchHighMark;
+ private int _prefetchHighMark;
/** Holds the low mark for prefetched messages, below which the session is resumed. */
- private int _defaultPrefetchLowMark;
+ private int _prefetchLowMark;
/** Holds the message listener, if any, which is attached to this session. */
private MessageListener _messageListener = null;
@@ -428,13 +443,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_channelId = channelId;
_messageFactoryRegistry = messageFactoryRegistry;
- _defaultPrefetchHighMark = defaultPrefetchHighMark;
- _defaultPrefetchLowMark = defaultPrefetchLowMark;
+ _prefetchHighMark = defaultPrefetchHighMark;
+ _prefetchLowMark = defaultPrefetchLowMark;
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
_queue =
- new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
+ new FlowControllingBlockingQueue(_prefetchHighMark, _prefetchLowMark,
new FlowControllingBlockingQueue.ThresholdListener()
{
private final AtomicBoolean _suspendState = new AtomicBoolean();
@@ -442,7 +457,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void aboveThreshold(int currentValue)
{
_logger.debug(
- "Above threshold(" + _defaultPrefetchHighMark
+ "Above threshold(" + _prefetchHighMark
+ ") so suspending channel. Current value is " + currentValue);
_suspendState.set(true);
new Thread(new SuspenderRunner(_suspendState)).start();
@@ -452,7 +467,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void underThreshold(int currentValue)
{
_logger.debug(
- "Below threshold(" + _defaultPrefetchLowMark
+ "Below threshold(" + _prefetchLowMark
+ ") so unsuspending channel. Current value is " + currentValue);
_suspendState.set(false);
new Thread(new SuspenderRunner(_suspendState)).start();
@@ -462,7 +477,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
else
{
- _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, null);
+ _queue = new FlowControllingBlockingQueue(_prefetchHighMark, null);
}
}
@@ -759,8 +774,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
try
{
+ //Check that we are clean to commit.
+ if (_failedOverDirty)
+ {
+ rollback();
+
+ throw new TransactionRolledBackException("Connection failover has occured since last send. " +
+ "Forced rollback");
+ }
+
- // TGM FIXME: what about failover?
// Acknowledge all delivered messages
while (true)
{
@@ -778,7 +801,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
catch (AMQException e)
{
- throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
+ throw new JMSAMQException("Failed to commit: " + e.getMessage() + ":" + e.getCause(), e);
}
catch (FailoverException e)
{
@@ -870,7 +893,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false,
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, false,
messageSelector, null, true, true);
}
@@ -878,7 +901,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, (destination instanceof Topic), null, null,
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null,
false, false);
}
@@ -886,7 +909,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, true, null, null,
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, true, null, null,
false, false);
}
@@ -894,7 +917,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, (destination instanceof Topic),
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic),
messageSelector, null, false, false);
}
@@ -903,7 +926,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, (destination instanceof Topic),
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic),
messageSelector, null, false, false);
}
@@ -912,7 +935,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, true,
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, true,
messageSelector, null, false, false);
}
@@ -1336,17 +1359,17 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public int getDefaultPrefetch()
{
- return _defaultPrefetchHighMark;
+ return _prefetchHighMark;
}
public int getDefaultPrefetchHigh()
{
- return _defaultPrefetchHighMark;
+ return _prefetchHighMark;
}
public int getDefaultPrefetchLow()
{
- return _defaultPrefetchLowMark;
+ return _prefetchLowMark;
}
public AMQShortString getDefaultQueueExchangeName()
@@ -1491,6 +1514,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
sendRecover();
+ markClean();
+
if (!isSuspended)
{
suspendChannel(false);
@@ -1559,6 +1584,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
suspendChannel(true);
}
+ // Let the dispatcher know that all the incomming messages
+ // should be rolled back(reject/release)
+ _rollbackMark.set(_highestDeliveryTag.get());
+
+ syncDispatchQueue();
+
+ _dispatcher.rollback();
+
releaseForRollback();
sendRollback();
@@ -1851,26 +1884,58 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
void failoverPrep()
{
- startDispatcherIfNecessary();
syncDispatchQueue();
}
void syncDispatchQueue()
{
- final CountDownLatch signal = new CountDownLatch(1);
- _queue.add(new Dispatchable() {
- public void dispatch(AMQSession ssn)
+ if (Thread.currentThread() == _dispatcherThread)
+ {
+ while (!_closed.get() && !_queue.isEmpty())
{
- signal.countDown();
+ Dispatchable disp;
+ try
+ {
+ disp = (Dispatchable) _queue.take();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ // Check just in case _queue becomes empty, it shouldn't but
+ // better than an NPE.
+ if (disp == null)
+ {
+ _logger.debug("_queue became empty during sync.");
+ break;
+ }
+
+ disp.dispatch(AMQSession.this);
}
- });
- try
- {
- signal.await();
}
- catch (InterruptedException e)
+ else
{
- throw new RuntimeException(e);
+ startDispatcherIfNecessary();
+
+ final CountDownLatch signal = new CountDownLatch(1);
+
+ _queue.add(new Dispatchable()
+ {
+ public void dispatch(AMQSession ssn)
+ {
+ signal.countDown();
+ }
+ });
+
+ try
+ {
+ signal.await();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
}
}
@@ -2233,7 +2298,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
throws JMSException
{
- return createProducerImpl(destination, mandatory, immediate, false);
+ return createProducerImpl(destination, mandatory, immediate, DEFAULT_WAIT_ON_SEND);
}
private P createProducerImpl(final Destination destination, final boolean mandatory,
@@ -2704,15 +2769,26 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void setFlowControl(final boolean active)
{
_flowControl.setFlowControl(active);
+ _logger.warn("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced"));
}
- public void checkFlowControl() throws InterruptedException
+ public void checkFlowControl() throws InterruptedException, JMSException
{
+ long expiryTime = 0L;
synchronized (_flowControl)
{
- while (!_flowControl.getFlowControl())
+ while (!_flowControl.getFlowControl() &&
+ (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE)
+ : expiryTime) >= System.currentTimeMillis() )
+ {
+
+ _flowControl.wait(FLOW_CONTROL_WAIT_PERIOD);
+ _logger.warn("Message send delayed by " + (System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE - expiryTime)/1000 + "s due to broker enforced flow control");
+ }
+ if(!_flowControl.getFlowControl())
{
- _flowControl.wait();
+ _logger.error("Message send failed due to timeout waiting on broker enforced flow control");
+ throw new JMSException("Unable to send message for " + FLOW_CONTROL_WAIT_FAILURE/1000 + " seconds due to broker enforced flow control");
}
}