summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-08-25 15:03:03 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-08-25 15:03:03 +0000
commitabd8126799b786e8e9a73df8dd637e6aa2b0ae4f (patch)
treeba7549949f8c4192b74836dec0904e916cd49d95
parent2a7c8b3061fda47cc53ef997c339599dd2285395 (diff)
downloadqpid-python-abd8126799b786e8e9a73df8dd637e6aa2b0ae4f.tar.gz
Merging from trunk r1616716:1616818 in the Java tree
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.30@1620333 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java11
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java21
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java96
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java31
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java46
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java11
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java41
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java7
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java60
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java16
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java21
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java6
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java146
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java4
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java18
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java39
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java9
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java5
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java12
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java18
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java25
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java18
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java8
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java9
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java20
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java35
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java15
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java28
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java18
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java20
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java14
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/addBinding.html60
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addBinding.js206
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java13
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java41
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java13
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java64
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java55
52 files changed, 1163 insertions, 161 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 338882e6df..835846a5ec 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -388,10 +388,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore
OperationStatus status = getMessageMetaDataDb().delete(tx, key);
if (status == OperationStatus.NOTFOUND)
{
- getLogger().info(
- "Message not found (attempt to remove failed - probably application initiated rollback) "
- +
- messageId);
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Message id " + messageId
+ + " not found (attempt to remove failed - probably application initiated rollback)");
+ }
}
if (getLogger().isDebugEnabled())
@@ -426,7 +427,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
catch(DatabaseException e2)
{
getLogger().warn(
- "Unable to abort transaction after LockConflictExcption on removal of message with id "
+ "Unable to abort transaction after LockConflictException on removal of message with id "
+ messageId,
e2);
// rethrow the original log conflict exception, the secondary exception should already have
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
index faf5a724f3..f8585344b0 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
@@ -29,6 +29,8 @@ public interface ConsumerTarget
{
+ void acquisitionRemoved(MessageInstance node);
+
enum State
{
ACTIVE, SUSPENDED, CLOSED
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 58ffd88b85..e41bb948dc 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -511,7 +511,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
Exchange altExchange = getAlternateExchange();
if(altExchange != null)
{
- return ((ExchangeImpl)altExchange).send(message, routingAddress, instanceProperties, txn, postEnqueueAction);
+ return altExchange.send(message, routingAddress, instanceProperties, txn, postEnqueueAction);
}
else
{
@@ -520,7 +520,24 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
}
else
{
- final BaseQueue[] baseQueues = queues.toArray(new BaseQueue[queues.size()]);
+ final BaseQueue[] baseQueues;
+
+ if(message.isReferenced())
+ {
+ ArrayList<BaseQueue> uniqueQueues = new ArrayList<>(queues.size());
+ for(BaseQueue q : queues)
+ {
+ if(!message.isReferenced(q))
+ {
+ uniqueQueues.add(q);
+ }
+ }
+ baseQueues = uniqueQueues.toArray(new BaseQueue[uniqueQueues.size()]);
+ }
+ else
+ {
+ baseQueues = queues.toArray(new BaseQueue[queues.size()]);
+ }
txn.enqueue(queues,message, new ServerTransaction.Action()
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
index d397dd57b6..d2789bfe58 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
@@ -21,10 +21,17 @@
package org.apache.qpid.server.message;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData> implements ServerMessage<T>
@@ -33,10 +40,14 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI
private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> _refCountUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractServerMessageImpl.class, "_referenceCount");
+ private static final AtomicReferenceFieldUpdater<AbstractServerMessageImpl, Collection> _resourcesUpdater =
+ AtomicReferenceFieldUpdater.newUpdater(AbstractServerMessageImpl.class, Collection.class,"_resources");
+
private volatile int _referenceCount = 0;
private final StoredMessage<T> _handle;
private final Object _connectionReference;
+ private volatile Collection<UUID> _resources;
public AbstractServerMessageImpl(StoredMessage<T> handle, Object connectionReference)
@@ -117,6 +128,26 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI
}
@Override
+ final public MessageReference<X> newReference(TransactionLogResource object)
+ {
+ return new Reference(this, object);
+ }
+
+ @Override
+ final public boolean isReferenced(TransactionLogResource resource)
+ {
+ Collection<UUID> resources = _resources;
+ return resources != null && resources.contains(resource.getId());
+ }
+
+ @Override
+ final public boolean isReferenced()
+ {
+ Collection<UUID> resources = _resources;
+ return resources != null && !resources.isEmpty();
+ }
+
+ @Override
final public boolean isPersistent()
{
return _handle.getMetaData().isPersistent();
@@ -156,15 +187,52 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI
AtomicIntegerFieldUpdater.newUpdater(Reference.class, "_released");
private AbstractServerMessageImpl<X, T> _message;
+ private final UUID _resourceId;
private volatile int _released;
private Reference(final AbstractServerMessageImpl<X, T> message)
{
+ this(message, null);
+ }
+ private Reference(final AbstractServerMessageImpl<X, T> message, TransactionLogResource resource)
+ {
_message = message;
+ if(resource != null)
+ {
+ Collection<UUID> currentValue;
+ Collection<UUID> newValue;
+ _resourceId = resource.getId();
+ do
+ {
+ currentValue = _message._resources;
+
+ if(currentValue == null)
+ {
+ newValue = Collections.singleton(_resourceId);
+ }
+ else
+ {
+ if(currentValue.contains(_resourceId))
+ {
+ throw new MessageAlreadyReferencedException(_message.getMessageNumber(), resource);
+ }
+ newValue = new ArrayList<>(currentValue.size()+1);
+ newValue.addAll(currentValue);
+ newValue.add(_resourceId);
+ }
+
+ }
+ while(!_resourcesUpdater.compareAndSet(_message, currentValue, newValue));
+ }
+ else
+ {
+ _resourceId = null;
+ }
if(!_message.incrementReference())
{
throw new MessageDeletedException(message.getMessageNumber());
}
+
}
public X getMessage()
@@ -176,6 +244,34 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI
{
if(_releasedUpdater.compareAndSet(this,0,1))
{
+ if(_resourceId != null)
+ {
+ Collection<UUID> currentValue;
+ Collection<UUID> newValue;
+ do
+ {
+ currentValue = _message._resources;
+ if(currentValue.size() == 1)
+ {
+ newValue = null;
+ }
+ else
+ {
+ UUID[] array = new UUID[currentValue.size()-1];
+ int pos = 0;
+ for(UUID uuid : currentValue)
+ {
+ if(!_resourceId.equals(uuid))
+ {
+ array[pos++] = uuid;
+ }
+ }
+ newValue = Arrays.asList(array);
+ }
+ }
+ while(!_resourcesUpdater.compareAndSet(_message, currentValue, newValue));
+
+ }
_message.decrementReference();
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java
new file mode 100644
index 0000000000..7ab2625e63
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+import org.apache.qpid.server.store.TransactionLogResource;
+
+public class MessageAlreadyReferencedException extends RuntimeException
+{
+ MessageAlreadyReferencedException(final long messageNumber, TransactionLogResource resource)
+ {
+ super("The message with id " + messageNumber + " is already referenced by resource " + resource.getName());
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
index 4ee47e05e9..1bf451948d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
@@ -51,6 +51,8 @@ public interface MessageInstance
boolean isAcquiredBy(ConsumerImpl consumer);
+ boolean removeAcquisitionFromConsumer(ConsumerImpl consumer);
+
void setRedelivered();
boolean isRedelivered();
@@ -67,6 +69,10 @@ public interface MessageInstance
boolean acquire(ConsumerImpl sub);
+ boolean lockAcquisition();
+
+ boolean unlockAcquisition();
+
int getMaximumDeliveryCount();
int routeToAlternate(Action<? super MessageInstance> action, ServerTransaction txn);
@@ -99,6 +105,7 @@ public interface MessageInstance
State currentState = getState();
return currentState == State.DEQUEUED || currentState == State.DELETED;
}
+
}
@@ -162,10 +169,12 @@ public interface MessageInstance
public final class ConsumerAcquiredState<C extends ConsumerImpl> extends EntryState
{
private final C _consumer;
+ private final LockedAcquiredState<C> _lockedState;
public ConsumerAcquiredState(C consumer)
{
_consumer = consumer;
+ _lockedState = new LockedAcquiredState<>(this);
}
@@ -183,6 +192,43 @@ public interface MessageInstance
{
return "{" + getState().name() + " : " + _consumer +"}";
}
+
+ public LockedAcquiredState<C> getLockedState()
+ {
+ return _lockedState;
+ }
+
+ }
+
+ public final class LockedAcquiredState<C extends ConsumerImpl> extends EntryState
+ {
+ private final ConsumerAcquiredState<C> _acquiredState;
+
+ public LockedAcquiredState(final ConsumerAcquiredState<C> acquiredState)
+ {
+ _acquiredState = acquiredState;
+ }
+
+ @Override
+ public State getState()
+ {
+ return State.ACQUIRED;
+ }
+
+ public C getConsumer()
+ {
+ return _acquiredState.getConsumer();
+ }
+
+ public String toString()
+ {
+ return "{" + getState().name() + " : " + _acquiredState.getConsumer() +"}";
+ }
+
+ public ConsumerAcquiredState<C> getUnlockedState()
+ {
+ return _acquiredState;
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
index 8c35af8be4..81e6b13ffd 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
@@ -20,10 +20,11 @@
*/
package org.apache.qpid.server.message;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
-
-import java.nio.ByteBuffer;
+import org.apache.qpid.server.store.TransactionLogResource;
public interface ServerMessage<T extends StorableMessageMetaData> extends EnqueueableMessage, MessageContentSource
{
@@ -41,6 +42,12 @@ public interface ServerMessage<T extends StorableMessageMetaData> extends Enqueu
MessageReference newReference();
+ MessageReference newReference(TransactionLogResource object);
+
+ boolean isReferenced(TransactionLogResource resource);
+
+ boolean isReferenced();
+
long getMessageNumber();
long getArrivalTime();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 54f3c4de09..545a1d941d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1164,6 +1164,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
else
{
deliverMessage(sub, entry, false);
+ if(sub.acquires())
+ {
+ entry.unlockAcquisition();
+ }
}
}
}
@@ -2001,6 +2005,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
else
{
deliverMessage(sub, node, batch);
+ if(sub.acquires())
+ {
+ node.unlockAcquisition();
+ }
}
}
@@ -2253,14 +2261,28 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
if (!node.isDeleted())
{
// If the node has expired then acquire it
- if (node.expired() && node.acquire())
+ if (node.expired())
{
- if (_logger.isDebugEnabled())
+ boolean acquiredForDequeueing = node.acquire();
+ if(!acquiredForDequeueing && node.getDeliveredToConsumer())
+ {
+ QueueConsumer consumer = (QueueConsumer) node.getDeliveredConsumer();
+ acquiredForDequeueing = node.removeAcquisitionFromConsumer(consumer);
+ if(acquiredForDequeueing)
+ {
+ consumer.acquisitionRemoved(node);
+ }
+ }
+
+ if(acquiredForDequeueing)
{
- _logger.debug("Dequeuing expired node " + node);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Dequeuing expired node " + node);
+ }
+ // Then dequeue it.
+ dequeueEntry(node);
}
- // Then dequeue it.
- dequeueEntry(node);
}
else
{
@@ -2527,7 +2549,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
final ServerTransaction txn,
final Action<? super MessageInstance> postEnqueueAction)
{
- txn.enqueue(this,message, new ServerTransaction.Action()
+ if(!message.isReferenced(this))
+ {
+ txn.enqueue(this, message, new ServerTransaction.Action()
{
MessageReference _reference = message.newReference();
@@ -2549,6 +2573,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
});
return 1;
+ }
+ else
+ {
+ return 0;
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
index 5ffbc0dbaa..71b7636159 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
@@ -39,6 +39,8 @@ public interface QueueConsumer<X extends QueueConsumer<X>> extends ConsumerImpl,
void send(QueueEntry entry, boolean batch);
+ void acquisitionRemoved(QueueEntry node);
+
void queueDeleted();
SubFlushRunner getRunner();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index 55782ac095..d80aa92007 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -477,6 +477,13 @@ class QueueConsumerImpl
}
@Override
+ public void acquisitionRemoved(final QueueEntry node)
+ {
+ _target.acquisitionRemoved(node);
+ _queue.decrementUnackedMsgCount(node);
+ }
+
+ @Override
public String getDistributionMode()
{
return _distributionMode;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 49644f8d76..6c541d78ef 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -103,7 +103,7 @@ public abstract class QueueEntryImpl implements QueueEntry
{
_queueEntryList = queueEntryList;
- _message = message == null ? null : message.newReference();
+ _message = message == null ? null : message.newReference(queueEntryList.getQueue());
_entryIdUpdater.set(this, entryId);
populateInstanceProperties();
@@ -112,7 +112,7 @@ public abstract class QueueEntryImpl implements QueueEntry
public QueueEntryImpl(QueueEntryList queueEntryList, ServerMessage message)
{
_queueEntryList = queueEntryList;
- _message = message == null ? null : message.newReference();
+ _message = message == null ? null : message.newReference(queueEntryList.getQueue());
populateInstanceProperties();
}
@@ -210,7 +210,7 @@ public abstract class QueueEntryImpl implements QueueEntry
public boolean acquire(ConsumerImpl sub)
{
- final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState());
+ final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState().getLockedState());
if(acquired)
{
_deliveryCountUpdater.compareAndSet(this,-1,0);
@@ -218,17 +218,57 @@ public abstract class QueueEntryImpl implements QueueEntry
return acquired;
}
+ @Override
+ public boolean lockAcquisition()
+ {
+ EntryState state = _state;
+ if(state instanceof ConsumerAcquiredState)
+ {
+ return _stateUpdater.compareAndSet(this, state, ((ConsumerAcquiredState)state).getLockedState());
+ }
+ return state instanceof LockedAcquiredState;
+ }
+
+ @Override
+ public boolean unlockAcquisition()
+ {
+ EntryState state = _state;
+ if(state instanceof LockedAcquiredState)
+ {
+ return _stateUpdater.compareAndSet(this, state, ((LockedAcquiredState)state).getUnlockedState());
+ }
+ return false;
+ }
+
public boolean acquiredByConsumer()
{
- return (_state instanceof ConsumerAcquiredState);
+ return (_state instanceof ConsumerAcquiredState) || (_state instanceof LockedAcquiredState);
}
+ @Override
public boolean isAcquiredBy(ConsumerImpl consumer)
{
EntryState state = _state;
- return state instanceof ConsumerAcquiredState
- && ((ConsumerAcquiredState)state).getConsumer() == consumer;
+ return (state instanceof ConsumerAcquiredState
+ && ((ConsumerAcquiredState)state).getConsumer() == consumer)
+ || (state instanceof LockedAcquiredState
+ && ((LockedAcquiredState)state).getConsumer() == consumer);
+ }
+
+ @Override
+ public boolean removeAcquisitionFromConsumer(ConsumerImpl consumer)
+ {
+ EntryState state = _state;
+ if(state instanceof ConsumerAcquiredState
+ && ((ConsumerAcquiredState)state).getConsumer() == consumer)
+ {
+ return _stateUpdater.compareAndSet(this,state,NON_CONSUMER_ACQUIRED_STATE);
+ }
+ else
+ {
+ return false;
+ }
}
public void release()
@@ -238,7 +278,7 @@ public abstract class QueueEntryImpl implements QueueEntry
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
{
- if(state instanceof ConsumerAcquiredState)
+ if(state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState)
{
getQueue().decrementUnackedMsgCount(this);
}
@@ -268,6 +308,10 @@ public abstract class QueueEntryImpl implements QueueEntry
{
return (QueueConsumer) ((ConsumerAcquiredState) state).getConsumer();
}
+ else if (state instanceof LockedAcquiredState)
+ {
+ return (QueueConsumer) ((LockedAcquiredState) state).getConsumer();
+ }
else
{
return null;
@@ -312,7 +356,7 @@ public abstract class QueueEntryImpl implements QueueEntry
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
{
- if (state instanceof ConsumerAcquiredState)
+ if (state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState)
{
getQueue().decrementUnackedMsgCount(this);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
index 28dfc73a27..d4a91f2c0b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
@@ -24,7 +24,7 @@ import org.apache.qpid.server.message.ServerMessage;
public interface QueueEntryList
{
- AMQQueue getQueue();
+ AMQQueue<?> getQueue();
QueueEntry add(ServerMessage message);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java
index ac28619d2d..d4aeca0437 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java
@@ -151,6 +151,10 @@ public class FileKeyStoreImpl extends AbstractConfiguredObject<FileKeyStoreImpl>
{
super.validateChange(proxyForValidation, changedAttributes);
FileKeyStore changedStore = (FileKeyStore) proxyForValidation;
+ if (changedAttributes.contains(KeyStore.DESIRED_STATE) && changedStore.getDesiredState() == State.DELETED)
+ {
+ return;
+ }
if(changedAttributes.contains(NAME) && !getName().equals(changedStore.getName()))
{
throw new IllegalConfigurationException("Changing the key store name is not allowed");
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java
index d71670fbe0..0596c21291 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java
@@ -166,7 +166,12 @@ public class FileTrustStoreImpl extends AbstractConfiguredObject<FileTrustStoreI
protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes)
{
super.validateChange(proxyForValidation, changedAttributes);
+
FileTrustStore updated = (FileTrustStore) proxyForValidation;
+ if (changedAttributes.contains(TrustStore.DESIRED_STATE) && updated.getDesiredState() == State.DELETED)
+ {
+ return;
+ }
if(changedAttributes.contains(TrustStore.NAME) && !getName().equals(updated.getName()))
{
throw new IllegalConfigurationException("Changing the trust store name is not allowed");
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index bb7a726a0c..57142e6e1f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -473,7 +473,11 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
if (results == 0)
{
- getLogger().warn("Message metadata not found for message id " + messageId);
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Message id " + messageId
+ + " not found (attempt to remove failed - probably application initiated rollback)");
+ }
}
if (getLogger().isDebugEnabled())
@@ -482,7 +486,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT);
- stmt.setLong(1,messageId);
+ stmt.setLong(1, messageId);
results = stmt.executeUpdate();
}
finally
@@ -1492,7 +1496,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
if(stored())
{
checkMessageStoreOpen();
- getLogger().debug("GET CONTENT for message id " + _messageId);
data = AbstractJDBCMessageStore.this.getAllContent(_messageId);
T metaData = _messageDataRef.getMetaData();
if (metaData == null)
@@ -1568,7 +1571,10 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
@Override
public void remove()
{
- getLogger().debug("REMOVE called on message: " + _messageId);
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("REMOVE called on message: " + _messageId);
+ }
checkMessageStoreOpen();
int delta = getMetaData().getContentSize();
@@ -1605,7 +1611,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
{
if (!stored())
{
- getLogger().debug("STORING message id " + _messageId);
storeMetaData(conn, _messageId, _messageDataRef.getMetaData());
AbstractJDBCMessageStore.this.addContent(conn, _messageId,
_messageDataRef.getData() == null
@@ -1636,7 +1641,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
Pointer(final MessageData<T> ref)
{
- getLogger().debug("POST COMMIT for message id " + _messageId);
_ref = ref;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index f15f608907..b72d44debf 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -68,6 +68,8 @@ import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueConsumer;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.stats.StatisticsCounter;
@@ -953,15 +955,26 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
op.withinTransaction(new Transaction()
{
- public void dequeue(final MessageInstance entry)
+ public void dequeue(final MessageInstance messageInstance)
{
- if(entry.acquire())
+ boolean acquired = messageInstance.acquire();
+ if(!acquired && messageInstance instanceof QueueEntry)
+ {
+ QueueEntry entry = (QueueEntry) messageInstance;
+ QueueConsumer consumer = (QueueConsumer) entry.getDeliveredConsumer();
+ acquired = messageInstance.removeAcquisitionFromConsumer(consumer);
+ if(acquired)
+ {
+ consumer.acquisitionRemoved((QueueEntry)messageInstance);
+ }
+ }
+ if(acquired)
{
- txn.dequeue(entry.getOwningResource(), entry.getMessage(), new ServerTransaction.Action()
+ txn.dequeue(messageInstance.getOwningResource(), messageInstance.getMessage(), new ServerTransaction.Action()
{
public void postCommit()
{
- entry.delete();
+ messageInstance.delete();
}
public void onRollback()
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index f614ff5847..8d025c50dc 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -181,6 +181,12 @@ public class MockConsumer implements ConsumerTarget
}
+ @Override
+ public void acquisitionRemoved(final MessageInstance node)
+ {
+
+ }
+
public State getState()
{
return _state;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
index d7779390b1..c775a70cb8 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.exchange;
import static org.apache.qpid.common.AMQPFilterTypes.*;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -43,6 +44,7 @@ import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.QueueExistsException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -497,6 +499,7 @@ public class TopicExchangeTest extends QpidTestCase
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(ref);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
when(message.getMessageNumber()).thenReturn(messageNumber);
for(BaseQueue q : queues)
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java
new file mode 100644
index 0000000000..c90e406ba9
--- /dev/null
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java
@@ -0,0 +1,146 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class AbstractServerMessageTest extends QpidTestCase
+{
+ private static class TestMessage<T extends StorableMessageMetaData> extends AbstractServerMessageImpl<TestMessage<T>,T>
+ {
+
+ public TestMessage(final StoredMessage<T> handle,
+ final Object connectionReference)
+ {
+ super(handle, connectionReference);
+ }
+
+ @Override
+ public String getInitialRoutingAddress()
+ {
+ return null;
+ }
+
+ @Override
+ public AMQMessageHeader getMessageHeader()
+ {
+ return null;
+ }
+
+ @Override
+ public long getSize()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getExpiration()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getArrivalTime()
+ {
+ return 0;
+ }
+ }
+
+ private TransactionLogResource createQueue(String name)
+ {
+ TransactionLogResource queue = mock(TransactionLogResource.class);
+ when(queue.getId()).thenReturn(UUID.randomUUID());
+ when(queue.getName()).thenReturn(name);
+ return queue;
+ }
+
+ public void testReferences()
+ {
+ TransactionLogResource q1 = createQueue("1");
+ TransactionLogResource q2 = createQueue("2");
+
+ TestMessage<StorableMessageMetaData> msg = new TestMessage<StorableMessageMetaData>(mock(StoredMessage.class),this);
+ assertFalse(msg.isReferenced());
+ assertFalse(msg.isReferenced(q1));
+
+ MessageReference<TestMessage<StorableMessageMetaData>> nonQueueRef = msg.newReference();
+ assertFalse(msg.isReferenced());
+ assertFalse(msg.isReferenced(q1));
+
+ MessageReference<TestMessage<StorableMessageMetaData>> q1ref = msg.newReference(q1);
+ assertTrue(msg.isReferenced());
+ assertTrue(msg.isReferenced(q1));
+ assertFalse(msg.isReferenced(q2));
+
+ q1ref.release();
+ assertFalse(msg.isReferenced());
+ assertFalse(msg.isReferenced(q1));
+
+ q1ref = msg.newReference(q1);
+ assertTrue(msg.isReferenced());
+ assertTrue(msg.isReferenced(q1));
+ assertFalse(msg.isReferenced(q2));
+
+ MessageReference<TestMessage<StorableMessageMetaData>> q2ref = msg.newReference(q2);
+ assertTrue(msg.isReferenced());
+ assertTrue(msg.isReferenced(q1));
+ assertTrue(msg.isReferenced(q2));
+
+ try
+ {
+ msg.newReference(q1);
+ fail("Should not be able to create a second reference to the same queue");
+ }
+ catch (MessageAlreadyReferencedException e)
+ {
+ // pass
+ }
+ q2ref.release();
+ assertTrue(msg.isReferenced());
+ assertTrue(msg.isReferenced(q1));
+ assertFalse(msg.isReferenced(q2));
+
+ q1ref.release();
+ assertFalse(msg.isReferenced());
+ assertFalse(msg.isReferenced(q1));
+
+ nonQueueRef.release();
+
+ try
+ {
+ msg.newReference(q1);
+ fail("Message should not allow new references as all references had been removed");
+ }
+ catch(MessageDeletedException e)
+ {
+ // pass
+ }
+
+ }
+}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index 0def708fed..9255dbf42e 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.contains;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeastOnce;
@@ -60,6 +61,7 @@ import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.QueueNotificationListener;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AbstractQueue.QueueEntryFilter;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -1157,6 +1159,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
when(message.newReference()).thenReturn(ref);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
return message;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
index 70a35dc4aa..799fc71d74 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
@@ -19,6 +19,7 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -38,6 +39,7 @@ import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class LastValueQueueListTest extends TestCase
@@ -220,6 +222,8 @@ public class LastValueQueueListTest extends TestCase
MessageReference messageReference = mock(MessageReference.class);
when(mockMessage.newReference()).thenReturn(messageReference);
+ when(mockMessage.newReference(any(TransactionLogResource.class))).thenReturn(messageReference);
+
when(messageReference.getMessage()).thenReturn(mockMessage);
return mockMessage;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
index 74a2262265..37c4eeb127 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
@@ -65,6 +65,12 @@ public class MockMessageInstance implements MessageInstance
return false;
}
+ @Override
+ public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer)
+ {
+ return false;
+ }
+
public void delete()
{
@@ -81,6 +87,18 @@ public class MockMessageInstance implements MessageInstance
return false;
}
+ @Override
+ public boolean lockAcquisition()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean unlockAcquisition()
+ {
+ return false;
+ }
+
public boolean isAvailable()
{
return false;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
index cc5f36098e..631731ecc0 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -37,6 +38,7 @@ import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -79,6 +81,7 @@ public class PriorityQueueListTest extends QpidTestCase
when(message.getMessageHeader()).thenReturn(header);
when(message.newReference()).thenReturn(ref);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
when(ref.getMessage()).thenReturn(message);
when(header.getPriority()).thenReturn(PRIORITIES[i]);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index 3189010284..40b6c1bebd 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -18,6 +18,7 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -38,6 +39,7 @@ import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
/**
@@ -137,6 +139,42 @@ public abstract class QueueEntryImplTestBase extends TestCase
return consumer;
}
+
+ public void testLocking()
+ {
+ QueueConsumer consumer = newConsumer();
+ QueueConsumer consumer2 = newConsumer();
+
+ _queueEntry.acquire(consumer);
+ assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method",
+ _queueEntry.isAcquired());
+
+ assertFalse("Acquisition should initially be locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
+ assertTrue("Should be able to unlock locked queue entry", _queueEntry.unlockAcquisition());
+ assertFalse("Acquisition should not be able to be removed from the wrong consumer",
+ _queueEntry.removeAcquisitionFromConsumer(consumer2));
+ assertTrue("Acquisition should be able to be removed once unlocked",
+ _queueEntry.removeAcquisitionFromConsumer(consumer));
+ assertTrue("Queue Entry should still be acquired", _queueEntry.isAcquired());
+ assertFalse("Queue Entry should not be marked as acquired by a consumer", _queueEntry.acquiredByConsumer());
+
+ _queueEntry.release();
+
+ assertFalse("Hijacked queue entry should be able to be released", _queueEntry.isAcquired());
+
+ _queueEntry.acquire(consumer);
+ assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method",
+ _queueEntry.isAcquired());
+
+ assertFalse("Acquisition should initially be locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
+ assertTrue("Should be able to unlock locked queue entry",_queueEntry.unlockAcquisition());
+ assertTrue("Should be able to unlock locked queue entry",_queueEntry.lockAcquisition());
+ assertFalse("Acquisition should not be able to be hijacked when locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
+
+ _queueEntry.delete();
+ assertTrue("Locked queue entry should be able to be deleted", _queueEntry.isDeleted());
+ }
+
/**
* A helper method to get entry state
*
@@ -220,6 +258,7 @@ public abstract class QueueEntryImplTestBase extends TestCase
final MessageReference reference = mock(MessageReference.class);
when(reference.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(reference);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference);
QueueEntryImpl entry = (QueueEntryImpl) queueEntryList.add(message);
entries[i] = entry;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
index c89d2abeae..a0ab7cd454 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
@@ -19,14 +19,16 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import junit.framework.TestCase;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import org.apache.qpid.server.store.TransactionLogResource;
/**
* Abstract test class for QueueEntryList implementations.
@@ -96,6 +98,7 @@ public abstract class QueueEntryListTestBase extends TestCase
AMQMessageHeader hdr = mock(AMQMessageHeader.class);
when(ref.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(ref);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
when(message.getMessageHeader()).thenReturn(hdr);
return message;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
index bd23aaa50a..79d7628a9c 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -33,6 +34,7 @@ import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -162,6 +164,7 @@ public class QueueMessageRecoveryTest extends QpidTestCase
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(msg);
when(msg.newReference()).thenReturn(ref);
+ when(msg.newReference(any(TransactionLogResource.class))).thenReturn(ref);
when(msg.getStoredMessage()).thenReturn(mock(StoredMessage.class));
return msg;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
index eaed1427b2..a2d314d629 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -35,6 +36,7 @@ import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase
@@ -70,6 +72,7 @@ public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase
final MessageReference reference = mock(MessageReference.class);
when(reference.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(reference);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference);
return (QueueEntryImpl) queueEntryList.add(message);
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
index bcc1e7bc0e..0c7f19bbd5 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
@@ -19,6 +19,7 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -40,6 +41,7 @@ import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class SortedQueueEntryListTest extends QueueEntryListTestBase
@@ -180,6 +182,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(ref);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
when(message.getMessageNumber()).thenReturn(id);
return message;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
index 268d334949..d9a176c688 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
@@ -19,6 +19,7 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -38,6 +39,7 @@ import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class SortedQueueEntryTest extends QueueEntryImplTestBase
@@ -97,6 +99,7 @@ public class SortedQueueEntryTest extends QueueEntryImplTestBase
final MessageReference reference = mock(MessageReference.class);
when(reference.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(reference);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference);
return _queueEntryList.add(message);
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
index 89bb32e133..95c53c8428 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -36,6 +37,7 @@ import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class StandardQueueEntryListTest extends QueueEntryListTestBase
@@ -73,6 +75,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(ref);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
final QueueEntry bleh = _sqel.add(message);
assertNotNull("QE should not have been null", bleh);
@@ -163,6 +166,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(ref);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
QueueEntry bleh = sqel.add(message);
assertNotNull("QE should not have been null", bleh);
entriesMap.put(i,bleh);
@@ -264,6 +268,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase
final MessageReference reference = mock(MessageReference.class);
when(reference.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(reference);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference);
entries[i] = (OrderedQueueEntry) queueEntryList.add(message);
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
index d328e21a94..ce1c95e674 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
@@ -342,5 +342,17 @@ public class StandardQueueTest extends AbstractQueueTestBase
return super.acquire(sub);
}
}
+
+ @Override
+ public boolean lockAcquisition()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean unlockAcquisition()
+ {
+ return true;
+ }
}
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
index e16ba66391..848675bf5d 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
@@ -165,6 +165,24 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM
}
@Override
+ public MessageReference newReference(final TransactionLogResource object)
+ {
+ return _messageReference;
+ }
+
+ @Override
+ public boolean isReferenced(final TransactionLogResource resource)
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isReferenced()
+ {
+ return false;
+ }
+
+ @Override
public int hashCode()
{
final int prime = 31;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
index 8992cf62c9..e0fbb6dcc3 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
@@ -20,14 +20,15 @@
*/
package org.apache.qpid.server.txn;
+import java.nio.ByteBuffer;
+
import org.apache.commons.lang.NotImplementedException;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.StoredMessage;
-
-import java.nio.ByteBuffer;
+import org.apache.qpid.server.store.TransactionLogResource;
/**
* Mock Server Message allowing its persistent flag to be controlled from test.
@@ -57,6 +58,24 @@ class MockServerMessage implements ServerMessage
throw new NotImplementedException();
}
+ @Override
+ public MessageReference newReference(final TransactionLogResource object)
+ {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean isReferenced(final TransactionLogResource resource)
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isReferenced()
+ {
+ return false;
+ }
+
public boolean isImmediate()
{
throw new NotImplementedException();
@@ -113,4 +132,4 @@ class MockServerMessage implements ServerMessage
{
return 0L;
}
-} \ No newline at end of file
+}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index d73d019000..7ab3fbb1f5 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -534,15 +534,25 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
return _stopped.get();
}
- public void acknowledge(MessageInstance entry)
+ public boolean deleteAcquired(MessageInstance entry)
{
- // TODO Fix Store Context / cleanup
if(entry.isAcquiredBy(getConsumer()))
{
- _unacknowledgedBytes.addAndGet(-entry.getMessage().getSize());
- _unacknowledgedCount.decrementAndGet();
+ acquisitionRemoved(entry);
entry.delete();
+ return true;
}
+ else
+ {
+ return false;
+ }
+ }
+
+ @Override
+ public void acquisitionRemoved(final MessageInstance entry)
+ {
+ _unacknowledgedBytes.addAndGet(-entry.getMessage().getSize());
+ _unacknowledgedCount.decrementAndGet();
}
public void flush()
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
index 4420709a91..94f04bbae3 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
@@ -41,13 +41,13 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public void onAccept()
{
- if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
+ if(_target != null && _entry.isAcquiredBy(_target.getConsumer()) && _entry.lockAcquisition())
{
_target.getSessionModel().acknowledge(_target, _entry);
}
else
{
- _logger.warn("MessageAccept received for message which has not been acquired (likely client error)");
+ _logger.info("MessageAccept received for message which is not been acquired - message may have expired or been removed");
}
}
@@ -60,7 +60,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
}
else
{
- _logger.warn("MessageRelease received for message which has not been acquired (likely client error)");
+ _logger.warn("MessageRelease received for message which has not been acquired - message may have expired or been removed");
}
}
@@ -72,7 +72,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
}
else
{
- _logger.warn("MessageReject received for message which has not been acquired (likely client error)");
+ _logger.warn("MessageReject received for message which has not been acquired - message may have expired or been removed");
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
index cd1146ac0b..7917b7989a 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
@@ -29,6 +29,7 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene
private final ConsumerTarget_0_10 _sub;
private final MessageInstance _entry;
private final ServerSession _session;
+ private long _messageSize;
private boolean _restoreCredit;
public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit)
@@ -38,15 +39,19 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene
_entry = entry;
_session = session;
_restoreCredit = restoreCredit;
+ if(restoreCredit)
+ {
+ _messageSize = entry.getMessage().getSize();
+ }
}
public void onComplete(Method method)
{
if(_restoreCredit)
{
- _sub.restoreCredit(_entry.getMessage());
+ _sub.getCreditManager().restoreCredit(1l, _messageSize);
}
- if(_entry.isAcquiredBy(_sub.getConsumer()))
+ if(_entry.isAcquiredBy(_sub.getConsumer()) && _entry.lockAcquisition())
{
_session.acknowledge(_sub, _entry);
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 3fe1515b18..b1c22fe823 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -460,7 +460,7 @@ public class ServerSession extends Session
public void postCommit()
{
- sub.acknowledge(entry);
+ sub.deleteAcquired(entry);
}
public void onRollback()
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index b6e1b7dd6a..7877812d84 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -1413,7 +1413,11 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
// explicit rollbacks resend the message after the rollback-ok is sent
if(_rollingBack)
{
- _resendList.addAll(_ackedMessages);
+ for(MessageInstance entry : _ackedMessages)
+ {
+ entry.unlockAcquisition();
+ }
+ _resendList.addAll(_ackedMessages);
}
else
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index ae63c16025..7c2efe64e6 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -20,11 +20,16 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.message.InstanceProperties;
@@ -34,14 +39,10 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.StateChangeListener;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* Encapsulation of a subscription to a queue.
* <p>
@@ -59,7 +60,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
final MessageInstance.State oldSate,
final MessageInstance.State newState)
{
- if (oldSate == QueueEntry.State.ACQUIRED && (newState == QueueEntry.State.AVAILABLE || newState == QueueEntry.State.DEQUEUED))
+ if (oldSate == QueueEntry.State.ACQUIRED && newState != QueueEntry.State.ACQUIRED)
{
restoreCredit(entry.getMessage());
}
@@ -76,8 +77,8 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel,
- AMQShortString consumerTag, FieldTable filters,
- FlowCreditManager creditManager) throws AMQException
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager) throws AMQException
{
return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
}
@@ -555,6 +556,11 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
});
}
+ @Override
+ public void acquisitionRemoved(final MessageInstance node)
+ {
+ }
+
public long getUnacknowledgedBytes()
{
return _unacknowledgedBytes.longValue();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
index 8d70e769d3..1bd9ab079e 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
@@ -20,31 +20,28 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.queue.QueueEntry;
-
+import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
+
public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
{
private final Object _lock = new Object();
- private long _unackedSize;
-
private Map<Long, MessageInstance> _map;
- private long _lastDeliveryTag;
-
private final int _prefetchLimit;
public UnacknowledgedMessageMapImpl(int prefetchLimit)
{
_prefetchLimit = prefetchLimit;
- _map = new LinkedHashMap<Long, MessageInstance>(prefetchLimit);
+ _map = new LinkedHashMap<>(prefetchLimit);
}
public void collect(long deliveryTag, boolean multiple, Map<Long, MessageInstance> msgs)
@@ -81,12 +78,6 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
{
MessageInstance message = _map.remove(deliveryTag);
- if(message != null)
- {
- _unackedSize -= message.getMessage().getSize();
-
- }
-
return message;
}
}
@@ -109,8 +100,6 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
synchronized (_lock)
{
_map.put(deliveryTag, message);
- _unackedSize += message.getMessage().getSize();
- _lastDeliveryTag = deliveryTag;
}
}
@@ -119,8 +108,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
synchronized (_lock)
{
Collection<MessageInstance> currentEntries = _map.values();
- _map = new LinkedHashMap<Long, MessageInstance>(_prefetchLimit);
- _unackedSize = 0l;
+ _map = new LinkedHashMap<>(_prefetchLimit);
return currentEntries;
}
}
@@ -138,7 +126,6 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
synchronized (_lock)
{
_map.clear();
- _unackedSize = 0l;
}
}
@@ -163,6 +150,14 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
Map<Long, MessageInstance> ackedMessageMap = new LinkedHashMap<Long,MessageInstance>();
collect(deliveryTag, multiple, ackedMessageMap);
remove(ackedMessageMap);
+ List<MessageInstance> acknowledged = new ArrayList<>();
+ for(MessageInstance instance : ackedMessageMap.values())
+ {
+ if(instance.lockAcquisition())
+ {
+ acknowledged.add(instance);
+ }
+ }
return ackedMessageMap.values();
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index adb2f8ea6a..bceae85896 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import java.nio.ByteBuffer;
+import java.util.List;
+
import org.apache.qpid.amqp_1_0.codec.ValueHandler;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
@@ -37,19 +40,16 @@ import org.apache.qpid.amqp_1_0.type.messaging.Released;
import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
-import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
class ConsumerTarget_1_0 extends AbstractConsumerTarget
{
private final boolean _acquires;
@@ -378,6 +378,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
if(outcome instanceof Accepted)
{
+ _queueEntry.lockAcquisition();
txn.dequeue(_queueEntry.getOwningResource(), _queueEntry.getMessage(),
new ServerTransaction.Action()
{
@@ -412,6 +413,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
modified.setDeliveryFailed(true);
_link.getEndpoint().updateDisposition(_deliveryTag, modified, true);
_link.getEndpoint().sendFlowConditional();
+ _queueEntry.unlockAcquisition();
}
}
});
@@ -498,6 +500,11 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
}
@Override
+ public void acquisitionRemoved(final MessageInstance node)
+ {
+ }
+
+ @Override
public void consumerAdded(final ConsumerImpl sub)
{
_consumer = sub;
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 7a844cbc79..a8fc5387b4 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -636,19 +636,21 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore());
if(_consumer.acquires())
{
- txn.dequeue(Collections.singleton(queueEntry),
- new ServerTransaction.Action()
- {
- public void postCommit()
- {
- queueEntry.delete();
- }
-
- public void onRollback()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
- });
+ if(queueEntry.acquire() || queueEntry.isAcquired())
+ {
+ txn.dequeue(Collections.singleton(queueEntry),
+ new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ queueEntry.delete();
+ }
+
+ public void onRollback()
+ {
+ }
+ });
+ }
}
}
else if(outcome instanceof Released)
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index e73d177599..34f08615ad 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -1071,6 +1071,12 @@ class ManagementNode implements MessageSource, MessageDestination
}
@Override
+ public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer)
+ {
+ return false;
+ }
+
+ @Override
public void setRedelivered()
{
@@ -1119,6 +1125,18 @@ class ManagementNode implements MessageSource, MessageDestination
}
@Override
+ public boolean lockAcquisition()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean unlockAcquisition()
+ {
+ return false;
+ }
+
+ @Override
public int getMaximumDeliveryCount()
{
return 0;
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
index ae2828d392..03e7eab61b 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
@@ -90,6 +90,12 @@ class ManagementResponse implements MessageInstance
}
@Override
+ public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer)
+ {
+ return consumer == _consumer;
+ }
+
+ @Override
public void setRedelivered()
{
_isRedelivered = true;
@@ -138,6 +144,18 @@ class ManagementResponse implements MessageInstance
}
@Override
+ public boolean lockAcquisition()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean unlockAcquisition()
+ {
+ return false;
+ }
+
+ @Override
public int getMaximumDeliveryCount()
{
return 0;
@@ -190,7 +208,7 @@ class ManagementResponse implements MessageInstance
@Override
public void delete()
{
- // TODO
+ _isDeleted = true;
}
@Override
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
index 9866207234..8c77876e1a 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
@@ -33,6 +33,7 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
+
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageDeletedException;
@@ -44,6 +45,7 @@ import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntryVisitor;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
+import org.apache.qpid.server.store.TransactionLogResource;
public class MessageServlet extends AbstractServlet
{
@@ -212,7 +214,11 @@ public class MessageServlet extends AbstractServlet
@Override
protected void updateEntry(QueueEntry entry, VirtualHost.Transaction txn)
{
- txn.move(entry, _destinationQueue);
+ ServerMessage msg = entry.getMessage();
+ if(msg != null && !msg.isReferenced((TransactionLogResource)_destinationQueue))
+ {
+ txn.move(entry, _destinationQueue);
+ }
}
}
@@ -229,7 +235,11 @@ public class MessageServlet extends AbstractServlet
@Override
protected void updateEntry(QueueEntry entry, VirtualHost.Transaction txn)
{
- txn.copy(entry, _destinationQueue);
+ ServerMessage msg = entry.getMessage();
+ if(msg != null && !msg.isReferenced((TransactionLogResource)_destinationQueue))
+ {
+ txn.copy(entry, _destinationQueue);
+ }
}
}
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/addBinding.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/addBinding.html
index 9aebca90d7..b57e3a1a24 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/addBinding.html
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/addBinding.html
@@ -15,26 +15,48 @@
~ limitations under the License.
-->
<div class="dijitHidden">
- <div data-dojo-type="dijit.Dialog" style="width:600px;" data-dojo-props="title:'Add Binding'" id="addBinding">
- <form id="formAddBinding" method="post" dojoType="dijit.form.Form">
- <table cellpadding="0" cellspacing="2">
- <tr>
- <td valign="top"><strong>Exchange Name*: </strong></td>
- <td><div id="addBinding.selectExchangeDiv"></div></td>
- </tr>
- <tr>
- <td valign="top"><strong>Queue Name*: </strong></td>
- <td><div id="addBinding.selectQueueDiv"></div></td>
- </tr>
- <tr>
- <td valign="top"><strong>Binding Key*: </strong></td>
- <td><input type="text" required="true" name="name" id="formAddbinding.bindingKey" placeholder="Binding Key"
- dojoType="dijit.form.ValidationTextBox" missingMessage="A name must be supplied" /></td>
- </tr>
- </table>
+ <div data-dojo-type="dijit/Dialog" style="width:600px;" data-dojo-props="title:'Add Binding'" id="addBinding">
+ <form id="formAddBinding" method="post" data-dojo-type="dijit/form/Form">
+
+ <div class="clear">
+ <div class="formLabel-labelCell tableContainer-labelCell">Exchange Name*:</div>
+ <div class="formLabel-controlCell tableContainer-valueCell">
+ <div id="addBinding.selectExchangeDiv"></div>
+ </div>
+ </div>
+ <div class="clear">
+ <div class="formLabel-labelCell tableContainer-labelCell">Queue Name*: </div>
+ <div class="formLabel-controlCell tableContainer-valueCell">
+ <div id="addBinding.selectQueueDiv"></div>
+ </div>
+ </div>
+ <div class="clear">
+ <div class="formLabel-labelCell tableContainer-labelCell">Binding Key*:</div>
+ <div class="formLabel-controlCell tableContainer-valueCell">
+ <input type="text" id="formAddbinding.bindingKey"
+ data-dojo-type="dijit/form/ValidationTextBox"
+ data-dojo-props="
+ name: 'name',
+ placeHolder: 'Binding Key',
+ missingMessage: 'A binding key must be supplied',
+ title: 'Enter binding key'" />
+ </div>
+ </div>
+
+ <div class="clear formBox">
+ <fieldset>
+ <legend>Binding Arguments</legend>
+ <div class="editNoteBanner">NOTE: Only arguments with name and value will be submitted. To edit, please, click on a grid cell.</div>
+ <div id="formAddbinding.bindingArguments"></div>
+ <div>
+ <button data-dojo-type="dijit/form/Button" id="formAddbinding.addArgumentButton" type="button">Add</button>
+ <button data-dojo-type="dijit/form/Button" id="formAddbinding.deleteArgumentButton" type="button">Delete</button>
+ </div>
+ </fieldset>
+ </div>
+
<div class="dijitDialogPaneActionBar">
- <!-- submit buttons -->
- <input type="submit" value="Create Binding" label="Create Binding" dojoType="dijit.form.Button" />
+ <input type="submit" value="Create Binding" label="Create Binding" data-dojo-type="dijit/form/Button" />
</div>
</form>
</div>
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addBinding.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addBinding.js
index 7bdd72525c..deda3f35d5 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addBinding.js
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addBinding.js
@@ -23,6 +23,8 @@ define(["dojo/_base/xhr",
"dojo/_base/array",
"dojo/_base/event",
'dojo/_base/json',
+ "dojo/_base/lang",
+ "dojo/_base/declare",
"dojo/store/Memory",
"dijit/form/FilteringSelect",
"dijit/form/NumberSpinner", // required by the form
@@ -38,8 +40,77 @@ define(["dojo/_base/xhr",
"dijit/form/DateTextBox",
/* basic dojox classes */
"dojox/form/BusyButton", "dojox/form/CheckedMultiSelect",
+ "dojox/grid/EnhancedGrid",
+ "dojo/data/ObjectStore",
"dojo/domReady!"],
- function (xhr, dom, construct, win, registry, parser, array, event, json, Memory, FilteringSelect) {
+ function (xhr, dom, construct, win, registry, parser, array, event, json, lang, declare, Memory, FilteringSelect) {
+
+ var noLocalValues = new Memory({
+ data: [
+ {name:"", id:null},
+ {name:"true", id:true},
+ {name:"false", id:false}
+ ]
+ });
+
+ var xMatchValues = new Memory({
+ data: [
+ {name:"all", id:"all"},
+ {name:"any", id:"any"}
+ ]
+ });
+
+ var defaultBindingArguments = [
+ {id: 0, name:"x-filter-jms-selector", value: null},
+ {id: 1, name:"x-qpid-no-local", value: null}
+ ];
+
+ var GridWidgetProxy = declare("qpid.dojox.grid.cells.GridWidgetProxy", dojox.grid.cells._Widget, {
+ createWidget: function(inNode, inDatum, inRowIndex)
+ {
+ var WidgetClass = this.widgetClass;
+ var widgetProperties = this.getWidgetProps(inDatum);
+ var getWidgetProperties = widgetProperties.getWidgetProperties;
+ if (typeof getWidgetProperties == "function")
+ {
+ var item = this.grid.getItem(inRowIndex);
+ if (item)
+ {
+ var additionalWidgetProperties = getWidgetProperties(inDatum, inRowIndex, item);
+ if (additionalWidgetProperties)
+ {
+ WidgetClass = additionalWidgetProperties.widgetClass;
+ for(var prop in additionalWidgetProperties)
+ {
+ if(additionalWidgetProperties.hasOwnProperty(prop) && !widgetProperties[prop])
+ {
+ widgetProperties[prop] = additionalWidgetProperties[ prop ];
+ }
+ }
+ }
+ }
+ }
+ var widget = new WidgetClass(widgetProperties, inNode);
+ return widget;
+ },
+ getValue: function(inRowIndex)
+ {
+ if (this.widget)
+ {
+ return this.widget.get('value');
+ }
+ return null;
+ },
+ _finish: function(inRowIndex)
+ {
+ if (this.widget)
+ {
+ this.inherited(arguments);
+ this.widget.destroyRecursive();
+ this.widget = null;
+ }
+ }
+ });
var addBinding = {};
@@ -73,6 +144,28 @@ define(["dojo/_base/xhr",
if(addBinding.exchange) {
newBinding.exchange = addBinding.exchange;
}
+
+ addBinding.bindingArgumentsGrid.store.fetch({
+ onComplete:function(items,request)
+ {
+ if(items.length)
+ {
+ array.forEach(items, function(item)
+ {
+ if (item && item.name && item.value)
+ {
+ var bindingArguments = newBinding.arguments;
+ if (!bindingArguments)
+ {
+ bindingArguments = {};
+ newBinding.arguments = bindingArguments;
+ }
+ bindingArguments[item.name]=item.value;
+ }
+ });
+ }
+ }
+ });
return newBinding;
};
@@ -106,6 +199,98 @@ define(["dojo/_base/xhr",
});
+ var argumentsGridNode = dom.byId("formAddbinding.bindingArguments");
+ var objectStore = new dojo.data.ObjectStore({objectStore: new Memory({data:lang.clone(defaultBindingArguments), idProperty: "id"})});
+
+ var layout = [[
+ { name: "Argument Name", field: "name", width: "50%", editable: true },
+ { name: 'Argument Value', field: 'value', width: '50%', editable: true, type: GridWidgetProxy,
+ widgetProps: {
+ getWidgetProperties: function(inDatum, inRowIndex, item)
+ {
+ if (item.name == "x-qpid-no-local")
+ {
+ return {
+ labelAttr: "name",
+ searchAttr: "id",
+ selectOnClick: false,
+ query: { id: "*"},
+ required: false,
+ store: noLocalValues,
+ widgetClass: dijit.form.FilteringSelect
+ };
+ }
+ else if (item.name && item.name.toLowerCase() == "x-match")
+ {
+ return {
+ labelAttr: "name",
+ searchAttr: "id",
+ selectOnClick: false,
+ query: { id: "*"},
+ required: false,
+ store: xMatchValues,
+ widgetClass: dijit.form.FilteringSelect
+ };
+ }
+ return {widgetClass: dijit.form.TextBox };
+ }
+ }
+ }
+ ]];
+
+ var grid = new dojox.grid.EnhancedGrid({
+ selectionMode: "multiple",
+ store: objectStore,
+ singleClickEdit: true,
+ structure: layout,
+ height: "150px",
+ plugins: {indirectSelection: true}
+ }, argumentsGridNode);
+ grid.startup();
+
+ addBinding.bindingArgumentsGrid = grid;
+ addBinding.idGenerator = 1;
+ var addArgumentButton = registry.byId("formAddbinding.addArgumentButton");
+ var deleteArgumentButton = registry.byId("formAddbinding.deleteArgumentButton");
+
+ addArgumentButton.on("click",
+ function(event)
+ {
+ addBinding.idGenerator = addBinding.idGenerator + 1;
+ var newItem = {id:addBinding.idGenerator, name: "", value: ""};
+ grid.store.newItem(newItem);
+ grid.store.save();
+ grid.store.fetch(
+ {
+ onComplete:function(items,request)
+ {
+ var rowIndex = items.length - 1;
+ window.setTimeout(function()
+ {
+ grid.focus.setFocusIndex(rowIndex, 1 );
+ },10);
+ }
+ });
+ }
+ );
+
+ deleteArgumentButton.on("click",
+ function(event)
+ {
+ var data = grid.selection.getSelected();
+ if(data.length)
+ {
+ array.forEach(data, function(selectedItem) {
+ if (selectedItem !== null)
+ {
+ grid.store.deleteItem(selectedItem);
+ }
+ });
+ grid.store.save();
+ }
+ }
+ );
+
theForm.on("submit", function(e) {
event.stop(e);
@@ -154,7 +339,24 @@ define(["dojo/_base/xhr",
addBinding.exchange = obj.exchange;
registry.byId("formAddBinding").reset();
-
+ var grid = addBinding.bindingArgumentsGrid;
+ grid.store.fetch({
+ onComplete:function(items,request)
+ {
+ if(items.length)
+ {
+ array.forEach(items, function(item)
+ {
+ if (item !== null)
+ {
+ grid.store.deleteItem(item);
+ }
+ });
+ }
+ }
+ });
+ array.forEach(lang.clone(defaultBindingArguments), function(item) {grid.store.newItem(item); });
+ grid.store.save();
xhr.get({url: "api/latest/queue/" + encodeURIComponent(obj.virtualhostnode) + "/" + encodeURIComponent(obj.virtualhost) + "?depth=0",
handleAs: "json"}).then(
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
index ca092fe6f8..5f5d6e7efe 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
@@ -59,6 +59,7 @@ import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.NotificationCheck;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntryVisitor;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener
@@ -519,7 +520,8 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN
final long messageId = message.getMessageNumber();
if ((messageId >= fromMessageId)
- && (messageId <= toMessageId))
+ && (messageId <= toMessageId)
+ && !(message.isReferenced((TransactionLogResource)destinationQueue)))
{
txn.move(entry, destinationQueue);
}
@@ -571,8 +573,8 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN
}
VirtualHost<?,?,?> vhost = _queue.getParent(VirtualHost.class);
- final Queue<?> queue = vhost.getChildByName(Queue.class, toQueue);
- if (queue == null)
+ final Queue<?> destinationQueue = vhost.getChildByName(Queue.class, toQueue);
+ if (destinationQueue == null)
{
throw new OperationsException("No such queue \""+ toQueue +"\"");
}
@@ -591,9 +593,10 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN
final long messageId = message.getMessageNumber();
if ((messageId >= fromMessageId)
- && (messageId <= toMessageId))
+ && (messageId <= toMessageId)
+ && !(message.isReferenced((TransactionLogResource)destinationQueue)))
{
- txn.copy(entry, queue);
+ txn.copy(entry, destinationQueue);
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 08f05cc8d6..681082526c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -20,8 +20,16 @@
*/
package org.apache.qpid.client.protocol;
-import org.apache.qpid.client.HeartbeatListener;
-import org.apache.qpid.util.BytesDataOutput;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,6 +39,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.HeartbeatListener;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverHandler;
import org.apache.qpid.client.failover.FailoverState;
@@ -59,16 +68,7 @@ import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
-
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+import org.apache.qpid.util.BytesDataOutput;
/**
* AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the
@@ -177,6 +177,7 @@ public class AMQProtocolHandler implements ProtocolEngine
private long _lastReadTime = System.currentTimeMillis();
private long _lastWriteTime = System.currentTimeMillis();
private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
+ private Throwable _initialConnectionException;
/**
* Creates a new protocol handler, associated with the specified client connection instance.
@@ -214,6 +215,8 @@ public class AMQProtocolHandler implements ProtocolEngine
// in order to execute AMQConnection#exceptionRecievedout out of synchronization block,
// otherwise it might deadlock with failover mutex
boolean failoverNotAllowed = false;
+ boolean failedWithoutConnecting = false;
+ Throwable initialConnectionException = null;
synchronized (this)
{
if (_logger.isDebugEnabled())
@@ -251,8 +254,11 @@ public class AMQProtocolHandler implements ProtocolEngine
}
else
{
+ failedWithoutConnecting = true;
+ initialConnectionException = _initialConnectionException;
_logger.debug("We are in process of establishing the initial connection");
}
+ _initialConnectionException = null;
}
else
{
@@ -265,6 +271,16 @@ public class AMQProtocolHandler implements ProtocolEngine
_connection.exceptionReceived(new AMQDisconnectedException(
"Server closed connection and reconnection not permitted.", _stateManager.getLastException()));
}
+ else if(failedWithoutConnecting)
+ {
+ if(initialConnectionException == null)
+ {
+ initialConnectionException = _stateManager.getLastException();
+ }
+ String message = initialConnectionException == null ? "" : initialConnectionException.getMessage();
+ _connection.exceptionReceived(new AMQDisconnectedException(
+ "Connection could not be established: " + message, initialConnectionException));
+ }
}
if (_logger.isDebugEnabled())
@@ -338,6 +354,7 @@ public class AMQProtocolHandler implements ProtocolEngine
if (causeIsAConnectionProblem)
{
_logger.info("Connection exception caught therefore going to attempt failover: " + cause, cause);
+ _initialConnectionException = cause;
}
else
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
index 13a16d07b5..1bbf166d82 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
@@ -20,17 +20,18 @@
*/
package org.apache.qpid.transport.network.security.ssl;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.security.SSLStatus;
-import org.apache.qpid.transport.util.Logger;
+import java.nio.ByteBuffer;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLEngineResult.Status;
import javax.net.ssl.SSLException;
-import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.security.SSLStatus;
+import org.apache.qpid.transport.util.Logger;
public class SSLReceiver implements Receiver<ByteBuffer>
{
@@ -192,7 +193,7 @@ public class SSLReceiver implements Receiver<ByteBuffer>
{
_sslStatus.getSslLock().notifyAll();
}
- exception(new TransportException("Error in SSLReceiver",e));
+ exception(new TransportException("Error in SSLReceiver: " + e.getMessage(),e));
}
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
index fedb88d008..e606df3f7d 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
@@ -21,15 +21,8 @@
package org.apache.qpid.server.queue;
-import org.junit.Assert;
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import javax.jms.Connection;
import javax.jms.JMSException;
@@ -39,8 +32,17 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TopicSubscriber;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
+import javax.naming.NamingException;
+
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
public class TimeToLiveTest extends QpidBrokerTestCase
{
@@ -53,18 +55,29 @@ public class TimeToLiveTest extends QpidBrokerTestCase
private static final int MSG_COUNT = 50;
private static final long SERVER_TTL_TIMEOUT = 60000L;
+ public void testPassiveTTLWithPrefetch() throws Exception
+ {
+ doTestPassiveTTL(true);
+ }
+
public void testPassiveTTL() throws Exception
{
+ doTestPassiveTTL(false);
+
+ }
+
+ private void doTestPassiveTTL(boolean prefetchMessages) throws JMSException, NamingException
+ {
//Create Client 1
Connection clientConnection = getConnection();
-
+
Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = clientSession.createQueue(QUEUE);
-
+ Queue queue = clientSession.createQueue(QUEUE);
+
// Create then close the consumer so the queue is actually created
// Closing it then reopening it ensures that the consumer shouldn't get messages
// which should have expired and allows a shorter sleep period. See QPID-1418
-
+
MessageConsumer consumer = clientSession.createConsumer(queue);
consumer.close();
@@ -79,6 +92,12 @@ public class TimeToLiveTest extends QpidBrokerTestCase
MessageProducer producer = producerSession.createProducer(queue);
+ consumer = clientSession.createConsumer(queue);
+ if(prefetchMessages)
+ {
+ clientConnection.start();
+ }
+
//Set TTL
int msg = 0;
producer.send(nextMessage(String.valueOf(msg), true, producerSession, producer));
@@ -96,7 +115,6 @@ public class TimeToLiveTest extends QpidBrokerTestCase
producerSession.commit();
- consumer = clientSession.createConsumer(queue);
// Ensure we sleep the required amount of time.
ReentrantLock waitLock = new ReentrantLock();
@@ -124,6 +142,16 @@ public class TimeToLiveTest extends QpidBrokerTestCase
}
+ if(prefetchMessages)
+ {
+ clientConnection.close();
+ clientConnection = getConnection();
+
+ clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ queue = clientSession.createQueue(QUEUE);
+ consumer = clientSession.createConsumer(queue);
+ }
+
clientConnection.start();
//Receive Message 0
@@ -131,14 +159,14 @@ public class TimeToLiveTest extends QpidBrokerTestCase
Message receivedFirst = consumer.receive(5000);
Message receivedSecond = consumer.receive(5000);
Message receivedThird = consumer.receive(1000);
-
+
// Log the messages to help diagnosis incase of failure
_logger.info("First:"+receivedFirst);
_logger.info("Second:"+receivedSecond);
_logger.info("Third:"+receivedThird);
// Only first and last messages sent should survive expiry
- Assert.assertNull("More messages received", receivedThird);
+ Assert.assertNull("More messages received", receivedThird);
Assert.assertNotNull("First message not received", receivedFirst);
Assert.assertTrue("First message doesn't have first set.", receivedFirst.getBooleanProperty("first"));
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
index a6a08d83f9..d0f133aa73 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
@@ -456,6 +456,61 @@ public class QueueManagementTest extends QpidBrokerTestCase
assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8);
}
+
+ /**
+ * Tests {@link ManagedQueue#copyMessages(long, long, String)} interface.
+ */
+ public void testCopyMessagesBetweenQueuesWithDuplicates() throws Exception
+ {
+ final int numberOfMessagesToSend = 10;
+ sendMessage(_session, _sourceQueue, numberOfMessagesToSend);
+ syncSession(_session);
+ assertEquals("Unexpected queue depth after send",
+ numberOfMessagesToSend,
+ _managedSourceQueue.getMessageCount().intValue());
+
+ List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend);
+
+ // Copy first three messages to destination
+ long fromMessageId = amqMessagesIds.get(0);
+ long toMessageId = amqMessagesIds.get(2);
+ _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName);
+
+ assertEquals("Unexpected queue depth on destination queue after first copy",
+ 3,
+ _managedDestinationQueue.getMessageCount().intValue());
+ assertEquals("Unexpected queue depth on source queue after first copy",
+ numberOfMessagesToSend,
+ _managedSourceQueue.getMessageCount().intValue());
+
+ // Now copy a further two messages to destination
+ fromMessageId = amqMessagesIds.get(7);
+ toMessageId = amqMessagesIds.get(8);
+ _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName);
+ assertEquals("Unexpected queue depth on destination queue after second copy",
+ 5,
+ _managedDestinationQueue.getMessageCount().intValue());
+ assertEquals("Unexpected queue depth on source queue after second copy",
+ numberOfMessagesToSend,
+ _managedSourceQueue.getMessageCount().intValue());
+
+ // Attempt to copy mixture of messages already on and some not already on the queue
+
+ fromMessageId = amqMessagesIds.get(5);
+ toMessageId = amqMessagesIds.get(8);
+ _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName);
+ assertEquals("Unexpected queue depth on destination queue after second copy",
+ 7,
+ _managedDestinationQueue.getMessageCount().intValue());
+ assertEquals("Unexpected queue depth on source queue after second copy",
+ numberOfMessagesToSend,
+ _managedSourceQueue.getMessageCount().intValue());
+
+ assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8, 5, 6);
+
+
+ }
+
public void testMoveMessagesBetweenQueuesWithActiveConsumerOnSourceQueue() throws Exception
{
setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString());