summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java21
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java96
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java31
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java46
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java11
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java41
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java7
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java60
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java16
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java21
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java6
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java146
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java4
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java18
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java39
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java9
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java5
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java12
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java18
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java25
32 files changed, 633 insertions, 35 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
index faf5a724f3..f8585344b0 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
@@ -29,6 +29,8 @@ public interface ConsumerTarget
{
+ void acquisitionRemoved(MessageInstance node);
+
enum State
{
ACTIVE, SUSPENDED, CLOSED
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 58ffd88b85..e41bb948dc 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -511,7 +511,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
Exchange altExchange = getAlternateExchange();
if(altExchange != null)
{
- return ((ExchangeImpl)altExchange).send(message, routingAddress, instanceProperties, txn, postEnqueueAction);
+ return altExchange.send(message, routingAddress, instanceProperties, txn, postEnqueueAction);
}
else
{
@@ -520,7 +520,24 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
}
else
{
- final BaseQueue[] baseQueues = queues.toArray(new BaseQueue[queues.size()]);
+ final BaseQueue[] baseQueues;
+
+ if(message.isReferenced())
+ {
+ ArrayList<BaseQueue> uniqueQueues = new ArrayList<>(queues.size());
+ for(BaseQueue q : queues)
+ {
+ if(!message.isReferenced(q))
+ {
+ uniqueQueues.add(q);
+ }
+ }
+ baseQueues = uniqueQueues.toArray(new BaseQueue[uniqueQueues.size()]);
+ }
+ else
+ {
+ baseQueues = queues.toArray(new BaseQueue[queues.size()]);
+ }
txn.enqueue(queues,message, new ServerTransaction.Action()
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
index d397dd57b6..d2789bfe58 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
@@ -21,10 +21,17 @@
package org.apache.qpid.server.message;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData> implements ServerMessage<T>
@@ -33,10 +40,14 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI
private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> _refCountUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractServerMessageImpl.class, "_referenceCount");
+ private static final AtomicReferenceFieldUpdater<AbstractServerMessageImpl, Collection> _resourcesUpdater =
+ AtomicReferenceFieldUpdater.newUpdater(AbstractServerMessageImpl.class, Collection.class,"_resources");
+
private volatile int _referenceCount = 0;
private final StoredMessage<T> _handle;
private final Object _connectionReference;
+ private volatile Collection<UUID> _resources;
public AbstractServerMessageImpl(StoredMessage<T> handle, Object connectionReference)
@@ -117,6 +128,26 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI
}
@Override
+ final public MessageReference<X> newReference(TransactionLogResource object)
+ {
+ return new Reference(this, object);
+ }
+
+ @Override
+ final public boolean isReferenced(TransactionLogResource resource)
+ {
+ Collection<UUID> resources = _resources;
+ return resources != null && resources.contains(resource.getId());
+ }
+
+ @Override
+ final public boolean isReferenced()
+ {
+ Collection<UUID> resources = _resources;
+ return resources != null && !resources.isEmpty();
+ }
+
+ @Override
final public boolean isPersistent()
{
return _handle.getMetaData().isPersistent();
@@ -156,15 +187,52 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI
AtomicIntegerFieldUpdater.newUpdater(Reference.class, "_released");
private AbstractServerMessageImpl<X, T> _message;
+ private final UUID _resourceId;
private volatile int _released;
private Reference(final AbstractServerMessageImpl<X, T> message)
{
+ this(message, null);
+ }
+ private Reference(final AbstractServerMessageImpl<X, T> message, TransactionLogResource resource)
+ {
_message = message;
+ if(resource != null)
+ {
+ Collection<UUID> currentValue;
+ Collection<UUID> newValue;
+ _resourceId = resource.getId();
+ do
+ {
+ currentValue = _message._resources;
+
+ if(currentValue == null)
+ {
+ newValue = Collections.singleton(_resourceId);
+ }
+ else
+ {
+ if(currentValue.contains(_resourceId))
+ {
+ throw new MessageAlreadyReferencedException(_message.getMessageNumber(), resource);
+ }
+ newValue = new ArrayList<>(currentValue.size()+1);
+ newValue.addAll(currentValue);
+ newValue.add(_resourceId);
+ }
+
+ }
+ while(!_resourcesUpdater.compareAndSet(_message, currentValue, newValue));
+ }
+ else
+ {
+ _resourceId = null;
+ }
if(!_message.incrementReference())
{
throw new MessageDeletedException(message.getMessageNumber());
}
+
}
public X getMessage()
@@ -176,6 +244,34 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI
{
if(_releasedUpdater.compareAndSet(this,0,1))
{
+ if(_resourceId != null)
+ {
+ Collection<UUID> currentValue;
+ Collection<UUID> newValue;
+ do
+ {
+ currentValue = _message._resources;
+ if(currentValue.size() == 1)
+ {
+ newValue = null;
+ }
+ else
+ {
+ UUID[] array = new UUID[currentValue.size()-1];
+ int pos = 0;
+ for(UUID uuid : currentValue)
+ {
+ if(!_resourceId.equals(uuid))
+ {
+ array[pos++] = uuid;
+ }
+ }
+ newValue = Arrays.asList(array);
+ }
+ }
+ while(!_resourcesUpdater.compareAndSet(_message, currentValue, newValue));
+
+ }
_message.decrementReference();
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java
new file mode 100644
index 0000000000..7ab2625e63
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+import org.apache.qpid.server.store.TransactionLogResource;
+
+public class MessageAlreadyReferencedException extends RuntimeException
+{
+ MessageAlreadyReferencedException(final long messageNumber, TransactionLogResource resource)
+ {
+ super("The message with id " + messageNumber + " is already referenced by resource " + resource.getName());
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
index 4ee47e05e9..1bf451948d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
@@ -51,6 +51,8 @@ public interface MessageInstance
boolean isAcquiredBy(ConsumerImpl consumer);
+ boolean removeAcquisitionFromConsumer(ConsumerImpl consumer);
+
void setRedelivered();
boolean isRedelivered();
@@ -67,6 +69,10 @@ public interface MessageInstance
boolean acquire(ConsumerImpl sub);
+ boolean lockAcquisition();
+
+ boolean unlockAcquisition();
+
int getMaximumDeliveryCount();
int routeToAlternate(Action<? super MessageInstance> action, ServerTransaction txn);
@@ -99,6 +105,7 @@ public interface MessageInstance
State currentState = getState();
return currentState == State.DEQUEUED || currentState == State.DELETED;
}
+
}
@@ -162,10 +169,12 @@ public interface MessageInstance
public final class ConsumerAcquiredState<C extends ConsumerImpl> extends EntryState
{
private final C _consumer;
+ private final LockedAcquiredState<C> _lockedState;
public ConsumerAcquiredState(C consumer)
{
_consumer = consumer;
+ _lockedState = new LockedAcquiredState<>(this);
}
@@ -183,6 +192,43 @@ public interface MessageInstance
{
return "{" + getState().name() + " : " + _consumer +"}";
}
+
+ public LockedAcquiredState<C> getLockedState()
+ {
+ return _lockedState;
+ }
+
+ }
+
+ public final class LockedAcquiredState<C extends ConsumerImpl> extends EntryState
+ {
+ private final ConsumerAcquiredState<C> _acquiredState;
+
+ public LockedAcquiredState(final ConsumerAcquiredState<C> acquiredState)
+ {
+ _acquiredState = acquiredState;
+ }
+
+ @Override
+ public State getState()
+ {
+ return State.ACQUIRED;
+ }
+
+ public C getConsumer()
+ {
+ return _acquiredState.getConsumer();
+ }
+
+ public String toString()
+ {
+ return "{" + getState().name() + " : " + _acquiredState.getConsumer() +"}";
+ }
+
+ public ConsumerAcquiredState<C> getUnlockedState()
+ {
+ return _acquiredState;
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
index 8c35af8be4..81e6b13ffd 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
@@ -20,10 +20,11 @@
*/
package org.apache.qpid.server.message;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
-
-import java.nio.ByteBuffer;
+import org.apache.qpid.server.store.TransactionLogResource;
public interface ServerMessage<T extends StorableMessageMetaData> extends EnqueueableMessage, MessageContentSource
{
@@ -41,6 +42,12 @@ public interface ServerMessage<T extends StorableMessageMetaData> extends Enqueu
MessageReference newReference();
+ MessageReference newReference(TransactionLogResource object);
+
+ boolean isReferenced(TransactionLogResource resource);
+
+ boolean isReferenced();
+
long getMessageNumber();
long getArrivalTime();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 54f3c4de09..545a1d941d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1164,6 +1164,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
else
{
deliverMessage(sub, entry, false);
+ if(sub.acquires())
+ {
+ entry.unlockAcquisition();
+ }
}
}
}
@@ -2001,6 +2005,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
else
{
deliverMessage(sub, node, batch);
+ if(sub.acquires())
+ {
+ node.unlockAcquisition();
+ }
}
}
@@ -2253,14 +2261,28 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
if (!node.isDeleted())
{
// If the node has expired then acquire it
- if (node.expired() && node.acquire())
+ if (node.expired())
{
- if (_logger.isDebugEnabled())
+ boolean acquiredForDequeueing = node.acquire();
+ if(!acquiredForDequeueing && node.getDeliveredToConsumer())
+ {
+ QueueConsumer consumer = (QueueConsumer) node.getDeliveredConsumer();
+ acquiredForDequeueing = node.removeAcquisitionFromConsumer(consumer);
+ if(acquiredForDequeueing)
+ {
+ consumer.acquisitionRemoved(node);
+ }
+ }
+
+ if(acquiredForDequeueing)
{
- _logger.debug("Dequeuing expired node " + node);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Dequeuing expired node " + node);
+ }
+ // Then dequeue it.
+ dequeueEntry(node);
}
- // Then dequeue it.
- dequeueEntry(node);
}
else
{
@@ -2527,7 +2549,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
final ServerTransaction txn,
final Action<? super MessageInstance> postEnqueueAction)
{
- txn.enqueue(this,message, new ServerTransaction.Action()
+ if(!message.isReferenced(this))
+ {
+ txn.enqueue(this, message, new ServerTransaction.Action()
{
MessageReference _reference = message.newReference();
@@ -2549,6 +2573,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
});
return 1;
+ }
+ else
+ {
+ return 0;
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
index 5ffbc0dbaa..71b7636159 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
@@ -39,6 +39,8 @@ public interface QueueConsumer<X extends QueueConsumer<X>> extends ConsumerImpl,
void send(QueueEntry entry, boolean batch);
+ void acquisitionRemoved(QueueEntry node);
+
void queueDeleted();
SubFlushRunner getRunner();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index 55782ac095..d80aa92007 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -477,6 +477,13 @@ class QueueConsumerImpl
}
@Override
+ public void acquisitionRemoved(final QueueEntry node)
+ {
+ _target.acquisitionRemoved(node);
+ _queue.decrementUnackedMsgCount(node);
+ }
+
+ @Override
public String getDistributionMode()
{
return _distributionMode;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 49644f8d76..6c541d78ef 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -103,7 +103,7 @@ public abstract class QueueEntryImpl implements QueueEntry
{
_queueEntryList = queueEntryList;
- _message = message == null ? null : message.newReference();
+ _message = message == null ? null : message.newReference(queueEntryList.getQueue());
_entryIdUpdater.set(this, entryId);
populateInstanceProperties();
@@ -112,7 +112,7 @@ public abstract class QueueEntryImpl implements QueueEntry
public QueueEntryImpl(QueueEntryList queueEntryList, ServerMessage message)
{
_queueEntryList = queueEntryList;
- _message = message == null ? null : message.newReference();
+ _message = message == null ? null : message.newReference(queueEntryList.getQueue());
populateInstanceProperties();
}
@@ -210,7 +210,7 @@ public abstract class QueueEntryImpl implements QueueEntry
public boolean acquire(ConsumerImpl sub)
{
- final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState());
+ final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState().getLockedState());
if(acquired)
{
_deliveryCountUpdater.compareAndSet(this,-1,0);
@@ -218,17 +218,57 @@ public abstract class QueueEntryImpl implements QueueEntry
return acquired;
}
+ @Override
+ public boolean lockAcquisition()
+ {
+ EntryState state = _state;
+ if(state instanceof ConsumerAcquiredState)
+ {
+ return _stateUpdater.compareAndSet(this, state, ((ConsumerAcquiredState)state).getLockedState());
+ }
+ return state instanceof LockedAcquiredState;
+ }
+
+ @Override
+ public boolean unlockAcquisition()
+ {
+ EntryState state = _state;
+ if(state instanceof LockedAcquiredState)
+ {
+ return _stateUpdater.compareAndSet(this, state, ((LockedAcquiredState)state).getUnlockedState());
+ }
+ return false;
+ }
+
public boolean acquiredByConsumer()
{
- return (_state instanceof ConsumerAcquiredState);
+ return (_state instanceof ConsumerAcquiredState) || (_state instanceof LockedAcquiredState);
}
+ @Override
public boolean isAcquiredBy(ConsumerImpl consumer)
{
EntryState state = _state;
- return state instanceof ConsumerAcquiredState
- && ((ConsumerAcquiredState)state).getConsumer() == consumer;
+ return (state instanceof ConsumerAcquiredState
+ && ((ConsumerAcquiredState)state).getConsumer() == consumer)
+ || (state instanceof LockedAcquiredState
+ && ((LockedAcquiredState)state).getConsumer() == consumer);
+ }
+
+ @Override
+ public boolean removeAcquisitionFromConsumer(ConsumerImpl consumer)
+ {
+ EntryState state = _state;
+ if(state instanceof ConsumerAcquiredState
+ && ((ConsumerAcquiredState)state).getConsumer() == consumer)
+ {
+ return _stateUpdater.compareAndSet(this,state,NON_CONSUMER_ACQUIRED_STATE);
+ }
+ else
+ {
+ return false;
+ }
}
public void release()
@@ -238,7 +278,7 @@ public abstract class QueueEntryImpl implements QueueEntry
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
{
- if(state instanceof ConsumerAcquiredState)
+ if(state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState)
{
getQueue().decrementUnackedMsgCount(this);
}
@@ -268,6 +308,10 @@ public abstract class QueueEntryImpl implements QueueEntry
{
return (QueueConsumer) ((ConsumerAcquiredState) state).getConsumer();
}
+ else if (state instanceof LockedAcquiredState)
+ {
+ return (QueueConsumer) ((LockedAcquiredState) state).getConsumer();
+ }
else
{
return null;
@@ -312,7 +356,7 @@ public abstract class QueueEntryImpl implements QueueEntry
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
{
- if (state instanceof ConsumerAcquiredState)
+ if (state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState)
{
getQueue().decrementUnackedMsgCount(this);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
index 28dfc73a27..d4a91f2c0b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
@@ -24,7 +24,7 @@ import org.apache.qpid.server.message.ServerMessage;
public interface QueueEntryList
{
- AMQQueue getQueue();
+ AMQQueue<?> getQueue();
QueueEntry add(ServerMessage message);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java
index ac28619d2d..d4aeca0437 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java
@@ -151,6 +151,10 @@ public class FileKeyStoreImpl extends AbstractConfiguredObject<FileKeyStoreImpl>
{
super.validateChange(proxyForValidation, changedAttributes);
FileKeyStore changedStore = (FileKeyStore) proxyForValidation;
+ if (changedAttributes.contains(KeyStore.DESIRED_STATE) && changedStore.getDesiredState() == State.DELETED)
+ {
+ return;
+ }
if(changedAttributes.contains(NAME) && !getName().equals(changedStore.getName()))
{
throw new IllegalConfigurationException("Changing the key store name is not allowed");
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java
index d71670fbe0..0596c21291 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java
@@ -166,7 +166,12 @@ public class FileTrustStoreImpl extends AbstractConfiguredObject<FileTrustStoreI
protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes)
{
super.validateChange(proxyForValidation, changedAttributes);
+
FileTrustStore updated = (FileTrustStore) proxyForValidation;
+ if (changedAttributes.contains(TrustStore.DESIRED_STATE) && updated.getDesiredState() == State.DELETED)
+ {
+ return;
+ }
if(changedAttributes.contains(TrustStore.NAME) && !getName().equals(updated.getName()))
{
throw new IllegalConfigurationException("Changing the trust store name is not allowed");
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index bb7a726a0c..57142e6e1f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -473,7 +473,11 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
if (results == 0)
{
- getLogger().warn("Message metadata not found for message id " + messageId);
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Message id " + messageId
+ + " not found (attempt to remove failed - probably application initiated rollback)");
+ }
}
if (getLogger().isDebugEnabled())
@@ -482,7 +486,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT);
- stmt.setLong(1,messageId);
+ stmt.setLong(1, messageId);
results = stmt.executeUpdate();
}
finally
@@ -1492,7 +1496,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
if(stored())
{
checkMessageStoreOpen();
- getLogger().debug("GET CONTENT for message id " + _messageId);
data = AbstractJDBCMessageStore.this.getAllContent(_messageId);
T metaData = _messageDataRef.getMetaData();
if (metaData == null)
@@ -1568,7 +1571,10 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
@Override
public void remove()
{
- getLogger().debug("REMOVE called on message: " + _messageId);
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("REMOVE called on message: " + _messageId);
+ }
checkMessageStoreOpen();
int delta = getMetaData().getContentSize();
@@ -1605,7 +1611,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
{
if (!stored())
{
- getLogger().debug("STORING message id " + _messageId);
storeMetaData(conn, _messageId, _messageDataRef.getMetaData());
AbstractJDBCMessageStore.this.addContent(conn, _messageId,
_messageDataRef.getData() == null
@@ -1636,7 +1641,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
Pointer(final MessageData<T> ref)
{
- getLogger().debug("POST COMMIT for message id " + _messageId);
_ref = ref;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index f15f608907..b72d44debf 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -68,6 +68,8 @@ import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueConsumer;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.stats.StatisticsCounter;
@@ -953,15 +955,26 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
op.withinTransaction(new Transaction()
{
- public void dequeue(final MessageInstance entry)
+ public void dequeue(final MessageInstance messageInstance)
{
- if(entry.acquire())
+ boolean acquired = messageInstance.acquire();
+ if(!acquired && messageInstance instanceof QueueEntry)
+ {
+ QueueEntry entry = (QueueEntry) messageInstance;
+ QueueConsumer consumer = (QueueConsumer) entry.getDeliveredConsumer();
+ acquired = messageInstance.removeAcquisitionFromConsumer(consumer);
+ if(acquired)
+ {
+ consumer.acquisitionRemoved((QueueEntry)messageInstance);
+ }
+ }
+ if(acquired)
{
- txn.dequeue(entry.getOwningResource(), entry.getMessage(), new ServerTransaction.Action()
+ txn.dequeue(messageInstance.getOwningResource(), messageInstance.getMessage(), new ServerTransaction.Action()
{
public void postCommit()
{
- entry.delete();
+ messageInstance.delete();
}
public void onRollback()
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index f614ff5847..8d025c50dc 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -181,6 +181,12 @@ public class MockConsumer implements ConsumerTarget
}
+ @Override
+ public void acquisitionRemoved(final MessageInstance node)
+ {
+
+ }
+
public State getState()
{
return _state;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
index d7779390b1..c775a70cb8 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.exchange;
import static org.apache.qpid.common.AMQPFilterTypes.*;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -43,6 +44,7 @@ import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.QueueExistsException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -497,6 +499,7 @@ public class TopicExchangeTest extends QpidTestCase
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(ref);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
when(message.getMessageNumber()).thenReturn(messageNumber);
for(BaseQueue q : queues)
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java
new file mode 100644
index 0000000000..c90e406ba9
--- /dev/null
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java
@@ -0,0 +1,146 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class AbstractServerMessageTest extends QpidTestCase
+{
+ private static class TestMessage<T extends StorableMessageMetaData> extends AbstractServerMessageImpl<TestMessage<T>,T>
+ {
+
+ public TestMessage(final StoredMessage<T> handle,
+ final Object connectionReference)
+ {
+ super(handle, connectionReference);
+ }
+
+ @Override
+ public String getInitialRoutingAddress()
+ {
+ return null;
+ }
+
+ @Override
+ public AMQMessageHeader getMessageHeader()
+ {
+ return null;
+ }
+
+ @Override
+ public long getSize()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getExpiration()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getArrivalTime()
+ {
+ return 0;
+ }
+ }
+
+ private TransactionLogResource createQueue(String name)
+ {
+ TransactionLogResource queue = mock(TransactionLogResource.class);
+ when(queue.getId()).thenReturn(UUID.randomUUID());
+ when(queue.getName()).thenReturn(name);
+ return queue;
+ }
+
+ public void testReferences()
+ {
+ TransactionLogResource q1 = createQueue("1");
+ TransactionLogResource q2 = createQueue("2");
+
+ TestMessage<StorableMessageMetaData> msg = new TestMessage<StorableMessageMetaData>(mock(StoredMessage.class),this);
+ assertFalse(msg.isReferenced());
+ assertFalse(msg.isReferenced(q1));
+
+ MessageReference<TestMessage<StorableMessageMetaData>> nonQueueRef = msg.newReference();
+ assertFalse(msg.isReferenced());
+ assertFalse(msg.isReferenced(q1));
+
+ MessageReference<TestMessage<StorableMessageMetaData>> q1ref = msg.newReference(q1);
+ assertTrue(msg.isReferenced());
+ assertTrue(msg.isReferenced(q1));
+ assertFalse(msg.isReferenced(q2));
+
+ q1ref.release();
+ assertFalse(msg.isReferenced());
+ assertFalse(msg.isReferenced(q1));
+
+ q1ref = msg.newReference(q1);
+ assertTrue(msg.isReferenced());
+ assertTrue(msg.isReferenced(q1));
+ assertFalse(msg.isReferenced(q2));
+
+ MessageReference<TestMessage<StorableMessageMetaData>> q2ref = msg.newReference(q2);
+ assertTrue(msg.isReferenced());
+ assertTrue(msg.isReferenced(q1));
+ assertTrue(msg.isReferenced(q2));
+
+ try
+ {
+ msg.newReference(q1);
+ fail("Should not be able to create a second reference to the same queue");
+ }
+ catch (MessageAlreadyReferencedException e)
+ {
+ // pass
+ }
+ q2ref.release();
+ assertTrue(msg.isReferenced());
+ assertTrue(msg.isReferenced(q1));
+ assertFalse(msg.isReferenced(q2));
+
+ q1ref.release();
+ assertFalse(msg.isReferenced());
+ assertFalse(msg.isReferenced(q1));
+
+ nonQueueRef.release();
+
+ try
+ {
+ msg.newReference(q1);
+ fail("Message should not allow new references as all references had been removed");
+ }
+ catch(MessageDeletedException e)
+ {
+ // pass
+ }
+
+ }
+}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index 0def708fed..9255dbf42e 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.contains;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeastOnce;
@@ -60,6 +61,7 @@ import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.QueueNotificationListener;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AbstractQueue.QueueEntryFilter;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -1157,6 +1159,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
when(message.newReference()).thenReturn(ref);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
return message;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
index 70a35dc4aa..799fc71d74 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
@@ -19,6 +19,7 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -38,6 +39,7 @@ import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class LastValueQueueListTest extends TestCase
@@ -220,6 +222,8 @@ public class LastValueQueueListTest extends TestCase
MessageReference messageReference = mock(MessageReference.class);
when(mockMessage.newReference()).thenReturn(messageReference);
+ when(mockMessage.newReference(any(TransactionLogResource.class))).thenReturn(messageReference);
+
when(messageReference.getMessage()).thenReturn(mockMessage);
return mockMessage;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
index 74a2262265..37c4eeb127 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
@@ -65,6 +65,12 @@ public class MockMessageInstance implements MessageInstance
return false;
}
+ @Override
+ public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer)
+ {
+ return false;
+ }
+
public void delete()
{
@@ -81,6 +87,18 @@ public class MockMessageInstance implements MessageInstance
return false;
}
+ @Override
+ public boolean lockAcquisition()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean unlockAcquisition()
+ {
+ return false;
+ }
+
public boolean isAvailable()
{
return false;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
index cc5f36098e..631731ecc0 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -37,6 +38,7 @@ import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -79,6 +81,7 @@ public class PriorityQueueListTest extends QpidTestCase
when(message.getMessageHeader()).thenReturn(header);
when(message.newReference()).thenReturn(ref);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
when(ref.getMessage()).thenReturn(message);
when(header.getPriority()).thenReturn(PRIORITIES[i]);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index 3189010284..40b6c1bebd 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -18,6 +18,7 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -38,6 +39,7 @@ import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
/**
@@ -137,6 +139,42 @@ public abstract class QueueEntryImplTestBase extends TestCase
return consumer;
}
+
+ public void testLocking()
+ {
+ QueueConsumer consumer = newConsumer();
+ QueueConsumer consumer2 = newConsumer();
+
+ _queueEntry.acquire(consumer);
+ assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method",
+ _queueEntry.isAcquired());
+
+ assertFalse("Acquisition should initially be locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
+ assertTrue("Should be able to unlock locked queue entry", _queueEntry.unlockAcquisition());
+ assertFalse("Acquisition should not be able to be removed from the wrong consumer",
+ _queueEntry.removeAcquisitionFromConsumer(consumer2));
+ assertTrue("Acquisition should be able to be removed once unlocked",
+ _queueEntry.removeAcquisitionFromConsumer(consumer));
+ assertTrue("Queue Entry should still be acquired", _queueEntry.isAcquired());
+ assertFalse("Queue Entry should not be marked as acquired by a consumer", _queueEntry.acquiredByConsumer());
+
+ _queueEntry.release();
+
+ assertFalse("Hijacked queue entry should be able to be released", _queueEntry.isAcquired());
+
+ _queueEntry.acquire(consumer);
+ assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method",
+ _queueEntry.isAcquired());
+
+ assertFalse("Acquisition should initially be locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
+ assertTrue("Should be able to unlock locked queue entry",_queueEntry.unlockAcquisition());
+ assertTrue("Should be able to unlock locked queue entry",_queueEntry.lockAcquisition());
+ assertFalse("Acquisition should not be able to be hijacked when locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
+
+ _queueEntry.delete();
+ assertTrue("Locked queue entry should be able to be deleted", _queueEntry.isDeleted());
+ }
+
/**
* A helper method to get entry state
*
@@ -220,6 +258,7 @@ public abstract class QueueEntryImplTestBase extends TestCase
final MessageReference reference = mock(MessageReference.class);
when(reference.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(reference);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference);
QueueEntryImpl entry = (QueueEntryImpl) queueEntryList.add(message);
entries[i] = entry;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
index c89d2abeae..a0ab7cd454 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
@@ -19,14 +19,16 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import junit.framework.TestCase;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import org.apache.qpid.server.store.TransactionLogResource;
/**
* Abstract test class for QueueEntryList implementations.
@@ -96,6 +98,7 @@ public abstract class QueueEntryListTestBase extends TestCase
AMQMessageHeader hdr = mock(AMQMessageHeader.class);
when(ref.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(ref);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
when(message.getMessageHeader()).thenReturn(hdr);
return message;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
index bd23aaa50a..79d7628a9c 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -33,6 +34,7 @@ import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -162,6 +164,7 @@ public class QueueMessageRecoveryTest extends QpidTestCase
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(msg);
when(msg.newReference()).thenReturn(ref);
+ when(msg.newReference(any(TransactionLogResource.class))).thenReturn(ref);
when(msg.getStoredMessage()).thenReturn(mock(StoredMessage.class));
return msg;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
index eaed1427b2..a2d314d629 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -35,6 +36,7 @@ import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase
@@ -70,6 +72,7 @@ public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase
final MessageReference reference = mock(MessageReference.class);
when(reference.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(reference);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference);
return (QueueEntryImpl) queueEntryList.add(message);
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
index bcc1e7bc0e..0c7f19bbd5 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
@@ -19,6 +19,7 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -40,6 +41,7 @@ import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class SortedQueueEntryListTest extends QueueEntryListTestBase
@@ -180,6 +182,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(ref);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
when(message.getMessageNumber()).thenReturn(id);
return message;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
index 268d334949..d9a176c688 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
@@ -19,6 +19,7 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -38,6 +39,7 @@ import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class SortedQueueEntryTest extends QueueEntryImplTestBase
@@ -97,6 +99,7 @@ public class SortedQueueEntryTest extends QueueEntryImplTestBase
final MessageReference reference = mock(MessageReference.class);
when(reference.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(reference);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference);
return _queueEntryList.add(message);
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
index 89bb32e133..95c53c8428 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -36,6 +37,7 @@ import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class StandardQueueEntryListTest extends QueueEntryListTestBase
@@ -73,6 +75,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(ref);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
final QueueEntry bleh = _sqel.add(message);
assertNotNull("QE should not have been null", bleh);
@@ -163,6 +166,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(ref);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
QueueEntry bleh = sqel.add(message);
assertNotNull("QE should not have been null", bleh);
entriesMap.put(i,bleh);
@@ -264,6 +268,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase
final MessageReference reference = mock(MessageReference.class);
when(reference.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(reference);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference);
entries[i] = (OrderedQueueEntry) queueEntryList.add(message);
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
index d328e21a94..ce1c95e674 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
@@ -342,5 +342,17 @@ public class StandardQueueTest extends AbstractQueueTestBase
return super.acquire(sub);
}
}
+
+ @Override
+ public boolean lockAcquisition()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean unlockAcquisition()
+ {
+ return true;
+ }
}
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
index e16ba66391..848675bf5d 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
@@ -165,6 +165,24 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM
}
@Override
+ public MessageReference newReference(final TransactionLogResource object)
+ {
+ return _messageReference;
+ }
+
+ @Override
+ public boolean isReferenced(final TransactionLogResource resource)
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isReferenced()
+ {
+ return false;
+ }
+
+ @Override
public int hashCode()
{
final int prime = 31;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
index 8992cf62c9..e0fbb6dcc3 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
@@ -20,14 +20,15 @@
*/
package org.apache.qpid.server.txn;
+import java.nio.ByteBuffer;
+
import org.apache.commons.lang.NotImplementedException;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.StoredMessage;
-
-import java.nio.ByteBuffer;
+import org.apache.qpid.server.store.TransactionLogResource;
/**
* Mock Server Message allowing its persistent flag to be controlled from test.
@@ -57,6 +58,24 @@ class MockServerMessage implements ServerMessage
throw new NotImplementedException();
}
+ @Override
+ public MessageReference newReference(final TransactionLogResource object)
+ {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean isReferenced(final TransactionLogResource resource)
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isReferenced()
+ {
+ return false;
+ }
+
public boolean isImmediate()
{
throw new NotImplementedException();
@@ -113,4 +132,4 @@ class MockServerMessage implements ServerMessage
{
return 0L;
}
-} \ No newline at end of file
+}