summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-01-11 17:35:49 +0000
committerRobert Greig <rgreig@apache.org>2007-01-11 17:35:49 +0000
commitd7d88effebf7f17394e62d2a51b252ca6af74c47 (patch)
treeeccc7917f4654b1c808c8fd1855420bdcaf10977
parenta61a0955121a73f6020e943ea5c890fa3898a832 (diff)
downloadqpid-python-d7d88effebf7f17394e62d2a51b252ca6af74c47.tar.gz
QPID-32: transaction fixes
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@495304 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java97
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java3
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java6
10 files changed, 157 insertions, 32 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 799b085fb2..470789bb1a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -37,7 +37,6 @@ import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.LocalTransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.txn.TxnBuffer;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -126,7 +125,7 @@ public class AMQChannel
*/
public void setLocalTransactional()
{
- _txnContext = new LocalTransactionalContext(_messageStore, _storeContext, new TxnBuffer(), _returnMessages);
+ _txnContext = new LocalTransactionalContext(_messageStore, _storeContext, _returnMessages);
}
public boolean isTransactional()
@@ -190,6 +189,10 @@ public class AMQChannel
}
else
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Content header received on channel " + _channelId);
+ }
_currentMessage.setContentHeaderBody(contentHeaderBody);
routeCurrentMessage();
_currentMessage.routingComplete(_messageStore, _storeContext, _messageHandleFactory);
@@ -212,6 +215,10 @@ public class AMQChannel
// returns true iff the message was delivered (i.e. if all data was
// received
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Content body received on channel " + _channelId);
+ }
try
{
if (_currentMessage.addContentBodyFrame(_storeContext, contentBody))
@@ -484,6 +491,10 @@ public class AMQChannel
public void commit() throws AMQException
{
+ if (!isTransactional())
+ {
+ throw new AMQException("Fatal error: commit called on non-transactional channel");
+ }
_txnContext.commit();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
index b2b7a21296..f7644f3bad 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -29,9 +29,12 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.AMQChannel;
+import org.apache.log4j.Logger;
public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckBody>
-{
+{
+ private static final Logger _log = Logger.getLogger(BasicAckMethodHandler.class);
+
private static final BasicAckMethodHandler _instance = new BasicAckMethodHandler();
public static BasicAckMethodHandler getInstance()
@@ -47,6 +50,10 @@ public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckB
ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
AMQMethodEvent<BasicAckBody> evt) throws AMQException
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Ack received on channel " + evt.getChannelId());
+ }
BasicAckBody body = evt.getMethod();
final AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
// this method throws an AMQException if the delivery tag is not known
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
index 181409c255..9c25759cd3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,11 +34,14 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.log4j.Logger;
public class BasicPublishMethodHandler implements StateAwareMethodListener<BasicPublishBody>
{
+ private static final Logger _log = Logger.getLogger(BasicPublishMethodHandler.class);
+
private static final BasicPublishMethodHandler _instance = new BasicPublishMethodHandler();
-
+
private static final AMQShortString UNKNOWN_EXCHANGE_NAME = new AMQShortString("Unknown exchange name");
public static BasicPublishMethodHandler getInstance()
@@ -56,6 +59,11 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi
{
final BasicPublishBody body = evt.getMethod();
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Publish received on channel " + evt.getChannelId());
+ }
+
// TODO: check the delivery tag field details - is it unique across the broker or per subscriber?
if (body.exchange == null)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
index c67b86af09..14dc01656b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,9 +30,12 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.log4j.Logger;
public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
{
+ private static final Logger _log = Logger.getLogger(TxCommitHandler.class);
+
private static TxCommitHandler _instance = new TxCommitHandler();
public static TxCommitHandler getInstance()
@@ -51,6 +54,10 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
try
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Commit received on channel " + evt.getChannelId());
+ }
AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
channel.commit();
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index b56523f904..c55d24d507 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
@@ -27,14 +28,13 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.message.MessageDecorator;
-import org.apache.qpid.server.message.jms.JMSMessage;
-import org.apache.log4j.Logger;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Combines the information that make up a deliverable message into a more manageable form.
@@ -166,7 +166,7 @@ public class AMQMessage
_messageId = messageId;
_txnContext = txnContext;
_transientMessageData.setPublishBody(publishBody);
-
+
_taken = new AtomicBoolean(false);
if (_log.isDebugEnabled())
{
@@ -468,7 +468,7 @@ public class AMQMessage
if (pb.immediate && !_deliveredToConsumer)
{
throw new NoConsumersException(this);
- }
+ }
}
public BasicPublishBody getPublishBody() throws AMQException
@@ -509,6 +509,10 @@ public class AMQMessage
// we get a reference to the destination queues now so that we can clear the
// transient message data as quickly as possible
List<AMQQueue> destinationQueues = _transientMessageData.getDestinationQueues();
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Delivering message " + _messageId);
+ }
try
{
// first we allow the handle to know that the message has been fully received. This is useful if it is
@@ -555,7 +559,7 @@ public class AMQMessage
//
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
+ //
ContentBody cb = _messageHandle.getContentBody(_messageId, 0);
AMQDataBlock firstContentBody = ContentBody.createAMQFrame(channelId, cb);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java b/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java
index 3934943665..4dff514ff4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java
@@ -52,7 +52,7 @@ public class DeliverMessageOperation implements TxnOp
{
}
- public void commit(StoreContext context)
+ public void commit(StoreContext context) throws AMQException
{
//do the memeory part of the record()
_msg.incrementReference();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index f430a44ba0..d87554524f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -17,16 +17,18 @@
*/
package org.apache.qpid.server.txn;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.ack.TxAck;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
+import java.util.LinkedList;
import java.util.List;
/**
@@ -34,7 +36,11 @@ import java.util.List;
*/
public class LocalTransactionalContext implements TransactionalContext
{
- private final TxnBuffer _txnBuffer;
+ private static final Logger _log = Logger.getLogger(LocalTransactionalContext.class);
+
+ private final TxnBuffer _txnBuffer = new TxnBuffer();
+
+ private final List<DeliveryDetails> _postCommitDeliveryList = new LinkedList<DeliveryDetails>();
/**
* We keep hold of the ack operation so that we can consolidate acks, i.e. multiple acks within a txn are
@@ -50,14 +56,34 @@ public class LocalTransactionalContext implements TransactionalContext
private boolean _inTran = false;
+ private boolean _messageDelivered = false;
+
+ private static class DeliveryDetails
+ {
+ public AMQMessage message;
+ public AMQQueue queue;
+
+
+ public DeliveryDetails(AMQMessage message, AMQQueue queue)
+ {
+ this.message = message;
+ this.queue = queue;
+ }
+ }
+
public LocalTransactionalContext(MessageStore messageStore, StoreContext storeContext,
- TxnBuffer txnBuffer, List<RequiredDeliveryException> returnMessages)
+ List<RequiredDeliveryException> returnMessages)
{
_messageStore = messageStore;
_storeContext = storeContext;
- _txnBuffer = txnBuffer;
_returnMessages = returnMessages;
- _txnBuffer.enlist(new StoreMessageOperation(messageStore));
+ //_txnBuffer.enlist(new StoreMessageOperation(messageStore));
+ }
+
+
+ public StoreContext getStoreContext()
+ {
+ return _storeContext;
}
public void rollback() throws AMQException
@@ -73,9 +99,19 @@ public class LocalTransactionalContext implements TransactionalContext
// be added for every queue onto which the message is
// enqueued. Finally a cleanup op will be added to decrement
// the reference associated with the routing.
-
- _txnBuffer.enlist(new DeliverMessageOperation(message, queue));
+ message.incrementReference();
+ _postCommitDeliveryList.add(new DeliveryDetails(message, queue));
+ _messageDelivered = true;
+ /*_txnBuffer.enlist(new DeliverMessageOperation(message, queue));
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Incrementing ref count on message and enlisting cleanup operation - id " +
+ message.getMessageId());
+ }
+ message.incrementReference();
+ _messageDelivered = true;
_txnBuffer.enlist(new CleanupMessageOperation(message, _returnMessages));
+ */
}
private void checkAck(long deliveryTag, UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException
@@ -129,6 +165,10 @@ public class LocalTransactionalContext implements TransactionalContext
{
if (!_inTran)
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Starting transaction on message store");
+ }
_messageStore.beginTran(_storeContext);
_inTran = true;
}
@@ -136,6 +176,10 @@ public class LocalTransactionalContext implements TransactionalContext
public void commit() throws AMQException
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Committing transactional context");
+ }
if (_ackOp != null)
{
_ackOp.consolidate();
@@ -143,13 +187,48 @@ public class LocalTransactionalContext implements TransactionalContext
_ackOp = null;
}
+ if (_messageDelivered)
+ {
+ _txnBuffer.enlist(new StoreMessageOperation(_messageStore));
+ }
try
{
_txnBuffer.commit(_storeContext);
}
finally
{
- _inTran = false;
+ _messageDelivered = false;
+ _inTran = _messageStore.inTran(_storeContext);
}
+
+ try
+ {
+ postCommitDelivery();
+ }
+ catch (AMQException e)
+ {
+ // OK so what do we do now...?
+ _log.error("Failed to deliver messages following txn commit: " + e, e);
+ }
+ }
+
+ private void postCommitDelivery() throws AMQException
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Performing post commit delivery");
+ }
+ try
+ {
+ for (DeliveryDetails dd : _postCommitDeliveryList)
+ {
+ dd.queue.process(_storeContext, dd.message);
+ }
+ }
+ finally
+ {
+ _postCommitDeliveryList.clear();
+ }
+
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index f09d811b50..f3e6c69cc5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -72,6 +72,12 @@ public class NonTransactionalContext implements TransactionalContext
_browsedAcks = browsedAcks;
}
+
+ public StoreContext getStoreContext()
+ {
+ return _storeContext;
+ }
+
public void beginTranIfNecessary() throws AMQException
{
if (!_inTran)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
index bba4b98de4..e241ed4874 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
@@ -25,6 +25,7 @@ import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.StoreContext;
/**
* @author Robert Greig (robert.j.greig@jpmorgan.com)
@@ -45,4 +46,6 @@ public interface TransactionalContext
void messageFullyReceived(boolean persistent) throws AMQException;
void messageProcessed(AMQProtocolSession protocolSession) throws AMQException;
+
+ StoreContext getStoreContext();
}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
index 98de7deb5c..204631371a 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
@@ -59,15 +59,15 @@ public class SkeletonMessageStore implements MessageStore
{
}
- public void removeQueue(String name) throws AMQException
+ public void removeQueue(AMQShortString name) throws AMQException
{
}
- public void enqueueMessage(StoreContext s, String name, long messageId) throws AMQException
+ public void enqueueMessage(StoreContext s, AMQShortString name, long messageId) throws AMQException
{
}
- public void dequeueMessage(StoreContext s, String name, long messageId) throws AMQException
+ public void dequeueMessage(StoreContext s, AMQShortString name, long messageId) throws AMQException
{
}