summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-10 11:32:18 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-10 11:32:18 +0000
commit47471d475f9e8109d30c38d885f683a1ae86ce12 (patch)
treecea7b46ced1982225a918c5400084370056b068e
parentda6682c48f6e384c08f1a8d881da0612af3fb69e (diff)
downloadqpid-python-47471d475f9e8109d30c38d885f683a1ae86ce12.tar.gz
merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1566585 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java7
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java11
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java17
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java35
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java3
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java62
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java2
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java2
-rw-r--r--java/test-profiles/python_tests/Java010PythonExcludes3
9 files changed, 104 insertions, 38 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java b/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
index b50424868a..efefec58fd 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
@@ -30,6 +30,7 @@ import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.filter.SelectorParsingException;
import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.filter.selector.TokenMgrError;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
@@ -118,11 +119,11 @@ public class FilterSupport
}
}
- static final class NoLocalFilter implements MessageFilter
+ public static final class NoLocalFilter implements MessageFilter
{
- private final AMQQueue _queue;
+ private final MessageSource _queue;
- public NoLocalFilter(AMQQueue queue)
+ public NoLocalFilter(MessageSource queue)
{
_queue = queue;
}
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 6ad9de22cb..114095bace 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -107,7 +107,11 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
boolean closed = false;
State state = getState();
- getConsumer().getSendLock();
+ final Consumer consumer = getConsumer();
+ if(consumer != null)
+ {
+ consumer.getSendLock();
+ }
try
{
while(!closed && state != State.CLOSED)
@@ -122,7 +126,10 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
}
finally
{
- getConsumer().releaseSendLock();
+ if(consumer != null)
+ {
+ consumer.releaseSendLock();
+ }
}
return closed;
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 162951f9ef..8becdf853b 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -44,7 +44,10 @@ import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
+import org.apache.qpid.server.filter.FilterSupport;
+import org.apache.qpid.server.filter.SimpleFilterManager;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.logging.LogActor;
@@ -512,7 +515,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
* @throws AMQException if something goes wrong
*/
public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks,
- FieldTable filters, boolean exclusive) throws AMQException
+ FieldTable filters, boolean exclusive, boolean noLocal) throws AMQException
{
if (tag == null)
{
@@ -549,6 +552,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
options.add(Consumer.Option.EXCLUSIVE);
}
+
// So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
// We add before we register as the Async Delivery process may AutoClose the subscriber
// so calling _cT2QM.remove before we have done put which was after the register succeeded.
@@ -558,9 +562,18 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
try
{
+ FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(filters));
+ if(noLocal)
+ {
+ if(filterManager == null)
+ {
+ filterManager = new SimpleFilterManager();
+ }
+ filterManager.add(new FilterSupport.NoLocalFilter(source));
+ }
Consumer sub =
source.addConsumer(target,
- FilterManagerFactory.createManager(FieldTable.convertToMap(filters)),
+ filterManager,
AMQMessage.class,
AMQShortString.toString(tag),
options);
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index 47700f812f..0f0e49bf7f 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -31,6 +31,7 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.v0_8.handler.BasicGetMethodHandler;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
@@ -80,6 +81,16 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
}
+ public static ConsumerTarget_0_8 createGetNoAckTarget(final AMQChannel channel,
+ final AMQShortString consumerTag,
+ final FieldTable filters,
+ final FlowCreditManager creditManager,
+ final ClientDeliveryMethod deliveryMethod,
+ final RecordDeliveryMethod recordMethod) throws AMQException
+ {
+ return new GetNoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
+ }
+
static final class BrowserConsumer extends ConsumerTarget_0_8
{
public BrowserConsumer(AMQChannel channel,
@@ -132,10 +143,10 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
}
public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
- AMQShortString consumerTag, FieldTable filters,
- FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod) throws AMQException
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod) throws AMQException
{
return new NoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
}
@@ -223,9 +234,9 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
*/
public static final class GetNoAckConsumer extends NoAckConsumer
{
- public GetNoAckConsumer(AMQChannel channel, AMQProtocolSession protocolSession,
+ public GetNoAckConsumer(AMQChannel channel,
AMQShortString consumerTag, FieldTable filters,
- boolean noLocal, FlowCreditManager creditManager,
+ FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
RecordDeliveryMethod recordMethod)
throws AMQException
@@ -417,7 +428,12 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
boolean closed = false;
State state = getState();
- getConsumer().getSendLock();
+ final Consumer consumer = getConsumer();
+
+ if(consumer != null)
+ {
+ consumer.getSendLock();
+ }
try
{
while(!closed && state != State.CLOSED)
@@ -433,7 +449,10 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
}
finally
{
- getConsumer().releaseSendLock();
+ if(consumer != null)
+ {
+ consumer.releaseSendLock();
+ }
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
index 526bc9b9fe..b28bb5a0ad 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
@@ -125,7 +125,8 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
queue,
!body.getNoAck(),
body.getArguments(),
- body.getExclusive());
+ body.getExclusive(),
+ body.getNoLocal());
if (!body.getNowait())
{
MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
index d4bd486a99..b1d2fa5088 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
@@ -128,24 +128,8 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
- final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod()
- {
-
- @Override
- public void deliverToClient(final Consumer sub, final ServerMessage message, final
- InstanceProperties props, final long deliveryTag)
- throws AMQException
- {
- singleMessageCredit.useCreditForMessage(message.getSize());
- session.getProtocolOutputConverter().writeGetOk(message,
- props,
- channel.getChannelId(),
- deliveryTag,
- queue.getMessageCount());
-
-
- }
- };
+ final GetDeliveryMethod getDeliveryMethod =
+ new GetDeliveryMethod(singleMessageCredit, session, channel, queue);
final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
{
@@ -167,7 +151,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
}
else
{
- target = ConsumerTarget_0_8.createNoAckTarget(channel,
+ target = ConsumerTarget_0_8.createGetNoAckTarget(channel,
AMQShortString.EMPTY_STRING, null,
singleMessageCredit, getDeliveryMethod, getRecordMethod);
}
@@ -175,10 +159,48 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
Consumer sub = queue.addConsumer(target, null, AMQMessage.class, "", options);
sub.flush();
sub.close();
- return(!singleMessageCredit.hasCredit());
+ return(getDeliveryMethod.hasDeliveredMessage());
}
+ private static class GetDeliveryMethod implements ClientDeliveryMethod
+ {
+
+ private final FlowCreditManager _singleMessageCredit;
+ private final AMQProtocolSession _session;
+ private final AMQChannel _channel;
+ private final AMQQueue _queue;
+ private boolean _deliveredMessage;
+
+ public GetDeliveryMethod(final FlowCreditManager singleMessageCredit,
+ final AMQProtocolSession session,
+ final AMQChannel channel, final AMQQueue queue)
+ {
+ _singleMessageCredit = singleMessageCredit;
+ _session = session;
+ _channel = channel;
+ _queue = queue;
+ }
+
+ @Override
+ public void deliverToClient(final Consumer sub, final ServerMessage message,
+ final InstanceProperties props, final long deliveryTag) throws AMQException
+ {
+ _singleMessageCredit.useCreditForMessage(message.getSize());
+ _session.getProtocolOutputConverter().writeGetOk(message,
+ props,
+ _channel.getChannelId(),
+ deliveryTag,
+ _queue.getMessageCount());
+
+ _deliveredMessage = true;
+ }
+
+ public boolean hasDeliveredMessage()
+ {
+ return _deliveredMessage;
+ }
+ }
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
index 8aa25e8eb5..f47525097e 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
@@ -140,7 +140,7 @@ public class AcknowledgeTest extends QpidTestCase
assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size());
//Subscribe to the queue
- AMQShortString subscriber = _channel.consumeFromSource(null, _queue, true, null, true);
+ AMQShortString subscriber = _channel.consumeFromSource(null, _queue, true, null, true, false);
getQueue().deliverAsync();
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
index 0a4bfd13f1..dc687e1075 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
@@ -141,6 +141,6 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase
FieldTable filters = new FieldTable();
filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true);
- return channel.consumeFromSource(null, queue, true, filters, true);
+ return channel.consumeFromSource(null, queue, true, filters, true, false);
}
}
diff --git a/java/test-profiles/python_tests/Java010PythonExcludes b/java/test-profiles/python_tests/Java010PythonExcludes
index de870e5e27..474480bdd0 100644
--- a/java/test-profiles/python_tests/Java010PythonExcludes
+++ b/java/test-profiles/python_tests/Java010PythonExcludes
@@ -50,6 +50,9 @@ qpid_tests.broker_0_10.priority.PriorityTests.test_prioritised_delivery_with_ali
#The broker does not support the autodelete property on exchanges
qpid_tests.broker_0_10.exchange.AutodeleteTests.testAutodelete*
+# QPID-5531 : Changes to the C++ behaviour in having a default timeout for every transaction not implemented in Java Broker
+qpid_tests.broker_0_10.dtx.DtxTests.test_get_timeout
+
###### Behavioural differences between Java & CPP Broker ######
#Tests changed/added in QPID-5280 and QPID-5283