diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 152 |
1 files changed, 114 insertions, 38 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 2e3e417c95..dd9a00ce10 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -21,7 +21,6 @@ package org.apache.qpid.client; import java.io.Serializable; -import java.io.IOException; import java.net.URISyntaxException; import java.text.MessageFormat; import java.util.ArrayList; @@ -60,6 +59,7 @@ import javax.jms.Topic; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; +import javax.jms.TransactionRolledBackException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; @@ -113,7 +113,6 @@ import org.slf4j.LoggerFactory; public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession { - public static final class IdToConsumerMap<C extends BasicMessageConsumer> { private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; @@ -198,16 +197,32 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * The default value for immediate flag used by producers created by this session is false. That is, a consumer does * not need to be attached to a queue. */ - protected static final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false")); + protected final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false")); /** * The default value for mandatory flag used by producers created by this session is true. That is, server will not * silently drop messages where no queue is connected to the exchange for the message. */ - protected static final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true")); + protected final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true")); + + protected final boolean DEFAULT_WAIT_ON_SEND = Boolean.parseBoolean(System.getProperty("qpid.default_wait_on_send", "false")); + + /** + * The period to wait while flow controlled before sending a log message confirming that the session is still + * waiting on flow control being revoked + */ + protected final long FLOW_CONTROL_WAIT_PERIOD = Long.getLong("qpid.flow_control_wait_notify_period",5000L); + + /** + * The period to wait while flow controlled before declaring a failure + */ + public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L; + protected final long FLOW_CONTROL_WAIT_FAILURE = Long.getLong("qpid.flow_control_wait_failure", + DEFAULT_FLOW_CONTROL_WAIT_FAILURE); protected final boolean DECLARE_QUEUES = Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true")); + protected final boolean DECLARE_EXCHANGES = Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true")); @@ -244,10 +259,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private int _ticket; /** Holds the high mark for prefetched message, at which the session is suspended. */ - private int _defaultPrefetchHighMark; + private int _prefetchHighMark; /** Holds the low mark for prefetched messages, below which the session is resumed. */ - private int _defaultPrefetchLowMark; + private int _prefetchLowMark; /** Holds the message listener, if any, which is attached to this session. */ private MessageListener _messageListener = null; @@ -428,13 +443,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _channelId = channelId; _messageFactoryRegistry = messageFactoryRegistry; - _defaultPrefetchHighMark = defaultPrefetchHighMark; - _defaultPrefetchLowMark = defaultPrefetchLowMark; + _prefetchHighMark = defaultPrefetchHighMark; + _prefetchLowMark = defaultPrefetchLowMark; if (_acknowledgeMode == NO_ACKNOWLEDGE) { _queue = - new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark, + new FlowControllingBlockingQueue(_prefetchHighMark, _prefetchLowMark, new FlowControllingBlockingQueue.ThresholdListener() { private final AtomicBoolean _suspendState = new AtomicBoolean(); @@ -442,7 +457,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public void aboveThreshold(int currentValue) { _logger.debug( - "Above threshold(" + _defaultPrefetchHighMark + "Above threshold(" + _prefetchHighMark + ") so suspending channel. Current value is " + currentValue); _suspendState.set(true); new Thread(new SuspenderRunner(_suspendState)).start(); @@ -452,7 +467,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public void underThreshold(int currentValue) { _logger.debug( - "Below threshold(" + _defaultPrefetchLowMark + "Below threshold(" + _prefetchLowMark + ") so unsuspending channel. Current value is " + currentValue); _suspendState.set(false); new Thread(new SuspenderRunner(_suspendState)).start(); @@ -462,7 +477,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } else { - _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, null); + _queue = new FlowControllingBlockingQueue(_prefetchHighMark, null); } } @@ -759,8 +774,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { + //Check that we are clean to commit. + if (_failedOverDirty) + { + rollback(); + + throw new TransactionRolledBackException("Connection failover has occured since last send. " + + "Forced rollback"); + } + - // TGM FIXME: what about failover? // Acknowledge all delivered messages while (true) { @@ -778,7 +801,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } catch (AMQException e) { - throw new JMSAMQException("Failed to commit: " + e.getMessage(), e); + throw new JMSAMQException("Failed to commit: " + e.getMessage() + ":" + e.getCause(), e); } catch (FailoverException e) { @@ -870,7 +893,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkValidDestination(destination); - return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, + return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, false, messageSelector, null, true, true); } @@ -878,7 +901,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkValidDestination(destination); - return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, (destination instanceof Topic), null, null, + return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null, false, false); } @@ -886,7 +909,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkValidDestination(destination); - return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, true, null, null, + return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, true, null, null, false, false); } @@ -894,7 +917,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkValidDestination(destination); - return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, (destination instanceof Topic), + return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), messageSelector, null, false, false); } @@ -903,7 +926,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkValidDestination(destination); - return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, (destination instanceof Topic), + return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic), messageSelector, null, false, false); } @@ -912,7 +935,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkValidDestination(destination); - return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, true, + return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, true, messageSelector, null, false, false); } @@ -1336,17 +1359,17 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public int getDefaultPrefetch() { - return _defaultPrefetchHighMark; + return _prefetchHighMark; } public int getDefaultPrefetchHigh() { - return _defaultPrefetchHighMark; + return _prefetchHighMark; } public int getDefaultPrefetchLow() { - return _defaultPrefetchLowMark; + return _prefetchLowMark; } public AMQShortString getDefaultQueueExchangeName() @@ -1491,6 +1514,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic sendRecover(); + markClean(); + if (!isSuspended) { suspendChannel(false); @@ -1559,6 +1584,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic suspendChannel(true); } + // Let the dispatcher know that all the incomming messages + // should be rolled back(reject/release) + _rollbackMark.set(_highestDeliveryTag.get()); + + syncDispatchQueue(); + + _dispatcher.rollback(); + releaseForRollback(); sendRollback(); @@ -1851,26 +1884,58 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic void failoverPrep() { - startDispatcherIfNecessary(); syncDispatchQueue(); } void syncDispatchQueue() { - final CountDownLatch signal = new CountDownLatch(1); - _queue.add(new Dispatchable() { - public void dispatch(AMQSession ssn) + if (Thread.currentThread() == _dispatcherThread) + { + while (!_closed.get() && !_queue.isEmpty()) { - signal.countDown(); + Dispatchable disp; + try + { + disp = (Dispatchable) _queue.take(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + + // Check just in case _queue becomes empty, it shouldn't but + // better than an NPE. + if (disp == null) + { + _logger.debug("_queue became empty during sync."); + break; + } + + disp.dispatch(AMQSession.this); } - }); - try - { - signal.await(); } - catch (InterruptedException e) + else { - throw new RuntimeException(e); + startDispatcherIfNecessary(); + + final CountDownLatch signal = new CountDownLatch(1); + + _queue.add(new Dispatchable() + { + public void dispatch(AMQSession ssn) + { + signal.countDown(); + } + }); + + try + { + signal.await(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } } } @@ -2233,7 +2298,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate) throws JMSException { - return createProducerImpl(destination, mandatory, immediate, false); + return createProducerImpl(destination, mandatory, immediate, DEFAULT_WAIT_ON_SEND); } private P createProducerImpl(final Destination destination, final boolean mandatory, @@ -2704,15 +2769,26 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public void setFlowControl(final boolean active) { _flowControl.setFlowControl(active); + _logger.warn("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced")); } - public void checkFlowControl() throws InterruptedException + public void checkFlowControl() throws InterruptedException, JMSException { + long expiryTime = 0L; synchronized (_flowControl) { - while (!_flowControl.getFlowControl()) + while (!_flowControl.getFlowControl() && + (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE) + : expiryTime) >= System.currentTimeMillis() ) + { + + _flowControl.wait(FLOW_CONTROL_WAIT_PERIOD); + _logger.warn("Message send delayed by " + (System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE - expiryTime)/1000 + "s due to broker enforced flow control"); + } + if(!_flowControl.getFlowControl()) { - _flowControl.wait(); + _logger.error("Message send failed due to timeout waiting on broker enforced flow control"); + throw new JMSException("Unable to send message for " + FLOW_CONTROL_WAIT_FAILURE/1000 + " seconds due to broker enforced flow control"); } } |