diff options
Diffstat (limited to 'qpid/java/broker/src')
12 files changed, 599 insertions, 100 deletions
diff --git a/qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java index 146a048b3f..78889e69b5 100644 --- a/qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java @@ -19,6 +19,7 @@ package org.apache.qpid.server; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; @@ -26,16 +27,20 @@ import org.apache.qpid.server.exchange.MessageRouter; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.NoConsumersException; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.TxnBuffer; import org.apache.qpid.server.txn.TxnOp; +import org.apache.qpid.server.util.OrderedMapHelper; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; +import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -94,6 +99,8 @@ public class AMQChannel private final TxnBuffer _txnBuffer; + private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>(); + public static class UnacknowledgedMessage { public final AMQMessage message; @@ -202,18 +209,53 @@ public class AMQChannel //don't create a transaction unless needed if(_currentMessage.isPersistent()) { - _txnBuffer.setPersistentMessageRecevied(); + _txnBuffer.containsPersistentChanges(); } - //don't route this until commit - _txnBuffer.enlist(new Publish(_currentMessage)); - _currentMessage = null; + //A publication will result in the enlisting of several + //TxnOps. The first is an op that will store the message. + //Following that (and ordering is important), an op will + //be added for every queue onto which the message is + //enqueued. Finally a cleanup op will be added to decrement + //the reference associated with the routing. + Store storeOp = new Store(_currentMessage); + _txnBuffer.enlist(storeOp); + _currentMessage.setTxnBuffer(_txnBuffer); + try + { + _exchanges.routeContent(_currentMessage); + _txnBuffer.enlist(new Cleanup(_currentMessage)); + } + catch(RequiredDeliveryException e) + { + //Can only be due to the mandatory flag, as no attempt + //has yet been made to deliver the message. The + //message will thus not have been delivered to any + //queue so we can return the message (without killing + //the transaction) and for efficiency remove the store + //operation from the buffer. + _txnBuffer.cancel(storeOp); + throw e; + } + finally + { + _currentMessage = null; + } } else { - _exchanges.routeContent(_currentMessage); - _currentMessage.decrementReference(); - _currentMessage = null; + try + { + _exchanges.routeContent(_currentMessage); + //following check implements the functionality + //required by the 'immediate' flag: + _currentMessage.checkDeliveredToConsumer(); + } + finally + { + _currentMessage.decrementReference(); + _currentMessage = null; + } } } @@ -402,8 +444,14 @@ public class AMQChannel { if (_transactional) { - //don't handle this until commit - _txnBuffer.enlist(new Ack(deliveryTag, multiple)); + try + { + _txnBuffer.enlist(new Ack(getUnackedMessageFinder().getValues(deliveryTag, multiple))); + } + catch(NoSuchElementException e) + { + throw new AMQException("Received ack for unrecognised delivery tag: " + deliveryTag); + } } else { @@ -540,6 +588,7 @@ public class AMQChannel public void commit() throws AMQException { _txnBuffer.commit(); + //TODO: may need to return 'immediate' messages at this point } public void rollback() throws AMQException @@ -578,20 +627,74 @@ public class AMQChannel return _defaultQueue; } + public void processReturns(AMQProtocolSession session) + { + for(AMQDataBlock block : _returns) + { + session.writeFrame(block); + } + _returns.clear(); + } + + private OrderedMapHelper<Long, UnacknowledgedMessage> getUnackedMessageFinder() + { + return new OrderedMapHelper<Long, UnacknowledgedMessage>(_unacknowledgedMessageMap, _unacknowledgedMessageMapLock, 0L); + } + + private class Ack implements TxnOp { - private final long _msgId; - private final boolean _multi; + private final Map<Long, UnacknowledgedMessage> _unacked; - Ack(long msgId, boolean multi) + Ack(Map<Long, UnacknowledgedMessage> unacked) throws AMQException { - _msgId = msgId; - _multi = multi; + _unacked = unacked; + + //if any of the messages in unacked are persistent the txn + //buffer must be marked as persistent: + for(UnacknowledgedMessage msg : _unacked.values()) + { + if(msg.message.isPersistent()) + { + _txnBuffer.containsPersistentChanges(); + break; + } + } } - public void commit() throws AMQException + public void prepare() throws AMQException { - handleAcknowledgement(_msgId, _multi); + //make persistent changes, i.e. dequeue and decrementReference + for(UnacknowledgedMessage msg : _unacked.values()) + { + msg.discard(); + } + } + + public void undoPrepare() + { + //decrementReference is annoyingly untransactional (due to + //in memory counter) so if we failed in prepare for full + //txn, this op will have to compensate by fixing the count + //in memory (persistent changes will be rolled back by store) + for(UnacknowledgedMessage msg : _unacked.values()) + { + + msg.message.incrementReference(); + } + } + + public void commit() + { + //remove the unacked messages from the channels map + synchronized(_unacknowledgedMessageMapLock) + { + for(long tag : _unacked.keySet()) + { + _unacknowledgedMessageMap.remove(tag); + } + } + } public void rollback() @@ -599,40 +702,76 @@ public class AMQChannel } } - //TODO: - //implement a scheme whereby messages can be stored on disk - //until commit, then reloaded... - private class Publish implements TxnOp + private class Store implements TxnOp { + //just use this to do a store of the message during the + //prepare phase. Any enqueueing etc is done by TxnOps enlisted + //by the queues themselves. private final AMQMessage _msg; - Publish(AMQMessage msg) + Store(AMQMessage msg) { _msg = msg; } - public boolean isPersistent() throws AMQException + public void prepare() throws AMQException { - return _msg.isPersistent(); + _msg.storeMessage(); + //the routers reference can now be released + _msg.decrementReference(); } - public void commit() throws AMQException + public void undoPrepare() + { + } + + public void commit() { - _exchanges.routeContent(_msg); - _msg.decrementReference(); } public void rollback() { + } + } + + private class Cleanup implements TxnOp + { + private final AMQMessage _msg; + + Cleanup(AMQMessage msg) + { + _msg = msg; + } + + public void prepare() throws AMQException + { + //the routers reference can now be released + _msg.decrementReference(); try { - _msg.decrementReference(); + _msg.checkDeliveredToConsumer(); } - catch (AMQException e) + catch(NoConsumersException e) { - _log.error("Error rolling back a publish request: " + e, e); + //TODO: store this for delivery after the commit-ok + _returns.add(e.getReturnMessage(_channelId)); } } + + public void undoPrepare() + { + //don't need to do anything here, if the store's txn failed + //when processing prepare then the message was not stored + //or enqueued on any queues and can be discarded + } + + public void commit() + { + } + + public void rollback() + { + } } } diff --git a/qpid/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java b/qpid/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java index ce18c94c2b..7cd1612815 100644 --- a/qpid/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java +++ b/qpid/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java @@ -20,6 +20,7 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.TxCommitBody; import org.apache.qpid.framing.TxCommitOkBody; +import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -46,8 +47,10 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody> { try{ - protocolSession.getChannel(evt.getChannelId()).commit(); - protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId())); + AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); + channel.commit(); + protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId())); + channel.processReturns(protocolSession); }catch(AMQException e){ throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage()); } diff --git a/qpid/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java index 481941a128..6573be782b 100644 --- a/qpid/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java +++ b/qpid/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java @@ -19,8 +19,9 @@ package org.apache.qpid.server.queue; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.framing.*; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.txn.TxnBuffer; import org.apache.qpid.AMQException; import java.util.ArrayList; @@ -61,17 +62,44 @@ public class AMQMessage */ private transient final MessageStore _store; + /** + * For non transactional publishes, a message can be stored as + * soon as it is complete. For transactional messages it doesnt + * need to be stored until the transaction is committed. + */ + private boolean _storeWhenComplete; + + /** + * TxnBuffer for transactionally published messages + */ + private TxnBuffer _txnBuffer; + + /** + * Flag to indicate whether message has been delivered to a + * consumer. Used in implementing return functionality for + * messages published with the 'immediate' flag. + */ + private boolean _deliveredToConsumer; + + public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody) { + this(messageStore, publishBody, true); + } + + public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody, boolean storeWhenComplete) + { _messageId = messageStore.getNewMessageId(); _publishBody = publishBody; _store = messageStore; _contentBodies = new LinkedList<ContentBody>(); + _storeWhenComplete = storeWhenComplete; } public AMQMessage(MessageStore store, long messageId, BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies) throws AMQException + { _publishBody = publishBody; _contentHeaderBody = contentHeaderBody; @@ -93,7 +121,7 @@ public class AMQMessage this(msg._store, msg._messageId, msg._publishBody, msg._contentHeaderBody, msg._contentBodies); } - private void storeMessage() throws AMQException + public void storeMessage() throws AMQException { if (isPersistent()) { @@ -149,7 +177,7 @@ public class AMQMessage public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException { _contentHeaderBody = contentHeaderBody; - if (isAllContentReceived()) + if (_storeWhenComplete && isAllContentReceived()) { storeMessage(); } @@ -169,7 +197,7 @@ public class AMQMessage { _contentBodies.add(contentBody); _bodyLengthReceived += contentBody.getSize(); - if (isAllContentReceived()) + if (_storeWhenComplete && isAllContentReceived()) { storeMessage(); } @@ -294,4 +322,35 @@ public class AMQMessage return _contentHeaderBody.properties instanceof BasicContentHeaderProperties &&((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2; } + + public void setTxnBuffer(TxnBuffer buffer) + { + _txnBuffer = buffer; + } + + public TxnBuffer getTxnBuffer() + { + return _txnBuffer; + } + + /** + * Called to enforce the 'immediate' flag. + * @throws NoConsumersException if the message is marked for + * immediate delivery but has not been marked as delivered to a + * consumer + */ + public void checkDeliveredToConsumer() throws NoConsumersException{ + if(isImmediate() && !_deliveredToConsumer) + { + throw new NoConsumersException(_publishBody, _contentHeaderBody, _contentBodies); + } + } + + /** + * Called when this message is delivered to a consumer. (used to + * implement the 'immediate' flag functionality). + */ + public void setDeliveredToConsumer(){ + _deliveredToConsumer = true; + } } diff --git a/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java index bf510d379e..886846c0a5 100644 --- a/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java @@ -27,6 +27,8 @@ import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.txn.TxnBuffer; +import org.apache.qpid.server.txn.TxnOp; import javax.management.JMException; import javax.management.MBeanException; @@ -610,11 +612,55 @@ public class AMQQueue implements Managable public void deliver(AMQMessage msg) throws AMQException { + TxnBuffer buffer = msg.getTxnBuffer(); + if(buffer == null) + { + //non-transactional + record(msg); + process(msg); + } + else + { + buffer.enlist(new Deliver(msg)); + } + } + + private void record(AMQMessage msg) throws AMQException + { msg.enqueue(this); + msg.incrementReference(); + } + + private void process(AMQMessage msg) throws FailedDequeueException + { _deliveryMgr.deliver(getName(), msg); - updateReceivedMessageCount(); + try + { + msg.checkDeliveredToConsumer(); + updateReceivedMessageCount(); + } + catch(NoConsumersException e) + { + // as this message will be returned, it should be removed + // from the queue: + dequeue(msg); + } + } + void dequeue(AMQMessage msg) throws FailedDequeueException + { + try + { + msg.decrementReference(); + msg.dequeue(this); + } + catch(AMQException e) + { + throw new FailedDequeueException(_name, e); + } + } + public void deliverAsync() { _deliveryMgr.processAsync(_asyncDelivery); @@ -664,4 +710,49 @@ public class AMQQueue implements Managable _logger.debug(MessageFormat.format(msg, args)); } } + + private class Deliver implements TxnOp + { + private final AMQMessage _msg; + + Deliver(AMQMessage msg) + { + _msg = msg; + } + + public void prepare() throws AMQException + { + record(_msg); + } + + public void undoPrepare() + { + } + + public void commit() + { + try + { + process(_msg); + } + catch(FailedDequeueException e) + { + //TODO: is there anything else we can do here? I think not... + _logger.error("Error during commit of a queue delivery: " + e, e); + } + } + + public void rollback() + { + try + { + _msg.decrementReference(); + } + catch (AMQException e) + { + _logger.error("Error rolling back a queue delivery: " + e, e); + } + } + } + } diff --git a/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java b/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java index fbd952073e..64a54fe803 100644 --- a/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java +++ b/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java @@ -72,8 +72,16 @@ class DeliveryManager { if (_queueing) { - _messages.offer(msg); - return true; + if(msg.isImmediate()) + { + //can't enqueue messages for whom immediate delivery is required + return false; + } + else + { + _messages.offer(msg); + return true; + } } else { @@ -147,14 +155,7 @@ class DeliveryManager //We don't synchronize access to subscribers so need to re-check if (next != null) { - try - { - next.send(poll(), _queue); - } - catch (AMQException e) - { - _log.error("Unable to deliver message: " + e, e); - } + next.send(poll(), _queue); } else { @@ -162,6 +163,10 @@ class DeliveryManager } } } + catch (FailedDequeueException e) + { + _log.error("Unable to deliver message as dequeue failed: " + e, e); + } finally { _processing.set(false); @@ -211,10 +216,10 @@ class DeliveryManager * @param msg the message to deliver * @throws NoConsumersException if there are no active subscribers to deliver * the message to + * @throws FailedDequeueException if the message could not be dequeued */ - void deliver(String name, AMQMessage msg) throws AMQException + void deliver(String name, AMQMessage msg) throws FailedDequeueException { - msg.incrementReference(); // first check whether we are queueing, and enqueue if we are if (!enqueue(msg)) { @@ -222,11 +227,7 @@ class DeliveryManager Subscription s = _subscriptions.nextSubscriber(msg); if (s == null) { - if (msg.isImmediate()) - { - throw msg.getNoConsumersException(name); - } - else + if (!msg.isImmediate()) { // no subscribers yet so enter 'queueing' mode and queue this message startQueueing(msg); @@ -235,19 +236,7 @@ class DeliveryManager else { s.send(msg, _queue); - } - } - - else - { - if (msg.isImmediate()) - { - //todo check with spec to see if enqueing for immediate client delivery is ok. - Subscription s = _subscriptions.nextSubscriber(msg); - if (s == null) - { - throw msg.getNoConsumersException(name); - } + msg.setDeliveredToConsumer(); } } } diff --git a/qpid/java/broker/src/org/apache/qpid/server/queue/FailedDequeueException.java b/qpid/java/broker/src/org/apache/qpid/server/queue/FailedDequeueException.java new file mode 100644 index 0000000000..f27d935c25 --- /dev/null +++ b/qpid/java/broker/src/org/apache/qpid/server/queue/FailedDequeueException.java @@ -0,0 +1,36 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; + +/** + * Signals that the dequeue of a message from a queue failed + */ +public class FailedDequeueException extends AMQException +{ + public FailedDequeueException(String queue) + { + super("Failed to dequeue message from " + queue); + } + + public FailedDequeueException(String queue, AMQException e) + { + super("Failed to dequeue message from " + queue, e); + } +} diff --git a/qpid/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java b/qpid/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java index 0616f75633..277d7e2793 100644 --- a/qpid/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java +++ b/qpid/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java @@ -40,6 +40,13 @@ public class NoConsumersException extends RequiredDeliveryException super("Immediate delivery to " + queue + " is not possible.", publishBody, contentHeaderBody, contentBodies); } + public NoConsumersException(BasicPublishBody publishBody, + ContentHeaderBody contentHeaderBody, + List<ContentBody> contentBodies) + { + super("Immediate delivery is not possible.", publishBody, contentHeaderBody, contentBodies); + } + public int getReplyCode() { return AMQConstant.NO_CONSUMERS.getCode(); diff --git a/qpid/java/broker/src/org/apache/qpid/server/queue/Subscription.java b/qpid/java/broker/src/org/apache/qpid/server/queue/Subscription.java index d76e9ec105..2c073427da 100644 --- a/qpid/java/broker/src/org/apache/qpid/server/queue/Subscription.java +++ b/qpid/java/broker/src/org/apache/qpid/server/queue/Subscription.java @@ -21,7 +21,7 @@ import org.apache.qpid.AMQException; public interface Subscription { - void send(AMQMessage msg, AMQQueue queue) throws AMQException; + void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException; boolean isSuspended(); diff --git a/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java b/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java index 902e75bf48..b0855999a2 100644 --- a/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -122,10 +122,23 @@ public class SubscriptionImpl implements Subscription * @param queue * @throws AMQException */ - public void send(AMQMessage msg, AMQQueue queue) throws AMQException + public void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException { if (msg != null) { + // if we do not need to wait for client acknowledgements + // we can decrement the reference count immediately. + + // By doing this _before_ the send we ensure that it + // doesn't get sent if it can't be dequeued, preventing + // duplicate delivery on recovery. + + // The send may of course still fail, in which case, as + // the message is unacked, it will be lost. + if (!_acks) + { + queue.dequeue(msg); + } synchronized(channel) { long deliveryTag = channel.getNextDeliveryTag(); @@ -140,13 +153,6 @@ public class SubscriptionImpl implements Subscription protocolSession.writeFrame(frame); } - // if we do not need to wait for client acknowledgements we can decrement - // the reference count immediately - if (!_acks) - { - msg.decrementReference(); - msg.dequeue(queue); - } } else { diff --git a/qpid/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java b/qpid/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java index a75ddf4b63..9c4d8558ef 100644 --- a/qpid/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java +++ b/qpid/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java @@ -24,9 +24,13 @@ import org.apache.qpid.server.store.MessageStore; import java.util.ArrayList; import java.util.List; +/** + * Holds a list of TxnOp instance representing transactional + * operations. + */ public class TxnBuffer { - private boolean _persistentMessageRecevied = false; + private boolean _containsPersistentChanges = false; private final MessageStore _store; private final List<TxnOp> _ops = new ArrayList<TxnOp>(); private static final Logger _log = Logger.getLogger(TxnBuffer.class); @@ -36,45 +40,64 @@ public class TxnBuffer _store = store; } - public void setPersistentMessageRecevied() + public void containsPersistentChanges() { - _persistentMessageRecevied = true; + _containsPersistentChanges = true; } public void commit() throws AMQException { - if (_persistentMessageRecevied) + if (_containsPersistentChanges) { _log.debug("Begin Transaction."); _store.beginTran(); - } - boolean failed = true; - try - { - for (TxnOp op : _ops) + if(prepare()) { - op.commit(); + _log.debug("Transaction Succeeded"); + _store.commitTran(); + for (TxnOp op : _ops) + { + op.commit(); + } } - _ops.clear(); - failed = false; - } - finally - { - if (_persistentMessageRecevied) + else + { + _log.debug("Transaction Failed"); + _store.abortTran(); + } + }else{ + if(prepare()) { - if (failed) + for (TxnOp op : _ops) { - _log.debug("Transaction Failed"); - _store.abortTran(); + op.commit(); } - else + } + } + _ops.clear(); + } + + private boolean prepare() + { + for (int i = 0; i < _ops.size(); i++) + { + TxnOp op = _ops.get(i); + try + { + op.prepare(); + } + catch(Exception e) + { + //compensate previously prepared ops + for(int j = 0; j < i; j++) { - _log.debug("Transaction Succeeded"); - _store.commitTran(); - } + _ops.get(j).undoPrepare(); + } + return false; } } - } + return true; + } public void rollback() throws AMQException { @@ -89,4 +112,9 @@ public class TxnBuffer { _ops.add(op); } + + public void cancel(TxnOp op) + { + _ops.remove(op); + } } diff --git a/qpid/java/broker/src/org/apache/qpid/server/txn/TxnOp.java b/qpid/java/broker/src/org/apache/qpid/server/txn/TxnOp.java index 402e75c64b..f3f6d2122f 100644 --- a/qpid/java/broker/src/org/apache/qpid/server/txn/TxnOp.java +++ b/qpid/java/broker/src/org/apache/qpid/server/txn/TxnOp.java @@ -19,8 +19,33 @@ package org.apache.qpid.server.txn; import org.apache.qpid.AMQException; +/** + * This provides the abstraction of an individual operation within a + * transaction. It is used by the TxnBuffer class. + */ public interface TxnOp { - public void commit() throws AMQException; + /** + * Do the part of the operation that updates persistent state + */ + public void prepare() throws AMQException; + /** + * Complete the operation started by prepare. Can now update in + * memory state or make netork transfers. + */ + public void commit(); + /** + * This is not the same as rollback. Unfortunately the use of an + * in memory reference count as a locking mechanism and a test for + * whether a message should be deleted means that as things are, + * handling an acknowledgement unavoidably alters both memory and + * persistent state on prepare. This is needed to 'compensate' or + * undo the in-memory change if the peristent update of later ops + * fails. + */ + public void undoPrepare(); + /** + * Rolls back the operation. + */ public void rollback(); } diff --git a/qpid/java/broker/src/org/apache/qpid/server/util/OrderedMapHelper.java b/qpid/java/broker/src/org/apache/qpid/server/util/OrderedMapHelper.java new file mode 100644 index 0000000000..2c24944317 --- /dev/null +++ b/qpid/java/broker/src/org/apache/qpid/server/util/OrderedMapHelper.java @@ -0,0 +1,116 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.util; + +import java.util.List; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.NoSuchElementException; + +/** + * Utility class used by AMQChannel to retrieve unacknowledged + * messages. Made generic to avoid exposing the inner class in + * AMQChannel. Put in this package to keep ot out the way. + */ +public class OrderedMapHelper<K, V> +{ + private final Map<K, V> _map; + private final Object _lock; + private final K _wildcard; + + public OrderedMapHelper(Map<K, V> map, Object lock, K wildcard) + { + _map = map; + _lock = lock; + _wildcard = wildcard; + } + + /** + * Assumes the map passed in is ordered. Returns a copy of the + * map containing an individual key-value pair or a list of + * key-values upto and including the one matching the given + * key. If multiple == true and the key == the wildcard specified + * on construction, then all the values in the map will be + * returned. + */ + public Map<K, V> getValues(K key, boolean multiple) throws NoSuchElementException + { + if (multiple) + { + if(key == _wildcard) + { + synchronized(_lock) + { + return new LinkedHashMap<K, V>(_map); + } + } + else + { + return getValues(key); + } + } + else + { + Map<K, V> values = new LinkedHashMap<K, V>(); + values.put(key, getValue(key)); + return values; + } + } + + private V getValue(K key) throws NoSuchElementException + { + V value; + synchronized(_lock) + { + value = _map.get(key); + } + + if(value == null) + { + throw new NoSuchElementException(); + } + else + { + return value; + } + } + + private Map<K, V> getValues(K key) throws NoSuchElementException + { + Map<K, V> values = new LinkedHashMap<K, V>(); + synchronized(_lock) + { + if (!_map.containsKey(key)) + { + throw new NoSuchElementException(); + } + + for(Map.Entry<K, V> entry : _map.entrySet()) + { + values.put(entry.getKey(), entry.getValue()); + if (entry.getKey() == key) + { + break; + } + } + } + return values; + } + +}
\ No newline at end of file |