summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2007-04-05 16:37:40 +0000
committerRobert Godfrey <rgodfrey@apache.org>2007-04-05 16:37:40 +0000
commita153c3431ab54bc7f25e9129b70c73d7220a41d0 (patch)
treed2f5d87d9f1e67b81eb2330f64cee25cc5672f14
parent70036a3ac7d3380d559a95921b1092c81e2c23cd (diff)
downloadqpid-python-a153c3431ab54bc7f25e9129b70c73d7220a41d0.tar.gz
QPID-443 : Fix to transactionality of message publishing
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@525862 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/Event.java23
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java12
3 files changed, 42 insertions, 1 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index 451ff14adf..74cd2a3fc1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -89,6 +89,11 @@ public class LocalTransactionalContext implements TransactionalContext
public void rollback() throws AMQException
{
_txnBuffer.rollback(_storeContext);
+ // Hack to deal with uncommitted non-transactional writes
+ if(_messageStore.inTran(_storeContext))
+ {
+ _messageStore.abortTran(_storeContext);
+ }
_postCommitDeliveryList.clear();
}
@@ -103,6 +108,7 @@ public class LocalTransactionalContext implements TransactionalContext
// message.incrementReference();
_postCommitDeliveryList.add(new DeliveryDetails(message, queue, deliverFirst));
_messageDelivered = true;
+ _txnBuffer.enlist(new CleanupMessageOperation(message, _returnMessages));
/*_txnBuffer.enlist(new DeliverMessageOperation(message, queue));
if (_log.isDebugEnabled())
{
@@ -111,7 +117,7 @@ public class LocalTransactionalContext implements TransactionalContext
}
message.incrementReference();
_messageDelivered = true;
- _txnBuffer.enlist(new CleanupMessageOperation(message, _returnMessages));
+
*/
}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/Event.java b/java/common/src/main/java/org/apache/qpid/pool/Event.java
index 09890a103d..7300ec8c3f 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/Event.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/Event.java
@@ -85,4 +85,27 @@ abstract public class Event
}
+
+ public static final class CloseEvent extends Event
+ {
+ private final IoFilter.NextFilter _nextFilter;
+
+ public CloseEvent(final IoFilter.NextFilter nextFilter)
+ {
+ super();
+ _nextFilter = nextFilter;
+ }
+
+
+ public void process(IoSession session)
+ {
+ _nextFilter.sessionClosed(session);
+ }
+
+ public IoFilter.NextFilter getNextFilter()
+ {
+ return _nextFilter;
+ }
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
index 8126ca4bc8..552ecf6b66 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.pool;
+import org.apache.qpid.pool.Event.CloseEvent;
+
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -206,6 +208,10 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH
fireAsynchEvent(session, new Event.ReceivedEvent(nextFilter, message));
}
+ public void sessionClosed(final NextFilter nextFilter, final IoSession session) throws Exception
+ {
+ fireAsynchEvent(session, new CloseEvent(nextFilter));
+ }
}
@@ -224,6 +230,12 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH
fireAsynchEvent(session, new Event.WriteEvent(nextFilter, writeRequest));
}
+ public void sessionClosed(final NextFilter nextFilter, final IoSession session) throws Exception
+ {
+ fireAsynchEvent(session, new CloseEvent(nextFilter));
+ }
+
+
}
public static PoolingFilter createAynschReadPoolingFilter(ReferenceCountingExecutorService refCountingPool,String name)