diff options
-rw-r--r-- | dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs | 16 | ||||
-rw-r--r-- | dotnet/Qpid.Client/Client/AmqChannel.cs | 23 |
2 files changed, 22 insertions, 17 deletions
diff --git a/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs b/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs index 15be584031..f4facf80ad 100644 --- a/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs +++ b/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs @@ -34,7 +34,8 @@ namespace Qpid.Client.Tests.failover private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverTxTest)); const int NUM_ITERATIONS = 10; - const int NUM_MESSAGES = 10; + const int NUM_COMMITED_MESSAGES = 10; + const int NUM_ROLLEDBACK_MESSAGES = 5; const int SLEEP_MILLIS = 500; AMQConnection _connection; @@ -92,9 +93,18 @@ namespace Qpid.Client.Tests.failover for (int i = 1; i <= NUM_ITERATIONS; ++i) { - for (int j = 1; j <= NUM_MESSAGES; ++j) + for (int j = 1; j <= NUM_ROLLEDBACK_MESSAGES; ++j) { - ITextMessage msg = publishingChannel.CreateTextMessage("Tx=" + i + " msg=" + j); + ITextMessage msg = publishingChannel.CreateTextMessage("Tx=" + i + " rolledBackMsg=" + j); + _log.Info("sending message = " + msg.Text); + publisher.Send(msg); + Thread.Sleep(SLEEP_MILLIS); + } + if (transacted) publishingChannel.Rollback(); + + for (int j = 1; j <= NUM_COMMITED_MESSAGES; ++j) + { + ITextMessage msg = publishingChannel.CreateTextMessage("Tx=" + i + " commitedMsg=" + j); _log.Info("sending message = " + msg.Text); publisher.Send(msg); Thread.Sleep(SLEEP_MILLIS); diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs index b7c8b1857e..48d87d8f90 100644 --- a/dotnet/Qpid.Client/Client/AmqChannel.cs +++ b/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -300,20 +300,15 @@ namespace Qpid.Client CheckNotClosed(); CheckTransacted(); // throws IllegalOperationException if not a transacted session - /*Channel.Rollback frame = new Channel.Rollback(); - frame.channelId = _channelId; - frame.confirmTag = 1;*/ - - // try - // { - // _connection.getProtocolHandler().writeCommandFrameAndWaitForReply(frame, new ChannelReplyListener(_channelId)); - // } - // catch (AMQException e) - // { - // throw new JMSException("Error rolling back session: " + e); - // } - throw new NotImplementedException(); - //_logger.Info("Transaction rolled back on channel " + _channelId); + try + { + _connection.ConvenientProtocolWriter.SyncWrite( + TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody)); + } + catch (AMQException e) + { + throw new QpidException("Failed to rollback", e); + } } public override void Close() |