summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src')
-rw-r--r--qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java197
-rw-r--r--qpid/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java7
-rw-r--r--qpid/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java67
-rw-r--r--qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java93
-rw-r--r--qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java49
-rw-r--r--qpid/java/broker/src/org/apache/qpid/server/queue/FailedDequeueException.java36
-rw-r--r--qpid/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java7
-rw-r--r--qpid/java/broker/src/org/apache/qpid/server/queue/Subscription.java2
-rw-r--r--qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java22
-rw-r--r--qpid/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java76
-rw-r--r--qpid/java/broker/src/org/apache/qpid/server/txn/TxnOp.java27
-rw-r--r--qpid/java/broker/src/org/apache/qpid/server/util/OrderedMapHelper.java116
12 files changed, 599 insertions, 100 deletions
diff --git a/qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java
index 146a048b3f..78889e69b5 100644
--- a/qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java
@@ -19,6 +19,7 @@ package org.apache.qpid.server;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -26,16 +27,20 @@ import org.apache.qpid.server.exchange.MessageRouter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.NoConsumersException;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.TxnBuffer;
import org.apache.qpid.server.txn.TxnOp;
+import org.apache.qpid.server.util.OrderedMapHelper;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -94,6 +99,8 @@ public class AMQChannel
private final TxnBuffer _txnBuffer;
+ private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>();
+
public static class UnacknowledgedMessage
{
public final AMQMessage message;
@@ -202,18 +209,53 @@ public class AMQChannel
//don't create a transaction unless needed
if(_currentMessage.isPersistent())
{
- _txnBuffer.setPersistentMessageRecevied();
+ _txnBuffer.containsPersistentChanges();
}
- //don't route this until commit
- _txnBuffer.enlist(new Publish(_currentMessage));
- _currentMessage = null;
+ //A publication will result in the enlisting of several
+ //TxnOps. The first is an op that will store the message.
+ //Following that (and ordering is important), an op will
+ //be added for every queue onto which the message is
+ //enqueued. Finally a cleanup op will be added to decrement
+ //the reference associated with the routing.
+ Store storeOp = new Store(_currentMessage);
+ _txnBuffer.enlist(storeOp);
+ _currentMessage.setTxnBuffer(_txnBuffer);
+ try
+ {
+ _exchanges.routeContent(_currentMessage);
+ _txnBuffer.enlist(new Cleanup(_currentMessage));
+ }
+ catch(RequiredDeliveryException e)
+ {
+ //Can only be due to the mandatory flag, as no attempt
+ //has yet been made to deliver the message. The
+ //message will thus not have been delivered to any
+ //queue so we can return the message (without killing
+ //the transaction) and for efficiency remove the store
+ //operation from the buffer.
+ _txnBuffer.cancel(storeOp);
+ throw e;
+ }
+ finally
+ {
+ _currentMessage = null;
+ }
}
else
{
- _exchanges.routeContent(_currentMessage);
- _currentMessage.decrementReference();
- _currentMessage = null;
+ try
+ {
+ _exchanges.routeContent(_currentMessage);
+ //following check implements the functionality
+ //required by the 'immediate' flag:
+ _currentMessage.checkDeliveredToConsumer();
+ }
+ finally
+ {
+ _currentMessage.decrementReference();
+ _currentMessage = null;
+ }
}
}
@@ -402,8 +444,14 @@ public class AMQChannel
{
if (_transactional)
{
- //don't handle this until commit
- _txnBuffer.enlist(new Ack(deliveryTag, multiple));
+ try
+ {
+ _txnBuffer.enlist(new Ack(getUnackedMessageFinder().getValues(deliveryTag, multiple)));
+ }
+ catch(NoSuchElementException e)
+ {
+ throw new AMQException("Received ack for unrecognised delivery tag: " + deliveryTag);
+ }
}
else
{
@@ -540,6 +588,7 @@ public class AMQChannel
public void commit() throws AMQException
{
_txnBuffer.commit();
+ //TODO: may need to return 'immediate' messages at this point
}
public void rollback() throws AMQException
@@ -578,20 +627,74 @@ public class AMQChannel
return _defaultQueue;
}
+ public void processReturns(AMQProtocolSession session)
+ {
+ for(AMQDataBlock block : _returns)
+ {
+ session.writeFrame(block);
+ }
+ _returns.clear();
+ }
+
+ private OrderedMapHelper<Long, UnacknowledgedMessage> getUnackedMessageFinder()
+ {
+ return new OrderedMapHelper<Long, UnacknowledgedMessage>(_unacknowledgedMessageMap, _unacknowledgedMessageMapLock, 0L);
+ }
+
+
private class Ack implements TxnOp
{
- private final long _msgId;
- private final boolean _multi;
+ private final Map<Long, UnacknowledgedMessage> _unacked;
- Ack(long msgId, boolean multi)
+ Ack(Map<Long, UnacknowledgedMessage> unacked) throws AMQException
{
- _msgId = msgId;
- _multi = multi;
+ _unacked = unacked;
+
+ //if any of the messages in unacked are persistent the txn
+ //buffer must be marked as persistent:
+ for(UnacknowledgedMessage msg : _unacked.values())
+ {
+ if(msg.message.isPersistent())
+ {
+ _txnBuffer.containsPersistentChanges();
+ break;
+ }
+ }
}
- public void commit() throws AMQException
+ public void prepare() throws AMQException
{
- handleAcknowledgement(_msgId, _multi);
+ //make persistent changes, i.e. dequeue and decrementReference
+ for(UnacknowledgedMessage msg : _unacked.values())
+ {
+ msg.discard();
+ }
+ }
+
+ public void undoPrepare()
+ {
+ //decrementReference is annoyingly untransactional (due to
+ //in memory counter) so if we failed in prepare for full
+ //txn, this op will have to compensate by fixing the count
+ //in memory (persistent changes will be rolled back by store)
+ for(UnacknowledgedMessage msg : _unacked.values())
+ {
+
+ msg.message.incrementReference();
+ }
+ }
+
+ public void commit()
+ {
+ //remove the unacked messages from the channels map
+ synchronized(_unacknowledgedMessageMapLock)
+ {
+ for(long tag : _unacked.keySet())
+ {
+ _unacknowledgedMessageMap.remove(tag);
+ }
+ }
+
}
public void rollback()
@@ -599,40 +702,76 @@ public class AMQChannel
}
}
- //TODO:
- //implement a scheme whereby messages can be stored on disk
- //until commit, then reloaded...
- private class Publish implements TxnOp
+ private class Store implements TxnOp
{
+ //just use this to do a store of the message during the
+ //prepare phase. Any enqueueing etc is done by TxnOps enlisted
+ //by the queues themselves.
private final AMQMessage _msg;
- Publish(AMQMessage msg)
+ Store(AMQMessage msg)
{
_msg = msg;
}
- public boolean isPersistent() throws AMQException
+ public void prepare() throws AMQException
{
- return _msg.isPersistent();
+ _msg.storeMessage();
+ //the routers reference can now be released
+ _msg.decrementReference();
}
- public void commit() throws AMQException
+ public void undoPrepare()
+ {
+ }
+
+ public void commit()
{
- _exchanges.routeContent(_msg);
- _msg.decrementReference();
}
public void rollback()
{
+ }
+ }
+
+ private class Cleanup implements TxnOp
+ {
+ private final AMQMessage _msg;
+
+ Cleanup(AMQMessage msg)
+ {
+ _msg = msg;
+ }
+
+ public void prepare() throws AMQException
+ {
+ //the routers reference can now be released
+ _msg.decrementReference();
try
{
- _msg.decrementReference();
+ _msg.checkDeliveredToConsumer();
}
- catch (AMQException e)
+ catch(NoConsumersException e)
{
- _log.error("Error rolling back a publish request: " + e, e);
+ //TODO: store this for delivery after the commit-ok
+ _returns.add(e.getReturnMessage(_channelId));
}
}
+
+ public void undoPrepare()
+ {
+ //don't need to do anything here, if the store's txn failed
+ //when processing prepare then the message was not stored
+ //or enqueued on any queues and can be discarded
+ }
+
+ public void commit()
+ {
+ }
+
+ public void rollback()
+ {
+ }
}
}
diff --git a/qpid/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java b/qpid/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java
index ce18c94c2b..7cd1612815 100644
--- a/qpid/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java
+++ b/qpid/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java
@@ -20,6 +20,7 @@ package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.TxCommitBody;
import org.apache.qpid.framing.TxCommitOkBody;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -46,8 +47,10 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
{
try{
- protocolSession.getChannel(evt.getChannelId()).commit();
- protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId()));
+ AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+ channel.commit();
+ protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId()));
+ channel.processReturns(protocolSession);
}catch(AMQException e){
throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());
}
diff --git a/qpid/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
index 481941a128..6573be782b 100644
--- a/qpid/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
+++ b/qpid/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
@@ -19,8 +19,9 @@ package org.apache.qpid.server.queue;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.framing.*;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.TxnBuffer;
import org.apache.qpid.AMQException;
import java.util.ArrayList;
@@ -61,17 +62,44 @@ public class AMQMessage
*/
private transient final MessageStore _store;
+ /**
+ * For non transactional publishes, a message can be stored as
+ * soon as it is complete. For transactional messages it doesnt
+ * need to be stored until the transaction is committed.
+ */
+ private boolean _storeWhenComplete;
+
+ /**
+ * TxnBuffer for transactionally published messages
+ */
+ private TxnBuffer _txnBuffer;
+
+ /**
+ * Flag to indicate whether message has been delivered to a
+ * consumer. Used in implementing return functionality for
+ * messages published with the 'immediate' flag.
+ */
+ private boolean _deliveredToConsumer;
+
+
public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody)
{
+ this(messageStore, publishBody, true);
+ }
+
+ public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody, boolean storeWhenComplete)
+ {
_messageId = messageStore.getNewMessageId();
_publishBody = publishBody;
_store = messageStore;
_contentBodies = new LinkedList<ContentBody>();
+ _storeWhenComplete = storeWhenComplete;
}
public AMQMessage(MessageStore store, long messageId, BasicPublishBody publishBody,
ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies)
throws AMQException
+
{
_publishBody = publishBody;
_contentHeaderBody = contentHeaderBody;
@@ -93,7 +121,7 @@ public class AMQMessage
this(msg._store, msg._messageId, msg._publishBody, msg._contentHeaderBody, msg._contentBodies);
}
- private void storeMessage() throws AMQException
+ public void storeMessage() throws AMQException
{
if (isPersistent())
{
@@ -149,7 +177,7 @@ public class AMQMessage
public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException
{
_contentHeaderBody = contentHeaderBody;
- if (isAllContentReceived())
+ if (_storeWhenComplete && isAllContentReceived())
{
storeMessage();
}
@@ -169,7 +197,7 @@ public class AMQMessage
{
_contentBodies.add(contentBody);
_bodyLengthReceived += contentBody.getSize();
- if (isAllContentReceived())
+ if (_storeWhenComplete && isAllContentReceived())
{
storeMessage();
}
@@ -294,4 +322,35 @@ public class AMQMessage
return _contentHeaderBody.properties instanceof BasicContentHeaderProperties
&&((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
}
+
+ public void setTxnBuffer(TxnBuffer buffer)
+ {
+ _txnBuffer = buffer;
+ }
+
+ public TxnBuffer getTxnBuffer()
+ {
+ return _txnBuffer;
+ }
+
+ /**
+ * Called to enforce the 'immediate' flag.
+ * @throws NoConsumersException if the message is marked for
+ * immediate delivery but has not been marked as delivered to a
+ * consumer
+ */
+ public void checkDeliveredToConsumer() throws NoConsumersException{
+ if(isImmediate() && !_deliveredToConsumer)
+ {
+ throw new NoConsumersException(_publishBody, _contentHeaderBody, _contentBodies);
+ }
+ }
+
+ /**
+ * Called when this message is delivered to a consumer. (used to
+ * implement the 'immediate' flag functionality).
+ */
+ public void setDeliveredToConsumer(){
+ _deliveredToConsumer = true;
+ }
}
diff --git a/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
index bf510d379e..886846c0a5 100644
--- a/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
@@ -27,6 +27,8 @@ import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.txn.TxnBuffer;
+import org.apache.qpid.server.txn.TxnOp;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -610,11 +612,55 @@ public class AMQQueue implements Managable
public void deliver(AMQMessage msg) throws AMQException
{
+ TxnBuffer buffer = msg.getTxnBuffer();
+ if(buffer == null)
+ {
+ //non-transactional
+ record(msg);
+ process(msg);
+ }
+ else
+ {
+ buffer.enlist(new Deliver(msg));
+ }
+ }
+
+ private void record(AMQMessage msg) throws AMQException
+ {
msg.enqueue(this);
+ msg.incrementReference();
+ }
+
+ private void process(AMQMessage msg) throws FailedDequeueException
+ {
_deliveryMgr.deliver(getName(), msg);
- updateReceivedMessageCount();
+ try
+ {
+ msg.checkDeliveredToConsumer();
+ updateReceivedMessageCount();
+ }
+ catch(NoConsumersException e)
+ {
+ // as this message will be returned, it should be removed
+ // from the queue:
+ dequeue(msg);
+ }
+
}
+ void dequeue(AMQMessage msg) throws FailedDequeueException
+ {
+ try
+ {
+ msg.decrementReference();
+ msg.dequeue(this);
+ }
+ catch(AMQException e)
+ {
+ throw new FailedDequeueException(_name, e);
+ }
+ }
+
public void deliverAsync()
{
_deliveryMgr.processAsync(_asyncDelivery);
@@ -664,4 +710,49 @@ public class AMQQueue implements Managable
_logger.debug(MessageFormat.format(msg, args));
}
}
+
+ private class Deliver implements TxnOp
+ {
+ private final AMQMessage _msg;
+
+ Deliver(AMQMessage msg)
+ {
+ _msg = msg;
+ }
+
+ public void prepare() throws AMQException
+ {
+ record(_msg);
+ }
+
+ public void undoPrepare()
+ {
+ }
+
+ public void commit()
+ {
+ try
+ {
+ process(_msg);
+ }
+ catch(FailedDequeueException e)
+ {
+ //TODO: is there anything else we can do here? I think not...
+ _logger.error("Error during commit of a queue delivery: " + e, e);
+ }
+ }
+
+ public void rollback()
+ {
+ try
+ {
+ _msg.decrementReference();
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Error rolling back a queue delivery: " + e, e);
+ }
+ }
+ }
+
}
diff --git a/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java b/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
index fbd952073e..64a54fe803 100644
--- a/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
+++ b/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
@@ -72,8 +72,16 @@ class DeliveryManager
{
if (_queueing)
{
- _messages.offer(msg);
- return true;
+ if(msg.isImmediate())
+ {
+ //can't enqueue messages for whom immediate delivery is required
+ return false;
+ }
+ else
+ {
+ _messages.offer(msg);
+ return true;
+ }
}
else
{
@@ -147,14 +155,7 @@ class DeliveryManager
//We don't synchronize access to subscribers so need to re-check
if (next != null)
{
- try
- {
- next.send(poll(), _queue);
- }
- catch (AMQException e)
- {
- _log.error("Unable to deliver message: " + e, e);
- }
+ next.send(poll(), _queue);
}
else
{
@@ -162,6 +163,10 @@ class DeliveryManager
}
}
}
+ catch (FailedDequeueException e)
+ {
+ _log.error("Unable to deliver message as dequeue failed: " + e, e);
+ }
finally
{
_processing.set(false);
@@ -211,10 +216,10 @@ class DeliveryManager
* @param msg the message to deliver
* @throws NoConsumersException if there are no active subscribers to deliver
* the message to
+ * @throws FailedDequeueException if the message could not be dequeued
*/
- void deliver(String name, AMQMessage msg) throws AMQException
+ void deliver(String name, AMQMessage msg) throws FailedDequeueException
{
- msg.incrementReference();
// first check whether we are queueing, and enqueue if we are
if (!enqueue(msg))
{
@@ -222,11 +227,7 @@ class DeliveryManager
Subscription s = _subscriptions.nextSubscriber(msg);
if (s == null)
{
- if (msg.isImmediate())
- {
- throw msg.getNoConsumersException(name);
- }
- else
+ if (!msg.isImmediate())
{
// no subscribers yet so enter 'queueing' mode and queue this message
startQueueing(msg);
@@ -235,19 +236,7 @@ class DeliveryManager
else
{
s.send(msg, _queue);
- }
- }
-
- else
- {
- if (msg.isImmediate())
- {
- //todo check with spec to see if enqueing for immediate client delivery is ok.
- Subscription s = _subscriptions.nextSubscriber(msg);
- if (s == null)
- {
- throw msg.getNoConsumersException(name);
- }
+ msg.setDeliveredToConsumer();
}
}
}
diff --git a/qpid/java/broker/src/org/apache/qpid/server/queue/FailedDequeueException.java b/qpid/java/broker/src/org/apache/qpid/server/queue/FailedDequeueException.java
new file mode 100644
index 0000000000..f27d935c25
--- /dev/null
+++ b/qpid/java/broker/src/org/apache/qpid/server/queue/FailedDequeueException.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+/**
+ * Signals that the dequeue of a message from a queue failed
+ */
+public class FailedDequeueException extends AMQException
+{
+ public FailedDequeueException(String queue)
+ {
+ super("Failed to dequeue message from " + queue);
+ }
+
+ public FailedDequeueException(String queue, AMQException e)
+ {
+ super("Failed to dequeue message from " + queue, e);
+ }
+}
diff --git a/qpid/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java b/qpid/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java
index 0616f75633..277d7e2793 100644
--- a/qpid/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java
+++ b/qpid/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java
@@ -40,6 +40,13 @@ public class NoConsumersException extends RequiredDeliveryException
super("Immediate delivery to " + queue + " is not possible.", publishBody, contentHeaderBody, contentBodies);
}
+ public NoConsumersException(BasicPublishBody publishBody,
+ ContentHeaderBody contentHeaderBody,
+ List<ContentBody> contentBodies)
+ {
+ super("Immediate delivery is not possible.", publishBody, contentHeaderBody, contentBodies);
+ }
+
public int getReplyCode()
{
return AMQConstant.NO_CONSUMERS.getCode();
diff --git a/qpid/java/broker/src/org/apache/qpid/server/queue/Subscription.java b/qpid/java/broker/src/org/apache/qpid/server/queue/Subscription.java
index d76e9ec105..2c073427da 100644
--- a/qpid/java/broker/src/org/apache/qpid/server/queue/Subscription.java
+++ b/qpid/java/broker/src/org/apache/qpid/server/queue/Subscription.java
@@ -21,7 +21,7 @@ import org.apache.qpid.AMQException;
public interface Subscription
{
- void send(AMQMessage msg, AMQQueue queue) throws AMQException;
+ void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException;
boolean isSuspended();
diff --git a/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java b/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
index 902e75bf48..b0855999a2 100644
--- a/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -122,10 +122,23 @@ public class SubscriptionImpl implements Subscription
* @param queue
* @throws AMQException
*/
- public void send(AMQMessage msg, AMQQueue queue) throws AMQException
+ public void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
{
if (msg != null)
{
+ // if we do not need to wait for client acknowledgements
+ // we can decrement the reference count immediately.
+
+ // By doing this _before_ the send we ensure that it
+ // doesn't get sent if it can't be dequeued, preventing
+ // duplicate delivery on recovery.
+
+ // The send may of course still fail, in which case, as
+ // the message is unacked, it will be lost.
+ if (!_acks)
+ {
+ queue.dequeue(msg);
+ }
synchronized(channel)
{
long deliveryTag = channel.getNextDeliveryTag();
@@ -140,13 +153,6 @@ public class SubscriptionImpl implements Subscription
protocolSession.writeFrame(frame);
}
- // if we do not need to wait for client acknowledgements we can decrement
- // the reference count immediately
- if (!_acks)
- {
- msg.decrementReference();
- msg.dequeue(queue);
- }
}
else
{
diff --git a/qpid/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java b/qpid/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java
index a75ddf4b63..9c4d8558ef 100644
--- a/qpid/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java
+++ b/qpid/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java
@@ -24,9 +24,13 @@ import org.apache.qpid.server.store.MessageStore;
import java.util.ArrayList;
import java.util.List;
+/**
+ * Holds a list of TxnOp instance representing transactional
+ * operations.
+ */
public class TxnBuffer
{
- private boolean _persistentMessageRecevied = false;
+ private boolean _containsPersistentChanges = false;
private final MessageStore _store;
private final List<TxnOp> _ops = new ArrayList<TxnOp>();
private static final Logger _log = Logger.getLogger(TxnBuffer.class);
@@ -36,45 +40,64 @@ public class TxnBuffer
_store = store;
}
- public void setPersistentMessageRecevied()
+ public void containsPersistentChanges()
{
- _persistentMessageRecevied = true;
+ _containsPersistentChanges = true;
}
public void commit() throws AMQException
{
- if (_persistentMessageRecevied)
+ if (_containsPersistentChanges)
{
_log.debug("Begin Transaction.");
_store.beginTran();
- }
- boolean failed = true;
- try
- {
- for (TxnOp op : _ops)
+ if(prepare())
{
- op.commit();
+ _log.debug("Transaction Succeeded");
+ _store.commitTran();
+ for (TxnOp op : _ops)
+ {
+ op.commit();
+ }
}
- _ops.clear();
- failed = false;
- }
- finally
- {
- if (_persistentMessageRecevied)
+ else
+ {
+ _log.debug("Transaction Failed");
+ _store.abortTran();
+ }
+ }else{
+ if(prepare())
{
- if (failed)
+ for (TxnOp op : _ops)
{
- _log.debug("Transaction Failed");
- _store.abortTran();
+ op.commit();
}
- else
+ }
+ }
+ _ops.clear();
+ }
+
+ private boolean prepare()
+ {
+ for (int i = 0; i < _ops.size(); i++)
+ {
+ TxnOp op = _ops.get(i);
+ try
+ {
+ op.prepare();
+ }
+ catch(Exception e)
+ {
+ //compensate previously prepared ops
+ for(int j = 0; j < i; j++)
{
- _log.debug("Transaction Succeeded");
- _store.commitTran();
- }
+ _ops.get(j).undoPrepare();
+ }
+ return false;
}
}
- }
+ return true;
+ }
public void rollback() throws AMQException
{
@@ -89,4 +112,9 @@ public class TxnBuffer
{
_ops.add(op);
}
+
+ public void cancel(TxnOp op)
+ {
+ _ops.remove(op);
+ }
}
diff --git a/qpid/java/broker/src/org/apache/qpid/server/txn/TxnOp.java b/qpid/java/broker/src/org/apache/qpid/server/txn/TxnOp.java
index 402e75c64b..f3f6d2122f 100644
--- a/qpid/java/broker/src/org/apache/qpid/server/txn/TxnOp.java
+++ b/qpid/java/broker/src/org/apache/qpid/server/txn/TxnOp.java
@@ -19,8 +19,33 @@ package org.apache.qpid.server.txn;
import org.apache.qpid.AMQException;
+/**
+ * This provides the abstraction of an individual operation within a
+ * transaction. It is used by the TxnBuffer class.
+ */
public interface TxnOp
{
- public void commit() throws AMQException;
+ /**
+ * Do the part of the operation that updates persistent state
+ */
+ public void prepare() throws AMQException;
+ /**
+ * Complete the operation started by prepare. Can now update in
+ * memory state or make netork transfers.
+ */
+ public void commit();
+ /**
+ * This is not the same as rollback. Unfortunately the use of an
+ * in memory reference count as a locking mechanism and a test for
+ * whether a message should be deleted means that as things are,
+ * handling an acknowledgement unavoidably alters both memory and
+ * persistent state on prepare. This is needed to 'compensate' or
+ * undo the in-memory change if the peristent update of later ops
+ * fails.
+ */
+ public void undoPrepare();
+ /**
+ * Rolls back the operation.
+ */
public void rollback();
}
diff --git a/qpid/java/broker/src/org/apache/qpid/server/util/OrderedMapHelper.java b/qpid/java/broker/src/org/apache/qpid/server/util/OrderedMapHelper.java
new file mode 100644
index 0000000000..2c24944317
--- /dev/null
+++ b/qpid/java/broker/src/org/apache/qpid/server/util/OrderedMapHelper.java
@@ -0,0 +1,116 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import java.util.List;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Utility class used by AMQChannel to retrieve unacknowledged
+ * messages. Made generic to avoid exposing the inner class in
+ * AMQChannel. Put in this package to keep ot out the way.
+ */
+public class OrderedMapHelper<K, V>
+{
+ private final Map<K, V> _map;
+ private final Object _lock;
+ private final K _wildcard;
+
+ public OrderedMapHelper(Map<K, V> map, Object lock, K wildcard)
+ {
+ _map = map;
+ _lock = lock;
+ _wildcard = wildcard;
+ }
+
+ /**
+ * Assumes the map passed in is ordered. Returns a copy of the
+ * map containing an individual key-value pair or a list of
+ * key-values upto and including the one matching the given
+ * key. If multiple == true and the key == the wildcard specified
+ * on construction, then all the values in the map will be
+ * returned.
+ */
+ public Map<K, V> getValues(K key, boolean multiple) throws NoSuchElementException
+ {
+ if (multiple)
+ {
+ if(key == _wildcard)
+ {
+ synchronized(_lock)
+ {
+ return new LinkedHashMap<K, V>(_map);
+ }
+ }
+ else
+ {
+ return getValues(key);
+ }
+ }
+ else
+ {
+ Map<K, V> values = new LinkedHashMap<K, V>();
+ values.put(key, getValue(key));
+ return values;
+ }
+ }
+
+ private V getValue(K key) throws NoSuchElementException
+ {
+ V value;
+ synchronized(_lock)
+ {
+ value = _map.get(key);
+ }
+
+ if(value == null)
+ {
+ throw new NoSuchElementException();
+ }
+ else
+ {
+ return value;
+ }
+ }
+
+ private Map<K, V> getValues(K key) throws NoSuchElementException
+ {
+ Map<K, V> values = new LinkedHashMap<K, V>();
+ synchronized(_lock)
+ {
+ if (!_map.containsKey(key))
+ {
+ throw new NoSuchElementException();
+ }
+
+ for(Map.Entry<K, V> entry : _map.entrySet())
+ {
+ values.put(entry.getKey(), entry.getValue());
+ if (entry.getKey() == key)
+ {
+ break;
+ }
+ }
+ }
+ return values;
+ }
+
+} \ No newline at end of file