diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-08-25 15:03:03 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-08-25 15:03:03 +0000 |
commit | abd8126799b786e8e9a73df8dd637e6aa2b0ae4f (patch) | |
tree | ba7549949f8c4192b74836dec0904e916cd49d95 | |
parent | 2a7c8b3061fda47cc53ef997c339599dd2285395 (diff) | |
download | qpid-python-abd8126799b786e8e9a73df8dd637e6aa2b0ae4f.tar.gz |
Merging from trunk r1616716:1616818 in the Java tree
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.30@1620333 13f79535-47bb-0310-9956-ffa450edef68
52 files changed, 1163 insertions, 161 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index 338882e6df..835846a5ec 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -388,10 +388,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore OperationStatus status = getMessageMetaDataDb().delete(tx, key); if (status == OperationStatus.NOTFOUND) { - getLogger().info( - "Message not found (attempt to remove failed - probably application initiated rollback) " - + - messageId); + if (getLogger().isDebugEnabled()) + { + getLogger().debug("Message id " + messageId + + " not found (attempt to remove failed - probably application initiated rollback)"); + } } if (getLogger().isDebugEnabled()) @@ -426,7 +427,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore catch(DatabaseException e2) { getLogger().warn( - "Unable to abort transaction after LockConflictExcption on removal of message with id " + "Unable to abort transaction after LockConflictException on removal of message with id " + messageId, e2); // rethrow the original log conflict exception, the secondary exception should already have diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java index faf5a724f3..f8585344b0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java @@ -29,6 +29,8 @@ public interface ConsumerTarget { + void acquisitionRemoved(MessageInstance node); + enum State { ACTIVE, SUSPENDED, CLOSED diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 58ffd88b85..e41bb948dc 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -511,7 +511,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> Exchange altExchange = getAlternateExchange(); if(altExchange != null) { - return ((ExchangeImpl)altExchange).send(message, routingAddress, instanceProperties, txn, postEnqueueAction); + return altExchange.send(message, routingAddress, instanceProperties, txn, postEnqueueAction); } else { @@ -520,7 +520,24 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> } else { - final BaseQueue[] baseQueues = queues.toArray(new BaseQueue[queues.size()]); + final BaseQueue[] baseQueues; + + if(message.isReferenced()) + { + ArrayList<BaseQueue> uniqueQueues = new ArrayList<>(queues.size()); + for(BaseQueue q : queues) + { + if(!message.isReferenced(q)) + { + uniqueQueues.add(q); + } + } + baseQueues = uniqueQueues.toArray(new BaseQueue[uniqueQueues.size()]); + } + else + { + baseQueues = queues.toArray(new BaseQueue[queues.size()]); + } txn.enqueue(queues,message, new ServerTransaction.Action() { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java index d397dd57b6..d2789bfe58 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java @@ -21,10 +21,17 @@ package org.apache.qpid.server.message; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.UUID; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.util.ServerScopedRuntimeException; public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData> implements ServerMessage<T> @@ -33,10 +40,14 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> _refCountUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractServerMessageImpl.class, "_referenceCount"); + private static final AtomicReferenceFieldUpdater<AbstractServerMessageImpl, Collection> _resourcesUpdater = + AtomicReferenceFieldUpdater.newUpdater(AbstractServerMessageImpl.class, Collection.class,"_resources"); + private volatile int _referenceCount = 0; private final StoredMessage<T> _handle; private final Object _connectionReference; + private volatile Collection<UUID> _resources; public AbstractServerMessageImpl(StoredMessage<T> handle, Object connectionReference) @@ -117,6 +128,26 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI } @Override + final public MessageReference<X> newReference(TransactionLogResource object) + { + return new Reference(this, object); + } + + @Override + final public boolean isReferenced(TransactionLogResource resource) + { + Collection<UUID> resources = _resources; + return resources != null && resources.contains(resource.getId()); + } + + @Override + final public boolean isReferenced() + { + Collection<UUID> resources = _resources; + return resources != null && !resources.isEmpty(); + } + + @Override final public boolean isPersistent() { return _handle.getMetaData().isPersistent(); @@ -156,15 +187,52 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI AtomicIntegerFieldUpdater.newUpdater(Reference.class, "_released"); private AbstractServerMessageImpl<X, T> _message; + private final UUID _resourceId; private volatile int _released; private Reference(final AbstractServerMessageImpl<X, T> message) { + this(message, null); + } + private Reference(final AbstractServerMessageImpl<X, T> message, TransactionLogResource resource) + { _message = message; + if(resource != null) + { + Collection<UUID> currentValue; + Collection<UUID> newValue; + _resourceId = resource.getId(); + do + { + currentValue = _message._resources; + + if(currentValue == null) + { + newValue = Collections.singleton(_resourceId); + } + else + { + if(currentValue.contains(_resourceId)) + { + throw new MessageAlreadyReferencedException(_message.getMessageNumber(), resource); + } + newValue = new ArrayList<>(currentValue.size()+1); + newValue.addAll(currentValue); + newValue.add(_resourceId); + } + + } + while(!_resourcesUpdater.compareAndSet(_message, currentValue, newValue)); + } + else + { + _resourceId = null; + } if(!_message.incrementReference()) { throw new MessageDeletedException(message.getMessageNumber()); } + } public X getMessage() @@ -176,6 +244,34 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI { if(_releasedUpdater.compareAndSet(this,0,1)) { + if(_resourceId != null) + { + Collection<UUID> currentValue; + Collection<UUID> newValue; + do + { + currentValue = _message._resources; + if(currentValue.size() == 1) + { + newValue = null; + } + else + { + UUID[] array = new UUID[currentValue.size()-1]; + int pos = 0; + for(UUID uuid : currentValue) + { + if(!_resourceId.equals(uuid)) + { + array[pos++] = uuid; + } + } + newValue = Arrays.asList(array); + } + } + while(!_resourcesUpdater.compareAndSet(_message, currentValue, newValue)); + + } _message.decrementReference(); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java new file mode 100644 index 0000000000..7ab2625e63 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.message; + +import org.apache.qpid.server.store.TransactionLogResource; + +public class MessageAlreadyReferencedException extends RuntimeException +{ + MessageAlreadyReferencedException(final long messageNumber, TransactionLogResource resource) + { + super("The message with id " + messageNumber + " is already referenced by resource " + resource.getName()); + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java index 4ee47e05e9..1bf451948d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java @@ -51,6 +51,8 @@ public interface MessageInstance boolean isAcquiredBy(ConsumerImpl consumer); + boolean removeAcquisitionFromConsumer(ConsumerImpl consumer); + void setRedelivered(); boolean isRedelivered(); @@ -67,6 +69,10 @@ public interface MessageInstance boolean acquire(ConsumerImpl sub); + boolean lockAcquisition(); + + boolean unlockAcquisition(); + int getMaximumDeliveryCount(); int routeToAlternate(Action<? super MessageInstance> action, ServerTransaction txn); @@ -99,6 +105,7 @@ public interface MessageInstance State currentState = getState(); return currentState == State.DEQUEUED || currentState == State.DELETED; } + } @@ -162,10 +169,12 @@ public interface MessageInstance public final class ConsumerAcquiredState<C extends ConsumerImpl> extends EntryState { private final C _consumer; + private final LockedAcquiredState<C> _lockedState; public ConsumerAcquiredState(C consumer) { _consumer = consumer; + _lockedState = new LockedAcquiredState<>(this); } @@ -183,6 +192,43 @@ public interface MessageInstance { return "{" + getState().name() + " : " + _consumer +"}"; } + + public LockedAcquiredState<C> getLockedState() + { + return _lockedState; + } + + } + + public final class LockedAcquiredState<C extends ConsumerImpl> extends EntryState + { + private final ConsumerAcquiredState<C> _acquiredState; + + public LockedAcquiredState(final ConsumerAcquiredState<C> acquiredState) + { + _acquiredState = acquiredState; + } + + @Override + public State getState() + { + return State.ACQUIRED; + } + + public C getConsumer() + { + return _acquiredState.getConsumer(); + } + + public String toString() + { + return "{" + getState().name() + " : " + _acquiredState.getConsumer() +"}"; + } + + public ConsumerAcquiredState<C> getUnlockedState() + { + return _acquiredState; + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java index 8c35af8be4..81e6b13ffd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java @@ -20,10 +20,11 @@ */ package org.apache.qpid.server.message; +import java.nio.ByteBuffer; + import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoredMessage; - -import java.nio.ByteBuffer; +import org.apache.qpid.server.store.TransactionLogResource; public interface ServerMessage<T extends StorableMessageMetaData> extends EnqueueableMessage, MessageContentSource { @@ -41,6 +42,12 @@ public interface ServerMessage<T extends StorableMessageMetaData> extends Enqueu MessageReference newReference(); + MessageReference newReference(TransactionLogResource object); + + boolean isReferenced(TransactionLogResource resource); + + boolean isReferenced(); + long getMessageNumber(); long getArrivalTime(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 54f3c4de09..545a1d941d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -1164,6 +1164,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> else { deliverMessage(sub, entry, false); + if(sub.acquires()) + { + entry.unlockAcquisition(); + } } } } @@ -2001,6 +2005,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> else { deliverMessage(sub, node, batch); + if(sub.acquires()) + { + node.unlockAcquisition(); + } } } @@ -2253,14 +2261,28 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> if (!node.isDeleted()) { // If the node has expired then acquire it - if (node.expired() && node.acquire()) + if (node.expired()) { - if (_logger.isDebugEnabled()) + boolean acquiredForDequeueing = node.acquire(); + if(!acquiredForDequeueing && node.getDeliveredToConsumer()) + { + QueueConsumer consumer = (QueueConsumer) node.getDeliveredConsumer(); + acquiredForDequeueing = node.removeAcquisitionFromConsumer(consumer); + if(acquiredForDequeueing) + { + consumer.acquisitionRemoved(node); + } + } + + if(acquiredForDequeueing) { - _logger.debug("Dequeuing expired node " + node); + if (_logger.isDebugEnabled()) + { + _logger.debug("Dequeuing expired node " + node); + } + // Then dequeue it. + dequeueEntry(node); } - // Then dequeue it. - dequeueEntry(node); } else { @@ -2527,7 +2549,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> final ServerTransaction txn, final Action<? super MessageInstance> postEnqueueAction) { - txn.enqueue(this,message, new ServerTransaction.Action() + if(!message.isReferenced(this)) + { + txn.enqueue(this, message, new ServerTransaction.Action() { MessageReference _reference = message.newReference(); @@ -2549,6 +2573,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } }); return 1; + } + else + { + return 0; + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java index 5ffbc0dbaa..71b7636159 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java @@ -39,6 +39,8 @@ public interface QueueConsumer<X extends QueueConsumer<X>> extends ConsumerImpl, void send(QueueEntry entry, boolean batch); + void acquisitionRemoved(QueueEntry node); + void queueDeleted(); SubFlushRunner getRunner(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java index 55782ac095..d80aa92007 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java @@ -477,6 +477,13 @@ class QueueConsumerImpl } @Override + public void acquisitionRemoved(final QueueEntry node) + { + _target.acquisitionRemoved(node); + _queue.decrementUnackedMsgCount(node); + } + + @Override public String getDistributionMode() { return _distributionMode; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 49644f8d76..6c541d78ef 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -103,7 +103,7 @@ public abstract class QueueEntryImpl implements QueueEntry { _queueEntryList = queueEntryList; - _message = message == null ? null : message.newReference(); + _message = message == null ? null : message.newReference(queueEntryList.getQueue()); _entryIdUpdater.set(this, entryId); populateInstanceProperties(); @@ -112,7 +112,7 @@ public abstract class QueueEntryImpl implements QueueEntry public QueueEntryImpl(QueueEntryList queueEntryList, ServerMessage message) { _queueEntryList = queueEntryList; - _message = message == null ? null : message.newReference(); + _message = message == null ? null : message.newReference(queueEntryList.getQueue()); populateInstanceProperties(); } @@ -210,7 +210,7 @@ public abstract class QueueEntryImpl implements QueueEntry public boolean acquire(ConsumerImpl sub) { - final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState()); + final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState().getLockedState()); if(acquired) { _deliveryCountUpdater.compareAndSet(this,-1,0); @@ -218,17 +218,57 @@ public abstract class QueueEntryImpl implements QueueEntry return acquired; } + @Override + public boolean lockAcquisition() + { + EntryState state = _state; + if(state instanceof ConsumerAcquiredState) + { + return _stateUpdater.compareAndSet(this, state, ((ConsumerAcquiredState)state).getLockedState()); + } + return state instanceof LockedAcquiredState; + } + + @Override + public boolean unlockAcquisition() + { + EntryState state = _state; + if(state instanceof LockedAcquiredState) + { + return _stateUpdater.compareAndSet(this, state, ((LockedAcquiredState)state).getUnlockedState()); + } + return false; + } + public boolean acquiredByConsumer() { - return (_state instanceof ConsumerAcquiredState); + return (_state instanceof ConsumerAcquiredState) || (_state instanceof LockedAcquiredState); } + @Override public boolean isAcquiredBy(ConsumerImpl consumer) { EntryState state = _state; - return state instanceof ConsumerAcquiredState - && ((ConsumerAcquiredState)state).getConsumer() == consumer; + return (state instanceof ConsumerAcquiredState + && ((ConsumerAcquiredState)state).getConsumer() == consumer) + || (state instanceof LockedAcquiredState + && ((LockedAcquiredState)state).getConsumer() == consumer); + } + + @Override + public boolean removeAcquisitionFromConsumer(ConsumerImpl consumer) + { + EntryState state = _state; + if(state instanceof ConsumerAcquiredState + && ((ConsumerAcquiredState)state).getConsumer() == consumer) + { + return _stateUpdater.compareAndSet(this,state,NON_CONSUMER_ACQUIRED_STATE); + } + else + { + return false; + } } public void release() @@ -238,7 +278,7 @@ public abstract class QueueEntryImpl implements QueueEntry if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE)) { - if(state instanceof ConsumerAcquiredState) + if(state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState) { getQueue().decrementUnackedMsgCount(this); } @@ -268,6 +308,10 @@ public abstract class QueueEntryImpl implements QueueEntry { return (QueueConsumer) ((ConsumerAcquiredState) state).getConsumer(); } + else if (state instanceof LockedAcquiredState) + { + return (QueueConsumer) ((LockedAcquiredState) state).getConsumer(); + } else { return null; @@ -312,7 +356,7 @@ public abstract class QueueEntryImpl implements QueueEntry if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE)) { - if (state instanceof ConsumerAcquiredState) + if (state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState) { getQueue().decrementUnackedMsgCount(this); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java index 28dfc73a27..d4a91f2c0b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java @@ -24,7 +24,7 @@ import org.apache.qpid.server.message.ServerMessage; public interface QueueEntryList { - AMQQueue getQueue(); + AMQQueue<?> getQueue(); QueueEntry add(ServerMessage message); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java index ac28619d2d..d4aeca0437 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java @@ -151,6 +151,10 @@ public class FileKeyStoreImpl extends AbstractConfiguredObject<FileKeyStoreImpl> { super.validateChange(proxyForValidation, changedAttributes); FileKeyStore changedStore = (FileKeyStore) proxyForValidation; + if (changedAttributes.contains(KeyStore.DESIRED_STATE) && changedStore.getDesiredState() == State.DELETED) + { + return; + } if(changedAttributes.contains(NAME) && !getName().equals(changedStore.getName())) { throw new IllegalConfigurationException("Changing the key store name is not allowed"); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java index d71670fbe0..0596c21291 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java @@ -166,7 +166,12 @@ public class FileTrustStoreImpl extends AbstractConfiguredObject<FileTrustStoreI protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes) { super.validateChange(proxyForValidation, changedAttributes); + FileTrustStore updated = (FileTrustStore) proxyForValidation; + if (changedAttributes.contains(TrustStore.DESIRED_STATE) && updated.getDesiredState() == State.DELETED) + { + return; + } if(changedAttributes.contains(TrustStore.NAME) && !getName().equals(updated.getName())) { throw new IllegalConfigurationException("Changing the trust store name is not allowed"); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index bb7a726a0c..57142e6e1f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -473,7 +473,11 @@ public abstract class AbstractJDBCMessageStore implements MessageStore if (results == 0) { - getLogger().warn("Message metadata not found for message id " + messageId); + if (getLogger().isDebugEnabled()) + { + getLogger().debug("Message id " + messageId + + " not found (attempt to remove failed - probably application initiated rollback)"); + } } if (getLogger().isDebugEnabled()) @@ -482,7 +486,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT); - stmt.setLong(1,messageId); + stmt.setLong(1, messageId); results = stmt.executeUpdate(); } finally @@ -1492,7 +1496,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore if(stored()) { checkMessageStoreOpen(); - getLogger().debug("GET CONTENT for message id " + _messageId); data = AbstractJDBCMessageStore.this.getAllContent(_messageId); T metaData = _messageDataRef.getMetaData(); if (metaData == null) @@ -1568,7 +1571,10 @@ public abstract class AbstractJDBCMessageStore implements MessageStore @Override public void remove() { - getLogger().debug("REMOVE called on message: " + _messageId); + if (getLogger().isDebugEnabled()) + { + getLogger().debug("REMOVE called on message: " + _messageId); + } checkMessageStoreOpen(); int delta = getMetaData().getContentSize(); @@ -1605,7 +1611,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore { if (!stored()) { - getLogger().debug("STORING message id " + _messageId); storeMetaData(conn, _messageId, _messageDataRef.getMetaData()); AbstractJDBCMessageStore.this.addContent(conn, _messageId, _messageDataRef.getData() == null @@ -1636,7 +1641,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore Pointer(final MessageData<T> ref) { - getLogger().debug("POST COMMIT for message id " + _messageId); _ref = ref; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index f15f608907..b72d44debf 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -68,6 +68,8 @@ import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueConsumer; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.stats.StatisticsCounter; @@ -953,15 +955,26 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte op.withinTransaction(new Transaction() { - public void dequeue(final MessageInstance entry) + public void dequeue(final MessageInstance messageInstance) { - if(entry.acquire()) + boolean acquired = messageInstance.acquire(); + if(!acquired && messageInstance instanceof QueueEntry) + { + QueueEntry entry = (QueueEntry) messageInstance; + QueueConsumer consumer = (QueueConsumer) entry.getDeliveredConsumer(); + acquired = messageInstance.removeAcquisitionFromConsumer(consumer); + if(acquired) + { + consumer.acquisitionRemoved((QueueEntry)messageInstance); + } + } + if(acquired) { - txn.dequeue(entry.getOwningResource(), entry.getMessage(), new ServerTransaction.Action() + txn.dequeue(messageInstance.getOwningResource(), messageInstance.getMessage(), new ServerTransaction.Action() { public void postCommit() { - entry.delete(); + messageInstance.delete(); } public void onRollback() diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index f614ff5847..8d025c50dc 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -181,6 +181,12 @@ public class MockConsumer implements ConsumerTarget } + @Override + public void acquisitionRemoved(final MessageInstance node) + { + + } + public State getState() { return _state; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index d7779390b1..c775a70cb8 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.exchange; import static org.apache.qpid.common.AMQPFilterTypes.*; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -43,6 +44,7 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.QueueExistsException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -497,6 +499,7 @@ public class TopicExchangeTest extends QpidTestCase MessageReference ref = mock(MessageReference.class); when(ref.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(ref); + when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref); when(message.getMessageNumber()).thenReturn(messageNumber); for(BaseQueue q : queues) { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java new file mode 100644 index 0000000000..c90e406ba9 --- /dev/null +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java @@ -0,0 +1,146 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.message; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.UUID; + +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.test.utils.QpidTestCase; + +public class AbstractServerMessageTest extends QpidTestCase +{ + private static class TestMessage<T extends StorableMessageMetaData> extends AbstractServerMessageImpl<TestMessage<T>,T> + { + + public TestMessage(final StoredMessage<T> handle, + final Object connectionReference) + { + super(handle, connectionReference); + } + + @Override + public String getInitialRoutingAddress() + { + return null; + } + + @Override + public AMQMessageHeader getMessageHeader() + { + return null; + } + + @Override + public long getSize() + { + return 0; + } + + @Override + public long getExpiration() + { + return 0; + } + + @Override + public long getArrivalTime() + { + return 0; + } + } + + private TransactionLogResource createQueue(String name) + { + TransactionLogResource queue = mock(TransactionLogResource.class); + when(queue.getId()).thenReturn(UUID.randomUUID()); + when(queue.getName()).thenReturn(name); + return queue; + } + + public void testReferences() + { + TransactionLogResource q1 = createQueue("1"); + TransactionLogResource q2 = createQueue("2"); + + TestMessage<StorableMessageMetaData> msg = new TestMessage<StorableMessageMetaData>(mock(StoredMessage.class),this); + assertFalse(msg.isReferenced()); + assertFalse(msg.isReferenced(q1)); + + MessageReference<TestMessage<StorableMessageMetaData>> nonQueueRef = msg.newReference(); + assertFalse(msg.isReferenced()); + assertFalse(msg.isReferenced(q1)); + + MessageReference<TestMessage<StorableMessageMetaData>> q1ref = msg.newReference(q1); + assertTrue(msg.isReferenced()); + assertTrue(msg.isReferenced(q1)); + assertFalse(msg.isReferenced(q2)); + + q1ref.release(); + assertFalse(msg.isReferenced()); + assertFalse(msg.isReferenced(q1)); + + q1ref = msg.newReference(q1); + assertTrue(msg.isReferenced()); + assertTrue(msg.isReferenced(q1)); + assertFalse(msg.isReferenced(q2)); + + MessageReference<TestMessage<StorableMessageMetaData>> q2ref = msg.newReference(q2); + assertTrue(msg.isReferenced()); + assertTrue(msg.isReferenced(q1)); + assertTrue(msg.isReferenced(q2)); + + try + { + msg.newReference(q1); + fail("Should not be able to create a second reference to the same queue"); + } + catch (MessageAlreadyReferencedException e) + { + // pass + } + q2ref.release(); + assertTrue(msg.isReferenced()); + assertTrue(msg.isReferenced(q1)); + assertFalse(msg.isReferenced(q2)); + + q1ref.release(); + assertFalse(msg.isReferenced()); + assertFalse(msg.isReferenced(q1)); + + nonQueueRef.release(); + + try + { + msg.newReference(q1); + fail("Message should not allow new references as all references had been removed"); + } + catch(MessageDeletedException e) + { + // pass + } + + } +} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java index 0def708fed..9255dbf42e 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.contains; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.atLeastOnce; @@ -60,6 +61,7 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.QueueNotificationListener; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AbstractQueue.QueueEntryFilter; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -1157,6 +1159,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase when(message.newReference()).thenReturn(ref); + when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref); return message; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java index 70a35dc4aa..799fc71d74 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java @@ -19,6 +19,7 @@ */ package org.apache.qpid.server.queue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -38,6 +39,7 @@ import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class LastValueQueueListTest extends TestCase @@ -220,6 +222,8 @@ public class LastValueQueueListTest extends TestCase MessageReference messageReference = mock(MessageReference.class); when(mockMessage.newReference()).thenReturn(messageReference); + when(mockMessage.newReference(any(TransactionLogResource.class))).thenReturn(messageReference); + when(messageReference.getMessage()).thenReturn(mockMessage); return mockMessage; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java index 74a2262265..37c4eeb127 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java @@ -65,6 +65,12 @@ public class MockMessageInstance implements MessageInstance return false; } + @Override + public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer) + { + return false; + } + public void delete() { @@ -81,6 +87,18 @@ public class MockMessageInstance implements MessageInstance return false; } + @Override + public boolean lockAcquisition() + { + return false; + } + + @Override + public boolean unlockAcquisition() + { + return false; + } + public boolean isAvailable() { return false; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java index cc5f36098e..631731ecc0 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.queue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -37,6 +38,7 @@ import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.test.utils.QpidTestCase; @@ -79,6 +81,7 @@ public class PriorityQueueListTest extends QpidTestCase when(message.getMessageHeader()).thenReturn(header); when(message.newReference()).thenReturn(ref); + when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref); when(ref.getMessage()).thenReturn(message); when(header.getPriority()).thenReturn(PRIORITIES[i]); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java index 3189010284..40b6c1bebd 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java @@ -18,6 +18,7 @@ */ package org.apache.qpid.server.queue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -38,6 +39,7 @@ import org.apache.qpid.server.model.BrokerModel; import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.virtualhost.VirtualHostImpl; /** @@ -137,6 +139,42 @@ public abstract class QueueEntryImplTestBase extends TestCase return consumer; } + + public void testLocking() + { + QueueConsumer consumer = newConsumer(); + QueueConsumer consumer2 = newConsumer(); + + _queueEntry.acquire(consumer); + assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method", + _queueEntry.isAcquired()); + + assertFalse("Acquisition should initially be locked",_queueEntry.removeAcquisitionFromConsumer(consumer)); + assertTrue("Should be able to unlock locked queue entry", _queueEntry.unlockAcquisition()); + assertFalse("Acquisition should not be able to be removed from the wrong consumer", + _queueEntry.removeAcquisitionFromConsumer(consumer2)); + assertTrue("Acquisition should be able to be removed once unlocked", + _queueEntry.removeAcquisitionFromConsumer(consumer)); + assertTrue("Queue Entry should still be acquired", _queueEntry.isAcquired()); + assertFalse("Queue Entry should not be marked as acquired by a consumer", _queueEntry.acquiredByConsumer()); + + _queueEntry.release(); + + assertFalse("Hijacked queue entry should be able to be released", _queueEntry.isAcquired()); + + _queueEntry.acquire(consumer); + assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method", + _queueEntry.isAcquired()); + + assertFalse("Acquisition should initially be locked",_queueEntry.removeAcquisitionFromConsumer(consumer)); + assertTrue("Should be able to unlock locked queue entry",_queueEntry.unlockAcquisition()); + assertTrue("Should be able to unlock locked queue entry",_queueEntry.lockAcquisition()); + assertFalse("Acquisition should not be able to be hijacked when locked",_queueEntry.removeAcquisitionFromConsumer(consumer)); + + _queueEntry.delete(); + assertTrue("Locked queue entry should be able to be deleted", _queueEntry.isDeleted()); + } + /** * A helper method to get entry state * @@ -220,6 +258,7 @@ public abstract class QueueEntryImplTestBase extends TestCase final MessageReference reference = mock(MessageReference.class); when(reference.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(reference); + when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference); QueueEntryImpl entry = (QueueEntryImpl) queueEntryList.add(message); entries[i] = entry; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java index c89d2abeae..a0ab7cd454 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java @@ -19,14 +19,16 @@ */ package org.apache.qpid.server.queue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import junit.framework.TestCase; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import org.apache.qpid.server.store.TransactionLogResource; /** * Abstract test class for QueueEntryList implementations. @@ -96,6 +98,7 @@ public abstract class QueueEntryListTestBase extends TestCase AMQMessageHeader hdr = mock(AMQMessageHeader.class); when(ref.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(ref); + when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref); when(message.getMessageHeader()).thenReturn(hdr); return message; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java index bd23aaa50a..79d7628a9c 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.queue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -33,6 +34,7 @@ import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -162,6 +164,7 @@ public class QueueMessageRecoveryTest extends QpidTestCase MessageReference ref = mock(MessageReference.class); when(ref.getMessage()).thenReturn(msg); when(msg.newReference()).thenReturn(ref); + when(msg.newReference(any(TransactionLogResource.class))).thenReturn(ref); when(msg.getStoredMessage()).thenReturn(mock(StoredMessage.class)); return msg; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java index eaed1427b2..a2d314d629 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.queue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -35,6 +36,7 @@ import org.apache.qpid.server.model.BrokerModel; import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase @@ -70,6 +72,7 @@ public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase final MessageReference reference = mock(MessageReference.class); when(reference.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(reference); + when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference); return (QueueEntryImpl) queueEntryList.add(message); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java index bcc1e7bc0e..0c7f19bbd5 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java @@ -19,6 +19,7 @@ */ package org.apache.qpid.server.queue; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -40,6 +41,7 @@ import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class SortedQueueEntryListTest extends QueueEntryListTestBase @@ -180,6 +182,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase MessageReference ref = mock(MessageReference.class); when(ref.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(ref); + when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref); when(message.getMessageNumber()).thenReturn(id); return message; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java index 268d334949..d9a176c688 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java @@ -19,6 +19,7 @@ */ package org.apache.qpid.server.queue; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -38,6 +39,7 @@ import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class SortedQueueEntryTest extends QueueEntryImplTestBase @@ -97,6 +99,7 @@ public class SortedQueueEntryTest extends QueueEntryImplTestBase final MessageReference reference = mock(MessageReference.class); when(reference.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(reference); + when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference); return _queueEntryList.add(message); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java index 89bb32e133..95c53c8428 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.queue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -36,6 +37,7 @@ import org.apache.qpid.server.model.BrokerModel; import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class StandardQueueEntryListTest extends QueueEntryListTestBase @@ -73,6 +75,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase MessageReference ref = mock(MessageReference.class); when(ref.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(ref); + when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref); final QueueEntry bleh = _sqel.add(message); assertNotNull("QE should not have been null", bleh); @@ -163,6 +166,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase MessageReference ref = mock(MessageReference.class); when(ref.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(ref); + when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref); QueueEntry bleh = sqel.add(message); assertNotNull("QE should not have been null", bleh); entriesMap.put(i,bleh); @@ -264,6 +268,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase final MessageReference reference = mock(MessageReference.class); when(reference.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(reference); + when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference); entries[i] = (OrderedQueueEntry) queueEntryList.add(message); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java index d328e21a94..ce1c95e674 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java @@ -342,5 +342,17 @@ public class StandardQueueTest extends AbstractQueueTestBase return super.acquire(sub); } } + + @Override + public boolean lockAcquisition() + { + return true; + } + + @Override + public boolean unlockAcquisition() + { + return true; + } } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java index e16ba66391..848675bf5d 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java @@ -165,6 +165,24 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM } @Override + public MessageReference newReference(final TransactionLogResource object) + { + return _messageReference; + } + + @Override + public boolean isReferenced(final TransactionLogResource resource) + { + return false; + } + + @Override + public boolean isReferenced() + { + return false; + } + + @Override public int hashCode() { final int prime = 31; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java index 8992cf62c9..e0fbb6dcc3 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java @@ -20,14 +20,15 @@ */ package org.apache.qpid.server.txn; +import java.nio.ByteBuffer; + import org.apache.commons.lang.NotImplementedException; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.store.StoredMessage; - -import java.nio.ByteBuffer; +import org.apache.qpid.server.store.TransactionLogResource; /** * Mock Server Message allowing its persistent flag to be controlled from test. @@ -57,6 +58,24 @@ class MockServerMessage implements ServerMessage throw new NotImplementedException(); } + @Override + public MessageReference newReference(final TransactionLogResource object) + { + throw new NotImplementedException(); + } + + @Override + public boolean isReferenced(final TransactionLogResource resource) + { + return false; + } + + @Override + public boolean isReferenced() + { + return false; + } + public boolean isImmediate() { throw new NotImplementedException(); @@ -113,4 +132,4 @@ class MockServerMessage implements ServerMessage { return 0L; } -}
\ No newline at end of file +} diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index d73d019000..7ab3fbb1f5 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -534,15 +534,25 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC return _stopped.get(); } - public void acknowledge(MessageInstance entry) + public boolean deleteAcquired(MessageInstance entry) { - // TODO Fix Store Context / cleanup if(entry.isAcquiredBy(getConsumer())) { - _unacknowledgedBytes.addAndGet(-entry.getMessage().getSize()); - _unacknowledgedCount.decrementAndGet(); + acquisitionRemoved(entry); entry.delete(); + return true; } + else + { + return false; + } + } + + @Override + public void acquisitionRemoved(final MessageInstance entry) + { + _unacknowledgedBytes.addAndGet(-entry.getMessage().getSize()); + _unacknowledgedCount.decrementAndGet(); } public void flush() diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java index 4420709a91..94f04bbae3 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java @@ -41,13 +41,13 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onAccept() { - if(_target != null && _entry.isAcquiredBy(_target.getConsumer())) + if(_target != null && _entry.isAcquiredBy(_target.getConsumer()) && _entry.lockAcquisition()) { _target.getSessionModel().acknowledge(_target, _entry); } else { - _logger.warn("MessageAccept received for message which has not been acquired (likely client error)"); + _logger.info("MessageAccept received for message which is not been acquired - message may have expired or been removed"); } } @@ -60,7 +60,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi } else { - _logger.warn("MessageRelease received for message which has not been acquired (likely client error)"); + _logger.warn("MessageRelease received for message which has not been acquired - message may have expired or been removed"); } } @@ -72,7 +72,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi } else { - _logger.warn("MessageReject received for message which has not been acquired (likely client error)"); + _logger.warn("MessageReject received for message which has not been acquired - message may have expired or been removed"); } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java index cd1146ac0b..7917b7989a 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java @@ -29,6 +29,7 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene private final ConsumerTarget_0_10 _sub; private final MessageInstance _entry; private final ServerSession _session; + private long _messageSize; private boolean _restoreCredit; public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit) @@ -38,15 +39,19 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene _entry = entry; _session = session; _restoreCredit = restoreCredit; + if(restoreCredit) + { + _messageSize = entry.getMessage().getSize(); + } } public void onComplete(Method method) { if(_restoreCredit) { - _sub.restoreCredit(_entry.getMessage()); + _sub.getCreditManager().restoreCredit(1l, _messageSize); } - if(_entry.isAcquiredBy(_sub.getConsumer())) + if(_entry.isAcquiredBy(_sub.getConsumer()) && _entry.lockAcquisition()) { _session.acknowledge(_sub, _entry); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 3fe1515b18..b1c22fe823 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -460,7 +460,7 @@ public class ServerSession extends Session public void postCommit() { - sub.acknowledge(entry); + sub.deleteAcquired(entry); } public void onRollback() diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index b6e1b7dd6a..7877812d84 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -1413,7 +1413,11 @@ public class AMQChannel<T extends AMQProtocolSession<T>> // explicit rollbacks resend the message after the rollback-ok is sent if(_rollingBack) { - _resendList.addAll(_ackedMessages); + for(MessageInstance entry : _ackedMessages) + { + entry.unlockAcquisition(); + } + _resendList.addAll(_ackedMessages); } else { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index ae63c16025..7c2efe64e6 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -20,11 +20,16 @@ */ package org.apache.qpid.server.protocol.v0_8; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.message.InstanceProperties; @@ -34,14 +39,10 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.StateChangeListener; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - /** * Encapsulation of a subscription to a queue. * <p> @@ -59,7 +60,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen final MessageInstance.State oldSate, final MessageInstance.State newState) { - if (oldSate == QueueEntry.State.ACQUIRED && (newState == QueueEntry.State.AVAILABLE || newState == QueueEntry.State.DEQUEUED)) + if (oldSate == QueueEntry.State.ACQUIRED && newState != QueueEntry.State.ACQUIRED) { restoreCredit(entry.getMessage()); } @@ -76,8 +77,8 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel, - AMQShortString consumerTag, FieldTable filters, - FlowCreditManager creditManager) throws AMQException + AMQShortString consumerTag, FieldTable filters, + FlowCreditManager creditManager) throws AMQException { return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod()); } @@ -555,6 +556,11 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen }); } + @Override + public void acquisitionRemoved(final MessageInstance node) + { + } + public long getUnacknowledgedBytes() { return _unacknowledgedBytes.longValue(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java index 8d70e769d3..1bd9ab079e 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java @@ -20,31 +20,28 @@ */ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.AMQException; -import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.queue.QueueEntry; - +import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.MessageInstance; + public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap { private final Object _lock = new Object(); - private long _unackedSize; - private Map<Long, MessageInstance> _map; - private long _lastDeliveryTag; - private final int _prefetchLimit; public UnacknowledgedMessageMapImpl(int prefetchLimit) { _prefetchLimit = prefetchLimit; - _map = new LinkedHashMap<Long, MessageInstance>(prefetchLimit); + _map = new LinkedHashMap<>(prefetchLimit); } public void collect(long deliveryTag, boolean multiple, Map<Long, MessageInstance> msgs) @@ -81,12 +78,6 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap { MessageInstance message = _map.remove(deliveryTag); - if(message != null) - { - _unackedSize -= message.getMessage().getSize(); - - } - return message; } } @@ -109,8 +100,6 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap synchronized (_lock) { _map.put(deliveryTag, message); - _unackedSize += message.getMessage().getSize(); - _lastDeliveryTag = deliveryTag; } } @@ -119,8 +108,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap synchronized (_lock) { Collection<MessageInstance> currentEntries = _map.values(); - _map = new LinkedHashMap<Long, MessageInstance>(_prefetchLimit); - _unackedSize = 0l; + _map = new LinkedHashMap<>(_prefetchLimit); return currentEntries; } } @@ -138,7 +126,6 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap synchronized (_lock) { _map.clear(); - _unackedSize = 0l; } } @@ -163,6 +150,14 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap Map<Long, MessageInstance> ackedMessageMap = new LinkedHashMap<Long,MessageInstance>(); collect(deliveryTag, multiple, ackedMessageMap); remove(ackedMessageMap); + List<MessageInstance> acknowledged = new ArrayList<>(); + for(MessageInstance instance : ackedMessageMap.values()) + { + if(instance.lockAcquisition()) + { + acknowledged.add(instance); + } + } return ackedMessageMap.values(); } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index adb2f8ea6a..bceae85896 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.protocol.v1_0; +import java.nio.ByteBuffer; +import java.util.List; + import org.apache.qpid.amqp_1_0.codec.ValueHandler; import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl; @@ -37,19 +40,16 @@ import org.apache.qpid.amqp_1_0.type.messaging.Released; import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.MessageConverterRegistry; -import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; -import java.nio.ByteBuffer; -import java.util.List; - class ConsumerTarget_1_0 extends AbstractConsumerTarget { private final boolean _acquires; @@ -378,6 +378,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget if(outcome instanceof Accepted) { + _queueEntry.lockAcquisition(); txn.dequeue(_queueEntry.getOwningResource(), _queueEntry.getMessage(), new ServerTransaction.Action() { @@ -412,6 +413,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget modified.setDeliveryFailed(true); _link.getEndpoint().updateDisposition(_deliveryTag, modified, true); _link.getEndpoint().sendFlowConditional(); + _queueEntry.unlockAcquisition(); } } }); @@ -498,6 +500,11 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget } @Override + public void acquisitionRemoved(final MessageInstance node) + { + } + + @Override public void consumerAdded(final ConsumerImpl sub) { _consumer = sub; diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 7a844cbc79..a8fc5387b4 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -636,19 +636,21 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore()); if(_consumer.acquires()) { - txn.dequeue(Collections.singleton(queueEntry), - new ServerTransaction.Action() - { - public void postCommit() - { - queueEntry.delete(); - } - - public void onRollback() - { - //To change body of implemented methods use File | Settings | File Templates. - } - }); + if(queueEntry.acquire() || queueEntry.isAcquired()) + { + txn.dequeue(Collections.singleton(queueEntry), + new ServerTransaction.Action() + { + public void postCommit() + { + queueEntry.delete(); + } + + public void onRollback() + { + } + }); + } } } else if(outcome instanceof Released) diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java index e73d177599..34f08615ad 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java @@ -1071,6 +1071,12 @@ class ManagementNode implements MessageSource, MessageDestination } @Override + public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer) + { + return false; + } + + @Override public void setRedelivered() { @@ -1119,6 +1125,18 @@ class ManagementNode implements MessageSource, MessageDestination } @Override + public boolean lockAcquisition() + { + return false; + } + + @Override + public boolean unlockAcquisition() + { + return false; + } + + @Override public int getMaximumDeliveryCount() { return 0; diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java index ae2828d392..03e7eab61b 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java @@ -90,6 +90,12 @@ class ManagementResponse implements MessageInstance } @Override + public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer) + { + return consumer == _consumer; + } + + @Override public void setRedelivered() { _isRedelivered = true; @@ -138,6 +144,18 @@ class ManagementResponse implements MessageInstance } @Override + public boolean lockAcquisition() + { + return false; + } + + @Override + public boolean unlockAcquisition() + { + return false; + } + + @Override public int getMaximumDeliveryCount() { return 0; @@ -190,7 +208,7 @@ class ManagementResponse implements MessageInstance @Override public void delete() { - // TODO + _isDeleted = true; } @Override diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java index 9866207234..8c77876e1a 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java @@ -33,6 +33,7 @@ import javax.servlet.http.HttpServletResponse; import org.apache.log4j.Logger; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializationConfig; + import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageDeletedException; @@ -44,6 +45,7 @@ import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.QueueEntryVisitor; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; +import org.apache.qpid.server.store.TransactionLogResource; public class MessageServlet extends AbstractServlet { @@ -212,7 +214,11 @@ public class MessageServlet extends AbstractServlet @Override protected void updateEntry(QueueEntry entry, VirtualHost.Transaction txn) { - txn.move(entry, _destinationQueue); + ServerMessage msg = entry.getMessage(); + if(msg != null && !msg.isReferenced((TransactionLogResource)_destinationQueue)) + { + txn.move(entry, _destinationQueue); + } } } @@ -229,7 +235,11 @@ public class MessageServlet extends AbstractServlet @Override protected void updateEntry(QueueEntry entry, VirtualHost.Transaction txn) { - txn.copy(entry, _destinationQueue); + ServerMessage msg = entry.getMessage(); + if(msg != null && !msg.isReferenced((TransactionLogResource)_destinationQueue)) + { + txn.copy(entry, _destinationQueue); + } } } diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/addBinding.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/addBinding.html index 9aebca90d7..b57e3a1a24 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/addBinding.html +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/addBinding.html @@ -15,26 +15,48 @@ ~ limitations under the License. --> <div class="dijitHidden"> - <div data-dojo-type="dijit.Dialog" style="width:600px;" data-dojo-props="title:'Add Binding'" id="addBinding"> - <form id="formAddBinding" method="post" dojoType="dijit.form.Form"> - <table cellpadding="0" cellspacing="2"> - <tr> - <td valign="top"><strong>Exchange Name*: </strong></td> - <td><div id="addBinding.selectExchangeDiv"></div></td> - </tr> - <tr> - <td valign="top"><strong>Queue Name*: </strong></td> - <td><div id="addBinding.selectQueueDiv"></div></td> - </tr> - <tr> - <td valign="top"><strong>Binding Key*: </strong></td> - <td><input type="text" required="true" name="name" id="formAddbinding.bindingKey" placeholder="Binding Key" - dojoType="dijit.form.ValidationTextBox" missingMessage="A name must be supplied" /></td> - </tr> - </table> + <div data-dojo-type="dijit/Dialog" style="width:600px;" data-dojo-props="title:'Add Binding'" id="addBinding"> + <form id="formAddBinding" method="post" data-dojo-type="dijit/form/Form"> + + <div class="clear"> + <div class="formLabel-labelCell tableContainer-labelCell">Exchange Name*:</div> + <div class="formLabel-controlCell tableContainer-valueCell"> + <div id="addBinding.selectExchangeDiv"></div> + </div> + </div> + <div class="clear"> + <div class="formLabel-labelCell tableContainer-labelCell">Queue Name*: </div> + <div class="formLabel-controlCell tableContainer-valueCell"> + <div id="addBinding.selectQueueDiv"></div> + </div> + </div> + <div class="clear"> + <div class="formLabel-labelCell tableContainer-labelCell">Binding Key*:</div> + <div class="formLabel-controlCell tableContainer-valueCell"> + <input type="text" id="formAddbinding.bindingKey" + data-dojo-type="dijit/form/ValidationTextBox" + data-dojo-props=" + name: 'name', + placeHolder: 'Binding Key', + missingMessage: 'A binding key must be supplied', + title: 'Enter binding key'" /> + </div> + </div> + + <div class="clear formBox"> + <fieldset> + <legend>Binding Arguments</legend> + <div class="editNoteBanner">NOTE: Only arguments with name and value will be submitted. To edit, please, click on a grid cell.</div> + <div id="formAddbinding.bindingArguments"></div> + <div> + <button data-dojo-type="dijit/form/Button" id="formAddbinding.addArgumentButton" type="button">Add</button> + <button data-dojo-type="dijit/form/Button" id="formAddbinding.deleteArgumentButton" type="button">Delete</button> + </div> + </fieldset> + </div> + <div class="dijitDialogPaneActionBar"> - <!-- submit buttons --> - <input type="submit" value="Create Binding" label="Create Binding" dojoType="dijit.form.Button" /> + <input type="submit" value="Create Binding" label="Create Binding" data-dojo-type="dijit/form/Button" /> </div> </form> </div> diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addBinding.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addBinding.js index 7bdd72525c..deda3f35d5 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addBinding.js +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addBinding.js @@ -23,6 +23,8 @@ define(["dojo/_base/xhr", "dojo/_base/array", "dojo/_base/event", 'dojo/_base/json', + "dojo/_base/lang", + "dojo/_base/declare", "dojo/store/Memory", "dijit/form/FilteringSelect", "dijit/form/NumberSpinner", // required by the form @@ -38,8 +40,77 @@ define(["dojo/_base/xhr", "dijit/form/DateTextBox", /* basic dojox classes */ "dojox/form/BusyButton", "dojox/form/CheckedMultiSelect", + "dojox/grid/EnhancedGrid", + "dojo/data/ObjectStore", "dojo/domReady!"], - function (xhr, dom, construct, win, registry, parser, array, event, json, Memory, FilteringSelect) { + function (xhr, dom, construct, win, registry, parser, array, event, json, lang, declare, Memory, FilteringSelect) { + + var noLocalValues = new Memory({ + data: [ + {name:"", id:null}, + {name:"true", id:true}, + {name:"false", id:false} + ] + }); + + var xMatchValues = new Memory({ + data: [ + {name:"all", id:"all"}, + {name:"any", id:"any"} + ] + }); + + var defaultBindingArguments = [ + {id: 0, name:"x-filter-jms-selector", value: null}, + {id: 1, name:"x-qpid-no-local", value: null} + ]; + + var GridWidgetProxy = declare("qpid.dojox.grid.cells.GridWidgetProxy", dojox.grid.cells._Widget, { + createWidget: function(inNode, inDatum, inRowIndex) + { + var WidgetClass = this.widgetClass; + var widgetProperties = this.getWidgetProps(inDatum); + var getWidgetProperties = widgetProperties.getWidgetProperties; + if (typeof getWidgetProperties == "function") + { + var item = this.grid.getItem(inRowIndex); + if (item) + { + var additionalWidgetProperties = getWidgetProperties(inDatum, inRowIndex, item); + if (additionalWidgetProperties) + { + WidgetClass = additionalWidgetProperties.widgetClass; + for(var prop in additionalWidgetProperties) + { + if(additionalWidgetProperties.hasOwnProperty(prop) && !widgetProperties[prop]) + { + widgetProperties[prop] = additionalWidgetProperties[ prop ]; + } + } + } + } + } + var widget = new WidgetClass(widgetProperties, inNode); + return widget; + }, + getValue: function(inRowIndex) + { + if (this.widget) + { + return this.widget.get('value'); + } + return null; + }, + _finish: function(inRowIndex) + { + if (this.widget) + { + this.inherited(arguments); + this.widget.destroyRecursive(); + this.widget = null; + } + } + }); var addBinding = {}; @@ -73,6 +144,28 @@ define(["dojo/_base/xhr", if(addBinding.exchange) { newBinding.exchange = addBinding.exchange; } + + addBinding.bindingArgumentsGrid.store.fetch({ + onComplete:function(items,request) + { + if(items.length) + { + array.forEach(items, function(item) + { + if (item && item.name && item.value) + { + var bindingArguments = newBinding.arguments; + if (!bindingArguments) + { + bindingArguments = {}; + newBinding.arguments = bindingArguments; + } + bindingArguments[item.name]=item.value; + } + }); + } + } + }); return newBinding; }; @@ -106,6 +199,98 @@ define(["dojo/_base/xhr", }); + var argumentsGridNode = dom.byId("formAddbinding.bindingArguments"); + var objectStore = new dojo.data.ObjectStore({objectStore: new Memory({data:lang.clone(defaultBindingArguments), idProperty: "id"})}); + + var layout = [[ + { name: "Argument Name", field: "name", width: "50%", editable: true }, + { name: 'Argument Value', field: 'value', width: '50%', editable: true, type: GridWidgetProxy, + widgetProps: { + getWidgetProperties: function(inDatum, inRowIndex, item) + { + if (item.name == "x-qpid-no-local") + { + return { + labelAttr: "name", + searchAttr: "id", + selectOnClick: false, + query: { id: "*"}, + required: false, + store: noLocalValues, + widgetClass: dijit.form.FilteringSelect + }; + } + else if (item.name && item.name.toLowerCase() == "x-match") + { + return { + labelAttr: "name", + searchAttr: "id", + selectOnClick: false, + query: { id: "*"}, + required: false, + store: xMatchValues, + widgetClass: dijit.form.FilteringSelect + }; + } + return {widgetClass: dijit.form.TextBox }; + } + } + } + ]]; + + var grid = new dojox.grid.EnhancedGrid({ + selectionMode: "multiple", + store: objectStore, + singleClickEdit: true, + structure: layout, + height: "150px", + plugins: {indirectSelection: true} + }, argumentsGridNode); + grid.startup(); + + addBinding.bindingArgumentsGrid = grid; + addBinding.idGenerator = 1; + var addArgumentButton = registry.byId("formAddbinding.addArgumentButton"); + var deleteArgumentButton = registry.byId("formAddbinding.deleteArgumentButton"); + + addArgumentButton.on("click", + function(event) + { + addBinding.idGenerator = addBinding.idGenerator + 1; + var newItem = {id:addBinding.idGenerator, name: "", value: ""}; + grid.store.newItem(newItem); + grid.store.save(); + grid.store.fetch( + { + onComplete:function(items,request) + { + var rowIndex = items.length - 1; + window.setTimeout(function() + { + grid.focus.setFocusIndex(rowIndex, 1 ); + },10); + } + }); + } + ); + + deleteArgumentButton.on("click", + function(event) + { + var data = grid.selection.getSelected(); + if(data.length) + { + array.forEach(data, function(selectedItem) { + if (selectedItem !== null) + { + grid.store.deleteItem(selectedItem); + } + }); + grid.store.save(); + } + } + ); + theForm.on("submit", function(e) { event.stop(e); @@ -154,7 +339,24 @@ define(["dojo/_base/xhr", addBinding.exchange = obj.exchange; registry.byId("formAddBinding").reset(); - + var grid = addBinding.bindingArgumentsGrid; + grid.store.fetch({ + onComplete:function(items,request) + { + if(items.length) + { + array.forEach(items, function(item) + { + if (item !== null) + { + grid.store.deleteItem(item); + } + }); + } + } + }); + array.forEach(lang.clone(defaultBindingArguments), function(item) {grid.store.newItem(item); }); + grid.store.save(); xhr.get({url: "api/latest/queue/" + encodeURIComponent(obj.virtualhostnode) + "/" + encodeURIComponent(obj.virtualhost) + "?depth=0", handleAs: "json"}).then( diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java index ca092fe6f8..5f5d6e7efe 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java @@ -59,6 +59,7 @@ import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.NotificationCheck; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.QueueEntryVisitor; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.util.ServerScopedRuntimeException; public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener @@ -519,7 +520,8 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN final long messageId = message.getMessageNumber(); if ((messageId >= fromMessageId) - && (messageId <= toMessageId)) + && (messageId <= toMessageId) + && !(message.isReferenced((TransactionLogResource)destinationQueue))) { txn.move(entry, destinationQueue); } @@ -571,8 +573,8 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN } VirtualHost<?,?,?> vhost = _queue.getParent(VirtualHost.class); - final Queue<?> queue = vhost.getChildByName(Queue.class, toQueue); - if (queue == null) + final Queue<?> destinationQueue = vhost.getChildByName(Queue.class, toQueue); + if (destinationQueue == null) { throw new OperationsException("No such queue \""+ toQueue +"\""); } @@ -591,9 +593,10 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN final long messageId = message.getMessageNumber(); if ((messageId >= fromMessageId) - && (messageId <= toMessageId)) + && (messageId <= toMessageId) + && !(message.isReferenced((TransactionLogResource)destinationQueue))) { - txn.copy(entry, queue); + txn.copy(entry, destinationQueue); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 08f05cc8d6..681082526c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -20,8 +20,16 @@ */ package org.apache.qpid.client.protocol; -import org.apache.qpid.client.HeartbeatListener; -import org.apache.qpid.util.BytesDataOutput; +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,6 +39,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.HeartbeatListener; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverHandler; import org.apache.qpid.client.failover.FailoverState; @@ -59,16 +68,7 @@ import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; - -import java.io.IOException; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import org.apache.qpid.util.BytesDataOutput; /** * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the @@ -177,6 +177,7 @@ public class AMQProtocolHandler implements ProtocolEngine private long _lastReadTime = System.currentTimeMillis(); private long _lastWriteTime = System.currentTimeMillis(); private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT; + private Throwable _initialConnectionException; /** * Creates a new protocol handler, associated with the specified client connection instance. @@ -214,6 +215,8 @@ public class AMQProtocolHandler implements ProtocolEngine // in order to execute AMQConnection#exceptionRecievedout out of synchronization block, // otherwise it might deadlock with failover mutex boolean failoverNotAllowed = false; + boolean failedWithoutConnecting = false; + Throwable initialConnectionException = null; synchronized (this) { if (_logger.isDebugEnabled()) @@ -251,8 +254,11 @@ public class AMQProtocolHandler implements ProtocolEngine } else { + failedWithoutConnecting = true; + initialConnectionException = _initialConnectionException; _logger.debug("We are in process of establishing the initial connection"); } + _initialConnectionException = null; } else { @@ -265,6 +271,16 @@ public class AMQProtocolHandler implements ProtocolEngine _connection.exceptionReceived(new AMQDisconnectedException( "Server closed connection and reconnection not permitted.", _stateManager.getLastException())); } + else if(failedWithoutConnecting) + { + if(initialConnectionException == null) + { + initialConnectionException = _stateManager.getLastException(); + } + String message = initialConnectionException == null ? "" : initialConnectionException.getMessage(); + _connection.exceptionReceived(new AMQDisconnectedException( + "Connection could not be established: " + message, initialConnectionException)); + } } if (_logger.isDebugEnabled()) @@ -338,6 +354,7 @@ public class AMQProtocolHandler implements ProtocolEngine if (causeIsAConnectionProblem) { _logger.info("Connection exception caught therefore going to attempt failover: " + cause, cause); + _initialConnectionException = cause; } else { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java index 13a16d07b5..1bbf166d82 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java @@ -20,17 +20,18 @@ */ package org.apache.qpid.transport.network.security.ssl; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.security.SSLStatus; -import org.apache.qpid.transport.util.Logger; +import java.nio.ByteBuffer; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngineResult; import javax.net.ssl.SSLEngineResult.HandshakeStatus; import javax.net.ssl.SSLEngineResult.Status; import javax.net.ssl.SSLException; -import java.nio.ByteBuffer; + +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.security.SSLStatus; +import org.apache.qpid.transport.util.Logger; public class SSLReceiver implements Receiver<ByteBuffer> { @@ -192,7 +193,7 @@ public class SSLReceiver implements Receiver<ByteBuffer> { _sslStatus.getSslLock().notifyAll(); } - exception(new TransportException("Error in SSLReceiver",e)); + exception(new TransportException("Error in SSLReceiver: " + e.getMessage(),e)); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java index fedb88d008..e606df3f7d 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java @@ -21,15 +21,8 @@ package org.apache.qpid.server.queue; -import org.junit.Assert; -import org.apache.log4j.Logger; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.test.utils.QpidBrokerTestCase; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import javax.jms.Connection; import javax.jms.JMSException; @@ -39,8 +32,17 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TopicSubscriber; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; +import javax.naming.NamingException; + +import org.apache.log4j.Logger; +import org.junit.Assert; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.test.utils.QpidBrokerTestCase; public class TimeToLiveTest extends QpidBrokerTestCase { @@ -53,18 +55,29 @@ public class TimeToLiveTest extends QpidBrokerTestCase private static final int MSG_COUNT = 50; private static final long SERVER_TTL_TIMEOUT = 60000L; + public void testPassiveTTLWithPrefetch() throws Exception + { + doTestPassiveTTL(true); + } + public void testPassiveTTL() throws Exception { + doTestPassiveTTL(false); + + } + + private void doTestPassiveTTL(boolean prefetchMessages) throws JMSException, NamingException + { //Create Client 1 Connection clientConnection = getConnection(); - + Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = clientSession.createQueue(QUEUE); - + Queue queue = clientSession.createQueue(QUEUE); + // Create then close the consumer so the queue is actually created // Closing it then reopening it ensures that the consumer shouldn't get messages // which should have expired and allows a shorter sleep period. See QPID-1418 - + MessageConsumer consumer = clientSession.createConsumer(queue); consumer.close(); @@ -79,6 +92,12 @@ public class TimeToLiveTest extends QpidBrokerTestCase MessageProducer producer = producerSession.createProducer(queue); + consumer = clientSession.createConsumer(queue); + if(prefetchMessages) + { + clientConnection.start(); + } + //Set TTL int msg = 0; producer.send(nextMessage(String.valueOf(msg), true, producerSession, producer)); @@ -96,7 +115,6 @@ public class TimeToLiveTest extends QpidBrokerTestCase producerSession.commit(); - consumer = clientSession.createConsumer(queue); // Ensure we sleep the required amount of time. ReentrantLock waitLock = new ReentrantLock(); @@ -124,6 +142,16 @@ public class TimeToLiveTest extends QpidBrokerTestCase } + if(prefetchMessages) + { + clientConnection.close(); + clientConnection = getConnection(); + + clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + queue = clientSession.createQueue(QUEUE); + consumer = clientSession.createConsumer(queue); + } + clientConnection.start(); //Receive Message 0 @@ -131,14 +159,14 @@ public class TimeToLiveTest extends QpidBrokerTestCase Message receivedFirst = consumer.receive(5000); Message receivedSecond = consumer.receive(5000); Message receivedThird = consumer.receive(1000); - + // Log the messages to help diagnosis incase of failure _logger.info("First:"+receivedFirst); _logger.info("Second:"+receivedSecond); _logger.info("Third:"+receivedThird); // Only first and last messages sent should survive expiry - Assert.assertNull("More messages received", receivedThird); + Assert.assertNull("More messages received", receivedThird); Assert.assertNotNull("First message not received", receivedFirst); Assert.assertTrue("First message doesn't have first set.", receivedFirst.getBooleanProperty("first")); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java index a6a08d83f9..d0f133aa73 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java @@ -456,6 +456,61 @@ public class QueueManagementTest extends QpidBrokerTestCase assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8); } + + /** + * Tests {@link ManagedQueue#copyMessages(long, long, String)} interface. + */ + public void testCopyMessagesBetweenQueuesWithDuplicates() throws Exception + { + final int numberOfMessagesToSend = 10; + sendMessage(_session, _sourceQueue, numberOfMessagesToSend); + syncSession(_session); + assertEquals("Unexpected queue depth after send", + numberOfMessagesToSend, + _managedSourceQueue.getMessageCount().intValue()); + + List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); + + // Copy first three messages to destination + long fromMessageId = amqMessagesIds.get(0); + long toMessageId = amqMessagesIds.get(2); + _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName); + + assertEquals("Unexpected queue depth on destination queue after first copy", + 3, + _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after first copy", + numberOfMessagesToSend, + _managedSourceQueue.getMessageCount().intValue()); + + // Now copy a further two messages to destination + fromMessageId = amqMessagesIds.get(7); + toMessageId = amqMessagesIds.get(8); + _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName); + assertEquals("Unexpected queue depth on destination queue after second copy", + 5, + _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after second copy", + numberOfMessagesToSend, + _managedSourceQueue.getMessageCount().intValue()); + + // Attempt to copy mixture of messages already on and some not already on the queue + + fromMessageId = amqMessagesIds.get(5); + toMessageId = amqMessagesIds.get(8); + _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName); + assertEquals("Unexpected queue depth on destination queue after second copy", + 7, + _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after second copy", + numberOfMessagesToSend, + _managedSourceQueue.getMessageCount().intValue()); + + assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8, 5, 6); + + + } + public void testMoveMessagesBetweenQueuesWithActiveConsumerOnSourceQueue() throws Exception { setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString()); |