diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java | 213 |
1 files changed, 213 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java new file mode 100644 index 0000000000..28af36e3db --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -0,0 +1,213 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.txn; + +import java.util.LinkedList; +import java.util.List; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.ack.UnacknowledgedMessageMap; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.*; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreContext; + +/** @author Apache Software Foundation */ +public class NonTransactionalContext implements TransactionalContext +{ + private static final Logger _log = Logger.getLogger(NonTransactionalContext.class); + + /** Channel is useful for logging */ + private final AMQChannel _channel; + + /** Where to put undeliverable messages */ + private final List<RequiredDeliveryException> _returnMessages; + + + + private final MessageStore _messageStore; + + private final StoreContext _storeContext; + + /** Whether we are in a transaction */ + private boolean _inTran; + + public NonTransactionalContext(MessageStore messageStore, StoreContext storeContext, AMQChannel channel, + List<RequiredDeliveryException> returnMessages) + { + _channel = channel; + _storeContext = storeContext; + _returnMessages = returnMessages; + _messageStore = messageStore; + + } + + + public StoreContext getStoreContext() + { + return _storeContext; + } + + public void beginTranIfNecessary() throws AMQException + { + if (!_inTran) + { + _messageStore.beginTran(_storeContext); + _inTran = true; + } + } + + public void commit() throws AMQException + { + // Does not apply to this context + } + + public void rollback() throws AMQException + { + // Does not apply to this context + } + + public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException + { + QueueEntry entry = queue.enqueue(_storeContext, message); + + //following check implements the functionality + //required by the 'immediate' flag: + if(entry.immediateAndNotDelivered()) + { + _returnMessages.add(new NoConsumersException(entry.getMessage())); + } + + } + + public void requeue(QueueEntry entry) throws AMQException + { + entry.requeue(_storeContext); + } + + public void acknowledgeMessage(final long deliveryTag, long lastDeliveryTag, + boolean multiple, final UnacknowledgedMessageMap unacknowledgedMessageMap) + throws AMQException + { + + final boolean debug = _log.isDebugEnabled(); + ; + if (multiple) + { + if (deliveryTag == 0) + { + + //Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero, + // tells the server to acknowledge all outstanding mesages. + _log.info("Multiple ack on delivery tag 0. ACKing all messages. Current count:" + + unacknowledgedMessageMap.size()); + unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() + { + public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException + { + if (debug) + { + _log.debug("Discarding message: " + message.getMessage().getMessageId()); + } + if(message.getMessage().isPersistent()) + { + beginTranIfNecessary(); + } + //Message has been ack so discard it. This will dequeue and decrement the reference. + message.discard(_storeContext); + + return false; + } + + public void visitComplete() + { + unacknowledgedMessageMap.clear(); + } + }); + } + else + { + if (!unacknowledgedMessageMap.contains(deliveryTag)) + { + throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel"); + } + + unacknowledgedMessageMap.drainTo(deliveryTag, _storeContext); + } + } + else + { + QueueEntry msg; + msg = unacknowledgedMessageMap.get(deliveryTag); + + if (msg == null) + { + _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" + + _channel.getChannelId()); + throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + + _channel.getChannelId()); + } + + if (debug) + { + _log.debug("Discarding message: " + msg.getMessage().getMessageId()); + } + if(msg.getMessage().isPersistent()) + { + beginTranIfNecessary(); + } + + //Message has been ack so discard it. This will dequeue and decrement the reference. + msg.discard(_storeContext); + + unacknowledgedMessageMap.remove(deliveryTag); + + + if (debug) + { + _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " + + msg.getMessage().getMessageId()); + } + } + if(_inTran) + { + _messageStore.commitTran(_storeContext); + _inTran = false; + } + } + + public void messageFullyReceived(boolean persistent) throws AMQException + { + if (persistent) + { + _messageStore.commitTran(_storeContext); + _inTran = false; + } + } + + public void messageProcessed(AMQProtocolSession protocolSession) throws AMQException + { + _channel.processReturns(); + } +} |