diff options
7 files changed, 538 insertions, 172 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index a0b79b135d..804c846572 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -20,11 +20,11 @@ */ package org.apache.qpid.client; +import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.AMQInvalidRoutingKeyException; import org.apache.qpid.AMQUndeliveredException; -import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverNoopSupport; import org.apache.qpid.client.failover.FailoverProtectedOperation; @@ -100,6 +100,7 @@ import javax.jms.Topic; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; +import javax.jms.TransactionRolledBackException; import java.io.Serializable; import java.text.MessageFormat; import java.util.ArrayList; @@ -293,6 +294,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private final boolean _strictAMQPFATAL; private final Object _messageDeliveryLock = new Object(); + /** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */ + private boolean _dirty; + /** Has failover occured on this session */ + private boolean _failedOver; + /** * Creates a new session on a connection. * @@ -610,30 +616,65 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkTransacted(); - try + new FailoverNoopSupport<Object, JMSException>(new FailoverProtectedOperation<Object, JMSException>() { - // Acknowledge up to message last delivered (if any) for each consumer. - // need to send ack for messages delivered to consumers so far - for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) + public Object execute() throws JMSException, FailoverException { - // Sends acknowledgement to server - i.next().acknowledgeLastDelivered(); - } + //Check that we are clean to commit. + if (_failedOver && _dirty) + { + rollback(); - // Commits outstanding messages sent and outstanding acknowledgements. - final AMQProtocolHandler handler = getProtocolHandler(); + throw new TransactionRolledBackException("Connection failover has occured since last send. " + + "Forced rollback"); + } - handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), - TxCommitOkBody.class); - } - catch (AMQException e) - { - throw new JMSAMQException("Failed to commit: " + e.getMessage(), e); - } - catch (FailoverException e) - { - throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e); - } + try + { + // Acknowledge up to message last delivered (if any) on this session. + // We only need to find the highest value and ack that as commit is session level. + Long lastTag = -1L; + + for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) + { +// i.next().acknowledgeLastDelivered(); +// } + + // get next acknowledgement to server + Long next = i.next().getLastDelivered(); + if (next != null && next > lastTag) + { + lastTag = next; + } + } + + if (lastTag != -1) + { + acknowledgeMessage(lastTag, true); + } + + // Commits outstanding messages sent and outstanding acknowledgements. + final AMQProtocolHandler handler = getProtocolHandler(); + + handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), + TxCommitOkBody.class); + + markClean(); + } + + catch (AMQException e) + { + throw new JMSAMQException("Failed to commit: " + e.getMessage(), e); + } + + catch (FailoverException e) + { + throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e); + } + + return null; + } + }, _connection).execute(); } public void confirmConsumerCancelled(AMQShortString consumerTag) @@ -1431,6 +1472,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); + markClean(); + if (!isSuspended) { suspendChannel(false); @@ -1731,6 +1774,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ void resubscribe() throws AMQException { + _failedOver = true; resubscribeProducers(); resubscribeConsumers(); } @@ -2532,6 +2576,41 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi return _messageDeliveryLock; } + /** + * Signifies that the session has pending sends to commit. + */ + public void markDirty() + { + _dirty = true; + } + + /** + * Signifies that the session has no pending sends to commit. + */ + public void markClean() + { + _dirty = false; + _failedOver = false; + } + + /** + * Check to see if failover has occured since the last call to markClean(commit or rollback). + * @return boolean true if failover has occured. + */ + public boolean hasFailedOver() + { + return _failedOver; + } + + /** + * Check to see if any message have been sent in this transaction and have not been commited. + * @return boolean true if a message has been sent but not commited + */ + public boolean isDirty() + { + return _dirty; + } + /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ private class Dispatcher extends Thread { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java b/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java new file mode 100644 index 0000000000..6723ef3fdd --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; + +/** + * AMQSessionDirtyException represents all failures to send data on a transacted session that is + * no longer in a state that the client expects. i.e. failover has occured so previously sent messages + * will not be part of the transaction. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Represent attempt to perform additional sends on a dirty session. + * </table> + */ +public class AMQSessionDirtyException extends AMQException +{ + public AMQSessionDirtyException(String msg) + { + super(AMQConstant.RESOURCE_ERROR, msg); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index ddaf0cfd93..4f8a3e5557 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -754,6 +754,30 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } + /** + * Acknowledge up to last message delivered (if any). Used when commiting. + * + * @return the lastDeliveryTag to acknowledge + */ + Long getLastDelivered() + { + if (!_receivedDeliveryTags.isEmpty()) + { + Long lastDeliveryTag = _receivedDeliveryTags.poll(); + + while (!_receivedDeliveryTags.isEmpty()) + { + lastDeliveryTag = _receivedDeliveryTags.poll(); + } + + assert _receivedDeliveryTags.isEmpty(); + + return lastDeliveryTag; + } + + return null; + } + /** Acknowledge up to last message delivered (if any). Used when commiting. */ void acknowledgeLastDelivered() { @@ -772,6 +796,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } + void notifyError(Throwable cause) { // synchronized (_closed) @@ -783,7 +808,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_closedStack != null) { _logger.trace(_consumerTag + " notifyError():" - + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); + + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); _logger.trace(_consumerTag + " previously" + _closedStack.toString()); } else @@ -904,7 +929,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_logger.isDebugEnabled()) { _logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)" - + "for consumer with tag:" + _consumerTag); + + "for consumer with tag:" + _consumerTag); } Long tag = _receivedDeliveryTags.poll(); @@ -934,7 +959,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_logger.isDebugEnabled()) { _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)" - + "for consumer with tag:" + _consumerTag); + + "for consumer with tag:" + _consumerTag); } Iterator iterator = _synchronousQueue.iterator(); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 0ee4882ec2..fb6e4aa9fd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -60,46 +60,30 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j private AMQConnection _connection; - /** - * If true, messages will not get a timestamp. - */ + /** If true, messages will not get a timestamp. */ private boolean _disableTimestamps; - /** - * Priority of messages created by this producer. - */ + /** Priority of messages created by this producer. */ private int _messagePriority; - /** - * Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution. - */ + /** Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution. */ private long _timeToLive; - /** - * Delivery mode used for this producer. - */ + /** Delivery mode used for this producer. */ private int _deliveryMode = DeliveryMode.PERSISTENT; - /** - * The Destination used for this consumer, if specified upon creation. - */ + /** The Destination used for this consumer, if specified upon creation. */ protected AMQDestination _destination; - /** - * Default encoding used for messages produced by this producer. - */ + /** Default encoding used for messages produced by this producer. */ private String _encoding; - /** - * Default encoding used for message produced by this producer. - */ + /** Default encoding used for message produced by this producer. */ private String _mimeType; private AMQProtocolHandler _protocolHandler; - /** - * True if this producer was created from a transacted session - */ + /** True if this producer was created from a transacted session */ private boolean _transacted; private int _channelId; @@ -112,9 +96,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j */ private long _producerId; - /** - * The session used to create this producer - */ + /** The session used to create this producer */ private AMQSession _session; private final boolean _immediate; @@ -128,8 +110,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0]; protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, - AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory, - boolean waitUntilSent) + AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory, + boolean waitUntilSent) { _connection = connection; _destination = destination; @@ -162,16 +144,16 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j // Note that the durable and internal arguments are ignored since passive is set to false // TODO: Be aware of possible changes to parameter order as versions change. AMQFrame declare = - ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), null, // arguments - false, // autoDelete - false, // durable - destination.getExchangeName(), // exchange - false, // internal - true, // nowait - false, // passive - _session.getTicket(), // ticket - destination.getExchangeClass()); // type + ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion(), null, // arguments + false, // autoDelete + false, // durable + destination.getExchangeName(), // exchange + false, // internal + true, // nowait + false, // passive + _session.getTicket(), // ticket + destination.getExchangeClass()); // type _protocolHandler.writeFrame(declare); } @@ -208,7 +190,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j if ((i != DeliveryMode.NON_PERSISTENT) && (i != DeliveryMode.PERSISTENT)) { throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i - + " is illegal"); + + " is illegal"); } _deliveryMode = i; @@ -320,12 +302,12 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j { validateDestination(destination); sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, - _immediate); + _immediate); } } public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) - throws JMSException + throws JMSException { checkPreConditions(); checkDestination(destination); @@ -337,7 +319,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, - boolean mandatory) throws JMSException + boolean mandatory) throws JMSException { checkPreConditions(); checkDestination(destination); @@ -349,7 +331,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, - boolean mandatory, boolean immediate) throws JMSException + boolean mandatory, boolean immediate) throws JMSException { checkPreConditions(); checkDestination(destination); @@ -361,7 +343,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, - boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException + boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException { checkPreConditions(); checkDestination(destination); @@ -369,7 +351,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j { validateDestination(destination); sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, - waitUntilSent); + waitUntilSent); } } @@ -415,7 +397,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j else { throw new JMSException("Unable to send message, due to class conversion error: " - + message.getClass().getName()); + + message.getClass().getName()); } } } @@ -425,14 +407,14 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j if (!(destination instanceof AMQDestination)) { throw new JMSException("Unsupported destination class: " - + ((destination != null) ? destination.getClass() : null)); + + ((destination != null) ? destination.getClass() : null)); } declareDestination((AMQDestination) destination); } protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, - boolean mandatory, boolean immediate) throws JMSException + boolean mandatory, boolean immediate) throws JMSException { sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent); } @@ -447,16 +429,27 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j * @param timeToLive * @param mandatory * @param immediate + * * @throws JMSException */ protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority, long timeToLive, - boolean mandatory, boolean immediate, boolean wait) throws JMSException + boolean mandatory, boolean immediate, boolean wait) throws JMSException { checkTemporaryDestination(destination); origMessage.setJMSDestination(destination); AbstractJMSMessage message = convertToNativeMessage(origMessage); + if (_transacted) + { + if (_session.hasFailedOver() && _session.isDirty()) + { + throw new JMSAMQException("Failover has occurred and session is dirty so unable to send.", + new AMQSessionDirtyException("Failover has occurred and session is dirty " + + "so unable to send.")); + } + } + if (_disableMessageId) { message.setJMSMessageID(null); @@ -489,12 +482,12 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame publishFrame = - BasicPublishBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), destination.getExchangeName(), // exchange - immediate, // immediate - mandatory, // mandatory - destination.getRoutingKey(), // routingKey - _session.getTicket()); // ticket + BasicPublishBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion(), destination.getExchangeName(), // exchange + immediate, // immediate + mandatory, // mandatory + destination.getRoutingKey(), // routingKey + _session.getTicket()); // ticket message.prepareForSending(); ByteBuffer payload = message.getData(); @@ -536,9 +529,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. AMQFrame contentHeaderFrame = - ContentHeaderBody.createAMQFrame(_channelId, - BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion()), 0, contentHeaderProperties, size); + ContentHeaderBody.createAMQFrame(_channelId, + BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion()), 0, contentHeaderProperties, size); if (_logger.isDebugEnabled()) { _logger.debug("Sending content header frame to " + destination); @@ -558,6 +551,11 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j origMessage.setJMSExpiration(message.getJMSExpiration()); origMessage.setJMSMessageID(message.getJMSMessageID()); } + + if (_transacted) + { + _session.markDirty(); + } } private void checkTemporaryDestination(AMQDestination destination) throws JMSException @@ -669,7 +667,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j if ((_destination != null) && (suppliedDestination != null)) { throw new UnsupportedOperationException( - "This message producer was created with a Destination, therefore you cannot use an unidentified Destination"); + "This message producer was created with a Destination, therefore you cannot use an unidentified Destination"); } if (suppliedDestination == null) diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index e7ff5afceb..19142067cb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -104,23 +104,22 @@ import java.util.concurrent.CountDownLatch; * <p/><table id="crc"><caption>CRC Card</caption> * <tr><th> Responsibilities <th> Collaborations * <tr><td> Create the filter chain to filter this handlers events. - * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}. + * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}. * * <tr><td> Maintain fail-over state. * <tr><td> * </table> * * @todo Explain the system property: amqj.shared_read_write_pool. How does putting the protocol codec filter before the - * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing - * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec - * filter before it mean not doing the read/write asynchronously but in the main filter thread? - * + * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing + * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec + * filter before it mean not doing the read/write asynchronously but in the main filter thread? * @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including - * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of - * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could - * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data - * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so - * that lifecycles of the fields match lifecycles of their containing objects. + * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of + * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could + * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data + * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so + * that lifecycles of the fields match lifecycles of their containing objects. */ public class AMQProtocolHandler extends IoHandlerAdapter { @@ -200,7 +199,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter { SSLConfiguration sslConfig = _connection.getSSLConfiguration(); SSLContextFactory sslFactory = - new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); + new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext()); sslFilter.setUseClientMode(true); session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter); @@ -235,7 +234,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter * @param session The MINA session. * * @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and - * not otherwise? The above comment doesn't make that clear. + * not otherwise? The above comment doesn't make that clear. */ public void sessionClosed(IoSession session) { @@ -413,74 +412,74 @@ public class AMQProtocolHandler extends IoHandlerAdapter switch (bodyFrame.getFrameType()) { - case AMQMethodBody.TYPE: + case AMQMethodBody.TYPE: - if (debug) - { - _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame); - } + if (debug) + { + _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame); + } - final AMQMethodEvent<AMQMethodBody> evt = - new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame); + final AMQMethodEvent<AMQMethodBody> evt = + new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame); - try - { - - boolean wasAnyoneInterested = getStateManager().methodReceived(evt); - if (!_frameListeners.isEmpty()) + try { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) + + boolean wasAnyoneInterested = getStateManager().methodReceived(evt); + if (!_frameListeners.isEmpty()) { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; + Iterator it = _frameListeners.iterator(); + while (it.hasNext()) + { + final AMQMethodListener listener = (AMQMethodListener) it.next(); + wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; + } } - } - if (!wasAnyoneInterested) - { - throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" - + _frameListeners); + if (!wasAnyoneInterested) + { + throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + + _frameListeners); + } } - } - catch (AMQException e) - { - getStateManager().error(e); - if (!_frameListeners.isEmpty()) + catch (AMQException e) { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) + getStateManager().error(e); + if (!_frameListeners.isEmpty()) { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - listener.error(e); + Iterator it = _frameListeners.iterator(); + while (it.hasNext()) + { + final AMQMethodListener listener = (AMQMethodListener) it.next(); + listener.error(e); + } } - } - exceptionCaught(session, e); - } + exceptionCaught(session, e); + } - break; + break; - case ContentHeaderBody.TYPE: + case ContentHeaderBody.TYPE: - _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame); - break; + _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame); + break; - case ContentBody.TYPE: + case ContentBody.TYPE: - _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame); - break; + _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame); + break; - case HeartbeatBody.TYPE: + case HeartbeatBody.TYPE: - if (debug) - { - _logger.debug("Received heartbeat"); - } + if (debug) + { + _logger.debug("Received heartbeat"); + } - break; + break; - default: + default: } @@ -491,6 +490,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void messageSent(IoSession session, Object message) throws Exception { +// System.err.println("Sent PS:" + System.identityHashCode(_protocolSession) + ":" + message); + final long sentMessages = _messagesOut++; final boolean debug = _logger.isDebugEnabled(); @@ -547,7 +548,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter * @param listener the blocking listener. Note the calling thread will block. */ public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener) - throws AMQException, FailoverException + throws AMQException, FailoverException { return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT); } @@ -560,7 +561,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter * @param listener the blocking listener. Note the calling thread will block. */ public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener, - long timeout) throws AMQException, FailoverException + long timeout) throws AMQException, FailoverException { try { @@ -570,8 +571,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter AMQMethodEvent e = listener.blockForFrame(timeout); return e; - // When control resumes before this line, a reply will have been received - // that matches the criteria defined in the blocking listener + // When control resumes before this line, a reply will have been received + // that matches the criteria defined in the blocking listener } catch (AMQException e) { @@ -595,7 +596,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException, FailoverException { return writeCommandFrameAndWaitForReply(frame, new SpecificMethodFrameListener(frame.getChannel(), responseClass), - timeout); + timeout); } public void closeSession(AMQSession session) throws AMQException @@ -621,12 +622,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. final AMQFrame frame = - ConnectionCloseBody.createAMQFrame(0, _protocolSession.getProtocolMajorVersion(), - _protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor) - 0, // classId - 0, // methodId - AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client is closing the connection.")); // replyText + ConnectionCloseBody.createAMQFrame(0, _protocolSession.getProtocolMajorVersion(), + _protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor) + 0, // classId + 0, // methodId + AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + new AMQShortString("JMS client is closing the connection.")); // replyText try { diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index 227f23b540..72ff3844ca 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -255,9 +255,9 @@ public class AMQStateManager implements AMQMethodListener if (_currentState != s) { _logger.warn("State not achieved within permitted time. Current state " + _currentState - + ", desired state: " + s); + + ", desired state: " + s); throw new AMQException("State not achieved within permitted time. Current state " + _currentState - + ", desired state: " + s); + + ", desired state: " + s); } } diff --git a/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java b/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java index 2957dda869..75b6fbaedd 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java @@ -22,23 +22,26 @@ package org.apache.qpid.server.txn;
import junit.framework.TestCase;
-import junit.framework.Assert;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQSessionDirtyException;
import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
-import org.apache.log4j.Logger;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
-import javax.jms.Session;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
-import javax.jms.ConnectionFactory;
-import javax.jms.Connection;
-import javax.jms.Message;
+import javax.jms.Session;
import javax.jms.TextMessage;
-import javax.jms.MessageListener;
-import javax.naming.spi.InitialContextFactory;
+import javax.jms.TransactionRolledBackException;
import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
import java.util.Hashtable;
import java.util.concurrent.CountDownLatch;
@@ -49,7 +52,8 @@ public class TxnTest extends TestCase implements MessageListener private static final Logger _logger = Logger.getLogger(TxnTest.class);
- protected final String BROKER = "vm://:1";//"localhost";
+ //Set retries quite high to ensure that it continues to retry whilst the InVM broker is restarted.
+ protected final String BROKER = "vm://:1?retries='1000'";
protected final String VHOST = "/test";
protected final String QUEUE = "TxnTestQueue";
@@ -75,7 +79,11 @@ public class TxnTest extends TestCase implements MessageListener Hashtable<String, String> env = new Hashtable<String, String>();
env.put("connectionfactory.connection", "amqp://guest:guest@TTL_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'");
- env.put("queue.queue", QUEUE);
+
+ // Ensure that the queue is unique for each test run.
+ // There appears to be other old sesssion/consumers when looping the tests this means that sometimes a message
+ // will disappear. When it has actually gone to the old client.
+ env.put("queue.queue", QUEUE + "-" + System.currentTimeMillis());
_context = factory.getInitialContext(env);
@@ -109,7 +117,7 @@ public class TxnTest extends TestCase implements MessageListener {
_producerConnection.close();
}
-
+
super.tearDown();
if (BROKER.startsWith("vm://"))
@@ -124,10 +132,8 @@ public class TxnTest extends TestCase implements MessageListener _consumer.setMessageListener(this);
_clientConnection.start();
- //Set TTL
_producer.send(_producerSession.createTextMessage("TxtTestML"));
-
try
{
//Wait for message to arrive
@@ -150,7 +156,6 @@ public class TxnTest extends TestCase implements MessageListener public void onMessage(Message message)
{
-
try
{
assertEquals("Incorrect Message Received.", "TxtTestML", ((TextMessage) message).getText());
@@ -170,19 +175,235 @@ public class TxnTest extends TestCase implements MessageListener {
_clientConnection.start();
- //Set TTL
_producer.send(_producerSession.createTextMessage("TxtTestReceive"));
//Receive Message
Message received = _consumer.receive(1000);
+ _clientSession.commit();
+
assertEquals("Incorrect Message Received.", "TxtTestReceive", ((TextMessage) received).getText());
- //Receive Message
+ //Receive Message
received = _consumer.receive(1000);
assertNull("More messages received", received);
_consumer.close();
}
+
+ /**
+ * Test that after the connection has failed over that a sent message is still correctly receieved.
+ * Using Auto-Ack consumer.
+ *
+ * @throws JMSException
+ */
+ public void testReceiveAfterFailover() throws JMSException
+ {
+// System.err.println("testReceiveAfterFailover");
+ _clientConnection.close();
+
+ MessageConsumer consumer = _producerSession.createConsumer(_queue);
+
+ failServer();
+
+// System.err.println("Server restarted");
+
+ String MESSAGE_TXT = "TxtTestReceiveAfterFailoverTX";
+
+// System.err.println("Prod Session:" + _producerSession + ":" + ((AMQSession) _producerSession).isClosed());
+
+ Message sent = _producerSession.createTextMessage(MESSAGE_TXT);
+// System.err.println("Created message");
+
+ _producer.send(sent);
+// System.err.println("Sent message");
+
+ //Verify correct message received
+ Message received = consumer.receive(10000);
+// System.err.println("Message Receieved:" + received);
+
+ assertNotNull("Message should be received.", received);
+ assertEquals("Incorrect Message Received.", MESSAGE_TXT, ((TextMessage) received).getText());
+
+ //Check no more messages are received
+ received = consumer.receive(1000);
+ System.err.println("Second receive completed.");
+
+ assertNull("More messages received", received);
+
+ _producer.close();
+// System.err.println("Close producer");
+
+ consumer.close();
+// System.err.println("Close consumer");
+
+ _producerConnection.close();
+ }
+
+ /**
+ * Test that after the connection has failed over the dirty transaction is notified when calling commit
+ *
+ * @throws JMSException
+ */
+ public void testSendBeforeFailoverThenCommitTx() throws JMSException
+ {
+// System.err.println("testSendBeforeFailoverThenCommitTx");
+ _clientConnection.start();
+
+ //Create a transacted producer.
+ MessageProducer txProducer = _clientSession.createProducer(_queue);
+
+ String MESSAGE_TXT = "testSendBeforeFailoverThenCommitTx";
+
+ //Send the first message
+ txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+
+ failServer();
+
+ //Check that the message isn't received.
+ Message received = _consumer.receive(1000);
+ assertNull("Message received after failover to clean broker!", received);
+
+ //Attempt to commit session
+ try
+ {
+ _clientSession.commit();
+ fail("TransactionRolledBackException not thrown");
+ }
+ catch (JMSException jmse)
+ {
+ if (!(jmse instanceof TransactionRolledBackException))
+ {
+ fail(jmse.toString());
+ }
+ }
+
+ //Close consumer & producer
+ _consumer.close();
+ txProducer.close();
+ }
+
+ /**
+ * Test that after the connection has failed over the dirty transaction is fast failed by throwing an
+ * Exception on the next send.
+ *
+ * @throws JMSException
+ */
+ public void testSendBeforeFailoverThenSendTx() throws JMSException
+ {
+// System.err.println("testSendBeforeFailoverThenSendTx");
+
+ _clientConnection.start();
+ MessageProducer txProducer = _clientSession.createProducer(_queue);
+
+ String MESSAGE_TXT = "TxtTestSendBeforeFailoverTx";
+
+ //Send the first message
+ txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+
+ failServer();
+
+ //Check that the message isn't received.
+ Message received = _consumer.receive(1000);
+ assertNull("Message received after failover to clean broker!", received);
+
+ //Attempt to send another message on the session, here we should fast fail.
+ try
+ {
+ txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+ fail("JMSException not thrown");
+ }
+ catch (JMSException jmse)
+ {
+ if (!(jmse.getLinkedException() instanceof AMQSessionDirtyException))
+ {
+ fail(jmse.toString());
+ }
+ }
+
+
+ _consumer.close();
+ }
+
+ public void testSendBeforeFailoverThenSend2Tx() throws JMSException
+ {
+// System.err.println("testSendBeforeFailoverThenSendTx");
+
+ _clientConnection.start();
+ MessageProducer txProducer = _clientSession.createProducer(_queue);
+
+ String MESSAGE_TXT = "TxtTestSendBeforeFailoverTx";
+
+ //Send the first message
+ txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+
+ failServer();
+
+ //Check that the message isn't received.
+ Message received = _consumer.receive(1000);
+ assertNull("Message received after failover to clean broker!", received);
+
+ _clientSession.rollback();
+
+ //Attempt to send another message on the session, here we should fast fail.
+ try
+ {
+ txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+ txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+ }
+ catch (JMSException jmse)
+ {
+ if (jmse.getLinkedException() instanceof AMQSessionDirtyException)
+ {
+ fail(jmse.toString());
+ }
+ }
+
+
+ _consumer.close();
+ }
+
+
+ private void failServer()
+ {
+ if (BROKER.startsWith("vm://"))
+ {
+ //Work around for MessageStore not being initialise and the send not fully completing before the failover occurs.
+ try
+ {
+ Thread.sleep(5000);
+ }
+ catch (InterruptedException e)
+ {
+
+ }
+
+ TransportConnection.killAllVMBrokers();
+ ApplicationRegistry.remove(1);
+ try
+ {
+ TransportConnection.createVMBroker(1);
+ }
+ catch (AMQVMBrokerCreationException e)
+ {
+ _logger.error("Unable to restart broker due to :" + e);
+ }
+
+ //Work around for receive not being failover aware.. because it is the first receive it trys to
+ // unsuspend the channel but in this case the ChannelFlow command goes on the old session and the response on the
+ // new one ... though I thought the statemanager recorded the listeners so should be ok.???
+ try
+ {
+ Thread.sleep(5000);
+ }
+ catch (InterruptedException e)
+ {
+
+ }
+
+ }
+
+ }
+
}
|