summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java25
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java25
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java17
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java24
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java69
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java23
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java15
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java56
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java18
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java21
20 files changed, 218 insertions, 127 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 26ac562fb2..5fde08cbdd 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -491,7 +491,7 @@ public class AMQChannel
if (!unacked.isQueueDeleted())
{
// Mark message redelivered
- unacked.getMessage().setRedelivered(true);
+ unacked.setRedelivered(true);
// Ensure message is released for redelivery
unacked.release();
@@ -522,7 +522,7 @@ public class AMQChannel
if (unacked != null)
{
// Mark message redelivered
- unacked.getMessage().setRedelivered(true);
+ unacked.setRedelivered(true);
// Ensure message is released for redelivery
if (!unacked.isQueueDeleted())
@@ -611,13 +611,10 @@ public class AMQChannel
for (Map.Entry<Long, QueueEntry> entry : msgToResend.entrySet())
{
- QueueEntry message = entry.getValue();
+ QueueEntry queueEntry = entry.getValue();
long deliveryTag = entry.getKey();
-
-
- AMQMessage msg = message.getMessage();
- AMQQueue queue = message.getQueue();
+ AMQQueue queue = queueEntry.getQueue();
// Our Java Client will always suspend the channel when resending!
// If the client has requested the messages be resent then it is
@@ -635,16 +632,16 @@ public class AMQChannel
// Without any details from the client about what has been processed we have to mark
// all messages in the unacked map as redelivered.
- msg.setRedelivered(true);
+ queueEntry.setRedelivered(true);
- Subscription sub = message.getDeliveredSubscription();
+ Subscription sub = queueEntry.getDeliveredSubscription();
if (sub != null)
{
- if(!queue.resend(message, sub))
+ if(!queue.resend(queueEntry, sub))
{
- msgToRequeue.put(deliveryTag, message);
+ msgToRequeue.put(deliveryTag, queueEntry);
}
}
else
@@ -652,11 +649,11 @@ public class AMQChannel
if (_log.isInfoEnabled())
{
- _log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString()
+ _log.info("DeliveredSubscription not recorded so just requeueing(" + queueEntry.toString()
+ ")to prevent loss");
}
// move this message to requeue
- msgToRequeue.put(deliveryTag, message);
+ msgToRequeue.put(deliveryTag, queueEntry);
}
} // for all messages
// } else !isSuspend
@@ -888,7 +885,7 @@ public class AMQChannel
public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
throws AMQException
{
- getProtocolSession().getProtocolOutputConverter().writeDeliver(entry.getMessage(), getChannelId(), deliveryTag, sub.getConsumerTag());
+ getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(), deliveryTag, sub.getConsumerTag());
}
};
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
index 29494c4118..097ac27399 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
@@ -54,22 +54,21 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor
_storeContext = storeContext;
}
- public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException
+ public boolean callback(final long deliveryTag, QueueEntry queueEntry) throws AMQException
{
-
- AMQMessage msg = message.getMessage();
- msg.setRedelivered(true);
- final Subscription subscription = message.getDeliveredSubscription();
+
+ queueEntry.setRedelivered(true);
+ final Subscription subscription = queueEntry.getDeliveredSubscription();
if (subscription != null)
{
// Consumer exists
if (!subscription.isClosed())
{
- _msgToResend.put(deliveryTag, message);
+ _msgToResend.put(deliveryTag, queueEntry);
}
else // consumer has gone
{
- _msgToRequeue.put(deliveryTag, message);
+ _msgToRequeue.put(deliveryTag, queueEntry);
}
}
else
@@ -77,22 +76,22 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor
// Message has no consumer tag, so was "delivered" to a GET
// or consumer no longer registered
// cannot resend, so re-queue.
- if (!message.isQueueDeleted())
+ if (!queueEntry.isQueueDeleted())
{
if (_requeueIfUnabletoResend)
{
- _msgToRequeue.put(deliveryTag, message);
+ _msgToRequeue.put(deliveryTag, queueEntry);
}
else
{
- message.discard(_storeContext);
- _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
+ queueEntry.discard(_storeContext);
+ _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + queueEntry);
}
}
else
{
- message.discard(_storeContext);
- _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
+ queueEntry.discard(_storeContext);
+ _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + queueEntry);
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
index b30c70dac3..fa276169bf 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
@@ -71,13 +71,7 @@ public class PropertyExpression<E extends Exception> implements Expression<E>
JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new ExpirationExpression());
- JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new Expression<E>()
- {
- public Object evaluate(Filterable message) throws E
- {
- return message.isRedelivered();
- }
- });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new RedeliveredExpression());
}
private final String name;
@@ -265,4 +259,13 @@ public class PropertyExpression<E extends Exception> implements Expression<E>
}
}
+
+ private static class RedeliveredExpression<E extends Exception> implements Expression<E>
+ {
+ public Object evaluate(Filterable<E> message) throws E
+ {
+ return message.isRedelivered();
+ }
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
index 001b7858ec..0f492a21bb 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
@@ -130,7 +130,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
throws AMQException
{
singleMessageCredit.useCreditForMessage(entry.getMessage());
- session.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(),
+ session.getProtocolOutputConverter().writeGetOk(entry, channel.getChannelId(),
deliveryTag, queue.getMessageCount());
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java
index e01c5aabbf..5438e0a7da 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java
@@ -27,6 +27,7 @@
package org.apache.qpid.server.output;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.AMQDataBlock;
@@ -41,10 +42,10 @@ public interface ProtocolOutputConverter
ProtocolOutputConverter newInstance(AMQProtocolSession session);
}
- void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ void writeDeliver(QueueEntry queueEntry, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException;
- void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException;
+ void writeGetOk(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize) throws AMQException;
byte getProtocolMinorVersion();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
index 2b55d294b5..4949e5b41d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
@@ -29,6 +29,7 @@ package org.apache.qpid.server.output.amqp0_8;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQMessageHandle;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.framing.*;
@@ -69,10 +70,12 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
return _protocolSession;
}
- public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ public void writeDeliver(QueueEntry queueEntry, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- AMQDataBlock deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+ final AMQMessage message = queueEntry.getMessage();
+
+ AMQDataBlock deliver = createEncodedDeliverFrame(queueEntry, channelId, deliveryTag, consumerTag);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
message.getContentHeaderBody());
@@ -120,13 +123,13 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
- public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
+ public void writeGetOk(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize) throws AMQException
{
-
+ final AMQMessage message = queueEntry.getMessage();
final AMQMessageHandle messageHandle = message.getMessageHandle();
final StoreContext storeContext = message.getStoreContext();
- AMQDataBlock deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
+ AMQDataBlock deliver = createEncodedGetOkFrame(queueEntry, channelId, deliveryTag, queueSize);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
@@ -170,9 +173,11 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
- private AMQDataBlock createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ private AMQDataBlock createEncodedDeliverFrame(QueueEntry queueEntry, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
+ final AMQMessage message = queueEntry.getMessage();
+
final MessagePublishInfo pb = message.getMessagePublishInfo();
final AMQMessageHandle messageHandle = message.getMessageHandle();
@@ -180,7 +185,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
BasicDeliverBody deliverBody =
methodRegistry.createBasicDeliverBody(consumerTag,
deliveryTag,
- messageHandle.isRedelivered(),
+ queueEntry.isRedelivered(),
pb.getExchange(),
pb.getRoutingKey());
AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
@@ -189,16 +194,17 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
return deliverFrame;
}
- private AMQDataBlock createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
+ private AMQDataBlock createEncodedGetOkFrame(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize)
throws AMQException
{
+ final AMQMessage message = queueEntry.getMessage();
final MessagePublishInfo pb = message.getMessagePublishInfo();
final AMQMessageHandle messageHandle = message.getMessageHandle();
MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
BasicGetOkBody getOkBody =
methodRegistry.createBasicGetOkBody(deliveryTag,
- messageHandle.isRedelivered(),
+ queueEntry.isRedelivered(),
pb.getExchange(),
pb.getRoutingKey(),
queueSize);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
index 65184fe744..00a15d2d50 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
@@ -1,25 +1,25 @@
package org.apache.qpid.server.output.amqp0_9;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
import org.apache.mina.common.ByteBuffer;
@@ -29,6 +29,7 @@ import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQMessageHandle;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.ContentChunk;
@@ -68,10 +69,12 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
return _protocolSession;
}
- public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ public void writeDeliver(QueueEntry queueEntry, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- AMQBody deliverBody = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+ AMQMessage message = queueEntry.getMessage();
+
+ AMQBody deliverBody = createEncodedDeliverFrame(queueEntry, channelId, deliveryTag, consumerTag);
final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
@@ -126,13 +129,14 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
- public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
+ public void writeGetOk(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize) throws AMQException
{
+ final AMQMessage message = queueEntry.getMessage();
final AMQMessageHandle messageHandle = message.getMessageHandle();
final StoreContext storeContext = message.getStoreContext();
- AMQFrame deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
+ AMQFrame deliver = createEncodedGetOkFrame(queueEntry, channelId, deliveryTag, queueSize);
AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
@@ -175,14 +179,14 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
- private AMQBody createEncodedDeliverFrame(AMQMessage message, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
+ private AMQBody createEncodedDeliverFrame(QueueEntry queueEntry, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
throws AMQException
{
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
+ AMQMessage message= queueEntry.getMessage();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
- final boolean isRedelivered = messageHandle.isRedelivered();
+ final boolean isRedelivered = queueEntry.isRedelivered();
final AMQShortString exchangeName = pb.getExchange();
final AMQShortString routingKey = pb.getRoutingKey();
@@ -237,16 +241,15 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
return returnBlock;
}
- private AMQFrame createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
+ private AMQFrame createEncodedGetOkFrame(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize)
throws AMQException
{
+ final AMQMessage message = queueEntry.getMessage();
final MessagePublishInfo pb = message.getMessagePublishInfo();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
-
BasicGetOkBody getOkBody =
METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
- messageHandle.isRedelivered(),
+ queueEntry.isRedelivered(),
pb.getExchange(),
pb.getRoutingKey(),
queueSize);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index a485649410..d73d37f48d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -41,7 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* A deliverable message.
*/
-public class AMQMessage implements Filterable<AMQException>
+public class AMQMessage
{
/** Used for debugging purposes. */
private static final Logger _log = Logger.getLogger(AMQMessage.class);
@@ -396,16 +396,6 @@ public class AMQMessage implements Filterable<AMQException>
return _messageHandle.getMessagePublishInfo(getStoreContext());
}
- public boolean isRedelivered()
- {
- return _messageHandle.isRedelivered();
- }
-
- public void setRedelivered(boolean redelivered)
- {
- _messageHandle.setRedelivered(redelivered);
- }
-
public long getArrivalTime()
{
return _messageHandle.getArrivalTime();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
index 0ddd4e4d92..93ac21fc7c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
@@ -63,10 +63,6 @@ public interface AMQMessageHandle
MessagePublishInfo getMessagePublishInfo(StoreContext context) throws AMQException;
- boolean isRedelivered();
-
- void setRedelivered(boolean redelivered);
-
boolean isPersistent();
void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo,
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index 2ed6be77c6..6f478dffd7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -397,11 +397,13 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
// Create the tabular list of message header contents
for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++)
{
- AMQMessage msg = list.get(i - 1).getMessage();
+ QueueEntry queueEntry = list.get(i - 1);
+ AMQMessage msg = queueEntry.getMessage();
ContentHeaderBody headerBody = msg.getContentHeaderBody();
// Create header attributes list
String[] headerAttributes = getMessageHeaderProperties(headerBody);
- Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered() };
+ Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize,
+ queueEntry.isRedelivered() };
CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
_messageList.put(messageData);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
index 35ad5be4e0..2a7c90a81e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
@@ -109,17 +109,6 @@ public class InMemoryMessageHandle implements AMQMessageHandle
return _messagePublishInfo;
}
- public boolean isRedelivered()
- {
- return _redelivered;
- }
-
-
- public void setRedelivered(boolean redelivered)
- {
- _redelivered = redelivered;
- }
-
public boolean isPersistent()
{
return false;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java
index 0b214ca336..7573a629c1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java
@@ -42,5 +42,8 @@ public class MessageHandleFactory
{
return new InMemoryMessageHandle(messageId);
}
+
+// return new AMQMessage(messageId, store, persistent);
}
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 2657c459a9..0df976a620 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -24,11 +24,8 @@ import org.apache.qpid.server.subscription.Subscription;
* under the License.
*
*/
-public interface QueueEntry extends Comparable<QueueEntry>
+public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQException>
{
-
-
-
public static enum State
{
AVAILABLE,
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index dbad5438dc..fe9686e906 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.log4j.Logger;
@@ -44,6 +45,7 @@ public class QueueEntryImpl implements QueueEntry
private AMQMessage _message;
+ private boolean _redelivered;
private Set<Subscription> _rejectedBy = null;
@@ -186,9 +188,26 @@ public class QueueEntryImpl implements QueueEntry
return _message.immediateAndNotDelivered();
}
- public void setRedelivered(boolean b)
+ public ContentHeaderBody getContentHeaderBody() throws AMQException
{
- getMessage().setRedelivered(b);
+ return _message.getContentHeaderBody();
+ }
+
+ public boolean isPersistent() throws AMQException
+ {
+ return _message.isPersistent();
+ }
+
+ public boolean isRedelivered()
+ {
+ return _redelivered;
+ }
+
+ public void setRedelivered(boolean redelivered)
+ {
+ _redelivered = redelivered;
+ // todo - here we could mark this message as redelivered so we don't have to mark
+ // all messages on recover as redelivered.
}
public Subscription getDeliveredSubscription()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index a616c2ea35..be11eb7b84 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -434,7 +434,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
private boolean checkFilters(QueueEntry msg)
{
- return (_filters == null) || _filters.allAllow(msg.getMessage());
+ return (_filters == null) || _filters.allAllow(msg);
}
public boolean isAutoClose()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
index 2fa017fc64..b5a91c8da6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
@@ -358,7 +358,7 @@ public class Show extends AbstractCommand
ispersitent.add("n/a");
}
- isredelivered.add(msg.isRedelivered() ? "true" : "false");
+ isredelivered.add(entry.isRedelivered() ? "true" : "false");
isdelivered.add(msg.getDeliveredToConsumer() ? "true" : "false");
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 6dcb187a37..12fa4ef952 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -405,6 +405,21 @@ public class AbstractHeadersExchangeTestBase extends TestCase
{
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
+
+ public ContentHeaderBody getContentHeaderBody() throws AMQException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isPersistent() throws AMQException
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isRedelivered()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
};
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java
new file mode 100644
index 0000000000..9344efd4a8
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.filter;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.MockQueueEntry;
+
+public class PropertyExpressionTest extends TestCase
+{
+
+ public void testJMSRedelivered()
+ {
+ PropertyExpression<AMQException> pe = new PropertyExpression<AMQException>("JMSRedelivered");
+
+ MockQueueEntry queueEntry = new MockQueueEntry();
+
+ try
+ {
+ assertEquals("MockQueueEntry.redelivered should initialy be false", Boolean.FALSE, pe.evaluate(queueEntry));
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+
+ queueEntry.setRedelivered(true);
+
+ try
+ {
+ assertEquals("MockQueueEntry.redelivered not updated", Boolean.TRUE, pe.evaluate(queueEntry));
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
index da35ddc594..08f6fae230 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
@@ -25,8 +25,8 @@ import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.AMQChannel;
import java.util.ArrayList;
import java.util.HashMap;
@@ -99,7 +99,7 @@ public class InternalTestProtocolSession extends AMQMinaProtocolSession implemen
{
}
- public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException
+ public void writeDeliver(QueueEntry queueEntry, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException
{
_deliveryCount.incrementAndGet();
@@ -121,11 +121,11 @@ public class InternalTestProtocolSession extends AMQMinaProtocolSession implemen
consumers.put(consumerTag, consumerDelivers);
}
- consumerDelivers.add(new DeliveryPair(deliveryTag, message));
+ consumerDelivers.add(new DeliveryPair(deliveryTag, queueEntry));
}
}
- public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
+ public void writeGetOk(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize) throws AMQException
{
}
@@ -147,17 +147,17 @@ public class InternalTestProtocolSession extends AMQMinaProtocolSession implemen
public class DeliveryPair
{
private long _deliveryTag;
- private AMQMessage _message;
+ private QueueEntry _queueEntry;
- public DeliveryPair(long deliveryTag, AMQMessage message)
+ public DeliveryPair(long deliveryTag, QueueEntry queueEntry)
{
_deliveryTag = deliveryTag;
- _message = message;
+ _queueEntry = queueEntry;
}
- public AMQMessage getMessage()
+ public QueueEntry getMessage()
{
- return _message;
+ return _queueEntry;
}
public long getDeliveryTag()
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index 37f91e7464..ed7b2923e7 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
@@ -28,6 +29,7 @@ public class MockQueueEntry implements QueueEntry
{
private AMQMessage _message;
+ private boolean _redelivered;
public boolean acquire()
{
@@ -176,10 +178,9 @@ public class MockQueueEntry implements QueueEntry
}
- public void setRedelivered(boolean b)
+ public void setRedelivered(boolean redelivered)
{
-
-
+ _redelivered = redelivered;
}
@@ -194,4 +195,18 @@ public class MockQueueEntry implements QueueEntry
_message = msg;
}
+ public ContentHeaderBody getContentHeaderBody() throws AMQException
+ {
+ return _message.getContentHeaderBody();
+ }
+
+ public boolean isPersistent() throws AMQException
+ {
+ return _message.isPersistent();
+ }
+
+ public boolean isRedelivered()
+ {
+ return _redelivered;
+ }
}