diff options
author | Keith Wall <kwall@apache.org> | 2011-11-09 08:59:26 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2011-11-09 08:59:26 +0000 |
commit | 3c20bd66ba063f52522fb19ca4a241f6c4e720d4 (patch) | |
tree | 82431fdd1dad8e5c53c0431425b4088609bf7d52 | |
parent | cc6ee2720ce235498297d76f35da4aea2c557de2 (diff) | |
download | qpid-python-3c20bd66ba063f52522fb19ca4a241f6c4e720d4.tar.gz |
QPID-3519: refactor consumer argument handling
Applied patch from Oleksandr Rudyy<orudyy@gmail.com> and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1199664 13f79535-47bb-0310-9956-ffa450edef68
9 files changed, 94 insertions, 66 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 4c4d2c75b1..ef44221ec1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -92,7 +92,6 @@ import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.filter.MessageFilter; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; @@ -2011,28 +2010,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic AMQDestination amqd = (AMQDestination) destination; - // TODO: Define selectors in AMQP - // TODO: construct the rawSelector from the selector string if rawSelector == null - final FieldTable ft = FieldTableFactory.newFieldTable(); - // if (rawSelector != null) - // ft.put("headers", rawSelector.getDataAsBytes()); - // rawSelector is used by HeadersExchange and is not a JMS Selector - if (rawSelector != null) - { - ft.addAll(rawSelector); - } - - // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a - // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise - // possible to determine when querying the broker whether there are no arguments or just a non-matching selector - // argument, as specifying null for the arguments when querying means they should not be checked at all - ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector); - C consumer; try { consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow, - noLocal, exclusive, messageSelector, ft, noConsume, autoClose); + noLocal, exclusive, messageSelector, rawSelector, noConsume, autoClose); } catch(TransportException e) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 6bab715e4b..7e5edef38d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -509,13 +509,13 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public BasicMessageConsumer_0_10 createMessageConsumer(final AMQDestination destination, final int prefetchHigh, final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, - final FieldTable ft, final boolean noConsume, + final FieldTable rawSelector, final boolean noConsume, final boolean autoClose) throws JMSException { final AMQProtocolHandler protocolHandler = getProtocolHandler(); return new BasicMessageConsumer_0_10(_channelId, _connection, destination, messageSelector, noLocal, - _messageFactoryRegistry, this, protocolHandler, ft, prefetchHigh, + _messageFactoryRegistry, this, protocolHandler, rawSelector, prefetchHigh, prefetchLow, exclusive, _acknowledgeMode, noConsume, autoClose); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 85fc857014..e33410f5fe 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -337,21 +337,6 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B MessageFilter messageSelector, int tag) throws AMQException, FailoverException { - FieldTable arguments = FieldTableFactory.newFieldTable(); - if (messageSelector != null) - { - arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector.getSelector()); - } - - if (consumer.isAutoClose()) - { - arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE); - } - - if (consumer.isBrowseOnly()) - { - arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE); - } BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(), queueName, @@ -360,7 +345,7 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, consumer.isExclusive(), nowait, - arguments); + consumer.getArguments()); AMQFrame jmsConsume = body.generateFrame(_channelId); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index b1975338b7..7bb400fada 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -27,6 +27,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.*; import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; @@ -150,7 +151,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, - FieldTable arguments, int prefetchHigh, int prefetchLow, + FieldTable rawSelector, int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException { _channelId = channelId; @@ -160,7 +161,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa _messageFactory = messageFactory; _session = session; _protocolHandler = protocolHandler; - _arguments = arguments; _prefetchHigh = prefetchHigh; _prefetchLow = prefetchLow; _exclusive = exclusive; @@ -196,6 +196,21 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa { _acknowledgeMode = acknowledgeMode; } + + final FieldTable ft = FieldTableFactory.newFieldTable(); + // rawSelector is used by HeadersExchange and is not a JMS Selector + if (rawSelector != null) + { + ft.addAll(rawSelector); + } + + // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a + // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise + // possible to determine when querying the broker whether there are no arguments or just a non-matching selector + // argument, as specifying null for the arguments when querying means they should not be checked at all + ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector); + + _arguments = ft; } public AMQDestination getDestination() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index ae2068b75b..47c20b683c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -72,12 +72,12 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession<?,?> session, AMQProtocolHandler protocolHandler, - FieldTable arguments, int prefetchHigh, int prefetchLow, + FieldTable rawSelector, int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException { super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler, - arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose); + rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose); _0_10session = (AMQSession_0_10) session; _preAcquire = evaluatePreAcquire(browseOnly, destination); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index cc061e35cb..cf1d7cedeb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -27,6 +27,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,12 +38,23 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, - AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow, + AMQProtocolHandler protocolHandler, FieldTable rawSelector, int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException { super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session, - protocolHandler, arguments, prefetchHigh, prefetchLow, exclusive, + protocolHandler, rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose); + final FieldTable consumerArguments = getArguments(); + if (isAutoClose()) + { + consumerArguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE); + } + + if (isBrowseOnly()) + { + consumerArguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE); + } + } void sendCancel() throws AMQException, FailoverException diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java index 849827216c..68531eee84 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java @@ -29,7 +29,6 @@ import javax.jms.MessageProducer; import junit.framework.TestCase; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.transport.Binary; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.Connection.SessionFactory; @@ -334,7 +333,7 @@ public class AMQSession_0_10Test extends TestCase try { BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, - null, new FieldTable(), false, true); + null, null, false, true); session.sendConsume(consumer, new AMQShortString("test"), null, true, null, 1); } catch (Exception e) @@ -383,7 +382,7 @@ public class AMQSession_0_10Test extends TestCase try { BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, - null, new FieldTable(), false, true); + null, null, false, true); session.start(); consumer.receive(1); fail("JMSException should be thrown"); @@ -401,7 +400,7 @@ public class AMQSession_0_10Test extends TestCase try { BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, - null, new FieldTable(), false, true); + null, null, false, true); session.start(); consumer.receive(1); } @@ -419,7 +418,7 @@ public class AMQSession_0_10Test extends TestCase try { BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, - null, new FieldTable(), false, true); + null, null, false, true); session.start(); consumer.receiveNoWait(); fail("JMSException should be thrown"); @@ -437,7 +436,7 @@ public class AMQSession_0_10Test extends TestCase try { BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, - null, new FieldTable(), false, true); + null, null, false, true); consumer.setMessageListener(new MockMessageListener()); fail("JMSException should be thrown"); } @@ -454,7 +453,7 @@ public class AMQSession_0_10Test extends TestCase try { BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, - null, new FieldTable(), false, true); + null, null, false, true); consumer.setMessageListener(new MockMessageListener()); } catch (Exception e) @@ -471,7 +470,7 @@ public class AMQSession_0_10Test extends TestCase try { BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, - null, new FieldTable(), false, true); + null, null, false, true); consumer.close(); } catch (Exception e) @@ -488,7 +487,7 @@ public class AMQSession_0_10Test extends TestCase try { BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, - null, new FieldTable(), false, true); + null, null, false, true); consumer.close(); fail("JMSException should be thrown"); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index 721c821bab..4a126b8504 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -832,9 +832,12 @@ public class FieldTable public void addAll(FieldTable fieldTable) { initMapIfNecessary(); - _encodedForm = null; - _properties.putAll(fieldTable._properties); - recalculateEncodedSize(); + if (fieldTable._properties != null) + { + _encodedForm = null; + _properties.putAll(fieldTable._properties); + recalculateEncodedSize(); + } } public static Map<String, Object> convertToMap(final FieldTable fieldTable) diff --git a/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java b/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java index bb4c9c3884..bd189feb1c 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java @@ -20,17 +20,19 @@ */ package org.apache.qpid.framing; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + import junit.framework.Assert; import junit.framework.TestCase; -import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.AMQPInvalidClassException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.*; - public class PropertyFieldTableTest extends TestCase { private static final Logger _logger = LoggerFactory.getLogger(PropertyFieldTableTest.class); @@ -106,7 +108,7 @@ public class PropertyFieldTableTest extends TestCase table1.setByte("value", Byte.MAX_VALUE); Assert.assertTrue(table1.propertyExists("value")); - // Tets lookups we shouldn't get anything back for other gets + // Tests lookups we shouldn't get anything back for other gets // we should get right value back for this type .... Assert.assertEquals(null, table1.getBoolean("value")); Assert.assertEquals(Byte.MAX_VALUE, (byte) table1.getByte("value")); @@ -139,7 +141,7 @@ public class PropertyFieldTableTest extends TestCase table1.setShort("value", Short.MAX_VALUE); Assert.assertTrue(table1.propertyExists("value")); - // Tets lookups we shouldn't get anything back for other gets + // Tests lookups we shouldn't get anything back for other gets // we should get right value back for this type .... Assert.assertEquals(null, table1.getBoolean("value")); Assert.assertEquals(null, table1.getByte("value")); @@ -172,7 +174,7 @@ public class PropertyFieldTableTest extends TestCase table1.setChar("value", 'c'); Assert.assertTrue(table1.propertyExists("value")); - // Tets lookups we shouldn't get anything back for other gets + // Tests lookups we shouldn't get anything back for other gets // we should get right value back for this type .... Assert.assertEquals(null, table1.getBoolean("value")); Assert.assertEquals(null, table1.getByte("value")); @@ -206,7 +208,7 @@ public class PropertyFieldTableTest extends TestCase table1.setDouble("value", Double.MAX_VALUE); Assert.assertTrue(table1.propertyExists("value")); - // Tets lookups we shouldn't get anything back for other gets + // Tests lookups we shouldn't get anything back for other gets // we should get right value back for this type .... Assert.assertEquals(null, table1.getBoolean("value")); Assert.assertEquals(null, table1.getByte("value")); @@ -241,7 +243,7 @@ public class PropertyFieldTableTest extends TestCase table1.setFloat("value", Float.MAX_VALUE); Assert.assertTrue(table1.propertyExists("value")); - // Tets lookups we shouldn't get anything back for other gets + // Tests lookups we shouldn't get anything back for other gets // we should get right value back for this type .... Assert.assertEquals(null, table1.getBoolean("value")); Assert.assertEquals(null, table1.getByte("value")); @@ -404,7 +406,7 @@ public class PropertyFieldTableTest extends TestCase table1.setString("value", "Hello"); Assert.assertTrue(table1.propertyExists("value")); - // Tets lookups we shouldn't get anything back for other gets + // Test lookups we shouldn't get anything back for other gets // we should get right value back for this type .... Assert.assertEquals(null, table1.getBoolean("value")); Assert.assertEquals(null, table1.getByte("value")); @@ -569,7 +571,7 @@ public class PropertyFieldTableTest extends TestCase Assert.assertEquals("Hello", table.getObject("object-string")); } - public void testwriteBuffer() throws IOException + public void testWriteBuffer() throws IOException { byte[] bytes = { 99, 98, 97, 96, 95 }; @@ -950,6 +952,36 @@ public class PropertyFieldTableTest extends TestCase } + public void testAddAll() + { + final FieldTable table1 = new FieldTable(); + table1.setInteger("int1", 1); + table1.setInteger("int2", 2); + assertEquals("Unexpected number of entries in table1", 2, table1.size()); + + final FieldTable table2 = new FieldTable(); + table2.setInteger("int3", 3); + table2.setInteger("int4", 4); + assertEquals("Unexpected number of entries in table2", 2, table2.size()); + + table1.addAll(table2); + assertEquals("Unexpected number of entries in table1 after addAll", 4, table1.size()); + assertEquals(Integer.valueOf(3), table1.getInteger("int3")); + } + + public void testAddAllWithEmptyFieldTable() + { + final FieldTable table1 = new FieldTable(); + table1.setInteger("int1", 1); + table1.setInteger("int2", 2); + assertEquals("Unexpected number of entries in table1", 2, table1.size()); + + final FieldTable emptyFieldTable = new FieldTable(); + + table1.addAll(emptyFieldTable); + assertEquals("Unexpected number of entries in table1 after addAll", 2, table1.size()); + } + private void assertBytesEqual(byte[] expected, byte[] actual) { Assert.assertEquals(expected.length, actual.length); |