summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java21
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java96
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java31
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java46
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java11
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java41
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java7
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java60
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java16
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java21
15 files changed, 336 insertions, 29 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
index faf5a724f3..f8585344b0 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
@@ -29,6 +29,8 @@ public interface ConsumerTarget
{
+ void acquisitionRemoved(MessageInstance node);
+
enum State
{
ACTIVE, SUSPENDED, CLOSED
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 58ffd88b85..e41bb948dc 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -511,7 +511,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
Exchange altExchange = getAlternateExchange();
if(altExchange != null)
{
- return ((ExchangeImpl)altExchange).send(message, routingAddress, instanceProperties, txn, postEnqueueAction);
+ return altExchange.send(message, routingAddress, instanceProperties, txn, postEnqueueAction);
}
else
{
@@ -520,7 +520,24 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
}
else
{
- final BaseQueue[] baseQueues = queues.toArray(new BaseQueue[queues.size()]);
+ final BaseQueue[] baseQueues;
+
+ if(message.isReferenced())
+ {
+ ArrayList<BaseQueue> uniqueQueues = new ArrayList<>(queues.size());
+ for(BaseQueue q : queues)
+ {
+ if(!message.isReferenced(q))
+ {
+ uniqueQueues.add(q);
+ }
+ }
+ baseQueues = uniqueQueues.toArray(new BaseQueue[uniqueQueues.size()]);
+ }
+ else
+ {
+ baseQueues = queues.toArray(new BaseQueue[queues.size()]);
+ }
txn.enqueue(queues,message, new ServerTransaction.Action()
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
index d397dd57b6..d2789bfe58 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
@@ -21,10 +21,17 @@
package org.apache.qpid.server.message;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData> implements ServerMessage<T>
@@ -33,10 +40,14 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI
private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> _refCountUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractServerMessageImpl.class, "_referenceCount");
+ private static final AtomicReferenceFieldUpdater<AbstractServerMessageImpl, Collection> _resourcesUpdater =
+ AtomicReferenceFieldUpdater.newUpdater(AbstractServerMessageImpl.class, Collection.class,"_resources");
+
private volatile int _referenceCount = 0;
private final StoredMessage<T> _handle;
private final Object _connectionReference;
+ private volatile Collection<UUID> _resources;
public AbstractServerMessageImpl(StoredMessage<T> handle, Object connectionReference)
@@ -117,6 +128,26 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI
}
@Override
+ final public MessageReference<X> newReference(TransactionLogResource object)
+ {
+ return new Reference(this, object);
+ }
+
+ @Override
+ final public boolean isReferenced(TransactionLogResource resource)
+ {
+ Collection<UUID> resources = _resources;
+ return resources != null && resources.contains(resource.getId());
+ }
+
+ @Override
+ final public boolean isReferenced()
+ {
+ Collection<UUID> resources = _resources;
+ return resources != null && !resources.isEmpty();
+ }
+
+ @Override
final public boolean isPersistent()
{
return _handle.getMetaData().isPersistent();
@@ -156,15 +187,52 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI
AtomicIntegerFieldUpdater.newUpdater(Reference.class, "_released");
private AbstractServerMessageImpl<X, T> _message;
+ private final UUID _resourceId;
private volatile int _released;
private Reference(final AbstractServerMessageImpl<X, T> message)
{
+ this(message, null);
+ }
+ private Reference(final AbstractServerMessageImpl<X, T> message, TransactionLogResource resource)
+ {
_message = message;
+ if(resource != null)
+ {
+ Collection<UUID> currentValue;
+ Collection<UUID> newValue;
+ _resourceId = resource.getId();
+ do
+ {
+ currentValue = _message._resources;
+
+ if(currentValue == null)
+ {
+ newValue = Collections.singleton(_resourceId);
+ }
+ else
+ {
+ if(currentValue.contains(_resourceId))
+ {
+ throw new MessageAlreadyReferencedException(_message.getMessageNumber(), resource);
+ }
+ newValue = new ArrayList<>(currentValue.size()+1);
+ newValue.addAll(currentValue);
+ newValue.add(_resourceId);
+ }
+
+ }
+ while(!_resourcesUpdater.compareAndSet(_message, currentValue, newValue));
+ }
+ else
+ {
+ _resourceId = null;
+ }
if(!_message.incrementReference())
{
throw new MessageDeletedException(message.getMessageNumber());
}
+
}
public X getMessage()
@@ -176,6 +244,34 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI
{
if(_releasedUpdater.compareAndSet(this,0,1))
{
+ if(_resourceId != null)
+ {
+ Collection<UUID> currentValue;
+ Collection<UUID> newValue;
+ do
+ {
+ currentValue = _message._resources;
+ if(currentValue.size() == 1)
+ {
+ newValue = null;
+ }
+ else
+ {
+ UUID[] array = new UUID[currentValue.size()-1];
+ int pos = 0;
+ for(UUID uuid : currentValue)
+ {
+ if(!_resourceId.equals(uuid))
+ {
+ array[pos++] = uuid;
+ }
+ }
+ newValue = Arrays.asList(array);
+ }
+ }
+ while(!_resourcesUpdater.compareAndSet(_message, currentValue, newValue));
+
+ }
_message.decrementReference();
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java
new file mode 100644
index 0000000000..7ab2625e63
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+import org.apache.qpid.server.store.TransactionLogResource;
+
+public class MessageAlreadyReferencedException extends RuntimeException
+{
+ MessageAlreadyReferencedException(final long messageNumber, TransactionLogResource resource)
+ {
+ super("The message with id " + messageNumber + " is already referenced by resource " + resource.getName());
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
index 4ee47e05e9..1bf451948d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
@@ -51,6 +51,8 @@ public interface MessageInstance
boolean isAcquiredBy(ConsumerImpl consumer);
+ boolean removeAcquisitionFromConsumer(ConsumerImpl consumer);
+
void setRedelivered();
boolean isRedelivered();
@@ -67,6 +69,10 @@ public interface MessageInstance
boolean acquire(ConsumerImpl sub);
+ boolean lockAcquisition();
+
+ boolean unlockAcquisition();
+
int getMaximumDeliveryCount();
int routeToAlternate(Action<? super MessageInstance> action, ServerTransaction txn);
@@ -99,6 +105,7 @@ public interface MessageInstance
State currentState = getState();
return currentState == State.DEQUEUED || currentState == State.DELETED;
}
+
}
@@ -162,10 +169,12 @@ public interface MessageInstance
public final class ConsumerAcquiredState<C extends ConsumerImpl> extends EntryState
{
private final C _consumer;
+ private final LockedAcquiredState<C> _lockedState;
public ConsumerAcquiredState(C consumer)
{
_consumer = consumer;
+ _lockedState = new LockedAcquiredState<>(this);
}
@@ -183,6 +192,43 @@ public interface MessageInstance
{
return "{" + getState().name() + " : " + _consumer +"}";
}
+
+ public LockedAcquiredState<C> getLockedState()
+ {
+ return _lockedState;
+ }
+
+ }
+
+ public final class LockedAcquiredState<C extends ConsumerImpl> extends EntryState
+ {
+ private final ConsumerAcquiredState<C> _acquiredState;
+
+ public LockedAcquiredState(final ConsumerAcquiredState<C> acquiredState)
+ {
+ _acquiredState = acquiredState;
+ }
+
+ @Override
+ public State getState()
+ {
+ return State.ACQUIRED;
+ }
+
+ public C getConsumer()
+ {
+ return _acquiredState.getConsumer();
+ }
+
+ public String toString()
+ {
+ return "{" + getState().name() + " : " + _acquiredState.getConsumer() +"}";
+ }
+
+ public ConsumerAcquiredState<C> getUnlockedState()
+ {
+ return _acquiredState;
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
index 8c35af8be4..81e6b13ffd 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
@@ -20,10 +20,11 @@
*/
package org.apache.qpid.server.message;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
-
-import java.nio.ByteBuffer;
+import org.apache.qpid.server.store.TransactionLogResource;
public interface ServerMessage<T extends StorableMessageMetaData> extends EnqueueableMessage, MessageContentSource
{
@@ -41,6 +42,12 @@ public interface ServerMessage<T extends StorableMessageMetaData> extends Enqueu
MessageReference newReference();
+ MessageReference newReference(TransactionLogResource object);
+
+ boolean isReferenced(TransactionLogResource resource);
+
+ boolean isReferenced();
+
long getMessageNumber();
long getArrivalTime();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 54f3c4de09..545a1d941d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1164,6 +1164,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
else
{
deliverMessage(sub, entry, false);
+ if(sub.acquires())
+ {
+ entry.unlockAcquisition();
+ }
}
}
}
@@ -2001,6 +2005,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
else
{
deliverMessage(sub, node, batch);
+ if(sub.acquires())
+ {
+ node.unlockAcquisition();
+ }
}
}
@@ -2253,14 +2261,28 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
if (!node.isDeleted())
{
// If the node has expired then acquire it
- if (node.expired() && node.acquire())
+ if (node.expired())
{
- if (_logger.isDebugEnabled())
+ boolean acquiredForDequeueing = node.acquire();
+ if(!acquiredForDequeueing && node.getDeliveredToConsumer())
+ {
+ QueueConsumer consumer = (QueueConsumer) node.getDeliveredConsumer();
+ acquiredForDequeueing = node.removeAcquisitionFromConsumer(consumer);
+ if(acquiredForDequeueing)
+ {
+ consumer.acquisitionRemoved(node);
+ }
+ }
+
+ if(acquiredForDequeueing)
{
- _logger.debug("Dequeuing expired node " + node);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Dequeuing expired node " + node);
+ }
+ // Then dequeue it.
+ dequeueEntry(node);
}
- // Then dequeue it.
- dequeueEntry(node);
}
else
{
@@ -2527,7 +2549,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
final ServerTransaction txn,
final Action<? super MessageInstance> postEnqueueAction)
{
- txn.enqueue(this,message, new ServerTransaction.Action()
+ if(!message.isReferenced(this))
+ {
+ txn.enqueue(this, message, new ServerTransaction.Action()
{
MessageReference _reference = message.newReference();
@@ -2549,6 +2573,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
});
return 1;
+ }
+ else
+ {
+ return 0;
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
index 5ffbc0dbaa..71b7636159 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
@@ -39,6 +39,8 @@ public interface QueueConsumer<X extends QueueConsumer<X>> extends ConsumerImpl,
void send(QueueEntry entry, boolean batch);
+ void acquisitionRemoved(QueueEntry node);
+
void queueDeleted();
SubFlushRunner getRunner();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index 55782ac095..d80aa92007 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -477,6 +477,13 @@ class QueueConsumerImpl
}
@Override
+ public void acquisitionRemoved(final QueueEntry node)
+ {
+ _target.acquisitionRemoved(node);
+ _queue.decrementUnackedMsgCount(node);
+ }
+
+ @Override
public String getDistributionMode()
{
return _distributionMode;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 49644f8d76..6c541d78ef 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -103,7 +103,7 @@ public abstract class QueueEntryImpl implements QueueEntry
{
_queueEntryList = queueEntryList;
- _message = message == null ? null : message.newReference();
+ _message = message == null ? null : message.newReference(queueEntryList.getQueue());
_entryIdUpdater.set(this, entryId);
populateInstanceProperties();
@@ -112,7 +112,7 @@ public abstract class QueueEntryImpl implements QueueEntry
public QueueEntryImpl(QueueEntryList queueEntryList, ServerMessage message)
{
_queueEntryList = queueEntryList;
- _message = message == null ? null : message.newReference();
+ _message = message == null ? null : message.newReference(queueEntryList.getQueue());
populateInstanceProperties();
}
@@ -210,7 +210,7 @@ public abstract class QueueEntryImpl implements QueueEntry
public boolean acquire(ConsumerImpl sub)
{
- final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState());
+ final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState().getLockedState());
if(acquired)
{
_deliveryCountUpdater.compareAndSet(this,-1,0);
@@ -218,17 +218,57 @@ public abstract class QueueEntryImpl implements QueueEntry
return acquired;
}
+ @Override
+ public boolean lockAcquisition()
+ {
+ EntryState state = _state;
+ if(state instanceof ConsumerAcquiredState)
+ {
+ return _stateUpdater.compareAndSet(this, state, ((ConsumerAcquiredState)state).getLockedState());
+ }
+ return state instanceof LockedAcquiredState;
+ }
+
+ @Override
+ public boolean unlockAcquisition()
+ {
+ EntryState state = _state;
+ if(state instanceof LockedAcquiredState)
+ {
+ return _stateUpdater.compareAndSet(this, state, ((LockedAcquiredState)state).getUnlockedState());
+ }
+ return false;
+ }
+
public boolean acquiredByConsumer()
{
- return (_state instanceof ConsumerAcquiredState);
+ return (_state instanceof ConsumerAcquiredState) || (_state instanceof LockedAcquiredState);
}
+ @Override
public boolean isAcquiredBy(ConsumerImpl consumer)
{
EntryState state = _state;
- return state instanceof ConsumerAcquiredState
- && ((ConsumerAcquiredState)state).getConsumer() == consumer;
+ return (state instanceof ConsumerAcquiredState
+ && ((ConsumerAcquiredState)state).getConsumer() == consumer)
+ || (state instanceof LockedAcquiredState
+ && ((LockedAcquiredState)state).getConsumer() == consumer);
+ }
+
+ @Override
+ public boolean removeAcquisitionFromConsumer(ConsumerImpl consumer)
+ {
+ EntryState state = _state;
+ if(state instanceof ConsumerAcquiredState
+ && ((ConsumerAcquiredState)state).getConsumer() == consumer)
+ {
+ return _stateUpdater.compareAndSet(this,state,NON_CONSUMER_ACQUIRED_STATE);
+ }
+ else
+ {
+ return false;
+ }
}
public void release()
@@ -238,7 +278,7 @@ public abstract class QueueEntryImpl implements QueueEntry
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
{
- if(state instanceof ConsumerAcquiredState)
+ if(state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState)
{
getQueue().decrementUnackedMsgCount(this);
}
@@ -268,6 +308,10 @@ public abstract class QueueEntryImpl implements QueueEntry
{
return (QueueConsumer) ((ConsumerAcquiredState) state).getConsumer();
}
+ else if (state instanceof LockedAcquiredState)
+ {
+ return (QueueConsumer) ((LockedAcquiredState) state).getConsumer();
+ }
else
{
return null;
@@ -312,7 +356,7 @@ public abstract class QueueEntryImpl implements QueueEntry
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
{
- if (state instanceof ConsumerAcquiredState)
+ if (state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState)
{
getQueue().decrementUnackedMsgCount(this);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
index 28dfc73a27..d4a91f2c0b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
@@ -24,7 +24,7 @@ import org.apache.qpid.server.message.ServerMessage;
public interface QueueEntryList
{
- AMQQueue getQueue();
+ AMQQueue<?> getQueue();
QueueEntry add(ServerMessage message);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java
index ac28619d2d..d4aeca0437 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java
@@ -151,6 +151,10 @@ public class FileKeyStoreImpl extends AbstractConfiguredObject<FileKeyStoreImpl>
{
super.validateChange(proxyForValidation, changedAttributes);
FileKeyStore changedStore = (FileKeyStore) proxyForValidation;
+ if (changedAttributes.contains(KeyStore.DESIRED_STATE) && changedStore.getDesiredState() == State.DELETED)
+ {
+ return;
+ }
if(changedAttributes.contains(NAME) && !getName().equals(changedStore.getName()))
{
throw new IllegalConfigurationException("Changing the key store name is not allowed");
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java
index d71670fbe0..0596c21291 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java
@@ -166,7 +166,12 @@ public class FileTrustStoreImpl extends AbstractConfiguredObject<FileTrustStoreI
protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes)
{
super.validateChange(proxyForValidation, changedAttributes);
+
FileTrustStore updated = (FileTrustStore) proxyForValidation;
+ if (changedAttributes.contains(TrustStore.DESIRED_STATE) && updated.getDesiredState() == State.DELETED)
+ {
+ return;
+ }
if(changedAttributes.contains(TrustStore.NAME) && !getName().equals(updated.getName()))
{
throw new IllegalConfigurationException("Changing the trust store name is not allowed");
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index bb7a726a0c..57142e6e1f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -473,7 +473,11 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
if (results == 0)
{
- getLogger().warn("Message metadata not found for message id " + messageId);
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Message id " + messageId
+ + " not found (attempt to remove failed - probably application initiated rollback)");
+ }
}
if (getLogger().isDebugEnabled())
@@ -482,7 +486,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT);
- stmt.setLong(1,messageId);
+ stmt.setLong(1, messageId);
results = stmt.executeUpdate();
}
finally
@@ -1492,7 +1496,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
if(stored())
{
checkMessageStoreOpen();
- getLogger().debug("GET CONTENT for message id " + _messageId);
data = AbstractJDBCMessageStore.this.getAllContent(_messageId);
T metaData = _messageDataRef.getMetaData();
if (metaData == null)
@@ -1568,7 +1571,10 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
@Override
public void remove()
{
- getLogger().debug("REMOVE called on message: " + _messageId);
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("REMOVE called on message: " + _messageId);
+ }
checkMessageStoreOpen();
int delta = getMetaData().getContentSize();
@@ -1605,7 +1611,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
{
if (!stored())
{
- getLogger().debug("STORING message id " + _messageId);
storeMetaData(conn, _messageId, _messageDataRef.getMetaData());
AbstractJDBCMessageStore.this.addContent(conn, _messageId,
_messageDataRef.getData() == null
@@ -1636,7 +1641,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
Pointer(final MessageData<T> ref)
{
- getLogger().debug("POST COMMIT for message id " + _messageId);
_ref = ref;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index f15f608907..b72d44debf 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -68,6 +68,8 @@ import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueConsumer;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.stats.StatisticsCounter;
@@ -953,15 +955,26 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
op.withinTransaction(new Transaction()
{
- public void dequeue(final MessageInstance entry)
+ public void dequeue(final MessageInstance messageInstance)
{
- if(entry.acquire())
+ boolean acquired = messageInstance.acquire();
+ if(!acquired && messageInstance instanceof QueueEntry)
+ {
+ QueueEntry entry = (QueueEntry) messageInstance;
+ QueueConsumer consumer = (QueueConsumer) entry.getDeliveredConsumer();
+ acquired = messageInstance.removeAcquisitionFromConsumer(consumer);
+ if(acquired)
+ {
+ consumer.acquisitionRemoved((QueueEntry)messageInstance);
+ }
+ }
+ if(acquired)
{
- txn.dequeue(entry.getOwningResource(), entry.getMessage(), new ServerTransaction.Action()
+ txn.dequeue(messageInstance.getOwningResource(), messageInstance.getMessage(), new ServerTransaction.Action()
{
public void postCommit()
{
- entry.delete();
+ messageInstance.delete();
}
public void onRollback()