diff options
author | Robert Greig <rgreig@apache.org> | 2007-01-11 17:35:49 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-01-11 17:35:49 +0000 |
commit | d7d88effebf7f17394e62d2a51b252ca6af74c47 (patch) | |
tree | eccc7917f4654b1c808c8fd1855420bdcaf10977 | |
parent | a61a0955121a73f6020e943ea5c890fa3898a832 (diff) | |
download | qpid-python-d7d88effebf7f17394e62d2a51b252ca6af74c47.tar.gz |
QPID-32: transaction fixes
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@495304 13f79535-47bb-0310-9956-ffa450edef68
10 files changed, 157 insertions, 32 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 799b085fb2..470789bb1a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -37,7 +37,6 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.LocalTransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; -import org.apache.qpid.server.txn.TxnBuffer; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -126,7 +125,7 @@ public class AMQChannel */ public void setLocalTransactional() { - _txnContext = new LocalTransactionalContext(_messageStore, _storeContext, new TxnBuffer(), _returnMessages); + _txnContext = new LocalTransactionalContext(_messageStore, _storeContext, _returnMessages); } public boolean isTransactional() @@ -190,6 +189,10 @@ public class AMQChannel } else { + if (_log.isDebugEnabled()) + { + _log.debug("Content header received on channel " + _channelId); + } _currentMessage.setContentHeaderBody(contentHeaderBody); routeCurrentMessage(); _currentMessage.routingComplete(_messageStore, _storeContext, _messageHandleFactory); @@ -212,6 +215,10 @@ public class AMQChannel // returns true iff the message was delivered (i.e. if all data was // received + if (_log.isDebugEnabled()) + { + _log.debug("Content body received on channel " + _channelId); + } try { if (_currentMessage.addContentBodyFrame(_storeContext, contentBody)) @@ -484,6 +491,10 @@ public class AMQChannel public void commit() throws AMQException { + if (!isTransactional()) + { + throw new AMQException("Fatal error: commit called on non-transactional channel"); + } _txnContext.commit(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java index b2b7a21296..f7644f3bad 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -29,9 +29,12 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.AMQChannel; +import org.apache.log4j.Logger; public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckBody> -{ +{ + private static final Logger _log = Logger.getLogger(BasicAckMethodHandler.class); + private static final BasicAckMethodHandler _instance = new BasicAckMethodHandler(); public static BasicAckMethodHandler getInstance() @@ -47,6 +50,10 @@ public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckB ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, AMQMethodEvent<BasicAckBody> evt) throws AMQException { + if (_log.isDebugEnabled()) + { + _log.debug("Ack received on channel " + evt.getChannelId()); + } BasicAckBody body = evt.getMethod(); final AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); // this method throws an AMQException if the delivery tag is not known diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java index 181409c255..9c25759cd3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -34,11 +34,14 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.log4j.Logger; public class BasicPublishMethodHandler implements StateAwareMethodListener<BasicPublishBody> { + private static final Logger _log = Logger.getLogger(BasicPublishMethodHandler.class); + private static final BasicPublishMethodHandler _instance = new BasicPublishMethodHandler(); - + private static final AMQShortString UNKNOWN_EXCHANGE_NAME = new AMQShortString("Unknown exchange name"); public static BasicPublishMethodHandler getInstance() @@ -56,6 +59,11 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi { final BasicPublishBody body = evt.getMethod(); + if (_log.isDebugEnabled()) + { + _log.debug("Publish received on channel " + evt.getChannelId()); + } + // TODO: check the delivery tag field details - is it unique across the broker or per subscriber? if (body.exchange == null) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java index c67b86af09..14dc01656b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -30,9 +30,12 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.log4j.Logger; public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody> { + private static final Logger _log = Logger.getLogger(TxCommitHandler.class); + private static TxCommitHandler _instance = new TxCommitHandler(); public static TxCommitHandler getInstance() @@ -51,6 +54,10 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody> try { + if (_log.isDebugEnabled()) + { + _log.debug("Commit received on channel " + evt.getChannelId()); + } AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); channel.commit(); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index b56523f904..c55d24d507 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.queue; +import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; @@ -27,14 +28,13 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.TransactionalContext; -import org.apache.qpid.server.message.MessageDecorator; -import org.apache.qpid.server.message.jms.JMSMessage; -import org.apache.log4j.Logger; -import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; /** * Combines the information that make up a deliverable message into a more manageable form. @@ -166,7 +166,7 @@ public class AMQMessage _messageId = messageId; _txnContext = txnContext; _transientMessageData.setPublishBody(publishBody); - + _taken = new AtomicBoolean(false); if (_log.isDebugEnabled()) { @@ -468,7 +468,7 @@ public class AMQMessage if (pb.immediate && !_deliveredToConsumer) { throw new NoConsumersException(this); - } + } } public BasicPublishBody getPublishBody() throws AMQException @@ -509,6 +509,10 @@ public class AMQMessage // we get a reference to the destination queues now so that we can clear the // transient message data as quickly as possible List<AMQQueue> destinationQueues = _transientMessageData.getDestinationQueues(); + if (_log.isDebugEnabled()) + { + _log.debug("Delivering message " + _messageId); + } try { // first we allow the handle to know that the message has been fully received. This is useful if it is @@ -555,7 +559,7 @@ public class AMQMessage // // Optimise the case where we have a single content body. In that case we create a composite block // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. - // + // ContentBody cb = _messageHandle.getContentBody(_messageId, 0); AMQDataBlock firstContentBody = ContentBody.createAMQFrame(channelId, cb); diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java b/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java index 3934943665..4dff514ff4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java @@ -52,7 +52,7 @@ public class DeliverMessageOperation implements TxnOp { } - public void commit(StoreContext context) + public void commit(StoreContext context) throws AMQException { //do the memeory part of the record() _msg.incrementReference(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index f430a44ba0..d87554524f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -17,16 +17,18 @@ */ package org.apache.qpid.server.txn; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.ack.TxAck; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; +import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.RequiredDeliveryException; -import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; +import java.util.LinkedList; import java.util.List; /** @@ -34,7 +36,11 @@ import java.util.List; */ public class LocalTransactionalContext implements TransactionalContext { - private final TxnBuffer _txnBuffer; + private static final Logger _log = Logger.getLogger(LocalTransactionalContext.class); + + private final TxnBuffer _txnBuffer = new TxnBuffer(); + + private final List<DeliveryDetails> _postCommitDeliveryList = new LinkedList<DeliveryDetails>(); /** * We keep hold of the ack operation so that we can consolidate acks, i.e. multiple acks within a txn are @@ -50,14 +56,34 @@ public class LocalTransactionalContext implements TransactionalContext private boolean _inTran = false; + private boolean _messageDelivered = false; + + private static class DeliveryDetails + { + public AMQMessage message; + public AMQQueue queue; + + + public DeliveryDetails(AMQMessage message, AMQQueue queue) + { + this.message = message; + this.queue = queue; + } + } + public LocalTransactionalContext(MessageStore messageStore, StoreContext storeContext, - TxnBuffer txnBuffer, List<RequiredDeliveryException> returnMessages) + List<RequiredDeliveryException> returnMessages) { _messageStore = messageStore; _storeContext = storeContext; - _txnBuffer = txnBuffer; _returnMessages = returnMessages; - _txnBuffer.enlist(new StoreMessageOperation(messageStore)); + //_txnBuffer.enlist(new StoreMessageOperation(messageStore)); + } + + + public StoreContext getStoreContext() + { + return _storeContext; } public void rollback() throws AMQException @@ -73,9 +99,19 @@ public class LocalTransactionalContext implements TransactionalContext // be added for every queue onto which the message is // enqueued. Finally a cleanup op will be added to decrement // the reference associated with the routing. - - _txnBuffer.enlist(new DeliverMessageOperation(message, queue)); + message.incrementReference(); + _postCommitDeliveryList.add(new DeliveryDetails(message, queue)); + _messageDelivered = true; + /*_txnBuffer.enlist(new DeliverMessageOperation(message, queue)); + if (_log.isDebugEnabled()) + { + _log.debug("Incrementing ref count on message and enlisting cleanup operation - id " + + message.getMessageId()); + } + message.incrementReference(); + _messageDelivered = true; _txnBuffer.enlist(new CleanupMessageOperation(message, _returnMessages)); + */ } private void checkAck(long deliveryTag, UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException @@ -129,6 +165,10 @@ public class LocalTransactionalContext implements TransactionalContext { if (!_inTran) { + if (_log.isDebugEnabled()) + { + _log.debug("Starting transaction on message store"); + } _messageStore.beginTran(_storeContext); _inTran = true; } @@ -136,6 +176,10 @@ public class LocalTransactionalContext implements TransactionalContext public void commit() throws AMQException { + if (_log.isDebugEnabled()) + { + _log.debug("Committing transactional context"); + } if (_ackOp != null) { _ackOp.consolidate(); @@ -143,13 +187,48 @@ public class LocalTransactionalContext implements TransactionalContext _ackOp = null; } + if (_messageDelivered) + { + _txnBuffer.enlist(new StoreMessageOperation(_messageStore)); + } try { _txnBuffer.commit(_storeContext); } finally { - _inTran = false; + _messageDelivered = false; + _inTran = _messageStore.inTran(_storeContext); } + + try + { + postCommitDelivery(); + } + catch (AMQException e) + { + // OK so what do we do now...? + _log.error("Failed to deliver messages following txn commit: " + e, e); + } + } + + private void postCommitDelivery() throws AMQException + { + if (_log.isDebugEnabled()) + { + _log.debug("Performing post commit delivery"); + } + try + { + for (DeliveryDetails dd : _postCommitDeliveryList) + { + dd.queue.process(_storeContext, dd.message); + } + } + finally + { + _postCommitDeliveryList.clear(); + } + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index f09d811b50..f3e6c69cc5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -72,6 +72,12 @@ public class NonTransactionalContext implements TransactionalContext _browsedAcks = browsedAcks; } + + public StoreContext getStoreContext() + { + return _storeContext; + } + public void beginTranIfNecessary() throws AMQException { if (!_inTran) diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java index bba4b98de4..e241ed4874 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java @@ -25,6 +25,7 @@ import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.store.StoreContext; /** * @author Robert Greig (robert.j.greig@jpmorgan.com) @@ -45,4 +46,6 @@ public interface TransactionalContext void messageFullyReceived(boolean persistent) throws AMQException; void messageProcessed(AMQProtocolSession protocolSession) throws AMQException; + + StoreContext getStoreContext(); } diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java index 98de7deb5c..204631371a 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ b/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -59,15 +59,15 @@ public class SkeletonMessageStore implements MessageStore { } - public void removeQueue(String name) throws AMQException + public void removeQueue(AMQShortString name) throws AMQException { } - public void enqueueMessage(StoreContext s, String name, long messageId) throws AMQException + public void enqueueMessage(StoreContext s, AMQShortString name, long messageId) throws AMQException { } - public void dequeueMessage(StoreContext s, String name, long messageId) throws AMQException + public void dequeueMessage(StoreContext s, AMQShortString name, long messageId) throws AMQException { } |