summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-02-23 22:15:29 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-02-23 22:15:29 +0000
commitafeb5065ec226e39bb0b6855db63952d9a1ba89c (patch)
tree2341895495043707530f839a0be87774f804553e
parent1b4a67ccc4b501b2b7872881609ee9f0d8c2e3eb (diff)
downloadqpid-python-afeb5065ec226e39bb0b6855db63952d9a1ba89c.tar.gz
AMQP-24 : [Java Broker] Implement distributed transactions for AMQP 0-10
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1292984 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java140
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/Xid.java52
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/PreparedTransaction.java46
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/PreparedTransactionTB.java122
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/XidTB.java58
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java363
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java10
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java128
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java239
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AlreadyKnownDtxException.java32
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java246
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java348
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxException.java44
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxNotSelectedException.java30
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxRegistry.java333
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/IncorrectDtxStateException.java32
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JoinAndResumeDtxException.java32
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NotAssociatedDtxException.java32
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/RollbackOnlyDtxException.java32
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/SuspendAndFailDtxException.java32
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TimeoutDtxException.java32
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/UnknownDtxBranchException.java32
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java11
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java208
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java21
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java8
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java8
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java8
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java12
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java17
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java8
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java11
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/QueueTest.java6
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/TopicTest.java72
-rw-r--r--qpid/java/test-profiles/JavaExcludes3
-rw-r--r--qpid/java/test-profiles/JavaPre010Excludes4
-rw-r--r--qpid/java/test-profiles/JavaTransientExcludes7
-rw-r--r--qpid/java/test-profiles/java-bdb-spawn.0-8.testprofile2
-rw-r--r--qpid/java/test-profiles/java-bdb-spawn.0-9-1.testprofile2
-rw-r--r--qpid/java/test-profiles/java-bdb-spawn.0-9.testprofile2
-rw-r--r--qpid/java/test-profiles/java-bdb.0-8.testprofile2
-rw-r--r--qpid/java/test-profiles/java-bdb.0-9-1.testprofile2
-rw-r--r--qpid/java/test-profiles/java-bdb.0-9.testprofile2
-rw-r--r--qpid/java/test-profiles/java-dby-spawn.0-8.testprofile2
-rw-r--r--qpid/java/test-profiles/java-dby-spawn.0-9-1.testprofile2
-rw-r--r--qpid/java/test-profiles/java-dby-spawn.0-9.testprofile2
-rw-r--r--qpid/java/test-profiles/java-dby.0-8.testprofile2
-rw-r--r--qpid/java/test-profiles/java-dby.0-9-1.testprofile2
-rw-r--r--qpid/java/test-profiles/java-dby.0-9.testprofile2
-rw-r--r--qpid/java/test-profiles/java-mms-spawn.0-8.testprofile2
-rw-r--r--qpid/java/test-profiles/java-mms-spawn.0-9-1.testprofile2
-rw-r--r--qpid/java/test-profiles/java-mms-spawn.0-9.testprofile2
-rw-r--r--qpid/java/test-profiles/java-mms.0-8.testprofile2
-rw-r--r--qpid/java/test-profiles/java-mms.0-9-1.testprofile2
-rw-r--r--qpid/java/test-profiles/java-mms.0-9.testprofile2
-rw-r--r--qpid/java/test-profiles/python_tests/Java010PythonExcludes10
-rw-r--r--qpid/java/test-profiles/testprofile.defaults2
63 files changed, 2784 insertions, 111 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 29f2a2f2fb..a91d8f359e 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -36,6 +36,7 @@ import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.LockConflictException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.Transaction;
import com.sleepycat.je.TransactionConfig;
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
@@ -68,14 +69,18 @@ import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_5;
+import org.apache.qpid.server.store.berkeleydb.keys.Xid;
import org.apache.qpid.server.store.berkeleydb.records.BindingRecord;
import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord;
+import org.apache.qpid.server.store.berkeleydb.records.PreparedTransaction;
import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory;
import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_5;
import org.apache.qpid.server.store.berkeleydb.tuples.MessageMetaDataTupleBindingFactory;
+import org.apache.qpid.server.store.berkeleydb.tuples.PreparedTransactionTB;
import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryTB;
import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory;
+import org.apache.qpid.server.store.berkeleydb.tuples.XidTB;
import java.io.File;
import java.lang.ref.SoftReference;
@@ -120,6 +125,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private String QUEUEDB_NAME = "queueDb";
private String BRIDGEDB_NAME = "bridges";
private String LINKDB_NAME = "links";
+ private String XIDDB_NAME = "xids";
private Database _messageMetaDataDb;
private Database _messageContentDb;
@@ -129,6 +135,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private Database _queueDb;
private Database _bridgeDb;
private Database _linkDb;
+ private Database _xidDb;
/* =======
* Schema:
@@ -217,6 +224,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
LINKDB_NAME += "_v" + version;
BRIDGEDB_NAME += "_v" + version;
+
+ XIDDB_NAME += "_v" + version;
}
}
@@ -272,6 +281,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
recoverQueueEntries(recoveryHandler);
+
}
@@ -487,6 +497,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
_deliveryDb = openDatabase(DELIVERYDB_NAME, dbConfig);
_linkDb = openDatabase(LINKDB_NAME, dbConfig);
_bridgeDb = openDatabase(BRIDGEDB_NAME, dbConfig);
+ _xidDb = openDatabase(XIDDB_NAME, dbConfig);
}
@@ -564,6 +575,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
_linkDb.close();
}
+
+ if (_xidDb != null)
+ {
+ _log.info("Close xid database");
+ _xidDb.close();
+ }
+
closeEnvironment();
_state = State.CLOSED;
@@ -884,7 +902,52 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
}
- qerh.completeQueueEntryRecovery();
+
+
+ TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = qerh.completeQueueEntryRecovery();
+
+ cursor = null;
+ try
+ {
+ cursor = _xidDb.openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ XidTB keyBinding = new XidTB();
+ PreparedTransactionTB valueBinding = new PreparedTransactionTB();
+ DatabaseEntry value = new DatabaseEntry();
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ Xid xid = keyBinding.entryToObject(key);
+ PreparedTransaction preparedTransaction = valueBinding.entryToObject(value);
+ dtxrh.dtxRecord(xid.getFormat(),xid.getGlobalId(),xid.getBranchId(),
+ preparedTransaction.getEnqueues(),preparedTransaction.getDequeues());
+ }
+
+ try
+ {
+ cursor.close();
+ }
+ finally
+ {
+ cursor = null;
+ }
+
+ }
+ catch (DatabaseException e)
+ {
+ _log.error("Database Error: " + e.getMessage(), e);
+ throw e;
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ cursor.close();
+ }
+ }
+
+
+ dtxrh.completeDtxRecordRecovery();
}
/**
@@ -1481,6 +1544,69 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
}
+
+ private void recordXid(com.sleepycat.je.Transaction txn,
+ long format,
+ byte[] globalId,
+ byte[] branchId,
+ Transaction.Record[] enqueues,
+ Transaction.Record[] dequeues) throws AMQStoreException
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ Xid xid = new Xid(format, globalId, branchId);
+ XidTB keyBinding = new XidTB();
+ keyBinding.objectToEntry(xid,key);
+
+ DatabaseEntry value = new DatabaseEntry();
+ PreparedTransaction preparedTransaction = new PreparedTransaction(enqueues, dequeues);
+ PreparedTransactionTB valueBinding = new PreparedTransactionTB();
+ valueBinding.objectToEntry(preparedTransaction, value);
+
+ try
+ {
+ _xidDb.put(txn, key, value);
+ }
+ catch (DatabaseException e)
+ {
+ _log.error("Failed to write xid: " + e.getMessage(), e);
+ throw new AMQStoreException("Error writing xid to database", e);
+ }
+ }
+
+ private void removeXid(com.sleepycat.je.Transaction txn, long format, byte[] globalId, byte[] branchId)
+ throws AMQStoreException
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ Xid xid = new Xid(format, globalId, branchId);
+ XidTB keyBinding = new XidTB();
+
+ keyBinding.objectToEntry(xid, key);
+
+
+ try
+ {
+
+ OperationStatus status = _xidDb.delete(txn, key);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ throw new AMQStoreException("Unable to find xid");
+ }
+ else if (status != OperationStatus.SUCCESS)
+ {
+ throw new AMQStoreException("Unable to remove xid");
+ }
+
+ }
+ catch (DatabaseException e)
+ {
+
+ _log.error("Failed to remove xid ", e);
+ _log.error(txn);
+
+ throw new AMQStoreException("Error accessing database while removing xid: " + e.getMessage(), e);
+ }
+ }
+
/**
* Commits all operations performed within a given transaction.
*
@@ -2385,6 +2511,18 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
BDBMessageStore.this.abortTran(_txn);
}
+
+ public void removeXid(long format, byte[] globalId, byte[] branchId) throws AMQStoreException
+ {
+ BDBMessageStore.this.removeXid(_txn, format, globalId, branchId);
+ }
+
+ public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues,
+ Record[] dequeues) throws AMQStoreException
+ {
+ BDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues);
+ }
}
+
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/Xid.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/Xid.java
new file mode 100644
index 0000000000..f74d67b355
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/Xid.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.store.berkeleydb.keys;
+
+public class Xid
+{
+
+ private final long _format;
+ private final byte[] _globalId;
+ private final byte[] _branchId;
+
+ public Xid(long format, byte[] globalId, byte[] branchId)
+ {
+ _format = format;
+ _globalId = globalId;
+ _branchId = branchId;
+ }
+
+ public long getFormat()
+ {
+ return _format;
+ }
+
+ public byte[] getGlobalId()
+ {
+ return _globalId;
+ }
+
+ public byte[] getBranchId()
+ {
+ return _branchId;
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/PreparedTransaction.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/PreparedTransaction.java
new file mode 100644
index 0000000000..bfd72b9a1f
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/PreparedTransaction.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.store.berkeleydb.records;
+
+import org.apache.qpid.server.store.MessageStore;
+
+public class PreparedTransaction
+{
+ private final MessageStore.Transaction.Record[] _enqueues;
+ private final MessageStore.Transaction.Record[] _dequeues;
+
+ public PreparedTransaction(MessageStore.Transaction.Record[] enqueues, MessageStore.Transaction.Record[] dequeues)
+ {
+ _enqueues = enqueues;
+ _dequeues = dequeues;
+ }
+
+ public MessageStore.Transaction.Record[] getEnqueues()
+ {
+ return _enqueues;
+ }
+
+ public MessageStore.Transaction.Record[] getDequeues()
+ {
+ return _dequeues;
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/PreparedTransactionTB.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/PreparedTransactionTB.java
new file mode 100644
index 0000000000..3eb4cb69b5
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/PreparedTransactionTB.java
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.berkeleydb.records.PreparedTransaction;
+
+public class PreparedTransactionTB extends TupleBinding<PreparedTransaction>
+{
+ @Override
+ public PreparedTransaction entryToObject(TupleInput input)
+ {
+ MessageStore.Transaction.Record[] enqueues = readRecords(input);
+
+ MessageStore.Transaction.Record[] dequeues = readRecords(input);
+
+ return new PreparedTransaction(enqueues, dequeues);
+ }
+
+ private MessageStore.Transaction.Record[] readRecords(TupleInput input)
+ {
+ MessageStore.Transaction.Record[] records = new MessageStore.Transaction.Record[input.readInt()];
+ for(int i = 0; i < records.length; i++)
+ {
+ records[i] = new RecordImpl(input.readString(), input.readLong());
+ }
+ return records;
+ }
+
+ @Override
+ public void objectToEntry(PreparedTransaction preparedTransaction, TupleOutput output)
+ {
+ writeRecords(preparedTransaction.getEnqueues(), output);
+ writeRecords(preparedTransaction.getDequeues(), output);
+
+ }
+
+ private void writeRecords(MessageStore.Transaction.Record[] records, TupleOutput output)
+ {
+ if(records == null)
+ {
+ output.writeInt(0);
+ }
+ else
+ {
+ output.writeInt(records.length);
+ for(MessageStore.Transaction.Record record : records)
+ {
+ output.writeString(record.getQueue().getResourceName());
+ output.writeLong(record.getMessage().getMessageNumber());
+ }
+ }
+ }
+
+ private static class RecordImpl implements MessageStore.Transaction.Record, TransactionLogResource, EnqueableMessage
+ {
+
+ private final String _queueName;
+ private long _messageNumber;
+
+ public RecordImpl(String queueName, long messageNumber)
+ {
+ _queueName = queueName;
+ _messageNumber = messageNumber;
+ }
+
+ public TransactionLogResource getQueue()
+ {
+ return this;
+ }
+
+ public EnqueableMessage getMessage()
+ {
+ return this;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageNumber;
+ }
+
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ public StoredMessage getStoredMessage()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public String getResourceName()
+ {
+ return _queueName;
+ }
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/XidTB.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/XidTB.java
new file mode 100644
index 0000000000..3a5d61b2b6
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/XidTB.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.qpid.server.store.berkeleydb.keys.Xid;
+
+public class XidTB extends TupleBinding<Xid>
+{
+ @Override
+ public Xid entryToObject(TupleInput input)
+ {
+ long format = input.readLong();
+ byte[] globalId = new byte[input.readInt()];
+ input.readFast(globalId);
+ byte[] branchId = new byte[input.readInt()];
+ input.readFast(branchId);
+ return new Xid(format,globalId,branchId);
+ }
+
+ @Override
+ public void objectToEntry(Xid xid, TupleOutput output)
+ {
+ output.writeLong(xid.getFormat());
+ output.writeInt(xid.getGlobalId() == null ? 0 : xid.getGlobalId().length);
+ if(xid.getGlobalId() != null)
+ {
+ output.write(xid.getGlobalId());
+ }
+ output.writeInt(xid.getBranchId() == null ? 0 : xid.getBranchId().length);
+ if(xid.getBranchId() != null)
+ {
+ output.write(xid.getBranchId());
+ }
+
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index e5e755bd23..16a3036ea7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -1615,7 +1615,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
}
}
- @Override
public int compareTo(AMQSessionModel session)
{
return getId().toString().compareTo(session.getID().toString());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties
index fadc2e2098..9ef58df940 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties
@@ -31,3 +31,9 @@ RECOVERY_START = TXN-1004 : Recovery Start[ : {0}]
RECOVERED = TXN-1005 : Recovered {0,number} messages for queue {1}
# 0 - queue name
RECOVERY_COMPLETE = TXN-1006 : Recovery Complete[ : {0}]
+# 0 - xid
+# 1 - queue name
+XA_INCOMPLETE_QUEUE = TXN-1007 : XA transaction recover for xid {0} incomplete as it references a queue {1} which was not durably retained
+# 0 - xid format
+# 1 - message id
+XA_INCOMPLETE_MESSAGE = TXN-1008 : XA transaction recover for xid {0} incomplete as it references a message {1} which was not durably retained
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index 2877e25645..2cc9a5423e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -92,7 +92,9 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
private static final String LINKS_TABLE_NAME = "QPID_LINKS";
private static final String BRIDGES_TABLE_NAME = "QPID_BRIDGES";
-
+ private static final String XID_TABLE_NAME = "QPID_XIDS";
+ private static final String XID_ACTIONS_TABLE_NAME = "QPID_XID_ACTIONS";
+
private static final int DB_VERSION = 3;
@@ -190,6 +192,31 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
+ "arguments )"
+ " values (?, ?, ?, ?, ?, ?)";
+ private static final String CREATE_XIDS_TABLE =
+ "CREATE TABLE "+XID_TABLE_NAME+" ( format bigint not null,"
+ + " global_id varchar(64) for bit data, branch_id varchar(64) for bit data, PRIMARY KEY ( format, " +
+ "global_id, branch_id ))";
+ private static final String INSERT_INTO_XIDS =
+ "INSERT INTO "+XID_TABLE_NAME+" ( format, global_id, branch_id ) values (?, ?, ?)";
+ private static final String DELETE_FROM_XIDS = "DELETE FROM " + XID_TABLE_NAME
+ + " WHERE format = ? and global_id = ? and branch_id = ?";
+ private static final String SELECT_ALL_FROM_XIDS = "SELECT format, global_id, branch_id FROM " + XID_TABLE_NAME;
+
+
+ private static final String CREATE_XID_ACTIONS_TABLE =
+ "CREATE TABLE "+XID_ACTIONS_TABLE_NAME+" ( format bigint not null,"
+ + " global_id varchar(64) for bit data not null, branch_id varchar(64) for bit data not null, " +
+ "action_type char not null, queue_name varchar(255) not null, message_id bigint not null" +
+ ", PRIMARY KEY ( " +
+ "format, global_id, branch_id, action_type, queue_name, message_id))";
+ private static final String INSERT_INTO_XID_ACTIONS =
+ "INSERT INTO "+XID_ACTIONS_TABLE_NAME+" ( format, global_id, branch_id, action_type, " +
+ "queue_name, message_id ) values (?,?,?,?,?,?) ";
+ private static final String DELETE_FROM_XID_ACTIONS = "DELETE FROM " + XID_ACTIONS_TABLE_NAME
+ + " WHERE format = ? and global_id = ? and branch_id = ?";
+ private static final String SELECT_ALL_FROM_XID_ACTIONS =
+ "SELECT action_type, queue_name, message_id FROM " + XID_ACTIONS_TABLE_NAME +
+ " WHERE format = ? and global_id = ? and branch_id = ?";
private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006";
@@ -295,7 +322,8 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
_configured = true;
}
- recoverQueueEntries(recoveryHandler);
+ TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(recoveryHandler);
+ recoverXids(dtxrh);
}
@@ -350,7 +378,8 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
createMessageContentTable(conn);
createLinkTable(conn);
createBridgeTable(conn);
-
+ createXidTable(conn);
+ createXidActionTable(conn);
conn.close();
}
@@ -519,8 +548,38 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
}
}
+ private void createXidTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(XID_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute(CREATE_XIDS_TABLE);
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
+ private void createXidActionTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(XID_ACTIONS_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute(CREATE_XID_ACTIONS_TABLE);
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
private boolean tableExists(final String tableName, final Connection conn) throws SQLException
{
@@ -1726,6 +1785,126 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
}
+
+ private void removeXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId)
+ throws AMQStoreException
+ {
+ Connection conn = connWrapper.getConnection();
+
+
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_XIDS);
+ try
+ {
+ stmt.setLong(1,format);
+ stmt.setBytes(2,globalId);
+ stmt.setBytes(3,branchId);
+ int results = stmt.executeUpdate();
+
+
+
+ if(results != 1)
+ {
+ throw new AMQStoreException("Unable to find message with xid");
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ stmt = conn.prepareStatement(DELETE_FROM_XID_ACTIONS);
+ try
+ {
+ stmt.setLong(1,format);
+ stmt.setBytes(2,globalId);
+ stmt.setBytes(3,branchId);
+ int results = stmt.executeUpdate();
+
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+ catch (SQLException e)
+ {
+ _logger.error("Failed to dequeue: " + e.getMessage(), e);
+ throw new AMQStoreException("Error deleting enqueued message with xid", e);
+ }
+
+ }
+
+
+ private void recordXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId,
+ Transaction.Record[] enqueues, Transaction.Record[] dequeues) throws AMQStoreException
+ {
+ Connection conn = connWrapper.getConnection();
+
+
+ try
+ {
+
+ PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_XIDS);
+ try
+ {
+ stmt.setLong(1,format);
+ stmt.setBytes(2, globalId);
+ stmt.setBytes(3, branchId);
+ stmt.executeUpdate();
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ stmt = conn.prepareStatement(INSERT_INTO_XID_ACTIONS);
+
+ try
+ {
+ stmt.setLong(1,format);
+ stmt.setBytes(2, globalId);
+ stmt.setBytes(3, branchId);
+
+ if(enqueues != null)
+ {
+ stmt.setString(4, "E");
+ for(Transaction.Record record : enqueues)
+ {
+ stmt.setString(5, record.getQueue().getResourceName());
+ stmt.setLong(6, record.getMessage().getMessageNumber());
+ stmt.executeUpdate();
+ }
+ }
+
+ if(dequeues != null)
+ {
+ stmt.setString(4, "D");
+ for(Transaction.Record record : dequeues)
+ {
+ stmt.setString(5, record.getQueue().getResourceName());
+ stmt.setLong(6, record.getMessage().getMessageNumber());
+ stmt.executeUpdate();
+ }
+ }
+
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+ catch (SQLException e)
+ {
+ _logger.error("Failed to enqueue: " + e.getMessage(), e);
+ throw new AMQStoreException("Error writing xid ", e);
+ }
+
+ }
+
private static final class ConnectionWrapper
{
private final Connection _connection;
@@ -1919,7 +2098,7 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
- private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException
+ private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException
{
Connection conn = newAutoCommitConnection();
try
@@ -1950,7 +2129,7 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
stmt.close();
}
- queueEntryHandler.completeQueueEntryRecovery();
+ return queueEntryHandler.completeQueueEntryRecovery();
}
finally
{
@@ -1958,6 +2137,165 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
}
}
+ private static final class Xid
+ {
+
+ private final long _format;
+ private final byte[] _globalId;
+ private final byte[] _branchId;
+
+ public Xid(long format, byte[] globalId, byte[] branchId)
+ {
+ _format = format;
+ _globalId = globalId;
+ _branchId = branchId;
+ }
+
+ public long getFormat()
+ {
+ return _format;
+ }
+
+ public byte[] getGlobalId()
+ {
+ return _globalId;
+ }
+
+ public byte[] getBranchId()
+ {
+ return _branchId;
+ }
+ }
+
+ private static class RecordImpl implements MessageStore.Transaction.Record, TransactionLogResource, EnqueableMessage
+ {
+
+ private final String _queueName;
+ private long _messageNumber;
+
+ public RecordImpl(String queueName, long messageNumber)
+ {
+ _queueName = queueName;
+ _messageNumber = messageNumber;
+ }
+
+ public TransactionLogResource getQueue()
+ {
+ return this;
+ }
+
+ public EnqueableMessage getMessage()
+ {
+ return this;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageNumber;
+ }
+
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ public StoredMessage getStoredMessage()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public String getResourceName()
+ {
+ return _queueName;
+ }
+ }
+
+ private void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ List<Xid> xids = new ArrayList<Xid>();
+
+ Statement stmt = conn.createStatement();
+ try
+ {
+ ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS);
+ try
+ {
+ while(rs.next())
+ {
+
+ long format = rs.getLong(1);
+ byte[] globalId = rs.getBytes(2);
+ byte[] branchId = rs.getBytes(3);
+ xids.add(new Xid(format, globalId, branchId));
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+
+
+ for(Xid xid : xids)
+ {
+ List<RecordImpl> enqueues = new ArrayList<RecordImpl>();
+ List<RecordImpl> dequeues = new ArrayList<RecordImpl>();
+
+ PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS);
+
+ pstmt.setLong(1, xid.getFormat());
+ pstmt.setBytes(2, xid.getGlobalId());
+ pstmt.setBytes(3, xid.getBranchId());
+ try
+ {
+ ResultSet rs = pstmt.executeQuery();
+ try
+ {
+ while(rs.next())
+ {
+
+ String actionType = rs.getString(1);
+ String queueName = rs.getString(2);
+ long messageId = rs.getLong(3);
+
+ RecordImpl record = new RecordImpl(queueName, messageId);
+ List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues;
+ records.add(record);
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ pstmt.close();
+ }
+
+ dtxrh.dtxRecord(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(),
+ enqueues.toArray(new RecordImpl[enqueues.size()]),
+ dequeues.toArray(new RecordImpl[dequeues.size()]));
+ }
+
+
+ dtxrh.completeDtxRecordRecovery();
+ }
+ finally
+ {
+ conn.close();
+ }
+
+ }
+
StorableMessageMetaData getMetaData(long messageId) throws SQLException
{
@@ -2175,8 +2513,21 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
{
DerbyMessageStore.this.abortTran(_connWrapper);
}
+
+ public void removeXid(long format, byte[] globalId, byte[] branchId) throws AMQStoreException
+ {
+ DerbyMessageStore.this.removeXid(_connWrapper, format, globalId, branchId);
+ }
+
+ public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
+ throws AMQStoreException
+ {
+ DerbyMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues);
+ }
}
+
+
private class StoredDerbyMessage implements StoredMessage
{
@@ -2360,4 +2711,4 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
}
}
-}
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 546a81a050..b01e5aa954 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -78,6 +78,14 @@ public class MemoryMessageStore implements MessageStore, DurableConfigurationSto
{
}
+ public void removeXid(long format, byte[] globalId, byte[] branchId)
+ {
+ }
+
+ public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
+ {
+ }
+
};
public void configureConfigStore(String name, ConfigurationRecoveryHandler handler, Configuration configuration, LogSubject logSubject) throws Exception
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
index 2fecd5d4be..00bb0449d6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -21,7 +21,6 @@
package org.apache.qpid.server.store;
import org.apache.commons.configuration.Configuration;
-
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.EnqueableMessage;
@@ -126,7 +125,16 @@ public interface MessageStore
void abortTran() throws AMQStoreException;
+ public static interface Record
+ {
+ TransactionLogResource getQueue();
+ EnqueableMessage getMessage();
+ }
+
+ void removeXid(long format, byte[] globalId, byte[] branchId) throws AMQStoreException;
+ void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
+ throws AMQStoreException;
}
public void configureTransactionLog(String name,
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
index 802596ed1e..48ca72718b 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
@@ -28,6 +28,13 @@ public interface TransactionLogRecoveryHandler
{
void queueEntry(String queuename, long messageId);
- void completeQueueEntryRecovery();
+ DtxRecordRecoveryHandler completeQueueEntryRecovery();
+ }
+
+ public static interface DtxRecordRecoveryHandler
+ {
+ void dtxRecord(long format, byte[] globalId, byte[] branchId, MessageStore.Transaction.Record[] enqueues, MessageStore.Transaction.Record[] dequeues);
+
+ void completeDtxRecordRecovery();
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index c32f642eea..48e372f87e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -21,6 +21,8 @@
package org.apache.qpid.server.transport;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
+import org.apache.qpid.server.txn.RollbackOnlyDtxException;
+import org.apache.qpid.server.txn.TimeoutDtxException;
import static org.apache.qpid.util.Serial.gt;
import java.security.Principal;
@@ -44,6 +46,7 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.configuration.ConfigStore;
@@ -67,24 +70,19 @@ import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.subscription.Subscription_0_10;
+import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
+import org.apache.qpid.server.txn.DistributedTransaction;
+import org.apache.qpid.server.txn.DtxNotSelectedException;
+import org.apache.qpid.server.txn.IncorrectDtxStateException;
+import org.apache.qpid.server.txn.JoinAndResumeDtxException;
import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.NotAssociatedDtxException;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.SuspendAndFailDtxException;
+import org.apache.qpid.server.txn.UnknownDtxBranchException;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.Binary;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageFlow;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageSetFlowMode;
-import org.apache.qpid.transport.MessageStop;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.Method;
-import org.apache.qpid.transport.Range;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.RangeSetFactory;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionDelegate;
+import org.apache.qpid.transport.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -109,7 +107,6 @@ public class ServerSession extends Session
private ChannelLogSubject _logSubject;
private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
-
public static interface MessageDispositionChangeListener
{
public void onAccept();
@@ -359,7 +356,15 @@ public class ServerSession extends Session
public void onClose()
{
- _transaction.rollback();
+ if(_transaction instanceof LocalTransaction)
+ {
+ _transaction.rollback();
+ }
+ else if(_transaction instanceof DistributedTransaction)
+ {
+ getVirtualHost().getDtxRegistry().endAssociations(this);
+ }
+
for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values())
{
listener.onRelease(true);
@@ -455,6 +460,95 @@ public class ServerSession extends Session
_txnStarts.incrementAndGet();
}
+ public void selectDtx()
+ {
+ _transaction = new DistributedTransaction(this, getMessageStore(), getVirtualHost());
+
+ }
+
+
+ public void startDtx(Xid xid, boolean join, boolean resume)
+ throws JoinAndResumeDtxException,
+ UnknownDtxBranchException,
+ AlreadyKnownDtxException,
+ DtxNotSelectedException
+ {
+ DistributedTransaction distributedTransaction = assertDtxTransaction();
+ distributedTransaction.start(xid, join, resume);
+ }
+
+
+ public void endDtx(Xid xid, boolean fail, boolean suspend)
+ throws NotAssociatedDtxException,
+ UnknownDtxBranchException,
+ DtxNotSelectedException,
+ SuspendAndFailDtxException, TimeoutDtxException
+ {
+ DistributedTransaction distributedTransaction = assertDtxTransaction();
+ distributedTransaction.end(xid, fail, suspend);
+ }
+
+
+ public long getTimeoutDtx(Xid xid)
+ throws UnknownDtxBranchException
+ {
+ return getVirtualHost().getDtxRegistry().getTimeout(xid);
+ }
+
+
+ public void setTimeoutDtx(Xid xid, long timeout)
+ throws UnknownDtxBranchException
+ {
+ getVirtualHost().getDtxRegistry().setTimeout(xid, timeout);
+ }
+
+
+ public void prepareDtx(Xid xid)
+ throws UnknownDtxBranchException,
+ IncorrectDtxStateException, AMQStoreException, RollbackOnlyDtxException, TimeoutDtxException
+ {
+ getVirtualHost().getDtxRegistry().prepare(xid);
+ }
+
+ public void commitDtx(Xid xid, boolean onePhase)
+ throws UnknownDtxBranchException,
+ IncorrectDtxStateException, AMQStoreException, RollbackOnlyDtxException, TimeoutDtxException
+ {
+ getVirtualHost().getDtxRegistry().commit(xid, onePhase);
+ }
+
+
+ public void rollbackDtx(Xid xid)
+ throws UnknownDtxBranchException,
+ IncorrectDtxStateException, AMQStoreException, TimeoutDtxException
+ {
+ getVirtualHost().getDtxRegistry().rollback(xid);
+ }
+
+
+ public void forgetDtx(Xid xid) throws UnknownDtxBranchException, IncorrectDtxStateException
+ {
+ getVirtualHost().getDtxRegistry().forget(xid);
+ }
+
+ public List<Xid> recoverDtx()
+ {
+ return getVirtualHost().getDtxRegistry().recover();
+ }
+
+ private DistributedTransaction assertDtxTransaction() throws DtxNotSelectedException
+ {
+ if(_transaction instanceof DistributedTransaction)
+ {
+ return (DistributedTransaction) _transaction;
+ }
+ else
+ {
+ throw new DtxNotSelectedException();
+ }
+ }
+
+
public void commit()
{
_transaction.commit();
@@ -910,13 +1004,11 @@ public class ServerSession extends Session
}
}
-
protected void setClose(boolean close)
{
super.setClose(close);
}
- @Override
public int compareTo(AMQSessionModel session)
{
return getId().toString().compareTo(session.getID().toString());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index 4648a53b40..d18086808f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.transport;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
import org.apache.qpid.AMQUnknownExchangeType;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -50,7 +51,16 @@ import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.subscription.Subscription_0_10;
+import org.apache.qpid.server.txn.AlreadyKnownDtxException;
+import org.apache.qpid.server.txn.DtxNotSelectedException;
+import org.apache.qpid.server.txn.IncorrectDtxStateException;
+import org.apache.qpid.server.txn.JoinAndResumeDtxException;
+import org.apache.qpid.server.txn.NotAssociatedDtxException;
+import org.apache.qpid.server.txn.RollbackOnlyDtxException;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.SuspendAndFailDtxException;
+import org.apache.qpid.server.txn.TimeoutDtxException;
+import org.apache.qpid.server.txn.UnknownDtxBranchException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.*;
@@ -429,6 +439,235 @@ public class ServerSessionDelegate extends SessionDelegate
((ServerSession)session).rollback();
}
+ @Override
+ public void dtxSelect(Session session, DtxSelect method)
+ {
+ // TODO - check current tx mode
+ ((ServerSession)session).selectDtx();
+ }
+
+ @Override
+ public void dtxStart(Session session, DtxStart method)
+ {
+ XaResult result = new XaResult();
+ result.setStatus(DtxXaStatus.XA_OK);
+ try
+ {
+ ((ServerSession)session).startDtx(method.getXid(), method.getJoin(), method.getResume());
+ session.executionResult(method.getId(), result);
+ }
+ catch(JoinAndResumeDtxException e)
+ {
+ exception(session, method, ExecutionErrorCode.COMMAND_INVALID, e.getMessage());
+ }
+ catch(UnknownDtxBranchException e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Unknown xid " + method.getXid());
+ }
+ catch(AlreadyKnownDtxException e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Xid already started an neither join nor " +
+ "resume set" + method.getXid());
+ }
+ catch(DtxNotSelectedException e)
+ {
+ exception(session, method, ExecutionErrorCode.COMMAND_INVALID, e.getMessage());
+ }
+
+ }
+
+ @Override
+ public void dtxEnd(Session session, DtxEnd method)
+ {
+ XaResult result = new XaResult();
+ result.setStatus(DtxXaStatus.XA_OK);
+ try
+ {
+ try
+ {
+ ((ServerSession)session).endDtx(method.getXid(), method.getFail(), method.getSuspend());
+ }
+ catch (TimeoutDtxException e)
+ {
+ result.setStatus(DtxXaStatus.XA_RBTIMEOUT);
+ }
+ session.executionResult(method.getId(), result);
+ }
+ catch(UnknownDtxBranchException e)
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage());
+ }
+ catch(NotAssociatedDtxException e)
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage());
+ }
+ catch(DtxNotSelectedException e)
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage());
+ }
+ catch(SuspendAndFailDtxException e)
+ {
+ exception(session, method, ExecutionErrorCode.COMMAND_INVALID, e.getMessage());
+ }
+
+ }
+
+ @Override
+ public void dtxCommit(Session session, DtxCommit method)
+ {
+ XaResult result = new XaResult();
+ result.setStatus(DtxXaStatus.XA_OK);
+ try
+ {
+ try
+ {
+ ((ServerSession)session).commitDtx(method.getXid(), method.getOnePhase());
+ }
+ catch (RollbackOnlyDtxException e)
+ {
+ result.setStatus(DtxXaStatus.XA_RBROLLBACK);
+ }
+ catch (TimeoutDtxException e)
+ {
+ result.setStatus(DtxXaStatus.XA_RBTIMEOUT);
+ }
+ session.executionResult(method.getId(), result);
+ }
+ catch(UnknownDtxBranchException e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage());
+ }
+ catch(IncorrectDtxStateException e)
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage());
+ }
+ catch(AMQStoreException e)
+ {
+ exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, e.getMessage());
+ }
+ }
+
+ @Override
+ public void dtxForget(Session session, DtxForget method)
+ {
+ try
+ {
+ ((ServerSession)session).forgetDtx(method.getXid());
+ }
+ catch(UnknownDtxBranchException e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage());
+ }
+ catch(IncorrectDtxStateException e)
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage());
+ }
+
+ }
+
+ @Override
+ public void dtxGetTimeout(Session session, DtxGetTimeout method)
+ {
+ GetTimeoutResult result = new GetTimeoutResult();
+ try
+ {
+ result.setTimeout(((ServerSession) session).getTimeoutDtx(method.getXid()));
+ session.executionResult(method.getId(), result);
+ }
+ catch(UnknownDtxBranchException e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage());
+ }
+ }
+
+ @Override
+ public void dtxPrepare(Session session, DtxPrepare method)
+ {
+ XaResult result = new XaResult();
+ result.setStatus(DtxXaStatus.XA_OK);
+ try
+ {
+ try
+ {
+ ((ServerSession)session).prepareDtx(method.getXid());
+ }
+ catch (RollbackOnlyDtxException e)
+ {
+ result.setStatus(DtxXaStatus.XA_RBROLLBACK);
+ }
+ catch (TimeoutDtxException e)
+ {
+ result.setStatus(DtxXaStatus.XA_RBTIMEOUT);
+ }
+ session.executionResult((int) method.getId(), result);
+ }
+ catch(UnknownDtxBranchException e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage());
+ }
+ catch(IncorrectDtxStateException e)
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage());
+ }
+ catch(AMQStoreException e)
+ {
+ exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, e.getMessage());
+ }
+ }
+
+ @Override
+ public void dtxRecover(Session session, DtxRecover method)
+ {
+ RecoverResult result = new RecoverResult();
+ List inDoubt = ((ServerSession)session).recoverDtx();
+ result.setInDoubt(inDoubt);
+ session.executionResult(method.getId(), result);
+ }
+
+ @Override
+ public void dtxRollback(Session session, DtxRollback method)
+ {
+
+ XaResult result = new XaResult();
+ result.setStatus(DtxXaStatus.XA_OK);
+ try
+ {
+ try
+ {
+ ((ServerSession)session).rollbackDtx(method.getXid());
+ }
+ catch (TimeoutDtxException e)
+ {
+ result.setStatus(DtxXaStatus.XA_RBTIMEOUT);
+ }
+ session.executionResult(method.getId(), result);
+ }
+ catch(UnknownDtxBranchException e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage());
+ }
+ catch(IncorrectDtxStateException e)
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage());
+ }
+ catch(AMQStoreException e)
+ {
+ exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, e.getMessage());
+ }
+ }
+
+ @Override
+ public void dtxSetTimeout(Session session, DtxSetTimeout method)
+ {
+ try
+ {
+ ((ServerSession)session).setTimeoutDtx(method.getXid(), method.getTimeout());
+ }
+ catch(UnknownDtxBranchException e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage());
+ }
+ }
@Override
public void executionSync(final Session ssn, final ExecutionSync sync)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AlreadyKnownDtxException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AlreadyKnownDtxException.java
new file mode 100644
index 0000000000..faa4ec592f
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AlreadyKnownDtxException.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.transport.Xid;
+
+public class AlreadyKnownDtxException extends DtxException
+{
+ public AlreadyKnownDtxException(Xid id)
+ {
+ super("Xid " + id + " cannot be started as it is already known");
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
new file mode 100644
index 0000000000..36f5f7b58f
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
@@ -0,0 +1,246 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.Xid;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public class DistributedTransaction implements ServerTransaction
+{
+
+ private final AutoCommitTransaction _autoCommitTransaction;
+
+ private volatile MessageStore.Transaction _transaction;
+
+ private long _txnStartTime = 0L;
+
+ private DtxBranch _branch;
+ private AMQSessionModel _session;
+ private VirtualHost _vhost;
+
+
+ public DistributedTransaction(AMQSessionModel session, MessageStore store, VirtualHost vhost)
+ {
+ _session = session;
+ _vhost = vhost;
+ _autoCommitTransaction = new AutoCommitTransaction(vhost.getMessageStore());
+ }
+
+ public long getTransactionStartTime()
+ {
+ return _txnStartTime;
+ }
+
+ public void addPostTransactionAction(Action postTransactionAction)
+ {
+ if(_branch != null)
+ {
+ _branch.addPostTransactionAcion(postTransactionAction);
+ }
+ else
+ {
+ _autoCommitTransaction.addPostTransactionAction(postTransactionAction);
+ }
+ }
+
+ public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
+ {
+ if(_branch != null)
+ {
+ _branch.dequeue(queue, message);
+ _branch.addPostTransactionAcion(postTransactionAction);
+ }
+ else
+ {
+ _autoCommitTransaction.dequeue(queue, message, postTransactionAction);
+ }
+ }
+
+ public void dequeue(Collection<QueueEntry> messages, Action postTransactionAction)
+ {
+ if(_branch != null)
+ {
+ for(QueueEntry entry : messages)
+ {
+ _branch.dequeue(entry.getQueue(), entry.getMessage());
+ }
+ _branch.addPostTransactionAcion(postTransactionAction);
+ }
+ else
+ {
+ _autoCommitTransaction.dequeue(messages, postTransactionAction);
+ }
+ }
+
+ public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
+ {
+ if(_branch != null)
+ {
+ _branch.enqueue(queue, message);
+ _branch.addPostTransactionAcion(postTransactionAction);
+ enqueue(Collections.singletonList(queue), message, postTransactionAction, System.currentTimeMillis());
+ }
+ else
+ {
+ _autoCommitTransaction.enqueue(queue, message, postTransactionAction);
+ }
+ }
+
+ public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message,
+ Action postTransactionAction, long currentTime)
+ {
+ if(_branch != null)
+ {
+ for(BaseQueue queue : queues)
+ {
+ _branch.enqueue(queue, message);
+ }
+ _branch.addPostTransactionAcion(postTransactionAction);
+ }
+ else
+ {
+ _autoCommitTransaction.enqueue(queues, message, postTransactionAction, currentTime);
+ }
+ }
+
+ public void commit()
+ {
+ throw new IllegalStateException("Cannot call tx.commit() on a distributed transaction");
+ }
+
+ public void commit(Runnable immediatePostTransactionAction)
+ {
+ throw new IllegalStateException("Cannot call tx.commit() on a distributed transaction");
+ }
+
+ public void rollback()
+ {
+ throw new IllegalStateException("Cannot call tx.rollback() on a distributed transaction");
+ }
+
+ public boolean isTransactional()
+ {
+ return _branch != null;
+ }
+
+ public void start(Xid id, boolean join, boolean resume)
+ throws UnknownDtxBranchException, AlreadyKnownDtxException, JoinAndResumeDtxException
+ {
+ if(join && resume)
+ {
+ throw new JoinAndResumeDtxException(id);
+ }
+
+ DtxBranch branch = _vhost.getDtxRegistry().getBranch(id);
+
+ if(branch == null)
+ {
+ if(join || resume)
+ {
+ throw new UnknownDtxBranchException(id);
+ }
+ branch = new DtxBranch(id,_vhost.getMessageStore(), _vhost);
+ if(_vhost.getDtxRegistry().registerBranch(branch))
+ {
+ _branch = branch;
+ branch.associateSession(_session);
+ }
+ else
+ {
+ throw new AlreadyKnownDtxException(id);
+ }
+ }
+ else
+ {
+ if(join)
+ {
+ branch.associateSession(_session);
+ }
+ else if(resume)
+ {
+ branch.resumeSession(_session);
+ }
+ else
+ {
+ throw new AlreadyKnownDtxException(id);
+ }
+ _branch = branch;
+ }
+ }
+
+ public void end(Xid id, boolean fail, boolean suspend)
+ throws UnknownDtxBranchException, NotAssociatedDtxException, SuspendAndFailDtxException, TimeoutDtxException
+ {
+ DtxBranch branch = _vhost.getDtxRegistry().getBranch(id);
+
+ if(suspend && fail)
+ {
+ branch.disassociateSession(_session);
+ _branch = null;
+ throw new SuspendAndFailDtxException(id);
+ }
+
+
+ if(branch == null)
+ {
+ throw new UnknownDtxBranchException(id);
+ }
+ else
+ {
+ if(!branch.isAssociated(_session))
+ {
+ throw new NotAssociatedDtxException(id);
+ }
+ if(branch.expired() || branch.getState() == DtxBranch.State.TIMEDOUT)
+ {
+ branch.disassociateSession(_session);
+ throw new TimeoutDtxException(id);
+ }
+
+ if(suspend)
+ {
+ branch.suspendSession(_session);
+ }
+ else
+ {
+ if(fail)
+ {
+ branch.setState(DtxBranch.State.ROLLBACK_ONLY);
+ }
+ branch.disassociateSession(_session);
+ }
+
+ _branch = null;
+
+ }
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
new file mode 100644
index 0000000000..99bb639261
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
@@ -0,0 +1,348 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.txn;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.Xid;
+
+public class DtxBranch
+{
+ private static final Logger _logger = Logger.getLogger(DtxBranch.class);
+
+ private final Xid _xid;
+ private final List<ServerTransaction.Action> _postTransactionActions = new ArrayList<ServerTransaction.Action>();
+ private State _state = State.ACTIVE;
+ private long _timeout;
+ private Map<AMQSessionModel, State> _associatedSessions = new HashMap<AMQSessionModel, State>();
+ private final List<Record> _enqueueRecords = new ArrayList<Record>();
+ private final List<Record> _dequeueRecords = new ArrayList<Record>();
+
+ private MessageStore.Transaction _transaction;
+ private long _expiration;
+ private VirtualHost _vhost;
+ private ScheduledFuture<?> _timeoutFuture;
+ private MessageStore _store;
+
+
+ public enum State
+ {
+ ACTIVE,
+ PREPARED,
+ TIMEDOUT,
+ SUSPENDED,
+ FORGOTTEN,
+ HEUR_COM,
+ HEUR_RB,
+ ROLLBACK_ONLY
+ }
+
+
+ public DtxBranch(Xid xid, MessageStore store, VirtualHost vhost)
+ {
+ _xid = xid;
+ _store = store;
+ _vhost = vhost;
+ }
+
+ public Xid getXid()
+ {
+ return _xid;
+ }
+
+ public State getState()
+ {
+ return _state;
+ }
+
+ public void setState(State state)
+ {
+ _state = state;
+ }
+
+ public long getTimeout()
+ {
+ return _timeout;
+ }
+
+ public void setTimeout(long timeout)
+ {
+ if(_timeoutFuture != null)
+ {
+ _timeoutFuture.cancel(false);
+ }
+ _timeout = timeout;
+ _expiration = timeout == 0 ? 0 : System.currentTimeMillis() + timeout;
+
+ if(_timeout == 0)
+ {
+ _timeoutFuture = null;
+ }
+ else
+ {
+ _timeoutFuture = _vhost.scheduleTask(_timeout, new Runnable()
+ {
+ public void run()
+ {
+ setState(State.TIMEDOUT);
+ try
+ {
+ rollback();
+ }
+ catch (AMQStoreException e)
+ {
+ _logger.error("Unexpected error when attempting to rollback XA transaction ("+
+ _xid + ") due to timeout", e);
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+ }
+
+ public boolean expired()
+ {
+ return _timeout != 0 && _expiration < System.currentTimeMillis();
+ }
+
+ public synchronized boolean isAssociated(AMQSessionModel session)
+ {
+ return _associatedSessions.containsKey(session);
+ }
+
+ public synchronized boolean hasAssociatedSessions()
+ {
+ return !_associatedSessions.isEmpty();
+ }
+
+
+ public synchronized boolean hasAssociatedActiveSessions()
+ {
+ if(hasAssociatedSessions())
+ {
+ for(State state : _associatedSessions.values())
+ {
+ if(state != State.SUSPENDED)
+ {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ public synchronized void clearAssociations()
+ {
+ _associatedSessions.clear();
+ }
+
+ synchronized boolean associateSession(AMQSessionModel associatedSession)
+ {
+ return _associatedSessions.put(associatedSession, State.ACTIVE) != null;
+ }
+
+ synchronized boolean disassociateSession(AMQSessionModel associatedSession)
+ {
+ return _associatedSessions.remove(associatedSession) != null;
+ }
+
+ public synchronized boolean resumeSession(AMQSessionModel session)
+ {
+ if(_associatedSessions.containsKey(session) && _associatedSessions.get(session) == State.SUSPENDED)
+ {
+ _associatedSessions.put(session, State.ACTIVE);
+ return true;
+ }
+ return false;
+ }
+
+ public synchronized boolean suspendSession(AMQSessionModel session)
+ {
+ if(_associatedSessions.containsKey(session) && _associatedSessions.get(session) == State.ACTIVE)
+ {
+ _associatedSessions.put(session, State.SUSPENDED);
+ return true;
+ }
+ return false;
+ }
+
+ public void prepare() throws AMQStoreException
+ {
+
+ MessageStore.Transaction txn = _store.newTransaction();
+ txn.recordXid(_xid.getFormat(),
+ _xid.getGlobalId(),
+ _xid.getBranchId(),
+ _enqueueRecords.toArray(new Record[_enqueueRecords.size()]),
+ _dequeueRecords.toArray(new Record[_dequeueRecords.size()]));
+ txn.commitTran();
+
+ prePrepareTransaction();
+ }
+
+ public synchronized void rollback() throws AMQStoreException
+ {
+ if(_timeoutFuture != null)
+ {
+ _timeoutFuture.cancel(false);
+ _timeoutFuture = null;
+ }
+
+
+ if(_transaction != null)
+ {
+ // prepare has previously been called
+
+ MessageStore.Transaction txn = _store.newTransaction();
+ txn.removeXid(_xid.getFormat(), _xid.getGlobalId(), _xid.getBranchId());
+ txn.commitTran();
+
+ _transaction.abortTran();
+ }
+
+ for(ServerTransaction.Action action : _postTransactionActions)
+ {
+ action.onRollback();
+ }
+ _postTransactionActions.clear();
+ }
+
+ public void commit() throws AMQStoreException
+ {
+ if(_timeoutFuture != null)
+ {
+ _timeoutFuture.cancel(false);
+ _timeoutFuture = null;
+ }
+
+ if(_transaction == null)
+ {
+ prePrepareTransaction();
+ }
+ else
+ {
+ _transaction.removeXid(_xid.getFormat(), _xid.getGlobalId(), _xid.getBranchId());
+ }
+ _transaction.commitTran();
+
+ for(ServerTransaction.Action action : _postTransactionActions)
+ {
+ action.postCommit();
+ }
+ _postTransactionActions.clear();
+ }
+
+ public void prePrepareTransaction() throws AMQStoreException
+ {
+ _transaction = _store.newTransaction();
+
+ for(Record enqueue : _enqueueRecords)
+ {
+ if(enqueue.isDurable())
+ {
+ _transaction.enqueueMessage(enqueue.getQueue(), enqueue.getMessage());
+ }
+ }
+
+
+ for(Record enqueue : _dequeueRecords)
+ {
+ if(enqueue.isDurable())
+ {
+ _transaction.dequeueMessage(enqueue.getQueue(), enqueue.getMessage());
+ }
+ }
+ }
+
+
+ public void addPostTransactionAcion(ServerTransaction.Action postTransactionAction)
+ {
+ _postTransactionActions.add(postTransactionAction);
+ }
+
+
+ public void dequeue(BaseQueue queue, EnqueableMessage message)
+ {
+ _dequeueRecords.add(new Record(queue, message));
+ }
+
+
+ public void enqueue(BaseQueue queue, EnqueableMessage message)
+ {
+ _enqueueRecords.add(new Record(queue, message));
+ }
+
+ private static final class Record implements MessageStore.Transaction.Record
+ {
+ private final BaseQueue _queue;
+ private final EnqueableMessage _message;
+
+ public Record(BaseQueue queue, EnqueableMessage message)
+ {
+ _queue = queue;
+ _message = message;
+ }
+
+ public BaseQueue getQueue()
+ {
+ return _queue;
+ }
+
+ public EnqueableMessage getMessage()
+ {
+ return _message;
+ }
+
+ public boolean isDurable()
+ {
+ return _message.isPersistent() && _queue.isDurable();
+ }
+ }
+
+
+ public void close()
+ {
+ if(_transaction != null)
+ {
+ try
+ {
+ _state = null;
+ _transaction.abortTran();
+ }
+ catch(AMQStoreException e)
+ {
+ _logger.error("Error while closing XA branch", e);
+ }
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxException.java
new file mode 100644
index 0000000000..d18d0cb68b
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxException.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.txn;
+
+public class DtxException extends Exception
+{
+ public DtxException()
+ {
+ }
+
+ public DtxException(String message)
+ {
+ super(message);
+ }
+
+ public DtxException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ public DtxException(Throwable cause)
+ {
+ super(cause);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxNotSelectedException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxNotSelectedException.java
new file mode 100644
index 0000000000..c1289b1fdd
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxNotSelectedException.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.txn;
+
+public class DtxNotSelectedException extends DtxException
+{
+ public DtxNotSelectedException()
+ {
+ super("Distribution transactions have not been selected on this session");
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxRegistry.java
new file mode 100644
index 0000000000..5c54c1164f
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxRegistry.java
@@ -0,0 +1,333 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.txn;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.transport.Xid;
+
+public class DtxRegistry
+{
+ private final Map<ComparableXid, DtxBranch> _branches = new HashMap<ComparableXid, DtxBranch>();
+
+
+ private static final class ComparableXid
+ {
+ private final Xid _xid;
+
+ private ComparableXid(Xid xid)
+ {
+ _xid = xid;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if(this == o)
+ {
+ return true;
+ }
+ if(o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ ComparableXid that = (ComparableXid) o;
+
+ return compareBytes(_xid.getBranchId(), that._xid.getBranchId())
+ && compareBytes(_xid.getGlobalId(), that._xid.getGlobalId());
+ }
+
+ private static boolean compareBytes(byte[] a, byte[] b)
+ {
+ if(a.length != b.length)
+ {
+ return false;
+ }
+ for(int i = 0; i < a.length; i++)
+ {
+ if(a[i] != b[i])
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+
+ @Override
+ public int hashCode()
+ {
+ int result = 0;
+ for(int i = 0; i < _xid.getGlobalId().length; i++)
+ {
+ result = 31 * result + (int) _xid.getGlobalId()[i];
+ }
+ for(int i = 0; i < _xid.getBranchId().length; i++)
+ {
+ result = 31 * result + (int) _xid.getBranchId()[i];
+ }
+
+ return result;
+ }
+ }
+
+ public synchronized DtxBranch getBranch(Xid xid)
+ {
+ return _branches.get(new ComparableXid(xid));
+ }
+
+ public synchronized boolean registerBranch(DtxBranch branch)
+ {
+ ComparableXid xid = new ComparableXid(branch.getXid());
+ if(!_branches.containsKey(xid))
+ {
+ _branches.put(xid, branch);
+ return true;
+ }
+ return false;
+ }
+
+ synchronized boolean unregisterBranch(DtxBranch branch)
+ {
+ return (_branches.remove(new ComparableXid(branch.getXid())) != null);
+ }
+
+ public void commit(Xid id, boolean onePhase)
+ throws IncorrectDtxStateException, UnknownDtxBranchException, AMQStoreException, RollbackOnlyDtxException, TimeoutDtxException
+ {
+ DtxBranch branch = getBranch(id);
+ if(branch != null)
+ {
+ synchronized (branch)
+ {
+ if(!branch.hasAssociatedActiveSessions())
+ {
+ branch.clearAssociations();
+
+ if(branch.expired() || branch.getState() == DtxBranch.State.TIMEDOUT)
+ {
+ unregisterBranch(branch);
+ throw new TimeoutDtxException(id);
+ }
+ else if(branch.getState() == DtxBranch.State.ROLLBACK_ONLY)
+ {
+ throw new RollbackOnlyDtxException(id);
+ }
+ else if(onePhase && branch.getState() == DtxBranch.State.PREPARED)
+ {
+ throw new IncorrectDtxStateException("Cannot call one-phase commit on a prepared branch", id);
+ }
+ else if(!onePhase && branch.getState() != DtxBranch.State.PREPARED)
+ {
+ throw new IncorrectDtxStateException("Cannot call two-phase commit on a non-prepared branch",
+ id);
+ }
+ branch.commit();
+ branch.setState(DtxBranch.State.FORGOTTEN);
+ unregisterBranch(branch);
+ }
+ else
+ {
+ throw new IncorrectDtxStateException("Branch was still associated with a session", id);
+ }
+ }
+ }
+ else
+ {
+ throw new UnknownDtxBranchException(id);
+ }
+ }
+
+ public synchronized void prepare(Xid id)
+ throws UnknownDtxBranchException,
+ IncorrectDtxStateException, AMQStoreException, RollbackOnlyDtxException, TimeoutDtxException
+ {
+ DtxBranch branch = getBranch(id);
+ if(branch != null)
+ {
+ synchronized (branch)
+ {
+ if(!branch.hasAssociatedActiveSessions())
+ {
+ branch.clearAssociations();
+
+ if(branch.expired() || branch.getState() == DtxBranch.State.TIMEDOUT)
+ {
+ unregisterBranch(branch);
+ throw new TimeoutDtxException(id);
+ }
+ else if(branch.getState() != DtxBranch.State.ACTIVE
+ && branch.getState() != DtxBranch.State.ROLLBACK_ONLY)
+ {
+ throw new IncorrectDtxStateException("Cannot prepare a transaction in state "
+ + branch.getState(), id);
+ }
+ else
+ {
+ branch.prepare();
+ branch.setState(DtxBranch.State.PREPARED);
+ }
+ }
+ else
+ {
+ throw new IncorrectDtxStateException("Branch still has associated sessions", id);
+ }
+ }
+ }
+ else
+ {
+ throw new UnknownDtxBranchException(id);
+ }
+ }
+
+ public void rollback(Xid id)
+ throws IncorrectDtxStateException,
+ UnknownDtxBranchException,
+ AMQStoreException, TimeoutDtxException
+ {
+
+ DtxBranch branch = getBranch(id);
+ if(branch != null)
+ {
+ synchronized (branch)
+ {
+ if(branch.expired() || branch.getState() == DtxBranch.State.TIMEDOUT)
+ {
+ unregisterBranch(branch);
+ throw new TimeoutDtxException(id);
+ }
+ if(!branch.hasAssociatedActiveSessions())
+ {
+ branch.clearAssociations();
+ branch.rollback();
+ branch.setState(DtxBranch.State.FORGOTTEN);
+ unregisterBranch(branch);
+ }
+ else
+ {
+ throw new IncorrectDtxStateException("Branch was still associated with a session", id);
+ }
+ }
+ }
+ else
+ {
+ throw new UnknownDtxBranchException(id);
+ }
+ }
+
+
+ public void forget(Xid id) throws UnknownDtxBranchException, IncorrectDtxStateException
+ {
+ DtxBranch branch = getBranch(id);
+ if(branch != null)
+ {
+ synchronized (branch)
+ {
+ if(!branch.hasAssociatedSessions())
+ {
+ if(branch.getState() != DtxBranch.State.HEUR_COM && branch.getState() != DtxBranch.State.HEUR_RB)
+ {
+ throw new IncorrectDtxStateException("Branch should not be forgotten - "
+ + "it is not heuristically complete", id);
+ }
+ branch.setState(DtxBranch.State.FORGOTTEN);
+ unregisterBranch(branch);
+ }
+ else
+ {
+ throw new IncorrectDtxStateException("Branch was still associated with a session", id);
+ }
+ }
+ }
+ else
+ {
+ throw new UnknownDtxBranchException(id);
+ }
+ }
+
+ public long getTimeout(Xid id) throws UnknownDtxBranchException
+ {
+ DtxBranch branch = getBranch(id);
+ if(branch != null)
+ {
+ return branch.getTimeout();
+ }
+ else
+ {
+ throw new UnknownDtxBranchException(id);
+ }
+ }
+
+ public void setTimeout(Xid id, long timeout) throws UnknownDtxBranchException
+ {
+ DtxBranch branch = getBranch(id);
+ if(branch != null)
+ {
+ branch.setTimeout(timeout);
+ }
+ else
+ {
+ throw new UnknownDtxBranchException(id);
+ }
+ }
+
+ public synchronized List<Xid> recover()
+ {
+ List<Xid> inDoubt = new ArrayList<Xid>();
+ for(DtxBranch branch : _branches.values())
+ {
+ if(branch.getState() == DtxBranch.State.PREPARED)
+ {
+ inDoubt.add(branch.getXid());
+ }
+ }
+ return inDoubt;
+ }
+
+ public synchronized void endAssociations(AMQSessionModel session)
+ {
+ for(DtxBranch branch : _branches.values())
+ {
+ if(branch.isAssociated(session))
+ {
+ branch.setState(DtxBranch.State.ROLLBACK_ONLY);
+ branch.disassociateSession(session);
+ }
+ }
+ }
+
+
+ public synchronized void close()
+ {
+ for(DtxBranch branch : _branches.values())
+ {
+ branch.close();
+ }
+ _branches.clear();
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/IncorrectDtxStateException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/IncorrectDtxStateException.java
new file mode 100644
index 0000000000..45f094e4b9
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/IncorrectDtxStateException.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.transport.Xid;
+
+public class IncorrectDtxStateException extends DtxException
+{
+ public IncorrectDtxStateException(String message, Xid id)
+ {
+ super(message + " (xid: " + id + ")");
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JoinAndResumeDtxException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JoinAndResumeDtxException.java
new file mode 100644
index 0000000000..a25e5a4ed6
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JoinAndResumeDtxException.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.transport.Xid;
+
+public class JoinAndResumeDtxException extends DtxException
+{
+ public JoinAndResumeDtxException(Xid id)
+ {
+ super("Cannot start a branch with both join and resume set " + id);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NotAssociatedDtxException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NotAssociatedDtxException.java
new file mode 100644
index 0000000000..de070546a7
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NotAssociatedDtxException.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.transport.Xid;
+
+public class NotAssociatedDtxException extends DtxException
+{
+ public NotAssociatedDtxException(Xid id)
+ {
+ super("Xid " + id + " not associated with the current session");
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/RollbackOnlyDtxException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/RollbackOnlyDtxException.java
new file mode 100644
index 0000000000..6cf12d8631
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/RollbackOnlyDtxException.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.transport.Xid;
+
+public class RollbackOnlyDtxException extends DtxException
+{
+ public RollbackOnlyDtxException(Xid id)
+ {
+ super("Transaction " + id + " may only be rolled back");
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/SuspendAndFailDtxException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/SuspendAndFailDtxException.java
new file mode 100644
index 0000000000..228844fd63
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/SuspendAndFailDtxException.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.transport.Xid;
+
+public class SuspendAndFailDtxException extends DtxException
+{
+public SuspendAndFailDtxException(Xid id)
+{
+ super("Cannot end a branch with both suspend and fail set " + id);
+}
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TimeoutDtxException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TimeoutDtxException.java
new file mode 100644
index 0000000000..50f7708d8a
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TimeoutDtxException.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.transport.Xid;
+
+public class TimeoutDtxException extends DtxException
+{
+ public TimeoutDtxException(Xid id)
+ {
+ super("Transaction " + id + " has timed-out and may only be rolled back");
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/UnknownDtxBranchException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/UnknownDtxBranchException.java
new file mode 100644
index 0000000000..c23e518365
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/UnknownDtxBranchException.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.transport.Xid;
+
+public class UnknownDtxBranchException extends DtxException
+{
+ public UnknownDtxBranchException(Xid id)
+ {
+ super("Unknown xid " + id);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index afded3416e..cc573c0100 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ScheduledFuture;
import org.apache.qpid.common.Closeable;
import org.apache.qpid.server.binding.BindingFactory;
import org.apache.qpid.server.configuration.ConfigStore;
@@ -37,9 +40,7 @@ import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
-
-import java.util.Map;
-import java.util.UUID;
+import org.apache.qpid.server.txn.DtxRegistry;
public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable, StatisticsGatherer
{
@@ -96,5 +97,9 @@ public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHo
ConfigStore getConfigStore();
+ DtxRegistry getDtxRegistry();
+
void removeBrokerConnection(BrokerLink brokerLink);
+
+ ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index 266d23af97..2202efb5e3 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -20,9 +20,20 @@
*/
package org.apache.qpid.server.virtualhost;
-import org.apache.log4j.Logger;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.BindingFactory;
@@ -32,29 +43,27 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.AbstractServerMessageImpl;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.txn.DtxBranch;
+import org.apache.qpid.server.txn.DtxRegistry;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.transport.Binary;
+import org.apache.qpid.transport.Xid;
+import org.apache.qpid.transport.util.Functions;
import org.apache.qpid.util.ByteBufferInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.UUID;
-
public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHandler,
ConfigurationRecoveryHandler.QueueRecoveryHandler,
ConfigurationRecoveryHandler.ExchangeRecoveryHandler,
@@ -63,7 +72,8 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
MessageStoreRecoveryHandler,
MessageStoreRecoveryHandler.StoredMessageRecoveryHandler,
TransactionLogRecoveryHandler,
- TransactionLogRecoveryHandler.QueueEntryRecoveryHandler
+ TransactionLogRecoveryHandler.QueueEntryRecoveryHandler,
+ TransactionLogRecoveryHandler.DtxRecordRecoveryHandler
{
private static final Logger _logger = Logger.getLogger(VirtualHostConfigRecoveryHandler.class);
@@ -76,7 +86,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
private MessageStore _store;
private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>();
- private Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>();
+ private Map<Long, AbstractServerMessageImpl> _recoveredMessages = new HashMap<Long, AbstractServerMessageImpl>();
private Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>();
@@ -158,7 +168,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
public void message(StoredMessage message)
{
- ServerMessage serverMessage;
+ AbstractServerMessageImpl serverMessage;
switch(message.getMetaData().getType())
{
case META_DATA_0_8:
@@ -193,6 +203,164 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
{
}
+ public void dtxRecord(long format, byte[] globalId, byte[] branchId,
+ MessageStore.Transaction.Record[] enqueues,
+ MessageStore.Transaction.Record[] dequeues)
+ {
+ Xid id = new Xid(format, globalId, branchId);
+ DtxRegistry dtxRegistry = _virtualHost.getDtxRegistry();
+ DtxBranch branch = dtxRegistry.getBranch(id);
+ if(branch == null)
+ {
+ branch = new DtxBranch(id, _store, _virtualHost);
+ dtxRegistry.registerBranch(branch);
+ }
+ for(MessageStore.Transaction.Record record : enqueues)
+ {
+ final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getResourceName());
+ if(queue != null)
+ {
+ final long messageId = record.getMessage().getMessageNumber();
+ final AbstractServerMessageImpl message = _recoveredMessages.get(messageId);
+ _unusedMessages.remove(messageId);
+
+ if(message != null)
+ {
+ message.incrementReference();
+
+ branch.enqueue(queue,message);
+
+ branch.addPostTransactionAcion(new ServerTransaction.Action()
+ {
+
+ public void postCommit()
+ {
+ try
+ {
+
+ queue.enqueue(message, true, null);
+ message.decrementReference();
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Unable to enqueue message " + message.getMessageNumber() + " into " +
+ "queue " + queue.getName() + " (from XA transaction)", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void onRollback()
+ {
+ message.decrementReference();
+ }
+ });
+ }
+ else
+ {
+ StringBuilder xidString = xidAsString(id);
+ String messageNumberString = String.valueOf(message.getMessageNumber());
+ CurrentActor.get().message(_logSubject,
+ TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
+ messageNumberString));
+
+ }
+
+ }
+ else
+ {
+ StringBuilder xidString = xidAsString(id);
+ CurrentActor.get().message(_logSubject,
+ TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
+ queue.getName()));
+
+ }
+ }
+ for(MessageStore.Transaction.Record record : dequeues)
+ {
+ final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getResourceName());
+ if(queue != null)
+ {
+ final long messageId = record.getMessage().getMessageNumber();
+ final AbstractServerMessageImpl message = _recoveredMessages.get(messageId);
+ _unusedMessages.remove(messageId);
+
+ if(message != null)
+ {
+ final QueueEntry entry = queue.getMessageOnTheQueue(messageId);
+
+ entry.acquire();
+
+ branch.dequeue(queue, message);
+
+ branch.addPostTransactionAcion(new ServerTransaction.Action()
+ {
+
+ public void postCommit()
+ {
+ entry.discard();
+ }
+
+ public void onRollback()
+ {
+ entry.release();
+ }
+ });
+ }
+ else
+ {
+ StringBuilder xidString = xidAsString(id);
+ String messageNumberString = String.valueOf(message.getMessageNumber());
+ CurrentActor.get().message(_logSubject,
+ TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
+ messageNumberString));
+
+ }
+
+ }
+ else
+ {
+ StringBuilder xidString = xidAsString(id);
+ CurrentActor.get().message(_logSubject,
+ TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
+ queue.getName()));
+ }
+
+ }
+
+ try
+ {
+ branch.setState(DtxBranch.State.PREPARED);
+ branch.prePrepareTransaction();
+ }
+ catch (AMQStoreException e)
+ {
+ _logger.error("Unexpected database exception when attempting to prepare a recovered XA transaction " +
+ xidAsString(id), e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static StringBuilder xidAsString(Xid id)
+ {
+ return new StringBuilder("(")
+ .append(id.getFormat())
+ .append(',')
+ .append(Functions.str(id.getGlobalId()))
+ .append(',')
+ .append(Functions.str(id.getBranchId()))
+ .append(')');
+ }
+
+ public void completeDtxRecordRecovery()
+ {
+ for(StoredMessage m : _unusedMessages.values())
+ {
+ _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing....");
+ m.remove();
+ }
+ CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
+ }
+
private static final class ProcessAction
{
private final AMQQueue _queue;
@@ -349,15 +517,9 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
}
- public void completeQueueEntryRecovery()
+ public DtxRecordRecoveryHandler completeQueueEntryRecovery()
{
- for(StoredMessage m : _unusedMessages.values())
- {
- _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing....");
- m.remove();
- }
-
for(Map.Entry<String,Integer> entry : _queueRecoveries.entrySet())
{
CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey()));
@@ -365,7 +527,9 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true));
}
- CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
+
+
+ return this;
}
private static class DummyMessage implements EnqueableMessage
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index 51b60f7980..061f06f1d0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.util.concurrent.ScheduledFuture;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
@@ -66,6 +67,7 @@ import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin;
import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
@@ -95,6 +97,8 @@ public class VirtualHostImpl implements VirtualHost
private MessageStore _messageStore;
+ private DtxRegistry _dtxRegistry;
+
private VirtualHostMBean _virtualHostMBean;
private AMQBrokerManagerMBean _brokerMBean;
@@ -118,6 +122,7 @@ public class VirtualHostImpl implements VirtualHost
private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>();
private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5;
+
public IConnectionRegistry getConnectionRegistry()
{
return _connectionRegistry;
@@ -188,6 +193,7 @@ public class VirtualHostImpl implements VirtualHost
_broker = _appRegistry.getBroker();
_configuration = hostConfig;
_name = _configuration.getName();
+ _dtxRegistry = new DtxRegistry();
_id = _appRegistry.getConfigStore().createId();
@@ -351,6 +357,11 @@ public class VirtualHostImpl implements VirtualHost
TimeUnit.MILLISECONDS);
}
+ public ScheduledFuture<?> scheduleTask(long delay, Runnable task)
+ {
+ return _houseKeepingTasks.schedule(task, delay, TimeUnit.MILLISECONDS);
+ }
+
public long getHouseKeepingTaskCount()
{
return _houseKeepingTasks.getTaskCount();
@@ -617,6 +628,11 @@ public class VirtualHostImpl implements VirtualHost
}
}
+ if(_dtxRegistry != null)
+ {
+ _dtxRegistry.close();
+ }
+
//Close MessageStore
if (_messageStore != null)
{
@@ -784,6 +800,11 @@ public class VirtualHostImpl implements VirtualHost
return getApplicationRegistry().getConfigStore();
}
+ public DtxRegistry getDtxRegistry()
+ {
+ return _dtxRegistry;
+ }
+
/**
* Temporary Startup RT class to record the creation of persistent queues / exchanges.
*
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
index 88b9acffe0..09d865cb05 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
@@ -146,6 +146,14 @@ public class SkeletonMessageStore implements MessageStore
{
}
+
+ public void removeXid(long format, byte[] globalId, byte[] branchId)
+ {
+ }
+
+ public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
+ {
+ }
};
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
index e409734a17..104e06d29a 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
@@ -100,6 +100,14 @@ public class TestableMemoryMessageStore extends MemoryMessageStore
public void abortTran() throws AMQStoreException
{
}
+
+ public void removeXid(long format, byte[] globalId, byte[] branchId)
+ {
+ }
+
+ public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
+ {
+ }
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
index fa0bb5be8b..801549e561 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
@@ -112,6 +112,14 @@ class MockStoreTransaction implements Transaction
_state = TransactionState.ABORTED;
}
+ public void removeXid(long format, byte[] globalId, byte[] branchId)
+ {
+ }
+
+ public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
+ {
+ }
+
public static MessageStore createTestTransactionLog(final MockStoreTransaction storeTransaction)
{
return new MessageStore()
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
index 5fe664acdd..af742532e2 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.util.concurrent.ScheduledFuture;
import org.apache.qpid.server.binding.BindingFactory;
import org.apache.qpid.server.configuration.BrokerConfig;
import org.apache.qpid.server.configuration.ConfigStore;
@@ -39,6 +40,7 @@ import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.DtxRegistry;
import java.util.Map;
import java.util.UUID;
@@ -94,6 +96,11 @@ public class MockVirtualHost implements VirtualHost
return null;
}
+ public DtxRegistry getDtxRegistry()
+ {
+ return null;
+ }
+
public VirtualHostConfiguration getConfiguration()
{
return null;
@@ -170,6 +177,11 @@ public class MockVirtualHost implements VirtualHost
}
+ public ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask)
+ {
+ return null;
+ }
+
public void scheduleHouseKeepingTask(long period, HouseKeepingTask task)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
index 128aa18d30..0bf0b012ff 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
@@ -308,13 +308,16 @@ public class XAResourceImpl implements XAResource
_xaSession.createSession();
convertExecutionErrorToXAErr( e.getException().getErrorCode());
}
- Xid[] result = new Xid[res.getInDoubt().size()];
- int i = 0;
- for (Object obj : res.getInDoubt())
- {
- org.apache.qpid.transport.Xid xid = (org.apache.qpid.transport.Xid) obj;
- result[i] = new XidImpl(xid.getBranchId(), (int) xid.getFormat(), xid.getGlobalId());
- i++;
+ Xid[] result = new Xid[res.getInDoubt() != null ? res.getInDoubt().size() : 0];
+ if(result.length != 0)
+ {
+ int i = 0;
+ for (Object obj : res.getInDoubt())
+ {
+ org.apache.qpid.transport.Xid xid = (org.apache.qpid.transport.Xid) obj;
+ result[i] = new XidImpl(xid.getBranchId(), (int) xid.getFormat(), xid.getGlobalId());
+ i++;
+ }
}
return result;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
index a38c83d4cb..2b93697bfc 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
@@ -25,6 +25,7 @@ import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.Struct;
import org.apache.qpid.transport.Type;
+import org.apache.qpid.transport.Xid;
import static org.apache.qpid.transport.util.Functions.lsb;
import java.io.UnsupportedEncodingException;
@@ -61,6 +62,7 @@ abstract class AbstractEncoder implements Encoder
ENCODINGS.put(Character.class, Type.CHAR);
ENCODINGS.put(byte[].class, Type.VBIN32);
ENCODINGS.put(UUID.class, Type.UUID);
+ ENCODINGS.put(Xid.class, Type.STRUCT32);
}
private final Map<String,byte[]> str8cache = new LinkedHashMap<String,byte[]>()
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java
index f337908134..c8ffe8571d 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java
@@ -43,7 +43,7 @@ public class XAResourceTest extends QpidBrokerTestCase
public void testIsSameRMSingleCF() throws Exception
{
XAConnectionFactory factory = getConnectionFactory(FACTORY_NAME);
- XAConnection conn = factory.createXAConnection();
+ XAConnection conn = factory.createXAConnection("guest","guest");
XASession session = conn.createXASession();
XAResource xaResource1 = session.getXAResource();
XAResource xaResource2 = session.getXAResource();
@@ -68,9 +68,9 @@ public class XAResourceTest extends QpidBrokerTestCase
XAConnectionFactory factory2 = new AMQConnectionFactory(url);
XAConnectionFactory factory3 = getConnectionFactory(ALT_FACTORY_NAME);
- XAConnection conn = factory.createXAConnection();
- XAConnection conn2 = factory2.createXAConnection();
- XAConnection conn3 = factory3.createXAConnection();
+ XAConnection conn = factory.createXAConnection("guest","guest");
+ XAConnection conn2 = factory2.createXAConnection("guest","guest");
+ XAConnection conn3 = factory3.createXAConnection("guest","guest");
XASession session = conn.createXASession();
XASession session2 = conn2.createXASession();
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index 54abb76b6d..8ffc09930e 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -313,6 +313,17 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
_underlying.abortTran();
doPostDelay("abortTran");
}
+
+ public void removeXid(long format, byte[] globalId, byte[] branchId) throws AMQStoreException
+ {
+ _underlying.removeXid(format, globalId, branchId);
+ }
+
+ public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
+ throws AMQStoreException
+ {
+ _underlying.recordXid(format, globalId, branchId, enqueues, dequeues);
+ }
}
public void updateQueue(AMQQueue queue) throws AMQStoreException
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java
index 86ba5c2cb7..e940a73bbb 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java
@@ -118,7 +118,7 @@ public class FaultTest extends AbstractXATestCase
_queueFactory = getConnectionFactory();
_xaqueueConnection = _queueFactory.createXAQueueConnection("guest", "guest");
XAQueueSession session = _xaqueueConnection.createXAQueueSession();
- _queueConnection = _queueFactory.createQueueConnection();
+ _queueConnection = _queueFactory.createQueueConnection("guest","guest");
_nonXASession = _queueConnection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE);
init(session, _queue);
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/QueueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/QueueTest.java
index eb19504dec..3fbe76323a 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/QueueTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/QueueTest.java
@@ -162,11 +162,12 @@ public class QueueTest extends AbstractXATestCase
// create a standard session
try
{
- _queueConnection = _queueFactory.createQueueConnection();
+ _queueConnection = _queueFactory.createQueueConnection("guest", "guest");
_nonXASession = _queueConnection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE);
}
catch (JMSException e)
{
+ e.printStackTrace();
fail("cannot create queue session: " + e.getMessage());
}
init(session, _queue);
@@ -638,7 +639,8 @@ public class QueueTest extends AbstractXATestCase
TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000);
if (message1 != null)
{
- fail("The queue is not empty! ");
+
+ fail("The queue is not empty! " + message1.getLongProperty(_sequenceNumberPropertyName));
}
}
catch (JMSException e)
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/TopicTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/TopicTest.java
index 3a6d573205..d955979ad6 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/TopicTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/TopicTest.java
@@ -18,6 +18,7 @@
package org.apache.qpid.test.unit.xa;
import junit.framework.TestSuite;
+import org.apache.qpid.configuration.ClientProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -25,6 +26,8 @@ import javax.jms.*;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -103,7 +106,7 @@ public class TopicTest extends AbstractXATestCase
}
catch (Exception e)
{
- fail("Exception thrown when cleaning standard connection: " + e.getStackTrace());
+ fail("Exception thrown when cleaning standard connection: " + e);
}
}
super.tearDown();
@@ -116,6 +119,7 @@ public class TopicTest extends AbstractXATestCase
{
if (!isBroker08())
{
+ setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "1");
// lookup test queue
try
{
@@ -650,7 +654,12 @@ public class TopicTest extends AbstractXATestCase
{
message = (TextMessage) xaDurSub.receive(1000);
- _logger.debug(" received message: " + message.getLongProperty(_sequenceNumberPropertyName));
+
+ if(message != null)
+ {
+ _logger.debug(" received message: " + message.getLongProperty(_sequenceNumberPropertyName));
+ }
+
if (message == null)
{
fail("no message received! expected: " + i);
@@ -882,35 +891,40 @@ public class TopicTest extends AbstractXATestCase
// receive 3 message within tx1: 3, 4 and 7
_xaResource.start(xid1, XAResource.TMRESUME);
// receive messages 3, 4 and 7
+ Set<Long> expected = new HashSet<Long>();
+ expected.add(3L);
+ expected.add(4L);
+ expected.add(7L);
message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
- fail("no message received! expected: " + 3);
+ fail("no message received! expected one of: " + expected);
}
- else if (message.getLongProperty(_sequenceNumberPropertyName) != 3)
+ else if (!expected.remove(message.getLongProperty(_sequenceNumberPropertyName)))
{
fail("wrong sequence number: " + message
- .getLongProperty(_sequenceNumberPropertyName) + " 3 was expected");
+ .getLongProperty(_sequenceNumberPropertyName) + " expected one from " + expected);
}
message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
- fail("no message received! expected: " + 4);
+ fail("no message received! expected one of: " + expected);
}
- else if (message.getLongProperty(_sequenceNumberPropertyName) != 4)
+ else if (!expected.remove(message.getLongProperty(_sequenceNumberPropertyName)))
{
+
fail("wrong sequence number: " + message
- .getLongProperty(_sequenceNumberPropertyName) + " 4 was expected");
+ .getLongProperty(_sequenceNumberPropertyName) + " expected one from " + expected);
}
message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
- fail("no message received! expected: " + 7);
+ fail("no message received! expected one of: " + expected);
}
- else if (message.getLongProperty(_sequenceNumberPropertyName) != 7)
+ else if (!expected.remove(message.getLongProperty(_sequenceNumberPropertyName)))
{
fail("wrong sequence number: " + message
- .getLongProperty(_sequenceNumberPropertyName) + " 7 was expected");
+ .getLongProperty(_sequenceNumberPropertyName) + " expected one from " + expected);
}
}
catch (Exception e)
@@ -936,8 +950,18 @@ public class TopicTest extends AbstractXATestCase
try
{
- // consume messages 1 - 4
- //----- start xid1
+ // consume messages: could be any from (1 - 4, 7-10)
+ //----- start xid4
+ Set<Long> expected = new HashSet<Long>();
+ Set<Long> xid4msgs = new HashSet<Long>();
+ for(long l = 1; l <= 4l; l++)
+ {
+ expected.add(l);
+ }
+ for(long l = 7; l <= 10l; l++)
+ {
+ expected.add(l);
+ }
_xaResource.start(xid4, XAResource.TMNOFLAGS);
for (int i = 1; i <= 4; i++)
{
@@ -946,9 +970,14 @@ public class TopicTest extends AbstractXATestCase
{
fail("no message received! expected: " + i);
}
- else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
+
+ long seqNo = message.getLongProperty(_sequenceNumberPropertyName);
+ xid4msgs.add(seqNo);
+
+ if (!expected.remove(seqNo))
{
- fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
+ fail("wrong sequence number: " + seqNo +
+ " expected one from " + expected);
}
}
_xaResource.end(xid4, XAResource.TMSUSPEND);
@@ -961,15 +990,17 @@ public class TopicTest extends AbstractXATestCase
{
fail("no message received! expected: " + i);
}
- else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
+ else if (!expected.remove(message.getLongProperty(_sequenceNumberPropertyName)))
{
- fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
+ fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)
+ + " expected one from " + expected);
}
}
_xaResource.end(xid5, XAResource.TMSUSPEND);
// abort tx4
_xaResource.prepare(xid4);
_xaResource.rollback(xid4);
+ expected.addAll(xid4msgs);
// consume messages 1-4 with tx5
_xaResource.start(xid5, XAResource.TMRESUME);
for (int i = 1; i <= 4; i++)
@@ -979,13 +1010,15 @@ public class TopicTest extends AbstractXATestCase
{
fail("no message received! expected: " + i);
}
- else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
+ else if (!expected.remove(message.getLongProperty(_sequenceNumberPropertyName)))
{
- fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
+ fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)
+ + " expected one from " + expected);
}
}
_xaResource.end(xid5, XAResource.TMSUSPEND);
// commit tx5
+
_xaResource.prepare(xid5);
_xaResource.commit(xid5, false);
}
@@ -1602,6 +1635,7 @@ public class TopicTest extends AbstractXATestCase
}
_xaResource.end(xid2, XAResource.TMSUCCESS);
_xaResource.commit(xid2, true);
+ _session.close();
}
catch (Exception e)
{
diff --git a/qpid/java/test-profiles/JavaExcludes b/qpid/java/test-profiles/JavaExcludes
index e123d02918..ce0016fcff 100644
--- a/qpid/java/test-profiles/JavaExcludes
+++ b/qpid/java/test-profiles/JavaExcludes
@@ -28,9 +28,6 @@ org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy
//Moved from JavaStandaloneExcludes when it was removed
///////////////////////////////////////////////////////
-//XA functionality is not fully implemented yet
-org.apache.qpid.jms.xa.XAResourceTest#*
-
//The Java broker doesnt support client auth
org.apache.qpid.client.ssl.SSLTest#testMultipleCertsInSingleStore
diff --git a/qpid/java/test-profiles/JavaPre010Excludes b/qpid/java/test-profiles/JavaPre010Excludes
index ada22638be..c05f7b51f4 100644
--- a/qpid/java/test-profiles/JavaPre010Excludes
+++ b/qpid/java/test-profiles/JavaPre010Excludes
@@ -61,3 +61,7 @@ org.apache.qpid.test.unit.client.temporaryqueue.TemporaryQueueTest#testTemporary
// QPID-3604 This fix is applied only to the 0-10 code, hence this test does not work for pre 0-10.
org.apache.qpid.client.prefetch.PrefetchBehaviourTest#testConnectionStop
+
+//XA functionality is not implemented in pre 0-10
+org.apache.qpid.jms.xa.XAResourceTest#*
+
diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes
index 7cc541c8b9..cf11e99e89 100644
--- a/qpid/java/test-profiles/JavaTransientExcludes
+++ b/qpid/java/test-profiles/JavaTransientExcludes
@@ -24,6 +24,13 @@ org.apache.qpid.test.unit.ack.ClientAcknowledgeTest#testClientAckWithLargeFlushe
org.apache.qpid.test.unit.ct.DurableSubscriberTest#*
+org.apache.qpid.test.unit.xa.QueueTest#testRecover
+org.apache.qpid.test.unit.xa.QueueTest#testSendAndRecover
+org.apache.qpid.test.unit.xa.TopicTest#testMultiMessagesDurSubCrash
+org.apache.qpid.test.unit.xa.TopicTest#testDurSubCrash
+org.apache.qpid.test.unit.xa.TopicTest#testRecover
+
+
org.apache.qpid.server.store.MessageStoreTest#testMessagePersistence
org.apache.qpid.server.store.MessageStoreTest#testMessageRemoval
org.apache.qpid.server.store.MessageStoreTest#testBindingPersistence
diff --git a/qpid/java/test-profiles/java-bdb-spawn.0-8.testprofile b/qpid/java/test-profiles/java-bdb-spawn.0-8.testprofile
index f9837cc4f3..011fd9fe91 100644
--- a/qpid/java/test-profiles/java-bdb-spawn.0-8.testprofile
+++ b/qpid/java/test-profiles/java-bdb-spawn.0-8.testprofile
@@ -24,7 +24,7 @@ broker.ready=BRK-1004
broker.stopped=Exception
broker.config=build/etc/config-systests-bdb.xml
messagestore.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStore
-profile.excludes=JavaExcludes JavaPersistentExcludes JavaPre010Excludes JavaBDBExcludes
+profile.excludes=JavaExcludes JavaPersistentExcludes XAExcludes JavaPre010Excludes JavaBDBExcludes
broker.clean.between.tests=true
broker.persistent=true
broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT --exclude-0-9-1 @PORT --exclude-0-9-1 @SSL_PORT --exclude-0-9 @PORT --exclude-0-9 @SSL_PORT
diff --git a/qpid/java/test-profiles/java-bdb-spawn.0-9-1.testprofile b/qpid/java/test-profiles/java-bdb-spawn.0-9-1.testprofile
index b04fea21b6..7d178d958f 100644
--- a/qpid/java/test-profiles/java-bdb-spawn.0-9-1.testprofile
+++ b/qpid/java/test-profiles/java-bdb-spawn.0-9-1.testprofile
@@ -24,7 +24,7 @@ broker.ready=BRK-1004
broker.stopped=Exception
broker.config=build/etc/config-systests-bdb.xml
messagestore.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStore
-profile.excludes=JavaExcludes JavaPersistentExcludes JavaPre010Excludes JavaBDBExcludes
+profile.excludes=JavaExcludes JavaPersistentExcludes XAExcludes JavaPre010Excludes JavaBDBExcludes
broker.clean.between.tests=true
broker.persistent=true
broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT
diff --git a/qpid/java/test-profiles/java-bdb-spawn.0-9.testprofile b/qpid/java/test-profiles/java-bdb-spawn.0-9.testprofile
index 03082b6f19..f5c321dc02 100644
--- a/qpid/java/test-profiles/java-bdb-spawn.0-9.testprofile
+++ b/qpid/java/test-profiles/java-bdb-spawn.0-9.testprofile
@@ -24,7 +24,7 @@ broker.ready=BRK-1004
broker.stopped=Exception
broker.config=build/etc/config-systests-bdb.xml
messagestore.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStore
-profile.excludes=JavaExcludes JavaPersistentExcludes JavaPre010Excludes JavaBDBExcludes
+profile.excludes=JavaExcludes JavaPersistentExcludes XAExcludes JavaPre010Excludes JavaBDBExcludes
broker.clean.between.tests=true
broker.persistent=true
broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT --exclude-0-9-1 @PORT --exclude-0-9-1 @SSL_PORT
diff --git a/qpid/java/test-profiles/java-bdb.0-8.testprofile b/qpid/java/test-profiles/java-bdb.0-8.testprofile
index ba6cc0fa80..322168064f 100644
--- a/qpid/java/test-profiles/java-bdb.0-8.testprofile
+++ b/qpid/java/test-profiles/java-bdb.0-8.testprofile
@@ -25,7 +25,7 @@ broker.ready=BRK-1004
broker.stopped=Exception
broker.config=build/etc/config-systests-bdb.xml
messagestore.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStore
-profile.excludes=JavaExcludes JavaPersistentExcludes JavaPre010Excludes JavaBDBExcludes
+profile.excludes=JavaExcludes JavaPersistentExcludes XAExcludes JavaPre010Excludes JavaBDBExcludes
broker.clean.between.tests=true
broker.persistent=true
broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT --exclude-0-9-1 @PORT --exclude-0-9-1 @SSL_PORT --exclude-0-9 @PORT --exclude-0-9 @SSL_PORT
diff --git a/qpid/java/test-profiles/java-bdb.0-9-1.testprofile b/qpid/java/test-profiles/java-bdb.0-9-1.testprofile
index 101d38f4b9..f0b34c5f67 100644
--- a/qpid/java/test-profiles/java-bdb.0-9-1.testprofile
+++ b/qpid/java/test-profiles/java-bdb.0-9-1.testprofile
@@ -25,7 +25,7 @@ broker.ready=BRK-1004
broker.stopped=Exception
broker.config=build/etc/config-systests-bdb.xml
messagestore.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStore
-profile.excludes=JavaExcludes JavaPersistentExcludes JavaPre010Excludes JavaBDBExcludes
+profile.excludes=JavaExcludes JavaPersistentExcludes XAExcludes JavaPre010Excludes JavaBDBExcludes
broker.clean.between.tests=true
broker.persistent=true
broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT
diff --git a/qpid/java/test-profiles/java-bdb.0-9.testprofile b/qpid/java/test-profiles/java-bdb.0-9.testprofile
index daa61253bb..1ac4ef74c0 100644
--- a/qpid/java/test-profiles/java-bdb.0-9.testprofile
+++ b/qpid/java/test-profiles/java-bdb.0-9.testprofile
@@ -25,7 +25,7 @@ broker.ready=BRK-1004
broker.stopped=Exception
broker.config=build/etc/config-systests-bdb.xml
messagestore.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStore
-profile.excludes=JavaExcludes JavaPersistentExcludes JavaPre010Excludes JavaBDBExcludes
+profile.excludes=JavaExcludes JavaPersistentExcludes XAExcludes JavaPre010Excludes JavaBDBExcludes
broker.clean.between.tests=true
broker.persistent=true
broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT --exclude-0-9-1 @PORT --exclude-0-9-1 @SSL_PORT
diff --git a/qpid/java/test-profiles/java-dby-spawn.0-8.testprofile b/qpid/java/test-profiles/java-dby-spawn.0-8.testprofile
index c186207f6a..3a6023e78f 100644
--- a/qpid/java/test-profiles/java-dby-spawn.0-8.testprofile
+++ b/qpid/java/test-profiles/java-dby-spawn.0-8.testprofile
@@ -25,7 +25,7 @@ broker.stopped=Exception
broker.config=build/etc/config-systests-derby.xml
broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT --exclude-0-9-1 @PORT --exclude-0-9-1 @SSL_PORT --exclude-0-9 @PORT --exclude-0-9 @SSL_PORT
messagestore.class.name=org.apache.qpid.server.store.DerbyMessageStore
-profile.excludes=JavaPersistentExcludes JavaDerbyExcludes JavaPre010Excludes
+profile.excludes=JavaPersistentExcludes JavaDerbyExcludes XAExcludes JavaPre010Excludes
broker.clean.between.tests=true
broker.persistent=true
#
diff --git a/qpid/java/test-profiles/java-dby-spawn.0-9-1.testprofile b/qpid/java/test-profiles/java-dby-spawn.0-9-1.testprofile
index e7212d30f8..0e910e8ce5 100644
--- a/qpid/java/test-profiles/java-dby-spawn.0-9-1.testprofile
+++ b/qpid/java/test-profiles/java-dby-spawn.0-9-1.testprofile
@@ -25,7 +25,7 @@ broker.stopped=Exception
broker.config=build/etc/config-systests-derby.xml
broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT
messagestore.class.name=org.apache.qpid.server.store.DerbyMessageStore
-profile.excludes=JavaPersistentExcludes JavaDerbyExcludes JavaPre010Excludes
+profile.excludes=JavaPersistentExcludes JavaDerbyExcludes XAExcludes JavaPre010Excludes
broker.clean.between.tests=true
broker.persistent=true
#
diff --git a/qpid/java/test-profiles/java-dby-spawn.0-9.testprofile b/qpid/java/test-profiles/java-dby-spawn.0-9.testprofile
index bd5df9ee82..232586623c 100644
--- a/qpid/java/test-profiles/java-dby-spawn.0-9.testprofile
+++ b/qpid/java/test-profiles/java-dby-spawn.0-9.testprofile
@@ -25,7 +25,7 @@ broker.stopped=Exception
broker.config=build/etc/config-systests-derby.xml
broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT --exclude-0-9-1 @PORT --exclude-0-9-1 @SSL_PORT
messagestore.class.name=org.apache.qpid.server.store.DerbyMessageStore
-profile.excludes=JavaPersistentExcludes JavaDerbyExcludes JavaPre010Excludes
+profile.excludes=JavaPersistentExcludes JavaDerbyExcludes XAExcludes JavaPre010Excludes
broker.clean.between.tests=true
broker.persistent=true
#
diff --git a/qpid/java/test-profiles/java-dby.0-8.testprofile b/qpid/java/test-profiles/java-dby.0-8.testprofile
index e4e00c70c2..161c3dcfe2 100644
--- a/qpid/java/test-profiles/java-dby.0-8.testprofile
+++ b/qpid/java/test-profiles/java-dby.0-8.testprofile
@@ -26,7 +26,7 @@ broker.stopped=Exception
broker.config=build/etc/config-systests-derby.xml
broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT --exclude-0-9-1 @PORT --exclude-0-9-1 @SSL_PORT --exclude-0-9 @PORT --exclude-0-9 @SSL_PORT
messagestore.class.name=org.apache.qpid.server.store.DerbyMessageStore
-profile.excludes=JavaPersistentExcludes JavaDerbyExcludes JavaPre010Excludes
+profile.excludes=JavaPersistentExcludes JavaDerbyExcludes XAExcludes JavaPre010Excludes
broker.clean.between.tests=true
broker.persistent=true
#
diff --git a/qpid/java/test-profiles/java-dby.0-9-1.testprofile b/qpid/java/test-profiles/java-dby.0-9-1.testprofile
index 83f43d8dbf..b5561782a7 100644
--- a/qpid/java/test-profiles/java-dby.0-9-1.testprofile
+++ b/qpid/java/test-profiles/java-dby.0-9-1.testprofile
@@ -26,7 +26,7 @@ broker.stopped=Exception
broker.config=build/etc/config-systests-derby.xml
broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT
messagestore.class.name=org.apache.qpid.server.store.DerbyMessageStore
-profile.excludes=JavaPersistentExcludes JavaDerbyExcludes JavaPre010Excludes
+profile.excludes=JavaPersistentExcludes JavaDerbyExcludes XAExcludes JavaPre010Excludes
broker.clean.between.tests=true
broker.persistent=true
#
diff --git a/qpid/java/test-profiles/java-dby.0-9.testprofile b/qpid/java/test-profiles/java-dby.0-9.testprofile
index b6d1840f9f..289554e618 100644
--- a/qpid/java/test-profiles/java-dby.0-9.testprofile
+++ b/qpid/java/test-profiles/java-dby.0-9.testprofile
@@ -26,7 +26,7 @@ broker.stopped=Exception
broker.config=build/etc/config-systests-derby.xml
broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT --exclude-0-9-1 @PORT --exclude-0-9-1 @SSL_PORT
messagestore.class.name=org.apache.qpid.server.store.DerbyMessageStore
-profile.excludes=JavaPersistentExcludes JavaDerbyExcludes JavaPre010Excludes
+profile.excludes=JavaPersistentExcludes JavaDerbyExcludes XAExcludes JavaPre010Excludes
broker.clean.between.tests=true
broker.persistent=true
#
diff --git a/qpid/java/test-profiles/java-mms-spawn.0-8.testprofile b/qpid/java/test-profiles/java-mms-spawn.0-8.testprofile
index e46823c3f1..b075890e58 100644
--- a/qpid/java/test-profiles/java-mms-spawn.0-8.testprofile
+++ b/qpid/java/test-profiles/java-mms-spawn.0-8.testprofile
@@ -27,4 +27,4 @@ broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT --exclude
# Do not enable. Allow client to attempt 0-10 and negotiate downwards
#
#qpid.amqp.version=0-8
-profile.excludes=JavaTransientExcludes JavaPre010Excludes
+profile.excludes=JavaTransientExcludes XAExcludes JavaPre010Excludes
diff --git a/qpid/java/test-profiles/java-mms-spawn.0-9-1.testprofile b/qpid/java/test-profiles/java-mms-spawn.0-9-1.testprofile
index 05b1f89452..7f80dc743a 100644
--- a/qpid/java/test-profiles/java-mms-spawn.0-9-1.testprofile
+++ b/qpid/java/test-profiles/java-mms-spawn.0-9-1.testprofile
@@ -27,4 +27,4 @@ broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT
# Do not enable. Allow client to attempt 0-10 and negotiate downwards
#
#qpid.amqp.version=0-91
-profile.excludes=JavaTransientExcludes JavaPre010Excludes
+profile.excludes=JavaTransientExcludes XAExcludes JavaPre010Excludes
diff --git a/qpid/java/test-profiles/java-mms-spawn.0-9.testprofile b/qpid/java/test-profiles/java-mms-spawn.0-9.testprofile
index d53a9f42ad..57e670ec31 100644
--- a/qpid/java/test-profiles/java-mms-spawn.0-9.testprofile
+++ b/qpid/java/test-profiles/java-mms-spawn.0-9.testprofile
@@ -27,4 +27,4 @@ broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT --exclude
# Do not enable. Allow client to attempt 0-10 and negotiate downwards
#
#qpid.amqp.version=0-9
-profile.excludes=JavaTransientExcludes JavaPre010Excludes
+profile.excludes=JavaTransientExcludes XAExcludes JavaPre010Excludes
diff --git a/qpid/java/test-profiles/java-mms.0-8.testprofile b/qpid/java/test-profiles/java-mms.0-8.testprofile
index 02723ab180..d7ef32ae3b 100644
--- a/qpid/java/test-profiles/java-mms.0-8.testprofile
+++ b/qpid/java/test-profiles/java-mms.0-8.testprofile
@@ -28,4 +28,4 @@ broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT --exclude
# Do not enable. Allow client to attempt 0-10 and negotiate downwards
#
#qpid.amqp.version=0-8
-profile.excludes=JavaTransientExcludes JavaPre010Excludes
+profile.excludes=JavaTransientExcludes XAExcludes JavaPre010Excludes
diff --git a/qpid/java/test-profiles/java-mms.0-9-1.testprofile b/qpid/java/test-profiles/java-mms.0-9-1.testprofile
index 37efa097bb..a2dc90bc63 100644
--- a/qpid/java/test-profiles/java-mms.0-9-1.testprofile
+++ b/qpid/java/test-profiles/java-mms.0-9-1.testprofile
@@ -28,4 +28,4 @@ broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT
# Do not enable. Allow client to attempt 0-10 and negotiate downwards
#
#qpid.amqp.version=0-91
-profile.excludes=JavaTransientExcludes JavaPre010Excludes
+profile.excludes=JavaTransientExcludes XAExcludes JavaPre010Excludes
diff --git a/qpid/java/test-profiles/java-mms.0-9.testprofile b/qpid/java/test-profiles/java-mms.0-9.testprofile
index 36ee470e09..398a9ac4d0 100644
--- a/qpid/java/test-profiles/java-mms.0-9.testprofile
+++ b/qpid/java/test-profiles/java-mms.0-9.testprofile
@@ -28,4 +28,4 @@ broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT --exclude
# Do not enable. Allow client to attempt 0-10 and negotiate downwards
#
#qpid.amqp.version=0-9
-profile.excludes=JavaTransientExcludes JavaPre010Excludes
+profile.excludes=JavaTransientExcludes XAExcludes JavaPre010Excludes
diff --git a/qpid/java/test-profiles/python_tests/Java010PythonExcludes b/qpid/java/test-profiles/python_tests/Java010PythonExcludes
index e6df44ab15..fd40ebd858 100644
--- a/qpid/java/test-profiles/python_tests/Java010PythonExcludes
+++ b/qpid/java/test-profiles/python_tests/Java010PythonExcludes
@@ -19,9 +19,6 @@
###### Feature not supported in Java Broker ######
-#The broker does not support DTX
-qpid_tests.broker_0_10.dtx.*
-
#The broker does not have the appropriate QMF support
qpid_tests.broker_0_10.management.*
qpid_tests.broker_0_10.stats.BrokerStatsTests.*
@@ -53,6 +50,13 @@ qpid_tests.broker_0_10.message.MessageTests.test_ack
qpid_tests.broker_0_10.message.MessageTests.test_acquire
qpid_tests.broker_0_10.message.MessageTests.test_acquire_with_no_accept_and_credit_flow
+qpid_tests.broker_0_10.dtx.DtxTests.test_forget_xid_on_completion
+qpid_tests.broker_0_10.dtx.DtxTests.test_implicit_end
+qpid_tests.broker_0_10.dtx.DtxTests.test_simple_commit
+qpid_tests.broker_0_10.dtx.DtxTests.test_simple_prepare_commit
+qpid_tests.broker_0_10.dtx.DtxTests.test_simple_prepare_rollback
+qpid_tests.broker_0_10.dtx.DtxTests.test_simple_rollback
+
###### Java Broker defects ######
diff --git a/qpid/java/test-profiles/testprofile.defaults b/qpid/java/test-profiles/testprofile.defaults
index bd2faa7915..b0c1aea661 100644
--- a/qpid/java/test-profiles/testprofile.defaults
+++ b/qpid/java/test-profiles/testprofile.defaults
@@ -47,7 +47,7 @@ test.port.alt.ssl=25671
test.exclude=true
profile.excludes=
-test.excludes=Excludes XAExcludes JavaExcludes ${profile}.excludes ${profile.excludes}
+test.excludes=Excludes JavaExcludes ${profile}.excludes ${profile.excludes}
test.mem=512M
test=*Test
haltonfailure=no