diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-02-10 11:32:18 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-02-10 11:32:18 +0000 |
commit | 47471d475f9e8109d30c38d885f683a1ae86ce12 (patch) | |
tree | cea7b46ced1982225a918c5400084370056b068e | |
parent | da6682c48f6e384c08f1a8d881da0612af3fb69e (diff) | |
download | qpid-python-47471d475f9e8109d30c38d885f683a1ae86ce12.tar.gz |
merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1566585 13f79535-47bb-0310-9956-ffa450edef68
9 files changed, 104 insertions, 38 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java b/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java index b50424868a..efefec58fd 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java @@ -30,6 +30,7 @@ import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.filter.SelectorParsingException; import org.apache.qpid.filter.selector.ParseException; import org.apache.qpid.filter.selector.TokenMgrError; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; @@ -118,11 +119,11 @@ public class FilterSupport } } - static final class NoLocalFilter implements MessageFilter + public static final class NoLocalFilter implements MessageFilter { - private final AMQQueue _queue; + private final MessageSource _queue; - public NoLocalFilter(AMQQueue queue) + public NoLocalFilter(MessageSource queue) { _queue = queue; } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index 6ad9de22cb..114095bace 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -107,7 +107,11 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC boolean closed = false; State state = getState(); - getConsumer().getSendLock(); + final Consumer consumer = getConsumer(); + if(consumer != null) + { + consumer.getSendLock(); + } try { while(!closed && state != State.CLOSED) @@ -122,7 +126,10 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC } finally { - getConsumer().releaseSendLock(); + if(consumer != null) + { + consumer.releaseSendLock(); + } } return closed; diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 162951f9ef..8becdf853b 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -44,7 +44,10 @@ import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; +import org.apache.qpid.server.filter.FilterSupport; +import org.apache.qpid.server.filter.SimpleFilterManager; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.logging.LogActor; @@ -512,7 +515,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F * @throws AMQException if something goes wrong */ public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks, - FieldTable filters, boolean exclusive) throws AMQException + FieldTable filters, boolean exclusive, boolean noLocal) throws AMQException { if (tag == null) { @@ -549,6 +552,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F options.add(Consumer.Option.EXCLUSIVE); } + // So to keep things straight we put before the call and catch all exceptions from the register and tidy up. // We add before we register as the Async Delivery process may AutoClose the subscriber // so calling _cT2QM.remove before we have done put which was after the register succeeded. @@ -558,9 +562,18 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F try { + FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(filters)); + if(noLocal) + { + if(filterManager == null) + { + filterManager = new SimpleFilterManager(); + } + filterManager.add(new FilterSupport.NoLocalFilter(source)); + } Consumer sub = source.addConsumer(target, - FilterManagerFactory.createManager(FieldTable.convertToMap(filters)), + filterManager, AMQMessage.class, AMQShortString.toString(tag), options); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index 47700f812f..0f0e49bf7f 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -31,6 +31,7 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.v0_8.handler.BasicGetMethodHandler; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.consumer.AbstractConsumerTarget; @@ -80,6 +81,16 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod()); } + public static ConsumerTarget_0_8 createGetNoAckTarget(final AMQChannel channel, + final AMQShortString consumerTag, + final FieldTable filters, + final FlowCreditManager creditManager, + final ClientDeliveryMethod deliveryMethod, + final RecordDeliveryMethod recordMethod) throws AMQException + { + return new GetNoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); + } + static final class BrowserConsumer extends ConsumerTarget_0_8 { public BrowserConsumer(AMQChannel channel, @@ -132,10 +143,10 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen } public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel, - AMQShortString consumerTag, FieldTable filters, - FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) throws AMQException + AMQShortString consumerTag, FieldTable filters, + FlowCreditManager creditManager, + ClientDeliveryMethod deliveryMethod, + RecordDeliveryMethod recordMethod) throws AMQException { return new NoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); } @@ -223,9 +234,9 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen */ public static final class GetNoAckConsumer extends NoAckConsumer { - public GetNoAckConsumer(AMQChannel channel, AMQProtocolSession protocolSession, + public GetNoAckConsumer(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, - boolean noLocal, FlowCreditManager creditManager, + FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) throws AMQException @@ -417,7 +428,12 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen boolean closed = false; State state = getState(); - getConsumer().getSendLock(); + final Consumer consumer = getConsumer(); + + if(consumer != null) + { + consumer.getSendLock(); + } try { while(!closed && state != State.CLOSED) @@ -433,7 +449,10 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen } finally { - getConsumer().releaseSendLock(); + if(consumer != null) + { + consumer.releaseSendLock(); + } } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java index 526bc9b9fe..b28bb5a0ad 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java @@ -125,7 +125,8 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic queue, !body.getNoAck(), body.getArguments(), - body.getExclusive()); + body.getExclusive(), + body.getNoLocal()); if (!body.getNowait()) { MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java index d4bd486a99..b1d2fa5088 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java @@ -128,24 +128,8 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L); - final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod() - { - - @Override - public void deliverToClient(final Consumer sub, final ServerMessage message, final - InstanceProperties props, final long deliveryTag) - throws AMQException - { - singleMessageCredit.useCreditForMessage(message.getSize()); - session.getProtocolOutputConverter().writeGetOk(message, - props, - channel.getChannelId(), - deliveryTag, - queue.getMessageCount()); - - - } - }; + final GetDeliveryMethod getDeliveryMethod = + new GetDeliveryMethod(singleMessageCredit, session, channel, queue); final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod() { @@ -167,7 +151,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB } else { - target = ConsumerTarget_0_8.createNoAckTarget(channel, + target = ConsumerTarget_0_8.createGetNoAckTarget(channel, AMQShortString.EMPTY_STRING, null, singleMessageCredit, getDeliveryMethod, getRecordMethod); } @@ -175,10 +159,48 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB Consumer sub = queue.addConsumer(target, null, AMQMessage.class, "", options); sub.flush(); sub.close(); - return(!singleMessageCredit.hasCredit()); + return(getDeliveryMethod.hasDeliveredMessage()); } + private static class GetDeliveryMethod implements ClientDeliveryMethod + { + + private final FlowCreditManager _singleMessageCredit; + private final AMQProtocolSession _session; + private final AMQChannel _channel; + private final AMQQueue _queue; + private boolean _deliveredMessage; + + public GetDeliveryMethod(final FlowCreditManager singleMessageCredit, + final AMQProtocolSession session, + final AMQChannel channel, final AMQQueue queue) + { + _singleMessageCredit = singleMessageCredit; + _session = session; + _channel = channel; + _queue = queue; + } + + @Override + public void deliverToClient(final Consumer sub, final ServerMessage message, + final InstanceProperties props, final long deliveryTag) throws AMQException + { + _singleMessageCredit.useCreditForMessage(message.getSize()); + _session.getProtocolOutputConverter().writeGetOk(message, + props, + _channel.getChannelId(), + deliveryTag, + _queue.getMessageCount()); + + _deliveredMessage = true; + } + + public boolean hasDeliveredMessage() + { + return _deliveredMessage; + } + } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java index 8aa25e8eb5..f47525097e 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java @@ -140,7 +140,7 @@ public class AcknowledgeTest extends QpidTestCase assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size()); //Subscribe to the queue - AMQShortString subscriber = _channel.consumeFromSource(null, _queue, true, null, true); + AMQShortString subscriber = _channel.consumeFromSource(null, _queue, true, null, true, false); getQueue().deliverAsync(); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java index 0a4bfd13f1..dc687e1075 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java @@ -141,6 +141,6 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase FieldTable filters = new FieldTable(); filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true); - return channel.consumeFromSource(null, queue, true, filters, true); + return channel.consumeFromSource(null, queue, true, filters, true, false); } } diff --git a/java/test-profiles/python_tests/Java010PythonExcludes b/java/test-profiles/python_tests/Java010PythonExcludes index de870e5e27..474480bdd0 100644 --- a/java/test-profiles/python_tests/Java010PythonExcludes +++ b/java/test-profiles/python_tests/Java010PythonExcludes @@ -50,6 +50,9 @@ qpid_tests.broker_0_10.priority.PriorityTests.test_prioritised_delivery_with_ali #The broker does not support the autodelete property on exchanges qpid_tests.broker_0_10.exchange.AutodeleteTests.testAutodelete* +# QPID-5531 : Changes to the C++ behaviour in having a default timeout for every transaction not implemented in Java Broker +qpid_tests.broker_0_10.dtx.DtxTests.test_get_timeout + ###### Behavioural differences between Java & CPP Broker ###### #Tests changed/added in QPID-5280 and QPID-5283 |