diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 113 |
1 files changed, 42 insertions, 71 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index fee6690538..a0b79b135d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -24,6 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.AMQInvalidRoutingKeyException; import org.apache.qpid.AMQUndeliveredException; +import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverNoopSupport; import org.apache.qpid.client.failover.FailoverProtectedOperation; @@ -112,22 +113,20 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; /** - * * <p/><table id="crc"><caption>CRC Card</caption> * <tr><th> Responsibilities <th> Collaborations * <tr><td> * </table> * * @todo Different FailoverSupport implementation are needed on the same method call, in different situations. For - * example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second - * fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of - * the fail-over process, the retry handler could be used to automatically retry the operation once the connection - * has been reestablished. All fail-over protected operations should be placed in private methods, with - * FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the - * fail-over process sets a nowait flag and uses an async method call instead. - * + * example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second + * fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of + * the fail-over process, the retry handler could be used to automatically retry the operation once the connection + * has been reestablished. All fail-over protected operations should be placed in private methods, with + * FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the + * fail-over process sets a nowait flag and uses an async method call instead. * @todo Two new objects created on every failover supported method call. Consider more efficient ways of doing this, - * after looking at worse bottlenecks first. + * after looking at worse bottlenecks first. */ public class AMQSession extends Closeable implements Session, QueueSession, TopicSession { @@ -222,9 +221,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private final FlowControllingBlockingQueue _queue; - /** - * Holds the highest received delivery tag. - */ + /** Holds the highest received delivery tag. */ private final AtomicLong _highestDeliveryTag = new AtomicLong(-1); /** Holds the dispatcher thread for this session. */ @@ -236,9 +233,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** Holds all of the producers created by this session, keyed by their unique identifiers. */ private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>(); - /** - * Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume methods. - */ + /** Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume methods. */ private int _nextTag = 1; /** @@ -374,12 +369,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Creates a new session on a connection with the default message factory factory. * - * @param con The connection on which to create the session. - * @param channelId The unique identifier for the session. - * @param transacted Indicates whether or not the session is transactional. - * @param acknowledgeMode The acknoledgement mode for the session. - * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session. - * @param defaultPrefetchLow The number of prefetched messages at which to resume the session. + * @param con The connection on which to create the session. + * @param channelId The unique identifier for the session. + * @param transacted Indicates whether or not the session is transactional. + * @param acknowledgeMode The acknoledgement mode for the session. + * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session. + * @param defaultPrefetchLow The number of prefetched messages at which to resume the session. */ AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) @@ -402,12 +397,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public BytesMessage createBytesMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) - { - checkNotClosed(); - - return new JMSBytesMessage(); - } + checkNotClosed(); + return new JMSBytesMessage(); } /** @@ -462,9 +453,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param exchangeName The exchange to bind the queue on. * * @throws AMQException If the queue cannot be bound for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. - * * @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges? */ public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, @@ -501,14 +490,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param timeout The timeout in milliseconds to wait for the session close acknoledgement from the broker. * * @throws JMSException If the JMS provider fails to close the session due to some internal error. - * * @todo Be aware of possible changes to parameter order as versions change. - * * @todo Not certain about the logic of ignoring the failover exception, because the channel won't be - * re-opened. May need to examine this more carefully. - * + * re-opened. May need to examine this more carefully. * @todo Note that taking the failover mutex doesn't prevent this operation being interrupted by a failover, - * because the failover process sends the failover event before acquiring the mutex itself. + * because the failover process sends the failover event before acquiring the mutex itself. */ public void close(long timeout) throws JMSException { @@ -579,6 +565,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { synchronized (_connection.getFailoverMutex()) { + if (e instanceof AMQDisconnectedException) + { + if (_dispatcher != null) + { + // Failover failed and ain't coming back. Knife the dispatcher. + _dispatcher.interrupt(); + } + } synchronized (_messageDeliveryLock) { // An AMQException has an error code and message already and will be passed in when closure occurs as a @@ -610,7 +604,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does * not mean that the commit is known to have failed, merely that it is not known whether it * failed or not. - * * @todo Be aware of possible changes to parameter order as versions change. */ public void commit() throws JMSException @@ -861,12 +854,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public MapMessage createMapMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) - { - checkNotClosed(); - - return new JMSMapMessage(); - } + checkNotClosed(); + return new JMSMapMessage(); } public javax.jms.Message createMessage() throws JMSException @@ -876,12 +865,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public ObjectMessage createObjectMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) - { - checkNotClosed(); - - return (ObjectMessage) new JMSObjectMessage(); - } + checkNotClosed(); + return (ObjectMessage) new JMSObjectMessage(); } public ObjectMessage createObjectMessage(Serializable object) throws JMSException @@ -955,7 +940,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param exclusive Flag to indicate that the queue is exclusive to this client. * * @throws AMQException If the queue cannot be declared for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. */ public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable, @@ -1301,7 +1285,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * <li>Stop message delivery.</li> * <li>Mark all messages that might have been delivered but not acknowledged as "redelivered". * <li>Restart the delivery sequence including all unacknowledged messages that had been previously delivered. - * Redelivered messages do not have to be delivered in exactly their original delivery order.</li> + * Redelivered messages do not have to be delivered in exactly their original delivery order.</li> * </ul> * * <p/>If the recover operation is interrupted by a fail-over, between asking that the broker begin recovery and @@ -1311,7 +1295,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @throws JMSException If the JMS provider fails to stop and restart message delivery due to some internal error. * Not that this does not necessarily mean that the recovery has failed, but simply that it * is not possible to tell if it has or not. - * * @todo Be aware of possible changes to parameter order as versions change. */ public void recover() throws JMSException @@ -1402,7 +1385,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { if (_logger.isDebugEnabled()) { - _logger.debug("Rejecting delivery tag:" + deliveryTag); + _logger.debug("Rejecting delivery tag:" + deliveryTag + ":SessionHC:" + this.hashCode()); } AMQFrame basicRejectBody = @@ -1423,7 +1406,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @throws JMSException If the JMS provider fails to rollback the transaction due to some internal error. This does * not mean that the rollback is known to have failed, merely that it is not known whether it * failed or not. - * * @todo Be aware of possible changes to parameter order as versions change. */ public void rollback() throws JMSException @@ -1695,7 +1677,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @return <tt>true</tt> if the queue is bound to the exchange and routing key, <tt>false</tt> if not. * * @throws JMSException If the query fails for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. */ boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) @@ -1768,10 +1749,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * Starts the session, which ensures that it is not suspended and that its event dispatcher is running. * * @throws AMQException If the session cannot be started for any reason. - * * @todo This should be controlled by _stopped as it pairs with the stop method fixme or check the - * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages - * for each subsequent call to flow.. only need to do this if we have called stop. + * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages + * for each subsequent call to flow.. only need to do this if we have called stop. */ void start() throws AMQException { @@ -2138,7 +2118,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param nowait * * @throws AMQException If the exchange cannot be declared for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. */ private void declareExchange(final AMQShortString name, final AMQShortString type, @@ -2183,9 +2162,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * the client. * * @throws AMQException If the queue cannot be declared for any reason. - * * @todo Verify the destiation is valid or throw an exception. - * * @todo Be aware of possible changes to parameter order as versions change. */ private AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) @@ -2229,7 +2206,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param queueName The name of the queue to delete. * * @throws JMSException If the queue could not be deleted for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. */ private void deleteQueue(final AMQShortString queueName) throws JMSException @@ -2404,11 +2380,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _producers.put(new Long(producerId), producer); } - private void rejectAllMessages(boolean requeue) - { - rejectMessagesForConsumerTag(null, requeue); - } - /** * @param consumerTag The consumerTag to prune from queue or all if null * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ) @@ -2528,7 +2499,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * should be unsuspended. * * @throws AMQException If the session cannot be suspended for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. */ private void suspendChannel(boolean suspend) throws AMQException // , FailoverException @@ -2571,6 +2541,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private final Object _lock = new Object(); private final AtomicLong _rollbackMark = new AtomicLong(-1); + private String dispatcherID = "" + System.identityHashCode(this); public Dispatcher() { @@ -2606,6 +2577,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // Reject messages on pre-dispatch queue rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); + //Let the dispatcher deal with this when it gets to them. // closeConsumer consumer.markClosed(); @@ -2639,8 +2611,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } else { - // should perhaps clear the _SQ here. - // consumer._synchronousQueue.clear(); + // cClear the _SQ here. consumer.clearReceiveQueue(); } @@ -2756,14 +2727,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { if (consumer == null) { - _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" + _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ")" + "[" + message.getDeliverBody().deliveryTag + "] from queue " + message.getDeliverBody().consumerTag + " )without a handler - rejecting(requeue)..."); } else { - _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" - + message.getDeliverBody().deliveryTag + "] from queue " + " consumer(" + _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ") [" + + message.getDeliverBody().deliveryTag + "] from queue consumer(" + consumer.debugIdentity() + ") is closed rejecting(requeue)..."); } } |