diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid')
15 files changed, 336 insertions, 29 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java index faf5a724f3..f8585344b0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java @@ -29,6 +29,8 @@ public interface ConsumerTarget { + void acquisitionRemoved(MessageInstance node); + enum State { ACTIVE, SUSPENDED, CLOSED diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 58ffd88b85..e41bb948dc 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -511,7 +511,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> Exchange altExchange = getAlternateExchange(); if(altExchange != null) { - return ((ExchangeImpl)altExchange).send(message, routingAddress, instanceProperties, txn, postEnqueueAction); + return altExchange.send(message, routingAddress, instanceProperties, txn, postEnqueueAction); } else { @@ -520,7 +520,24 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> } else { - final BaseQueue[] baseQueues = queues.toArray(new BaseQueue[queues.size()]); + final BaseQueue[] baseQueues; + + if(message.isReferenced()) + { + ArrayList<BaseQueue> uniqueQueues = new ArrayList<>(queues.size()); + for(BaseQueue q : queues) + { + if(!message.isReferenced(q)) + { + uniqueQueues.add(q); + } + } + baseQueues = uniqueQueues.toArray(new BaseQueue[uniqueQueues.size()]); + } + else + { + baseQueues = queues.toArray(new BaseQueue[queues.size()]); + } txn.enqueue(queues,message, new ServerTransaction.Action() { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java index d397dd57b6..d2789bfe58 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java @@ -21,10 +21,17 @@ package org.apache.qpid.server.message; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.UUID; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.util.ServerScopedRuntimeException; public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData> implements ServerMessage<T> @@ -33,10 +40,14 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> _refCountUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractServerMessageImpl.class, "_referenceCount"); + private static final AtomicReferenceFieldUpdater<AbstractServerMessageImpl, Collection> _resourcesUpdater = + AtomicReferenceFieldUpdater.newUpdater(AbstractServerMessageImpl.class, Collection.class,"_resources"); + private volatile int _referenceCount = 0; private final StoredMessage<T> _handle; private final Object _connectionReference; + private volatile Collection<UUID> _resources; public AbstractServerMessageImpl(StoredMessage<T> handle, Object connectionReference) @@ -117,6 +128,26 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI } @Override + final public MessageReference<X> newReference(TransactionLogResource object) + { + return new Reference(this, object); + } + + @Override + final public boolean isReferenced(TransactionLogResource resource) + { + Collection<UUID> resources = _resources; + return resources != null && resources.contains(resource.getId()); + } + + @Override + final public boolean isReferenced() + { + Collection<UUID> resources = _resources; + return resources != null && !resources.isEmpty(); + } + + @Override final public boolean isPersistent() { return _handle.getMetaData().isPersistent(); @@ -156,15 +187,52 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI AtomicIntegerFieldUpdater.newUpdater(Reference.class, "_released"); private AbstractServerMessageImpl<X, T> _message; + private final UUID _resourceId; private volatile int _released; private Reference(final AbstractServerMessageImpl<X, T> message) { + this(message, null); + } + private Reference(final AbstractServerMessageImpl<X, T> message, TransactionLogResource resource) + { _message = message; + if(resource != null) + { + Collection<UUID> currentValue; + Collection<UUID> newValue; + _resourceId = resource.getId(); + do + { + currentValue = _message._resources; + + if(currentValue == null) + { + newValue = Collections.singleton(_resourceId); + } + else + { + if(currentValue.contains(_resourceId)) + { + throw new MessageAlreadyReferencedException(_message.getMessageNumber(), resource); + } + newValue = new ArrayList<>(currentValue.size()+1); + newValue.addAll(currentValue); + newValue.add(_resourceId); + } + + } + while(!_resourcesUpdater.compareAndSet(_message, currentValue, newValue)); + } + else + { + _resourceId = null; + } if(!_message.incrementReference()) { throw new MessageDeletedException(message.getMessageNumber()); } + } public X getMessage() @@ -176,6 +244,34 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI { if(_releasedUpdater.compareAndSet(this,0,1)) { + if(_resourceId != null) + { + Collection<UUID> currentValue; + Collection<UUID> newValue; + do + { + currentValue = _message._resources; + if(currentValue.size() == 1) + { + newValue = null; + } + else + { + UUID[] array = new UUID[currentValue.size()-1]; + int pos = 0; + for(UUID uuid : currentValue) + { + if(!_resourceId.equals(uuid)) + { + array[pos++] = uuid; + } + } + newValue = Arrays.asList(array); + } + } + while(!_resourcesUpdater.compareAndSet(_message, currentValue, newValue)); + + } _message.decrementReference(); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java new file mode 100644 index 0000000000..7ab2625e63 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.message; + +import org.apache.qpid.server.store.TransactionLogResource; + +public class MessageAlreadyReferencedException extends RuntimeException +{ + MessageAlreadyReferencedException(final long messageNumber, TransactionLogResource resource) + { + super("The message with id " + messageNumber + " is already referenced by resource " + resource.getName()); + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java index 4ee47e05e9..1bf451948d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java @@ -51,6 +51,8 @@ public interface MessageInstance boolean isAcquiredBy(ConsumerImpl consumer); + boolean removeAcquisitionFromConsumer(ConsumerImpl consumer); + void setRedelivered(); boolean isRedelivered(); @@ -67,6 +69,10 @@ public interface MessageInstance boolean acquire(ConsumerImpl sub); + boolean lockAcquisition(); + + boolean unlockAcquisition(); + int getMaximumDeliveryCount(); int routeToAlternate(Action<? super MessageInstance> action, ServerTransaction txn); @@ -99,6 +105,7 @@ public interface MessageInstance State currentState = getState(); return currentState == State.DEQUEUED || currentState == State.DELETED; } + } @@ -162,10 +169,12 @@ public interface MessageInstance public final class ConsumerAcquiredState<C extends ConsumerImpl> extends EntryState { private final C _consumer; + private final LockedAcquiredState<C> _lockedState; public ConsumerAcquiredState(C consumer) { _consumer = consumer; + _lockedState = new LockedAcquiredState<>(this); } @@ -183,6 +192,43 @@ public interface MessageInstance { return "{" + getState().name() + " : " + _consumer +"}"; } + + public LockedAcquiredState<C> getLockedState() + { + return _lockedState; + } + + } + + public final class LockedAcquiredState<C extends ConsumerImpl> extends EntryState + { + private final ConsumerAcquiredState<C> _acquiredState; + + public LockedAcquiredState(final ConsumerAcquiredState<C> acquiredState) + { + _acquiredState = acquiredState; + } + + @Override + public State getState() + { + return State.ACQUIRED; + } + + public C getConsumer() + { + return _acquiredState.getConsumer(); + } + + public String toString() + { + return "{" + getState().name() + " : " + _acquiredState.getConsumer() +"}"; + } + + public ConsumerAcquiredState<C> getUnlockedState() + { + return _acquiredState; + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java index 8c35af8be4..81e6b13ffd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java @@ -20,10 +20,11 @@ */ package org.apache.qpid.server.message; +import java.nio.ByteBuffer; + import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoredMessage; - -import java.nio.ByteBuffer; +import org.apache.qpid.server.store.TransactionLogResource; public interface ServerMessage<T extends StorableMessageMetaData> extends EnqueueableMessage, MessageContentSource { @@ -41,6 +42,12 @@ public interface ServerMessage<T extends StorableMessageMetaData> extends Enqueu MessageReference newReference(); + MessageReference newReference(TransactionLogResource object); + + boolean isReferenced(TransactionLogResource resource); + + boolean isReferenced(); + long getMessageNumber(); long getArrivalTime(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 54f3c4de09..545a1d941d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -1164,6 +1164,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> else { deliverMessage(sub, entry, false); + if(sub.acquires()) + { + entry.unlockAcquisition(); + } } } } @@ -2001,6 +2005,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> else { deliverMessage(sub, node, batch); + if(sub.acquires()) + { + node.unlockAcquisition(); + } } } @@ -2253,14 +2261,28 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> if (!node.isDeleted()) { // If the node has expired then acquire it - if (node.expired() && node.acquire()) + if (node.expired()) { - if (_logger.isDebugEnabled()) + boolean acquiredForDequeueing = node.acquire(); + if(!acquiredForDequeueing && node.getDeliveredToConsumer()) + { + QueueConsumer consumer = (QueueConsumer) node.getDeliveredConsumer(); + acquiredForDequeueing = node.removeAcquisitionFromConsumer(consumer); + if(acquiredForDequeueing) + { + consumer.acquisitionRemoved(node); + } + } + + if(acquiredForDequeueing) { - _logger.debug("Dequeuing expired node " + node); + if (_logger.isDebugEnabled()) + { + _logger.debug("Dequeuing expired node " + node); + } + // Then dequeue it. + dequeueEntry(node); } - // Then dequeue it. - dequeueEntry(node); } else { @@ -2527,7 +2549,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> final ServerTransaction txn, final Action<? super MessageInstance> postEnqueueAction) { - txn.enqueue(this,message, new ServerTransaction.Action() + if(!message.isReferenced(this)) + { + txn.enqueue(this, message, new ServerTransaction.Action() { MessageReference _reference = message.newReference(); @@ -2549,6 +2573,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } }); return 1; + } + else + { + return 0; + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java index 5ffbc0dbaa..71b7636159 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java @@ -39,6 +39,8 @@ public interface QueueConsumer<X extends QueueConsumer<X>> extends ConsumerImpl, void send(QueueEntry entry, boolean batch); + void acquisitionRemoved(QueueEntry node); + void queueDeleted(); SubFlushRunner getRunner(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java index 55782ac095..d80aa92007 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java @@ -477,6 +477,13 @@ class QueueConsumerImpl } @Override + public void acquisitionRemoved(final QueueEntry node) + { + _target.acquisitionRemoved(node); + _queue.decrementUnackedMsgCount(node); + } + + @Override public String getDistributionMode() { return _distributionMode; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 49644f8d76..6c541d78ef 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -103,7 +103,7 @@ public abstract class QueueEntryImpl implements QueueEntry { _queueEntryList = queueEntryList; - _message = message == null ? null : message.newReference(); + _message = message == null ? null : message.newReference(queueEntryList.getQueue()); _entryIdUpdater.set(this, entryId); populateInstanceProperties(); @@ -112,7 +112,7 @@ public abstract class QueueEntryImpl implements QueueEntry public QueueEntryImpl(QueueEntryList queueEntryList, ServerMessage message) { _queueEntryList = queueEntryList; - _message = message == null ? null : message.newReference(); + _message = message == null ? null : message.newReference(queueEntryList.getQueue()); populateInstanceProperties(); } @@ -210,7 +210,7 @@ public abstract class QueueEntryImpl implements QueueEntry public boolean acquire(ConsumerImpl sub) { - final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState()); + final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState().getLockedState()); if(acquired) { _deliveryCountUpdater.compareAndSet(this,-1,0); @@ -218,17 +218,57 @@ public abstract class QueueEntryImpl implements QueueEntry return acquired; } + @Override + public boolean lockAcquisition() + { + EntryState state = _state; + if(state instanceof ConsumerAcquiredState) + { + return _stateUpdater.compareAndSet(this, state, ((ConsumerAcquiredState)state).getLockedState()); + } + return state instanceof LockedAcquiredState; + } + + @Override + public boolean unlockAcquisition() + { + EntryState state = _state; + if(state instanceof LockedAcquiredState) + { + return _stateUpdater.compareAndSet(this, state, ((LockedAcquiredState)state).getUnlockedState()); + } + return false; + } + public boolean acquiredByConsumer() { - return (_state instanceof ConsumerAcquiredState); + return (_state instanceof ConsumerAcquiredState) || (_state instanceof LockedAcquiredState); } + @Override public boolean isAcquiredBy(ConsumerImpl consumer) { EntryState state = _state; - return state instanceof ConsumerAcquiredState - && ((ConsumerAcquiredState)state).getConsumer() == consumer; + return (state instanceof ConsumerAcquiredState + && ((ConsumerAcquiredState)state).getConsumer() == consumer) + || (state instanceof LockedAcquiredState + && ((LockedAcquiredState)state).getConsumer() == consumer); + } + + @Override + public boolean removeAcquisitionFromConsumer(ConsumerImpl consumer) + { + EntryState state = _state; + if(state instanceof ConsumerAcquiredState + && ((ConsumerAcquiredState)state).getConsumer() == consumer) + { + return _stateUpdater.compareAndSet(this,state,NON_CONSUMER_ACQUIRED_STATE); + } + else + { + return false; + } } public void release() @@ -238,7 +278,7 @@ public abstract class QueueEntryImpl implements QueueEntry if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE)) { - if(state instanceof ConsumerAcquiredState) + if(state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState) { getQueue().decrementUnackedMsgCount(this); } @@ -268,6 +308,10 @@ public abstract class QueueEntryImpl implements QueueEntry { return (QueueConsumer) ((ConsumerAcquiredState) state).getConsumer(); } + else if (state instanceof LockedAcquiredState) + { + return (QueueConsumer) ((LockedAcquiredState) state).getConsumer(); + } else { return null; @@ -312,7 +356,7 @@ public abstract class QueueEntryImpl implements QueueEntry if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE)) { - if (state instanceof ConsumerAcquiredState) + if (state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState) { getQueue().decrementUnackedMsgCount(this); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java index 28dfc73a27..d4a91f2c0b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java @@ -24,7 +24,7 @@ import org.apache.qpid.server.message.ServerMessage; public interface QueueEntryList { - AMQQueue getQueue(); + AMQQueue<?> getQueue(); QueueEntry add(ServerMessage message); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java index ac28619d2d..d4aeca0437 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java @@ -151,6 +151,10 @@ public class FileKeyStoreImpl extends AbstractConfiguredObject<FileKeyStoreImpl> { super.validateChange(proxyForValidation, changedAttributes); FileKeyStore changedStore = (FileKeyStore) proxyForValidation; + if (changedAttributes.contains(KeyStore.DESIRED_STATE) && changedStore.getDesiredState() == State.DELETED) + { + return; + } if(changedAttributes.contains(NAME) && !getName().equals(changedStore.getName())) { throw new IllegalConfigurationException("Changing the key store name is not allowed"); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java index d71670fbe0..0596c21291 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java @@ -166,7 +166,12 @@ public class FileTrustStoreImpl extends AbstractConfiguredObject<FileTrustStoreI protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes) { super.validateChange(proxyForValidation, changedAttributes); + FileTrustStore updated = (FileTrustStore) proxyForValidation; + if (changedAttributes.contains(TrustStore.DESIRED_STATE) && updated.getDesiredState() == State.DELETED) + { + return; + } if(changedAttributes.contains(TrustStore.NAME) && !getName().equals(updated.getName())) { throw new IllegalConfigurationException("Changing the trust store name is not allowed"); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index bb7a726a0c..57142e6e1f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -473,7 +473,11 @@ public abstract class AbstractJDBCMessageStore implements MessageStore if (results == 0) { - getLogger().warn("Message metadata not found for message id " + messageId); + if (getLogger().isDebugEnabled()) + { + getLogger().debug("Message id " + messageId + + " not found (attempt to remove failed - probably application initiated rollback)"); + } } if (getLogger().isDebugEnabled()) @@ -482,7 +486,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT); - stmt.setLong(1,messageId); + stmt.setLong(1, messageId); results = stmt.executeUpdate(); } finally @@ -1492,7 +1496,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore if(stored()) { checkMessageStoreOpen(); - getLogger().debug("GET CONTENT for message id " + _messageId); data = AbstractJDBCMessageStore.this.getAllContent(_messageId); T metaData = _messageDataRef.getMetaData(); if (metaData == null) @@ -1568,7 +1571,10 @@ public abstract class AbstractJDBCMessageStore implements MessageStore @Override public void remove() { - getLogger().debug("REMOVE called on message: " + _messageId); + if (getLogger().isDebugEnabled()) + { + getLogger().debug("REMOVE called on message: " + _messageId); + } checkMessageStoreOpen(); int delta = getMetaData().getContentSize(); @@ -1605,7 +1611,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore { if (!stored()) { - getLogger().debug("STORING message id " + _messageId); storeMetaData(conn, _messageId, _messageDataRef.getMetaData()); AbstractJDBCMessageStore.this.addContent(conn, _messageId, _messageDataRef.getData() == null @@ -1636,7 +1641,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore Pointer(final MessageData<T> ref) { - getLogger().debug("POST COMMIT for message id " + _messageId); _ref = ref; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index f15f608907..b72d44debf 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -68,6 +68,8 @@ import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueConsumer; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.stats.StatisticsCounter; @@ -953,15 +955,26 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte op.withinTransaction(new Transaction() { - public void dequeue(final MessageInstance entry) + public void dequeue(final MessageInstance messageInstance) { - if(entry.acquire()) + boolean acquired = messageInstance.acquire(); + if(!acquired && messageInstance instanceof QueueEntry) + { + QueueEntry entry = (QueueEntry) messageInstance; + QueueConsumer consumer = (QueueConsumer) entry.getDeliveredConsumer(); + acquired = messageInstance.removeAcquisitionFromConsumer(consumer); + if(acquired) + { + consumer.acquisitionRemoved((QueueEntry)messageInstance); + } + } + if(acquired) { - txn.dequeue(entry.getOwningResource(), entry.getMessage(), new ServerTransaction.Action() + txn.dequeue(messageInstance.getOwningResource(), messageInstance.getMessage(), new ServerTransaction.Action() { public void postCommit() { - entry.delete(); + messageInstance.delete(); } public void onRollback() |