summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java121
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java42
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java31
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java118
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java141
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java4
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java253
7 files changed, 538 insertions, 172 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index a0b79b135d..804c846572 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -20,11 +20,11 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
import org.apache.qpid.AMQUndeliveredException;
-import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -100,6 +100,7 @@ import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
+import javax.jms.TransactionRolledBackException;
import java.io.Serializable;
import java.text.MessageFormat;
import java.util.ArrayList;
@@ -293,6 +294,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private final boolean _strictAMQPFATAL;
private final Object _messageDeliveryLock = new Object();
+ /** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */
+ private boolean _dirty;
+ /** Has failover occured on this session */
+ private boolean _failedOver;
+
/**
* Creates a new session on a connection.
*
@@ -610,30 +616,65 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkTransacted();
- try
+ new FailoverNoopSupport<Object, JMSException>(new FailoverProtectedOperation<Object, JMSException>()
{
- // Acknowledge up to message last delivered (if any) for each consumer.
- // need to send ack for messages delivered to consumers so far
- for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+ public Object execute() throws JMSException, FailoverException
{
- // Sends acknowledgement to server
- i.next().acknowledgeLastDelivered();
- }
+ //Check that we are clean to commit.
+ if (_failedOver && _dirty)
+ {
+ rollback();
- // Commits outstanding messages sent and outstanding acknowledgements.
- final AMQProtocolHandler handler = getProtocolHandler();
+ throw new TransactionRolledBackException("Connection failover has occured since last send. " +
+ "Forced rollback");
+ }
- handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()),
- TxCommitOkBody.class);
- }
- catch (AMQException e)
- {
- throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
- }
- catch (FailoverException e)
- {
- throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
- }
+ try
+ {
+ // Acknowledge up to message last delivered (if any) on this session.
+ // We only need to find the highest value and ack that as commit is session level.
+ Long lastTag = -1L;
+
+ for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+ {
+// i.next().acknowledgeLastDelivered();
+// }
+
+ // get next acknowledgement to server
+ Long next = i.next().getLastDelivered();
+ if (next != null && next > lastTag)
+ {
+ lastTag = next;
+ }
+ }
+
+ if (lastTag != -1)
+ {
+ acknowledgeMessage(lastTag, true);
+ }
+
+ // Commits outstanding messages sent and outstanding acknowledgements.
+ final AMQProtocolHandler handler = getProtocolHandler();
+
+ handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()),
+ TxCommitOkBody.class);
+
+ markClean();
+ }
+
+ catch (AMQException e)
+ {
+ throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
+ }
+
+ catch (FailoverException e)
+ {
+ throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
+ }
+
+ return null;
+ }
+ }, _connection).execute();
}
public void confirmConsumerCancelled(AMQShortString consumerTag)
@@ -1431,6 +1472,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId,
getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
+ markClean();
+
if (!isSuspended)
{
suspendChannel(false);
@@ -1731,6 +1774,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
void resubscribe() throws AMQException
{
+ _failedOver = true;
resubscribeProducers();
resubscribeConsumers();
}
@@ -2532,6 +2576,41 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
return _messageDeliveryLock;
}
+ /**
+ * Signifies that the session has pending sends to commit.
+ */
+ public void markDirty()
+ {
+ _dirty = true;
+ }
+
+ /**
+ * Signifies that the session has no pending sends to commit.
+ */
+ public void markClean()
+ {
+ _dirty = false;
+ _failedOver = false;
+ }
+
+ /**
+ * Check to see if failover has occured since the last call to markClean(commit or rollback).
+ * @return boolean true if failover has occured.
+ */
+ public boolean hasFailedOver()
+ {
+ return _failedOver;
+ }
+
+ /**
+ * Check to see if any message have been sent in this transaction and have not been commited.
+ * @return boolean true if a message has been sent but not commited
+ */
+ public boolean isDirty()
+ {
+ return _dirty;
+ }
+
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
private class Dispatcher extends Thread
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java b/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java
new file mode 100644
index 0000000000..6723ef3fdd
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+
+/**
+ * AMQSessionDirtyException represents all failures to send data on a transacted session that is
+ * no longer in a state that the client expects. i.e. failover has occured so previously sent messages
+ * will not be part of the transaction.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent attempt to perform additional sends on a dirty session.
+ * </table>
+ */
+public class AMQSessionDirtyException extends AMQException
+{
+ public AMQSessionDirtyException(String msg)
+ {
+ super(AMQConstant.RESOURCE_ERROR, msg);
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index ddaf0cfd93..4f8a3e5557 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -754,6 +754,30 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
+ /**
+ * Acknowledge up to last message delivered (if any). Used when commiting.
+ *
+ * @return the lastDeliveryTag to acknowledge
+ */
+ Long getLastDelivered()
+ {
+ if (!_receivedDeliveryTags.isEmpty())
+ {
+ Long lastDeliveryTag = _receivedDeliveryTags.poll();
+
+ while (!_receivedDeliveryTags.isEmpty())
+ {
+ lastDeliveryTag = _receivedDeliveryTags.poll();
+ }
+
+ assert _receivedDeliveryTags.isEmpty();
+
+ return lastDeliveryTag;
+ }
+
+ return null;
+ }
+
/** Acknowledge up to last message delivered (if any). Used when commiting. */
void acknowledgeLastDelivered()
{
@@ -772,6 +796,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
+
void notifyError(Throwable cause)
{
// synchronized (_closed)
@@ -783,7 +808,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (_closedStack != null)
{
_logger.trace(_consumerTag + " notifyError():"
- + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
+ + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
_logger.trace(_consumerTag + " previously" + _closedStack.toString());
}
else
@@ -904,7 +929,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (_logger.isDebugEnabled())
{
_logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)"
- + "for consumer with tag:" + _consumerTag);
+ + "for consumer with tag:" + _consumerTag);
}
Long tag = _receivedDeliveryTags.poll();
@@ -934,7 +959,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (_logger.isDebugEnabled())
{
_logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)"
- + "for consumer with tag:" + _consumerTag);
+ + "for consumer with tag:" + _consumerTag);
}
Iterator iterator = _synchronousQueue.iterator();
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 0ee4882ec2..fb6e4aa9fd 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -60,46 +60,30 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
private AMQConnection _connection;
- /**
- * If true, messages will not get a timestamp.
- */
+ /** If true, messages will not get a timestamp. */
private boolean _disableTimestamps;
- /**
- * Priority of messages created by this producer.
- */
+ /** Priority of messages created by this producer. */
private int _messagePriority;
- /**
- * Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
- */
+ /** Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution. */
private long _timeToLive;
- /**
- * Delivery mode used for this producer.
- */
+ /** Delivery mode used for this producer. */
private int _deliveryMode = DeliveryMode.PERSISTENT;
- /**
- * The Destination used for this consumer, if specified upon creation.
- */
+ /** The Destination used for this consumer, if specified upon creation. */
protected AMQDestination _destination;
- /**
- * Default encoding used for messages produced by this producer.
- */
+ /** Default encoding used for messages produced by this producer. */
private String _encoding;
- /**
- * Default encoding used for message produced by this producer.
- */
+ /** Default encoding used for message produced by this producer. */
private String _mimeType;
private AMQProtocolHandler _protocolHandler;
- /**
- * True if this producer was created from a transacted session
- */
+ /** True if this producer was created from a transacted session */
private boolean _transacted;
private int _channelId;
@@ -112,9 +96,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
*/
private long _producerId;
- /**
- * The session used to create this producer
- */
+ /** The session used to create this producer */
private AMQSession _session;
private final boolean _immediate;
@@ -128,8 +110,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
- AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
- boolean waitUntilSent)
+ AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
+ boolean waitUntilSent)
{
_connection = connection;
_destination = destination;
@@ -162,16 +144,16 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
// Note that the durable and internal arguments are ignored since passive is set to false
// TODO: Be aware of possible changes to parameter order as versions change.
AMQFrame declare =
- ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(), null, // arguments
- false, // autoDelete
- false, // durable
- destination.getExchangeName(), // exchange
- false, // internal
- true, // nowait
- false, // passive
- _session.getTicket(), // ticket
- destination.getExchangeClass()); // type
+ ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion(), null, // arguments
+ false, // autoDelete
+ false, // durable
+ destination.getExchangeName(), // exchange
+ false, // internal
+ true, // nowait
+ false, // passive
+ _session.getTicket(), // ticket
+ destination.getExchangeClass()); // type
_protocolHandler.writeFrame(declare);
}
@@ -208,7 +190,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
if ((i != DeliveryMode.NON_PERSISTENT) && (i != DeliveryMode.PERSISTENT))
{
throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i
- + " is illegal");
+ + " is illegal");
}
_deliveryMode = i;
@@ -320,12 +302,12 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
{
validateDestination(destination);
sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory,
- _immediate);
+ _immediate);
}
}
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
- throws JMSException
+ throws JMSException
{
checkPreConditions();
checkDestination(destination);
@@ -337,7 +319,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
}
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
- boolean mandatory) throws JMSException
+ boolean mandatory) throws JMSException
{
checkPreConditions();
checkDestination(destination);
@@ -349,7 +331,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
}
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
- boolean mandatory, boolean immediate) throws JMSException
+ boolean mandatory, boolean immediate) throws JMSException
{
checkPreConditions();
checkDestination(destination);
@@ -361,7 +343,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
}
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
- boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException
+ boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException
{
checkPreConditions();
checkDestination(destination);
@@ -369,7 +351,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
{
validateDestination(destination);
sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate,
- waitUntilSent);
+ waitUntilSent);
}
}
@@ -415,7 +397,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
else
{
throw new JMSException("Unable to send message, due to class conversion error: "
- + message.getClass().getName());
+ + message.getClass().getName());
}
}
}
@@ -425,14 +407,14 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
if (!(destination instanceof AMQDestination))
{
throw new JMSException("Unsupported destination class: "
- + ((destination != null) ? destination.getClass() : null));
+ + ((destination != null) ? destination.getClass() : null));
}
declareDestination((AMQDestination) destination);
}
protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
- boolean mandatory, boolean immediate) throws JMSException
+ boolean mandatory, boolean immediate) throws JMSException
{
sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
}
@@ -447,16 +429,27 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
* @param timeToLive
* @param mandatory
* @param immediate
+ *
* @throws JMSException
*/
protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority, long timeToLive,
- boolean mandatory, boolean immediate, boolean wait) throws JMSException
+ boolean mandatory, boolean immediate, boolean wait) throws JMSException
{
checkTemporaryDestination(destination);
origMessage.setJMSDestination(destination);
AbstractJMSMessage message = convertToNativeMessage(origMessage);
+ if (_transacted)
+ {
+ if (_session.hasFailedOver() && _session.isDirty())
+ {
+ throw new JMSAMQException("Failover has occurred and session is dirty so unable to send.",
+ new AMQSessionDirtyException("Failover has occurred and session is dirty " +
+ "so unable to send."));
+ }
+ }
+
if (_disableMessageId)
{
message.setJMSMessageID(null);
@@ -489,12 +482,12 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame publishFrame =
- BasicPublishBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(), destination.getExchangeName(), // exchange
- immediate, // immediate
- mandatory, // mandatory
- destination.getRoutingKey(), // routingKey
- _session.getTicket()); // ticket
+ BasicPublishBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion(), destination.getExchangeName(), // exchange
+ immediate, // immediate
+ mandatory, // mandatory
+ destination.getRoutingKey(), // routingKey
+ _session.getTicket()); // ticket
message.prepareForSending();
ByteBuffer payload = message.getData();
@@ -536,9 +529,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
AMQFrame contentHeaderFrame =
- ContentHeaderBody.createAMQFrame(_channelId,
- BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion()), 0, contentHeaderProperties, size);
+ ContentHeaderBody.createAMQFrame(_channelId,
+ BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion()), 0, contentHeaderProperties, size);
if (_logger.isDebugEnabled())
{
_logger.debug("Sending content header frame to " + destination);
@@ -558,6 +551,11 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
origMessage.setJMSExpiration(message.getJMSExpiration());
origMessage.setJMSMessageID(message.getJMSMessageID());
}
+
+ if (_transacted)
+ {
+ _session.markDirty();
+ }
}
private void checkTemporaryDestination(AMQDestination destination) throws JMSException
@@ -669,7 +667,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
if ((_destination != null) && (suppliedDestination != null))
{
throw new UnsupportedOperationException(
- "This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
+ "This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
}
if (suppliedDestination == null)
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index e7ff5afceb..19142067cb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -104,23 +104,22 @@ import java.util.concurrent.CountDownLatch;
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Create the filter chain to filter this handlers events.
- * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}.
+ * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}.
*
* <tr><td> Maintain fail-over state.
* <tr><td>
* </table>
*
* @todo Explain the system property: amqj.shared_read_write_pool. How does putting the protocol codec filter before the
- * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing
- * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec
- * filter before it mean not doing the read/write asynchronously but in the main filter thread?
- *
+ * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing
+ * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec
+ * filter before it mean not doing the read/write asynchronously but in the main filter thread?
* @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including
- * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of
- * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could
- * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data
- * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
- * that lifecycles of the fields match lifecycles of their containing objects.
+ * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of
+ * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could
+ * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data
+ * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
+ * that lifecycles of the fields match lifecycles of their containing objects.
*/
public class AMQProtocolHandler extends IoHandlerAdapter
{
@@ -200,7 +199,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
SSLConfiguration sslConfig = _connection.getSSLConfiguration();
SSLContextFactory sslFactory =
- new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
+ new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext());
sslFilter.setUseClientMode(true);
session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
@@ -235,7 +234,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
* @param session The MINA session.
*
* @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and
- * not otherwise? The above comment doesn't make that clear.
+ * not otherwise? The above comment doesn't make that clear.
*/
public void sessionClosed(IoSession session)
{
@@ -413,74 +412,74 @@ public class AMQProtocolHandler extends IoHandlerAdapter
switch (bodyFrame.getFrameType())
{
- case AMQMethodBody.TYPE:
+ case AMQMethodBody.TYPE:
- if (debug)
- {
- _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
- }
+ if (debug)
+ {
+ _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
+ }
- final AMQMethodEvent<AMQMethodBody> evt =
- new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
+ final AMQMethodEvent<AMQMethodBody> evt =
+ new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
- try
- {
-
- boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
- if (!_frameListeners.isEmpty())
+ try
{
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
+
+ boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
+ if (!_frameListeners.isEmpty())
{
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+ Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener listener = (AMQMethodListener) it.next();
+ wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+ }
}
- }
- if (!wasAnyoneInterested)
- {
- throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:"
- + _frameListeners);
+ if (!wasAnyoneInterested)
+ {
+ throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:"
+ + _frameListeners);
+ }
}
- }
- catch (AMQException e)
- {
- getStateManager().error(e);
- if (!_frameListeners.isEmpty())
+ catch (AMQException e)
{
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
+ getStateManager().error(e);
+ if (!_frameListeners.isEmpty())
{
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- listener.error(e);
+ Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener listener = (AMQMethodListener) it.next();
+ listener.error(e);
+ }
}
- }
- exceptionCaught(session, e);
- }
+ exceptionCaught(session, e);
+ }
- break;
+ break;
- case ContentHeaderBody.TYPE:
+ case ContentHeaderBody.TYPE:
- _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame);
- break;
+ _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame);
+ break;
- case ContentBody.TYPE:
+ case ContentBody.TYPE:
- _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame);
- break;
+ _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame);
+ break;
- case HeartbeatBody.TYPE:
+ case HeartbeatBody.TYPE:
- if (debug)
- {
- _logger.debug("Received heartbeat");
- }
+ if (debug)
+ {
+ _logger.debug("Received heartbeat");
+ }
- break;
+ break;
- default:
+ default:
}
@@ -491,6 +490,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void messageSent(IoSession session, Object message) throws Exception
{
+// System.err.println("Sent PS:" + System.identityHashCode(_protocolSession) + ":" + message);
+
final long sentMessages = _messagesOut++;
final boolean debug = _logger.isDebugEnabled();
@@ -547,7 +548,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
* @param listener the blocking listener. Note the calling thread will block.
*/
public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener)
- throws AMQException, FailoverException
+ throws AMQException, FailoverException
{
return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT);
}
@@ -560,7 +561,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
* @param listener the blocking listener. Note the calling thread will block.
*/
public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener,
- long timeout) throws AMQException, FailoverException
+ long timeout) throws AMQException, FailoverException
{
try
{
@@ -570,8 +571,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
AMQMethodEvent e = listener.blockForFrame(timeout);
return e;
- // When control resumes before this line, a reply will have been received
- // that matches the criteria defined in the blocking listener
+ // When control resumes before this line, a reply will have been received
+ // that matches the criteria defined in the blocking listener
}
catch (AMQException e)
{
@@ -595,7 +596,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException, FailoverException
{
return writeCommandFrameAndWaitForReply(frame, new SpecificMethodFrameListener(frame.getChannel(), responseClass),
- timeout);
+ timeout);
}
public void closeSession(AMQSession session) throws AMQException
@@ -621,12 +622,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
final AMQFrame frame =
- ConnectionCloseBody.createAMQFrame(0, _protocolSession.getProtocolMajorVersion(),
- _protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor)
- 0, // classId
- 0, // methodId
- AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client is closing the connection.")); // replyText
+ ConnectionCloseBody.createAMQFrame(0, _protocolSession.getProtocolMajorVersion(),
+ _protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor)
+ 0, // classId
+ 0, // methodId
+ AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ new AMQShortString("JMS client is closing the connection.")); // replyText
try
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index 227f23b540..72ff3844ca 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -255,9 +255,9 @@ public class AMQStateManager implements AMQMethodListener
if (_currentState != s)
{
_logger.warn("State not achieved within permitted time. Current state " + _currentState
- + ", desired state: " + s);
+ + ", desired state: " + s);
throw new AMQException("State not achieved within permitted time. Current state " + _currentState
- + ", desired state: " + s);
+ + ", desired state: " + s);
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java b/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java
index 2957dda869..75b6fbaedd 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java
@@ -22,23 +22,26 @@
package org.apache.qpid.server.txn;
import junit.framework.TestCase;
-import junit.framework.Assert;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQSessionDirtyException;
import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
-import org.apache.log4j.Logger;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
-import javax.jms.Session;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
-import javax.jms.ConnectionFactory;
-import javax.jms.Connection;
-import javax.jms.Message;
+import javax.jms.Session;
import javax.jms.TextMessage;
-import javax.jms.MessageListener;
-import javax.naming.spi.InitialContextFactory;
+import javax.jms.TransactionRolledBackException;
import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
import java.util.Hashtable;
import java.util.concurrent.CountDownLatch;
@@ -49,7 +52,8 @@ public class TxnTest extends TestCase implements MessageListener
private static final Logger _logger = Logger.getLogger(TxnTest.class);
- protected final String BROKER = "vm://:1";//"localhost";
+ //Set retries quite high to ensure that it continues to retry whilst the InVM broker is restarted.
+ protected final String BROKER = "vm://:1?retries='1000'";
protected final String VHOST = "/test";
protected final String QUEUE = "TxnTestQueue";
@@ -75,7 +79,11 @@ public class TxnTest extends TestCase implements MessageListener
Hashtable<String, String> env = new Hashtable<String, String>();
env.put("connectionfactory.connection", "amqp://guest:guest@TTL_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'");
- env.put("queue.queue", QUEUE);
+
+ // Ensure that the queue is unique for each test run.
+ // There appears to be other old sesssion/consumers when looping the tests this means that sometimes a message
+ // will disappear. When it has actually gone to the old client.
+ env.put("queue.queue", QUEUE + "-" + System.currentTimeMillis());
_context = factory.getInitialContext(env);
@@ -109,7 +117,7 @@ public class TxnTest extends TestCase implements MessageListener
{
_producerConnection.close();
}
-
+
super.tearDown();
if (BROKER.startsWith("vm://"))
@@ -124,10 +132,8 @@ public class TxnTest extends TestCase implements MessageListener
_consumer.setMessageListener(this);
_clientConnection.start();
- //Set TTL
_producer.send(_producerSession.createTextMessage("TxtTestML"));
-
try
{
//Wait for message to arrive
@@ -150,7 +156,6 @@ public class TxnTest extends TestCase implements MessageListener
public void onMessage(Message message)
{
-
try
{
assertEquals("Incorrect Message Received.", "TxtTestML", ((TextMessage) message).getText());
@@ -170,19 +175,235 @@ public class TxnTest extends TestCase implements MessageListener
{
_clientConnection.start();
- //Set TTL
_producer.send(_producerSession.createTextMessage("TxtTestReceive"));
//Receive Message
Message received = _consumer.receive(1000);
+ _clientSession.commit();
+
assertEquals("Incorrect Message Received.", "TxtTestReceive", ((TextMessage) received).getText());
- //Receive Message
+ //Receive Message
received = _consumer.receive(1000);
assertNull("More messages received", received);
_consumer.close();
}
+
+ /**
+ * Test that after the connection has failed over that a sent message is still correctly receieved.
+ * Using Auto-Ack consumer.
+ *
+ * @throws JMSException
+ */
+ public void testReceiveAfterFailover() throws JMSException
+ {
+// System.err.println("testReceiveAfterFailover");
+ _clientConnection.close();
+
+ MessageConsumer consumer = _producerSession.createConsumer(_queue);
+
+ failServer();
+
+// System.err.println("Server restarted");
+
+ String MESSAGE_TXT = "TxtTestReceiveAfterFailoverTX";
+
+// System.err.println("Prod Session:" + _producerSession + ":" + ((AMQSession) _producerSession).isClosed());
+
+ Message sent = _producerSession.createTextMessage(MESSAGE_TXT);
+// System.err.println("Created message");
+
+ _producer.send(sent);
+// System.err.println("Sent message");
+
+ //Verify correct message received
+ Message received = consumer.receive(10000);
+// System.err.println("Message Receieved:" + received);
+
+ assertNotNull("Message should be received.", received);
+ assertEquals("Incorrect Message Received.", MESSAGE_TXT, ((TextMessage) received).getText());
+
+ //Check no more messages are received
+ received = consumer.receive(1000);
+ System.err.println("Second receive completed.");
+
+ assertNull("More messages received", received);
+
+ _producer.close();
+// System.err.println("Close producer");
+
+ consumer.close();
+// System.err.println("Close consumer");
+
+ _producerConnection.close();
+ }
+
+ /**
+ * Test that after the connection has failed over the dirty transaction is notified when calling commit
+ *
+ * @throws JMSException
+ */
+ public void testSendBeforeFailoverThenCommitTx() throws JMSException
+ {
+// System.err.println("testSendBeforeFailoverThenCommitTx");
+ _clientConnection.start();
+
+ //Create a transacted producer.
+ MessageProducer txProducer = _clientSession.createProducer(_queue);
+
+ String MESSAGE_TXT = "testSendBeforeFailoverThenCommitTx";
+
+ //Send the first message
+ txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+
+ failServer();
+
+ //Check that the message isn't received.
+ Message received = _consumer.receive(1000);
+ assertNull("Message received after failover to clean broker!", received);
+
+ //Attempt to commit session
+ try
+ {
+ _clientSession.commit();
+ fail("TransactionRolledBackException not thrown");
+ }
+ catch (JMSException jmse)
+ {
+ if (!(jmse instanceof TransactionRolledBackException))
+ {
+ fail(jmse.toString());
+ }
+ }
+
+ //Close consumer & producer
+ _consumer.close();
+ txProducer.close();
+ }
+
+ /**
+ * Test that after the connection has failed over the dirty transaction is fast failed by throwing an
+ * Exception on the next send.
+ *
+ * @throws JMSException
+ */
+ public void testSendBeforeFailoverThenSendTx() throws JMSException
+ {
+// System.err.println("testSendBeforeFailoverThenSendTx");
+
+ _clientConnection.start();
+ MessageProducer txProducer = _clientSession.createProducer(_queue);
+
+ String MESSAGE_TXT = "TxtTestSendBeforeFailoverTx";
+
+ //Send the first message
+ txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+
+ failServer();
+
+ //Check that the message isn't received.
+ Message received = _consumer.receive(1000);
+ assertNull("Message received after failover to clean broker!", received);
+
+ //Attempt to send another message on the session, here we should fast fail.
+ try
+ {
+ txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+ fail("JMSException not thrown");
+ }
+ catch (JMSException jmse)
+ {
+ if (!(jmse.getLinkedException() instanceof AMQSessionDirtyException))
+ {
+ fail(jmse.toString());
+ }
+ }
+
+
+ _consumer.close();
+ }
+
+ public void testSendBeforeFailoverThenSend2Tx() throws JMSException
+ {
+// System.err.println("testSendBeforeFailoverThenSendTx");
+
+ _clientConnection.start();
+ MessageProducer txProducer = _clientSession.createProducer(_queue);
+
+ String MESSAGE_TXT = "TxtTestSendBeforeFailoverTx";
+
+ //Send the first message
+ txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+
+ failServer();
+
+ //Check that the message isn't received.
+ Message received = _consumer.receive(1000);
+ assertNull("Message received after failover to clean broker!", received);
+
+ _clientSession.rollback();
+
+ //Attempt to send another message on the session, here we should fast fail.
+ try
+ {
+ txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+ txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+ }
+ catch (JMSException jmse)
+ {
+ if (jmse.getLinkedException() instanceof AMQSessionDirtyException)
+ {
+ fail(jmse.toString());
+ }
+ }
+
+
+ _consumer.close();
+ }
+
+
+ private void failServer()
+ {
+ if (BROKER.startsWith("vm://"))
+ {
+ //Work around for MessageStore not being initialise and the send not fully completing before the failover occurs.
+ try
+ {
+ Thread.sleep(5000);
+ }
+ catch (InterruptedException e)
+ {
+
+ }
+
+ TransportConnection.killAllVMBrokers();
+ ApplicationRegistry.remove(1);
+ try
+ {
+ TransportConnection.createVMBroker(1);
+ }
+ catch (AMQVMBrokerCreationException e)
+ {
+ _logger.error("Unable to restart broker due to :" + e);
+ }
+
+ //Work around for receive not being failover aware.. because it is the first receive it trys to
+ // unsuspend the channel but in this case the ChannelFlow command goes on the old session and the response on the
+ // new one ... though I thought the statemanager recorded the listeners so should be ok.???
+ try
+ {
+ Thread.sleep(5000);
+ }
+ catch (InterruptedException e)
+ {
+
+ }
+
+ }
+
+ }
+
}