diff options
20 files changed, 218 insertions, 127 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 26ac562fb2..5fde08cbdd 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -491,7 +491,7 @@ public class AMQChannel if (!unacked.isQueueDeleted()) { // Mark message redelivered - unacked.getMessage().setRedelivered(true); + unacked.setRedelivered(true); // Ensure message is released for redelivery unacked.release(); @@ -522,7 +522,7 @@ public class AMQChannel if (unacked != null) { // Mark message redelivered - unacked.getMessage().setRedelivered(true); + unacked.setRedelivered(true); // Ensure message is released for redelivery if (!unacked.isQueueDeleted()) @@ -611,13 +611,10 @@ public class AMQChannel for (Map.Entry<Long, QueueEntry> entry : msgToResend.entrySet()) { - QueueEntry message = entry.getValue(); + QueueEntry queueEntry = entry.getValue(); long deliveryTag = entry.getKey(); - - - AMQMessage msg = message.getMessage(); - AMQQueue queue = message.getQueue(); + AMQQueue queue = queueEntry.getQueue(); // Our Java Client will always suspend the channel when resending! // If the client has requested the messages be resent then it is @@ -635,16 +632,16 @@ public class AMQChannel // Without any details from the client about what has been processed we have to mark // all messages in the unacked map as redelivered. - msg.setRedelivered(true); + queueEntry.setRedelivered(true); - Subscription sub = message.getDeliveredSubscription(); + Subscription sub = queueEntry.getDeliveredSubscription(); if (sub != null) { - if(!queue.resend(message, sub)) + if(!queue.resend(queueEntry, sub)) { - msgToRequeue.put(deliveryTag, message); + msgToRequeue.put(deliveryTag, queueEntry); } } else @@ -652,11 +649,11 @@ public class AMQChannel if (_log.isInfoEnabled()) { - _log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString() + _log.info("DeliveredSubscription not recorded so just requeueing(" + queueEntry.toString() + ")to prevent loss"); } // move this message to requeue - msgToRequeue.put(deliveryTag, message); + msgToRequeue.put(deliveryTag, queueEntry); } } // for all messages // } else !isSuspend @@ -888,7 +885,7 @@ public class AMQChannel public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag) throws AMQException { - getProtocolSession().getProtocolOutputConverter().writeDeliver(entry.getMessage(), getChannelId(), deliveryTag, sub.getConsumerTag()); + getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(), deliveryTag, sub.getConsumerTag()); } }; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java index 29494c4118..097ac27399 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java @@ -54,22 +54,21 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor _storeContext = storeContext; } - public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException + public boolean callback(final long deliveryTag, QueueEntry queueEntry) throws AMQException { - - AMQMessage msg = message.getMessage(); - msg.setRedelivered(true); - final Subscription subscription = message.getDeliveredSubscription(); + + queueEntry.setRedelivered(true); + final Subscription subscription = queueEntry.getDeliveredSubscription(); if (subscription != null) { // Consumer exists if (!subscription.isClosed()) { - _msgToResend.put(deliveryTag, message); + _msgToResend.put(deliveryTag, queueEntry); } else // consumer has gone { - _msgToRequeue.put(deliveryTag, message); + _msgToRequeue.put(deliveryTag, queueEntry); } } else @@ -77,22 +76,22 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor // Message has no consumer tag, so was "delivered" to a GET // or consumer no longer registered // cannot resend, so re-queue. - if (!message.isQueueDeleted()) + if (!queueEntry.isQueueDeleted()) { if (_requeueIfUnabletoResend) { - _msgToRequeue.put(deliveryTag, message); + _msgToRequeue.put(deliveryTag, queueEntry); } else { - message.discard(_storeContext); - _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message); + queueEntry.discard(_storeContext); + _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + queueEntry); } } else { - message.discard(_storeContext); - _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + message); + queueEntry.discard(_storeContext); + _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + queueEntry); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java index b30c70dac3..fa276169bf 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java @@ -71,13 +71,7 @@ public class PropertyExpression<E extends Exception> implements Expression<E> JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new ExpirationExpression()); - JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new Expression<E>() - { - public Object evaluate(Filterable message) throws E - { - return message.isRedelivered(); - } - }); + JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new RedeliveredExpression()); } private final String name; @@ -265,4 +259,13 @@ public class PropertyExpression<E extends Exception> implements Expression<E> } } + + private static class RedeliveredExpression<E extends Exception> implements Expression<E> + { + public Object evaluate(Filterable<E> message) throws E + { + return message.isRedelivered(); + } + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java index 001b7858ec..0f492a21bb 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java @@ -130,7 +130,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB throws AMQException
{
singleMessageCredit.useCreditForMessage(entry.getMessage());
- session.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(),
+ session.getProtocolOutputConverter().writeGetOk(entry, channel.getChannelId(),
deliveryTag, queue.getMessageCount());
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java index e01c5aabbf..5438e0a7da 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java @@ -27,6 +27,7 @@ package org.apache.qpid.server.output;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.AMQDataBlock;
@@ -41,10 +42,10 @@ public interface ProtocolOutputConverter ProtocolOutputConverter newInstance(AMQProtocolSession session);
}
- void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ void writeDeliver(QueueEntry queueEntry, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException;
- void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException;
+ void writeGetOk(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize) throws AMQException;
byte getProtocolMinorVersion();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java index 2b55d294b5..4949e5b41d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java @@ -29,6 +29,7 @@ package org.apache.qpid.server.output.amqp0_8; import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQMessageHandle;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.framing.*;
@@ -69,10 +70,12 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return _protocolSession;
}
- public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ public void writeDeliver(QueueEntry queueEntry, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- AMQDataBlock deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+ final AMQMessage message = queueEntry.getMessage();
+
+ AMQDataBlock deliver = createEncodedDeliverFrame(queueEntry, channelId, deliveryTag, consumerTag);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
message.getContentHeaderBody());
@@ -120,13 +123,13 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter }
- public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
+ public void writeGetOk(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize) throws AMQException
{
-
+ final AMQMessage message = queueEntry.getMessage();
final AMQMessageHandle messageHandle = message.getMessageHandle();
final StoreContext storeContext = message.getStoreContext();
- AMQDataBlock deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
+ AMQDataBlock deliver = createEncodedGetOkFrame(queueEntry, channelId, deliveryTag, queueSize);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
@@ -170,9 +173,11 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter }
- private AMQDataBlock createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ private AMQDataBlock createEncodedDeliverFrame(QueueEntry queueEntry, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
+ final AMQMessage message = queueEntry.getMessage();
+
final MessagePublishInfo pb = message.getMessagePublishInfo();
final AMQMessageHandle messageHandle = message.getMessageHandle();
@@ -180,7 +185,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter BasicDeliverBody deliverBody =
methodRegistry.createBasicDeliverBody(consumerTag,
deliveryTag,
- messageHandle.isRedelivered(),
+ queueEntry.isRedelivered(),
pb.getExchange(),
pb.getRoutingKey());
AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
@@ -189,16 +194,17 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return deliverFrame;
}
- private AMQDataBlock createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
+ private AMQDataBlock createEncodedGetOkFrame(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize)
throws AMQException
{
+ final AMQMessage message = queueEntry.getMessage();
final MessagePublishInfo pb = message.getMessagePublishInfo();
final AMQMessageHandle messageHandle = message.getMessageHandle();
MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
BasicGetOkBody getOkBody =
methodRegistry.createBasicGetOkBody(deliveryTag,
- messageHandle.isRedelivered(),
+ queueEntry.isRedelivered(),
pb.getExchange(),
pb.getRoutingKey(),
queueSize);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java index 65184fe744..00a15d2d50 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java @@ -1,25 +1,25 @@ package org.apache.qpid.server.output.amqp0_9;
-/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
import org.apache.mina.common.ByteBuffer;
@@ -29,6 +29,7 @@ import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQMessageHandle;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.ContentChunk;
@@ -68,10 +69,12 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return _protocolSession;
}
- public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ public void writeDeliver(QueueEntry queueEntry, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- AMQBody deliverBody = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+ AMQMessage message = queueEntry.getMessage();
+
+ AMQBody deliverBody = createEncodedDeliverFrame(queueEntry, channelId, deliveryTag, consumerTag);
final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
@@ -126,13 +129,14 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter }
- public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
+ public void writeGetOk(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize) throws AMQException
{
+ final AMQMessage message = queueEntry.getMessage();
final AMQMessageHandle messageHandle = message.getMessageHandle();
final StoreContext storeContext = message.getStoreContext();
- AMQFrame deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
+ AMQFrame deliver = createEncodedGetOkFrame(queueEntry, channelId, deliveryTag, queueSize);
AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
@@ -175,14 +179,14 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter }
- private AMQBody createEncodedDeliverFrame(AMQMessage message, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
+ private AMQBody createEncodedDeliverFrame(QueueEntry queueEntry, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
throws AMQException
{
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
+ AMQMessage message= queueEntry.getMessage();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
- final boolean isRedelivered = messageHandle.isRedelivered();
+ final boolean isRedelivered = queueEntry.isRedelivered();
final AMQShortString exchangeName = pb.getExchange();
final AMQShortString routingKey = pb.getRoutingKey();
@@ -237,16 +241,15 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return returnBlock;
}
- private AMQFrame createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
+ private AMQFrame createEncodedGetOkFrame(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize)
throws AMQException
{
+ final AMQMessage message = queueEntry.getMessage();
final MessagePublishInfo pb = message.getMessagePublishInfo();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
-
BasicGetOkBody getOkBody =
METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
- messageHandle.isRedelivered(),
+ queueEntry.isRedelivered(),
pb.getExchange(),
pb.getRoutingKey(),
queueSize);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index a485649410..d73d37f48d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -41,7 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger; /** * A deliverable message. */ -public class AMQMessage implements Filterable<AMQException> +public class AMQMessage { /** Used for debugging purposes. */ private static final Logger _log = Logger.getLogger(AMQMessage.class); @@ -396,16 +396,6 @@ public class AMQMessage implements Filterable<AMQException> return _messageHandle.getMessagePublishInfo(getStoreContext()); } - public boolean isRedelivered() - { - return _messageHandle.isRedelivered(); - } - - public void setRedelivered(boolean redelivered) - { - _messageHandle.setRedelivered(redelivered); - } - public long getArrivalTime() { return _messageHandle.getArrivalTime(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java index 0ddd4e4d92..93ac21fc7c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java @@ -63,10 +63,6 @@ public interface AMQMessageHandle MessagePublishInfo getMessagePublishInfo(StoreContext context) throws AMQException; - boolean isRedelivered(); - - void setRedelivered(boolean redelivered); - boolean isPersistent(); void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo, diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 2ed6be77c6..6f478dffd7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -397,11 +397,13 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que // Create the tabular list of message header contents for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++) { - AMQMessage msg = list.get(i - 1).getMessage(); + QueueEntry queueEntry = list.get(i - 1); + AMQMessage msg = queueEntry.getMessage(); ContentHeaderBody headerBody = msg.getContentHeaderBody(); // Create header attributes list String[] headerAttributes = getMessageHeaderProperties(headerBody); - Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered() }; + Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, + queueEntry.isRedelivered() }; CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues); _messageList.put(messageData); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java index 35ad5be4e0..2a7c90a81e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java @@ -109,17 +109,6 @@ public class InMemoryMessageHandle implements AMQMessageHandle return _messagePublishInfo; } - public boolean isRedelivered() - { - return _redelivered; - } - - - public void setRedelivered(boolean redelivered) - { - _redelivered = redelivered; - } - public boolean isPersistent() { return false; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java index 0b214ca336..7573a629c1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java @@ -42,5 +42,8 @@ public class MessageHandleFactory { return new InMemoryMessageHandle(messageId); } + +// return new AMQMessage(messageId, store, persistent); } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 2657c459a9..0df976a620 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -24,11 +24,8 @@ import org.apache.qpid.server.subscription.Subscription; * under the License. * */ -public interface QueueEntry extends Comparable<QueueEntry> +public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQException> { - - - public static enum State { AVAILABLE, diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index dbad5438dc..fe9686e906 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.subscription.Subscription; import org.apache.log4j.Logger; @@ -44,6 +45,7 @@ public class QueueEntryImpl implements QueueEntry private AMQMessage _message; + private boolean _redelivered; private Set<Subscription> _rejectedBy = null; @@ -186,9 +188,26 @@ public class QueueEntryImpl implements QueueEntry return _message.immediateAndNotDelivered(); } - public void setRedelivered(boolean b) + public ContentHeaderBody getContentHeaderBody() throws AMQException { - getMessage().setRedelivered(b); + return _message.getContentHeaderBody(); + } + + public boolean isPersistent() throws AMQException + { + return _message.isPersistent(); + } + + public boolean isRedelivered() + { + return _redelivered; + } + + public void setRedelivered(boolean redelivered) + { + _redelivered = redelivered; + // todo - here we could mark this message as redelivered so we don't have to mark + // all messages on recover as redelivered. } public Subscription getDeliveredSubscription() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index a616c2ea35..be11eb7b84 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -434,7 +434,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage private boolean checkFilters(QueueEntry msg) { - return (_filters == null) || _filters.allAllow(msg.getMessage()); + return (_filters == null) || _filters.allAllow(msg); } public boolean isAutoClose() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java index 2fa017fc64..b5a91c8da6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java @@ -358,7 +358,7 @@ public class Show extends AbstractCommand ispersitent.add("n/a"); } - isredelivered.add(msg.isRedelivered() ? "true" : "false"); + isredelivered.add(entry.isRedelivered() ? "true" : "false"); isdelivered.add(msg.getDeliveredToConsumer() ? "true" : "false"); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 6dcb187a37..12fa4ef952 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -405,6 +405,21 @@ public class AbstractHeadersExchangeTestBase extends TestCase { return 0; //To change body of implemented methods use File | Settings | File Templates. } + + public ContentHeaderBody getContentHeaderBody() throws AMQException + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isPersistent() throws AMQException + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isRedelivered() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } }; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java new file mode 100644 index 0000000000..9344efd4a8 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.filter; + +import junit.framework.TestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.queue.MockQueueEntry; + +public class PropertyExpressionTest extends TestCase +{ + + public void testJMSRedelivered() + { + PropertyExpression<AMQException> pe = new PropertyExpression<AMQException>("JMSRedelivered"); + + MockQueueEntry queueEntry = new MockQueueEntry(); + + try + { + assertEquals("MockQueueEntry.redelivered should initialy be false", Boolean.FALSE, pe.evaluate(queueEntry)); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + + queueEntry.setRedelivered(true); + + try + { + assertEquals("MockQueueEntry.redelivered not updated", Boolean.TRUE, pe.evaluate(queueEntry)); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java index da35ddc594..08f6fae230 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java @@ -25,8 +25,8 @@ import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.AMQChannel; import java.util.ArrayList; import java.util.HashMap; @@ -99,7 +99,7 @@ public class InternalTestProtocolSession extends AMQMinaProtocolSession implemen { } - public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException + public void writeDeliver(QueueEntry queueEntry, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException { _deliveryCount.incrementAndGet(); @@ -121,11 +121,11 @@ public class InternalTestProtocolSession extends AMQMinaProtocolSession implemen consumers.put(consumerTag, consumerDelivers); } - consumerDelivers.add(new DeliveryPair(deliveryTag, message)); + consumerDelivers.add(new DeliveryPair(deliveryTag, queueEntry)); } } - public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException + public void writeGetOk(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize) throws AMQException { } @@ -147,17 +147,17 @@ public class InternalTestProtocolSession extends AMQMinaProtocolSession implemen public class DeliveryPair { private long _deliveryTag; - private AMQMessage _message; + private QueueEntry _queueEntry; - public DeliveryPair(long deliveryTag, AMQMessage message) + public DeliveryPair(long deliveryTag, QueueEntry queueEntry) { _deliveryTag = deliveryTag; - _message = message; + _queueEntry = queueEntry; } - public AMQMessage getMessage() + public QueueEntry getMessage() { - return _message; + return _queueEntry; } public long getDeliveryTag() diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index 37f91e7464..ed7b2923e7 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.subscription.Subscription; @@ -28,6 +29,7 @@ public class MockQueueEntry implements QueueEntry { private AMQMessage _message; + private boolean _redelivered; public boolean acquire() { @@ -176,10 +178,9 @@ public class MockQueueEntry implements QueueEntry } - public void setRedelivered(boolean b) + public void setRedelivered(boolean redelivered) { - - + _redelivered = redelivered; } @@ -194,4 +195,18 @@ public class MockQueueEntry implements QueueEntry _message = msg; } + public ContentHeaderBody getContentHeaderBody() throws AMQException + { + return _message.getContentHeaderBody(); + } + + public boolean isPersistent() throws AMQException + { + return _message.isPersistent(); + } + + public boolean isRedelivered() + { + return _redelivered; + } } |