summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2011-11-09 08:59:26 +0000
committerKeith Wall <kwall@apache.org>2011-11-09 08:59:26 +0000
commit3c20bd66ba063f52522fb19ca4a241f6c4e720d4 (patch)
tree82431fdd1dad8e5c53c0431425b4088609bf7d52
parentcc6ee2720ce235498297d76f35da4aea2c557de2 (diff)
downloadqpid-python-3c20bd66ba063f52522fb19ca4a241f6c4e720d4.tar.gz
QPID-3519: refactor consumer argument handling
Applied patch from Oleksandr Rudyy<orudyy@gmail.com> and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1199664 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java20
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java17
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java19
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java16
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java17
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java9
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java54
9 files changed, 94 insertions, 66 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 4c4d2c75b1..ef44221ec1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -92,7 +92,6 @@ import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
@@ -2011,28 +2010,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
AMQDestination amqd = (AMQDestination) destination;
- // TODO: Define selectors in AMQP
- // TODO: construct the rawSelector from the selector string if rawSelector == null
- final FieldTable ft = FieldTableFactory.newFieldTable();
- // if (rawSelector != null)
- // ft.put("headers", rawSelector.getDataAsBytes());
- // rawSelector is used by HeadersExchange and is not a JMS Selector
- if (rawSelector != null)
- {
- ft.addAll(rawSelector);
- }
-
- // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a
- // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise
- // possible to determine when querying the broker whether there are no arguments or just a non-matching selector
- // argument, as specifying null for the arguments when querying means they should not be checked at all
- ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
-
C consumer;
try
{
consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
- noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
+ noLocal, exclusive, messageSelector, rawSelector, noConsume, autoClose);
}
catch(TransportException e)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 6bab715e4b..7e5edef38d 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -509,13 +509,13 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public BasicMessageConsumer_0_10 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
final int prefetchLow, final boolean noLocal,
final boolean exclusive, String messageSelector,
- final FieldTable ft, final boolean noConsume,
+ final FieldTable rawSelector, final boolean noConsume,
final boolean autoClose) throws JMSException
{
final AMQProtocolHandler protocolHandler = getProtocolHandler();
return new BasicMessageConsumer_0_10(_channelId, _connection, destination, messageSelector, noLocal,
- _messageFactoryRegistry, this, protocolHandler, ft, prefetchHigh,
+ _messageFactoryRegistry, this, protocolHandler, rawSelector, prefetchHigh,
prefetchLow, exclusive, _acknowledgeMode, noConsume, autoClose);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 85fc857014..e33410f5fe 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -337,21 +337,6 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
MessageFilter messageSelector,
int tag) throws AMQException, FailoverException
{
- FieldTable arguments = FieldTableFactory.newFieldTable();
- if (messageSelector != null)
- {
- arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector.getSelector());
- }
-
- if (consumer.isAutoClose())
- {
- arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
- }
-
- if (consumer.isBrowseOnly())
- {
- arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
- }
BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(),
queueName,
@@ -360,7 +345,7 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
consumer.isExclusive(),
nowait,
- arguments);
+ consumer.getArguments());
AMQFrame jmsConsume = body.generateFrame(_channelId);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index b1975338b7..7bb400fada 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -27,6 +27,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.*;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
@@ -150,7 +151,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession session, AMQProtocolHandler protocolHandler,
- FieldTable arguments, int prefetchHigh, int prefetchLow,
+ FieldTable rawSelector, int prefetchHigh, int prefetchLow,
boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
{
_channelId = channelId;
@@ -160,7 +161,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
_messageFactory = messageFactory;
_session = session;
_protocolHandler = protocolHandler;
- _arguments = arguments;
_prefetchHigh = prefetchHigh;
_prefetchLow = prefetchLow;
_exclusive = exclusive;
@@ -196,6 +196,21 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
{
_acknowledgeMode = acknowledgeMode;
}
+
+ final FieldTable ft = FieldTableFactory.newFieldTable();
+ // rawSelector is used by HeadersExchange and is not a JMS Selector
+ if (rawSelector != null)
+ {
+ ft.addAll(rawSelector);
+ }
+
+ // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a
+ // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise
+ // possible to determine when querying the broker whether there are no arguments or just a non-matching selector
+ // argument, as specifying null for the arguments when querying means they should not be checked at all
+ ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
+
+ _arguments = ft;
}
public AMQDestination getDestination()
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index ae2068b75b..47c20b683c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -72,12 +72,12 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession<?,?> session, AMQProtocolHandler protocolHandler,
- FieldTable arguments, int prefetchHigh, int prefetchLow,
+ FieldTable rawSelector, int prefetchHigh, int prefetchLow,
boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose)
throws JMSException
{
super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler,
- arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose);
+ rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose);
_0_10session = (AMQSession_0_10) session;
_preAcquire = evaluatePreAcquire(browseOnly, destination);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index cc061e35cb..cf1d7cedeb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -27,6 +27,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,12 +38,23 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
- AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow,
+ AMQProtocolHandler protocolHandler, FieldTable rawSelector, int prefetchHigh, int prefetchLow,
boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
{
super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session,
- protocolHandler, arguments, prefetchHigh, prefetchLow, exclusive,
+ protocolHandler, rawSelector, prefetchHigh, prefetchLow, exclusive,
acknowledgeMode, browseOnly, autoClose);
+ final FieldTable consumerArguments = getArguments();
+ if (isAutoClose())
+ {
+ consumerArguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
+ }
+
+ if (isBrowseOnly())
+ {
+ consumerArguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
+ }
+
}
void sendCancel() throws AMQException, FailoverException
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
index 849827216c..68531eee84 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
@@ -29,7 +29,6 @@ import javax.jms.MessageProducer;
import junit.framework.TestCase;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.transport.Binary;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.Connection.SessionFactory;
@@ -334,7 +333,7 @@ public class AMQSession_0_10Test extends TestCase
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
session.sendConsume(consumer, new AMQShortString("test"), null, true, null, 1);
}
catch (Exception e)
@@ -383,7 +382,7 @@ public class AMQSession_0_10Test extends TestCase
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
session.start();
consumer.receive(1);
fail("JMSException should be thrown");
@@ -401,7 +400,7 @@ public class AMQSession_0_10Test extends TestCase
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
session.start();
consumer.receive(1);
}
@@ -419,7 +418,7 @@ public class AMQSession_0_10Test extends TestCase
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
session.start();
consumer.receiveNoWait();
fail("JMSException should be thrown");
@@ -437,7 +436,7 @@ public class AMQSession_0_10Test extends TestCase
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
consumer.setMessageListener(new MockMessageListener());
fail("JMSException should be thrown");
}
@@ -454,7 +453,7 @@ public class AMQSession_0_10Test extends TestCase
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
consumer.setMessageListener(new MockMessageListener());
}
catch (Exception e)
@@ -471,7 +470,7 @@ public class AMQSession_0_10Test extends TestCase
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
consumer.close();
}
catch (Exception e)
@@ -488,7 +487,7 @@ public class AMQSession_0_10Test extends TestCase
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
consumer.close();
fail("JMSException should be thrown");
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
index 721c821bab..4a126b8504 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
@@ -832,9 +832,12 @@ public class FieldTable
public void addAll(FieldTable fieldTable)
{
initMapIfNecessary();
- _encodedForm = null;
- _properties.putAll(fieldTable._properties);
- recalculateEncodedSize();
+ if (fieldTable._properties != null)
+ {
+ _encodedForm = null;
+ _properties.putAll(fieldTable._properties);
+ recalculateEncodedSize();
+ }
}
public static Map<String, Object> convertToMap(final FieldTable fieldTable)
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java b/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java
index bb4c9c3884..bd189feb1c 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java
@@ -20,17 +20,19 @@
*/
package org.apache.qpid.framing;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
import junit.framework.Assert;
import junit.framework.TestCase;
-import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQPInvalidClassException;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.*;
-
public class PropertyFieldTableTest extends TestCase
{
private static final Logger _logger = LoggerFactory.getLogger(PropertyFieldTableTest.class);
@@ -106,7 +108,7 @@ public class PropertyFieldTableTest extends TestCase
table1.setByte("value", Byte.MAX_VALUE);
Assert.assertTrue(table1.propertyExists("value"));
- // Tets lookups we shouldn't get anything back for other gets
+ // Tests lookups we shouldn't get anything back for other gets
// we should get right value back for this type ....
Assert.assertEquals(null, table1.getBoolean("value"));
Assert.assertEquals(Byte.MAX_VALUE, (byte) table1.getByte("value"));
@@ -139,7 +141,7 @@ public class PropertyFieldTableTest extends TestCase
table1.setShort("value", Short.MAX_VALUE);
Assert.assertTrue(table1.propertyExists("value"));
- // Tets lookups we shouldn't get anything back for other gets
+ // Tests lookups we shouldn't get anything back for other gets
// we should get right value back for this type ....
Assert.assertEquals(null, table1.getBoolean("value"));
Assert.assertEquals(null, table1.getByte("value"));
@@ -172,7 +174,7 @@ public class PropertyFieldTableTest extends TestCase
table1.setChar("value", 'c');
Assert.assertTrue(table1.propertyExists("value"));
- // Tets lookups we shouldn't get anything back for other gets
+ // Tests lookups we shouldn't get anything back for other gets
// we should get right value back for this type ....
Assert.assertEquals(null, table1.getBoolean("value"));
Assert.assertEquals(null, table1.getByte("value"));
@@ -206,7 +208,7 @@ public class PropertyFieldTableTest extends TestCase
table1.setDouble("value", Double.MAX_VALUE);
Assert.assertTrue(table1.propertyExists("value"));
- // Tets lookups we shouldn't get anything back for other gets
+ // Tests lookups we shouldn't get anything back for other gets
// we should get right value back for this type ....
Assert.assertEquals(null, table1.getBoolean("value"));
Assert.assertEquals(null, table1.getByte("value"));
@@ -241,7 +243,7 @@ public class PropertyFieldTableTest extends TestCase
table1.setFloat("value", Float.MAX_VALUE);
Assert.assertTrue(table1.propertyExists("value"));
- // Tets lookups we shouldn't get anything back for other gets
+ // Tests lookups we shouldn't get anything back for other gets
// we should get right value back for this type ....
Assert.assertEquals(null, table1.getBoolean("value"));
Assert.assertEquals(null, table1.getByte("value"));
@@ -404,7 +406,7 @@ public class PropertyFieldTableTest extends TestCase
table1.setString("value", "Hello");
Assert.assertTrue(table1.propertyExists("value"));
- // Tets lookups we shouldn't get anything back for other gets
+ // Test lookups we shouldn't get anything back for other gets
// we should get right value back for this type ....
Assert.assertEquals(null, table1.getBoolean("value"));
Assert.assertEquals(null, table1.getByte("value"));
@@ -569,7 +571,7 @@ public class PropertyFieldTableTest extends TestCase
Assert.assertEquals("Hello", table.getObject("object-string"));
}
- public void testwriteBuffer() throws IOException
+ public void testWriteBuffer() throws IOException
{
byte[] bytes = { 99, 98, 97, 96, 95 };
@@ -950,6 +952,36 @@ public class PropertyFieldTableTest extends TestCase
}
+ public void testAddAll()
+ {
+ final FieldTable table1 = new FieldTable();
+ table1.setInteger("int1", 1);
+ table1.setInteger("int2", 2);
+ assertEquals("Unexpected number of entries in table1", 2, table1.size());
+
+ final FieldTable table2 = new FieldTable();
+ table2.setInteger("int3", 3);
+ table2.setInteger("int4", 4);
+ assertEquals("Unexpected number of entries in table2", 2, table2.size());
+
+ table1.addAll(table2);
+ assertEquals("Unexpected number of entries in table1 after addAll", 4, table1.size());
+ assertEquals(Integer.valueOf(3), table1.getInteger("int3"));
+ }
+
+ public void testAddAllWithEmptyFieldTable()
+ {
+ final FieldTable table1 = new FieldTable();
+ table1.setInteger("int1", 1);
+ table1.setInteger("int2", 2);
+ assertEquals("Unexpected number of entries in table1", 2, table1.size());
+
+ final FieldTable emptyFieldTable = new FieldTable();
+
+ table1.addAll(emptyFieldTable);
+ assertEquals("Unexpected number of entries in table1 after addAll", 2, table1.size());
+ }
+
private void assertBytesEqual(byte[] expected, byte[] actual)
{
Assert.assertEquals(expected.length, actual.length);