diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 322 |
1 files changed, 189 insertions, 133 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 25562cfff7..44c4e8987a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -70,7 +70,6 @@ import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.AMQInvalidRoutingKeyException; -import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverNoopSupport; @@ -88,8 +87,6 @@ import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; @@ -97,7 +94,10 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.jms.Session; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.thread.Threading; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.TransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -213,8 +213,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic */ protected final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true")); - protected final boolean DEFAULT_WAIT_ON_SEND = Boolean.parseBoolean(System.getProperty("qpid.default_wait_on_send", "false")); - /** * The period to wait while flow controlled before sending a log message confirming that the session is still * waiting on flow control being revoked @@ -364,7 +362,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * Set when recover is called. This is to handle the case where recover() is called by application code during * onMessage() processing to ensure that an auto ack is not sent. */ - private boolean _inRecovery; + private volatile boolean _sessionInRecovery; + + /** + * Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of + * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover(). + */ + private volatile boolean _usingDispatcherForCleanup; /** Used to indicates that the connection to which this session belongs, has been stopped. */ private boolean _connectionStopped; @@ -606,29 +610,36 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * Acknowledges all unacknowledged messages on the session, for all message consumers on the session. * * @throws IllegalStateException If the session is closed. + * @throws JMSException if there is a problem during acknowledge process. */ - public void acknowledge() throws IllegalStateException + public void acknowledge() throws IllegalStateException, JMSException { if (isClosed()) { throw new IllegalStateException("Session is already closed"); } - else if (hasFailedOver()) + else if (hasFailedOverDirty()) { + //perform an implicit recover in this scenario + recover(); + + //notify the consumer throw new IllegalStateException("has failed over"); } - while (true) + try { - Long tag = _unacknowledgedMessageTags.poll(); - if (tag == null) - { - break; - } - acknowledgeMessage(tag, false); + acknowledgeImpl(); + markClean(); + } + catch (TransportException e) + { + throw toJMSException("Exception while acknowledging message(s):" + e.getMessage(), e); } } + protected abstract void acknowledgeImpl() throws JMSException; + /** * Acknowledge one or many messages. * @@ -763,6 +774,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _logger.debug( "Got FailoverException during channel close, ignored as channel already marked as closed."); } + catch (TransportException e) + { + throw toJMSException("Error closing session:" + e.getMessage(), e); + } finally { _connection.deregisterSession(_channelId); @@ -833,51 +848,40 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does * not mean that the commit is known to have failed, merely that it is not known whether it * failed or not. - * @todo Be aware of possible changes to parameter order as versions change. */ public void commit() throws JMSException { checkTransacted(); - try + //Check that we are clean to commit. + if (_failedOverDirty) { - //Check that we are clean to commit. - if (_failedOverDirty) - { - rollback(); + rollback(); - throw new TransactionRolledBackException("Connection failover has occured since last send. " + - "Forced rollback"); - } - - - // Acknowledge all delivered messages - while (true) - { - Long tag = _deliveredMessageTags.poll(); - if (tag == null) - { - break; - } + throw new TransactionRolledBackException("Connection failover has occured with uncommitted transaction activity." + + "The session transaction was rolled back."); + } - acknowledgeMessage(tag, false); - } - // Commits outstanding messages and acknowledgments - sendCommit(); + try + { + commitImpl(); markClean(); } catch (AMQException e) { - throw new JMSAMQException("Failed to commit: " + e.getMessage() + ":" + e.getCause(), e); + throw new JMSAMQException("Exception during commit: " + e.getMessage() + ":" + e.getCause(), e); } catch (FailoverException e) { throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e); } + catch(TransportException e) + { + throw toJMSException("Session exception occured while trying to commit: " + e.getMessage(), e); + } } - public abstract void sendCommit() throws AMQException, FailoverException; - + protected abstract void commitImpl() throws AMQException, FailoverException, TransportException; public void confirmConsumerCancelled(int consumerTag) { @@ -955,7 +959,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector); } - public MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal) + protected MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { checkValidDestination(destination); @@ -969,15 +973,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic checkValidDestination(destination); return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null, - ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false); - } - - public C createExclusiveConsumer(Destination destination) throws JMSException - { - checkValidDestination(destination); - - return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, true, null, null, - ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false); + isBrowseOnlyDestination(destination), false); } public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException @@ -985,7 +981,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic checkValidDestination(destination); return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), - messageSelector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false); + messageSelector, null, isBrowseOnlyDestination(destination), false); } public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) @@ -994,16 +990,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic checkValidDestination(destination); return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic), - messageSelector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false); - } - - public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal) - throws JMSException - { - checkValidDestination(destination); - - return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, true, - messageSelector, null, false, false); + messageSelector, null, isBrowseOnlyDestination(destination), false); } public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive, @@ -1011,23 +998,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkValidDestination(destination); - return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false); + return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, isBrowseOnlyDestination(destination), false); } public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, - boolean exclusive, String selector) throws JMSException + boolean exclusive, String selector) throws JMSException { checkValidDestination(destination); - return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false); - } - - public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive, - String selector, FieldTable rawSelector) throws JMSException - { - checkValidDestination(destination); - - return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, rawSelector, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false); + return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, isBrowseOnlyDestination(destination), false); } public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, @@ -1035,7 +1014,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkValidDestination(destination); - return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), + return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, isBrowseOnlyDestination(destination), false); } @@ -1071,6 +1050,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic ex.setLinkedException(e); throw ex; } + catch(TransportException e) + { + throw toJMSException("Error when verifying destination", e); + } } String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector; @@ -1156,6 +1139,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic return subscriber; } + catch (TransportException e) + { + throw toJMSException("Exception while creating durable subscriber:" + e.getMessage(), e); + } finally { _subscriberDetails.unlock(); @@ -1216,12 +1203,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic return createProducerImpl(destination, mandatory, immediate); } - public P createProducer(Destination destination, boolean mandatory, boolean immediate, - boolean waitUntilSent) throws JMSException - { - return createProducerImpl(destination, mandatory, immediate, waitUntilSent); - } - public TopicPublisher createPublisher(Topic topic) throws JMSException { checkNotClosed(); @@ -1405,7 +1386,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkNotClosed(); - // return (QueueSender) createProducer(queue); return new QueueSenderAdapter(createProducer(queue), queue); } @@ -1440,10 +1420,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public TopicSubscriber createSubscriber(Topic topic) throws JMSException { checkNotClosed(); - Topic dest = checkValidTopic(topic); + checkValidTopic(topic); - // AMQTopic dest = new AMQTopic(topic.getTopicName()); - return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest)); + return new TopicSubscriberAdaptor<C>(topic, + createConsumerImpl(topic, _prefetchHighMark, _prefetchLowMark, false, true, null, null, false, false)); } /** @@ -1460,10 +1440,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { checkNotClosed(); - Topic dest = checkValidTopic(topic); + checkValidTopic(topic); - // AMQTopic dest = new AMQTopic(topic.getTopicName()); - return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest, messageSelector, noLocal)); + return new TopicSubscriberAdaptor<C>(topic, + createConsumerImpl(topic, _prefetchHighMark, _prefetchLowMark, noLocal, + true, messageSelector, null, false, false)); } public TemporaryQueue createTemporaryQueue() throws JMSException @@ -1565,10 +1546,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic abstract public void sync() throws AMQException; - public int getAcknowledgeMode() throws JMSException + public int getAcknowledgeMode() { - checkNotClosed(); - return _acknowledgeMode; } @@ -1628,10 +1607,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic return _ticket; } - public boolean getTransacted() throws JMSException + public boolean getTransacted() { - checkNotClosed(); - return _transacted; } @@ -1727,13 +1704,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // Ensure that the session is not transacted. checkNotTransacted(); - // flush any acks we are holding in the buffer. - flushAcknowledgments(); - - // this is set only here, and the before the consumer's onMessage is called it is set to false - _inRecovery = true; + try { + // flush any acks we are holding in the buffer. + flushAcknowledgments(); + + // this is only set true here, and only set false when the consumers preDeliver method is called + _sessionInRecovery = true; boolean isSuspended = isSuspended(); @@ -1741,9 +1719,18 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { suspendChannel(true); } - + + // Set to true to short circuit delivery of anything currently + //in the pre-dispatch queue. + _usingDispatcherForCleanup = true; + syncDispatchQueue(); - + + // Set to false before sending the recover as 0-8/9/9-1 will + //send messages back before the recover completes, and we + //probably shouldn't clean those! ;-) + _usingDispatcherForCleanup = false; + if (_dispatcher != null) { _dispatcher.recover(); @@ -1752,10 +1739,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic sendRecover(); markClean(); - - // Set inRecovery to false before you start message flow again again. - _inRecovery = false; - + if (!isSuspended) { suspendChannel(false); @@ -1769,7 +1753,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { throw new JMSAMQException("Recovery was interrupted by fail-over. Recovery status is not known.", e); } - + catch(TransportException e) + { + throw toJMSException("Recover failed: " + e.getMessage(), e); + } } protected abstract void sendRecover() throws AMQException, FailoverException; @@ -1854,6 +1841,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { throw new JMSAMQException("Fail-over interrupted rollback. Status of the rollback is uncertain.", e); } + catch (TransportException e) + { + throw toJMSException("Failure to rollback:" + e.getMessage(), e); + } } } @@ -1900,7 +1891,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic */ public void unsubscribe(String name) throws JMSException { - unsubscribe(name, false); + try + { + unsubscribe(name, false); + } + catch (TransportException e) + { + throw toJMSException("Exception while unsubscribing:" + e.getMessage(), e); + } } /** @@ -1977,6 +1975,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkTemporaryDestination(destination); + if(!noConsume && isBrowseOnlyDestination(destination)) + { + throw new InvalidDestinationException("The consumer being created is not 'noConsume'," + + "but a 'browseOnly' Destination has been supplied."); + } + final String messageSelector; if (_strictAMQP && !((selector == null) || selector.equals(""))) @@ -2021,8 +2025,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // argument, as specifying null for the arguments when querying means they should not be checked at all ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector); - C consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow, - noLocal, exclusive, messageSelector, ft, noConsume, autoClose); + C consumer; + try + { + consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow, + noLocal, exclusive, messageSelector, ft, noConsume, autoClose); + } + catch(TransportException e) + { + throw toJMSException("Exception while creating consumer: " + e.getMessage(), e); + } if (_messageListener != null) { @@ -2059,7 +2071,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic ex.initCause(e); throw ex; } - + catch (TransportException e) + { + throw toJMSException("Exception while registering consumer:" + e.getMessage(), e); + } return consumer; } }, _connection).execute(); @@ -2124,7 +2139,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic boolean isInRecovery() { - return _inRecovery; + return _sessionInRecovery; } boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName) throws JMSException @@ -2246,7 +2261,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic void setInRecovery(boolean inRecovery) { - _inRecovery = inRecovery; + _sessionInRecovery = inRecovery; } boolean isStarted() @@ -2585,15 +2600,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public abstract void sendConsume(C consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException; - private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate) + private P createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate) throws JMSException { - return createProducerImpl(destination, mandatory, immediate, DEFAULT_WAIT_ON_SEND); - } - - private P createProducerImpl(final Destination destination, final boolean mandatory, - final boolean immediate, final boolean waitUntilSent) throws JMSException - { return new FailoverRetrySupport<P, JMSException>( new FailoverProtectedOperation<P, JMSException>() { @@ -2601,8 +2610,18 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkNotClosed(); long producerId = getNextProducerId(); - P producer = createMessageProducer(destination, mandatory, - immediate, waitUntilSent, producerId); + + P producer; + try + { + producer = createMessageProducer(destination, mandatory, + immediate, producerId); + } + catch (TransportException e) + { + throw toJMSException("Exception while creating producer:" + e.getMessage(), e); + } + registerProducer(producerId, producer); return producer; @@ -2611,7 +2630,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } public abstract P createMessageProducer(final Destination destination, final boolean mandatory, - final boolean immediate, final boolean waitUntilSent, long producerId) throws JMSException; + final boolean immediate, final long producerId) throws JMSException; private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException { @@ -2754,6 +2773,21 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } + /** + * Undeclares the specified temporary queue/topic. + * + * <p/>Note that this operation automatically retries in the event of fail-over. + * + * @param amqQueue The name of the temporary destination to delete. + * + * @throws JMSException If the queue could not be deleted for any reason. + * @todo Be aware of possible changes to parameter order as versions change. + */ + protected void deleteTemporaryDestination(final TemporaryDestination amqQueue) throws JMSException + { + deleteQueue(amqQueue.getAMQQueueName()); + } + public abstract void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException; private long getNextProducerId() @@ -3009,6 +3043,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { throw new AMQException(null, "Fail-over interrupted suspend/unsuspend channel.", e); } + catch (TransportException e) + { + throw new AMQException(AMQConstant.getConstant(getErrorCode(e)), e.getMessage(), e); + } } } @@ -3047,21 +3085,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * * @return boolean true if failover has occured. */ - public boolean hasFailedOver() + public boolean hasFailedOverDirty() { return _failedOverDirty; } - /** - * Check to see if any message have been sent in this transaction and have not been commited. - * - * @return boolean true if a message has been sent but not commited - */ - public boolean isDirty() - { - return _dirty; - } - public void setTicket(int ticket) { _ticket = ticket; @@ -3325,7 +3353,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { rejectMessage(message, true); } - else if (isInRecovery()) + else if (_usingDispatcherForCleanup) { _unacknowledgedMessageTags.add(deliveryTag); } @@ -3486,4 +3514,32 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { return DECLARE_EXCHANGES; } + + JMSException toJMSException(String message, TransportException e) + { + int code = getErrorCode(e); + JMSException jmse = new JMSException(message, Integer.toString(code)); + jmse.setLinkedException(e); + jmse.initCause(e); + return jmse; + } + + private int getErrorCode(TransportException e) + { + int code = AMQConstant.INTERNAL_ERROR.getCode(); + if (e instanceof SessionException) + { + SessionException se = (SessionException) e; + if(se.getException() != null && se.getException().getErrorCode() != null) + { + code = se.getException().getErrorCode().getValue(); + } + } + return code; + } + + private boolean isBrowseOnlyDestination(Destination destination) + { + return ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()); + } } |