diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2007-04-05 16:37:40 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2007-04-05 16:37:40 +0000 |
commit | a153c3431ab54bc7f25e9129b70c73d7220a41d0 (patch) | |
tree | d2f5d87d9f1e67b81eb2330f64cee25cc5672f14 | |
parent | 70036a3ac7d3380d559a95921b1092c81e2c23cd (diff) | |
download | qpid-python-a153c3431ab54bc7f25e9129b70c73d7220a41d0.tar.gz |
QPID-443 : Fix to transactionality of message publishing
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@525862 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 42 insertions, 1 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index 451ff14adf..74cd2a3fc1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -89,6 +89,11 @@ public class LocalTransactionalContext implements TransactionalContext public void rollback() throws AMQException { _txnBuffer.rollback(_storeContext); + // Hack to deal with uncommitted non-transactional writes + if(_messageStore.inTran(_storeContext)) + { + _messageStore.abortTran(_storeContext); + } _postCommitDeliveryList.clear(); } @@ -103,6 +108,7 @@ public class LocalTransactionalContext implements TransactionalContext // message.incrementReference(); _postCommitDeliveryList.add(new DeliveryDetails(message, queue, deliverFirst)); _messageDelivered = true; + _txnBuffer.enlist(new CleanupMessageOperation(message, _returnMessages)); /*_txnBuffer.enlist(new DeliverMessageOperation(message, queue)); if (_log.isDebugEnabled()) { @@ -111,7 +117,7 @@ public class LocalTransactionalContext implements TransactionalContext } message.incrementReference(); _messageDelivered = true; - _txnBuffer.enlist(new CleanupMessageOperation(message, _returnMessages)); + */ } diff --git a/java/common/src/main/java/org/apache/qpid/pool/Event.java b/java/common/src/main/java/org/apache/qpid/pool/Event.java index 09890a103d..7300ec8c3f 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/Event.java +++ b/java/common/src/main/java/org/apache/qpid/pool/Event.java @@ -85,4 +85,27 @@ abstract public class Event } + + public static final class CloseEvent extends Event + { + private final IoFilter.NextFilter _nextFilter; + + public CloseEvent(final IoFilter.NextFilter nextFilter) + { + super(); + _nextFilter = nextFilter; + } + + + public void process(IoSession session) + { + _nextFilter.sessionClosed(session); + } + + public IoFilter.NextFilter getNextFilter() + { + return _nextFilter; + } + } + } diff --git a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java index 8126ca4bc8..552ecf6b66 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java +++ b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.pool; +import org.apache.qpid.pool.Event.CloseEvent; + import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -206,6 +208,10 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH fireAsynchEvent(session, new Event.ReceivedEvent(nextFilter, message)); } + public void sessionClosed(final NextFilter nextFilter, final IoSession session) throws Exception + { + fireAsynchEvent(session, new CloseEvent(nextFilter)); + } } @@ -224,6 +230,12 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH fireAsynchEvent(session, new Event.WriteEvent(nextFilter, writeRequest)); } + public void sessionClosed(final NextFilter nextFilter, final IoSession session) throws Exception + { + fireAsynchEvent(session, new CloseEvent(nextFilter)); + } + + } public static PoolingFilter createAynschReadPoolingFilter(ReferenceCountingExecutorService refCountingPool,String name) |