summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-02-27 16:05:01 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-02-27 16:05:01 +0000
commitfd4963007c35cd1c8e3b3cc88366a685920001e1 (patch)
tree88128615ff5846dcce9ce684230499e407ab431e
parent1a7e18ae07ed88605daf3c0277632ce9269eef71 (diff)
downloadqpid-python-fd4963007c35cd1c8e3b3cc88366a685920001e1.tar.gz
QPID-792 : [Java Client] Validate queue browser selector on client side, not broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1294194 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java64
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java34
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java20
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java15
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java5
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java11
7 files changed, 110 insertions, 41 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
index 2313bce474..bd83e8b37b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
@@ -20,6 +20,12 @@
*/
package org.apache.qpid.client;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidSelectorException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInternalException;
+import org.apache.qpid.client.filter.JMSSelectorFilter;
+import org.apache.qpid.protocol.AMQConstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,12 +54,50 @@ public class AMQQueueBrowser implements QueueBrowser
_session = session;
_queue = queue;
_messageSelector = ((messageSelector == null) || (messageSelector.trim().length() == 0)) ? null : messageSelector;
- // Create Consumer to verify message selector.
- BasicMessageConsumer consumer =
- (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
- // Close this consumer as we are not looking to consume only to establish that, at least for now,
- // the QB can be created
- consumer.close();
+
+
+ validateQueue((AMQDestination) queue);
+
+ if(_messageSelector != null)
+ {
+ validateSelector(_messageSelector);
+ }
+ }
+
+ private void validateSelector(String messageSelector) throws InvalidSelectorException
+ {
+ try
+ {
+ new JMSSelectorFilter(messageSelector);
+ }
+ catch (AMQInternalException e)
+ {
+ throw new InvalidSelectorException(e.getMessage());
+ }
+ }
+
+ private void validateQueue(AMQDestination queue) throws JMSException
+ {
+ try
+ {
+ // Essentially just test the connection/session is still active
+ _session.sync();
+ // TODO - should really validate queue exists, but we often rely on creating the consumer to create the queue :(
+ // _session.declareQueuePassive( queue );
+ }
+ catch (AMQException e)
+ {
+ if(e.getErrorCode() == AMQConstant.NOT_FOUND)
+ {
+ throw new InvalidDestinationException(e.getMessage());
+ }
+ else
+ {
+ final JMSException jmsException = new JMSException(e.getMessage(), String.valueOf(e.getErrorCode().getCode()));
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
+ }
}
public Queue getQueue() throws JMSException
@@ -118,12 +162,12 @@ public class AMQQueueBrowser implements QueueBrowser
_consumer = consumer;
prefetchMessage();
}
- _logger.info("QB:created with first element:" + _nextMessage);
+ _logger.debug("QB:created with first element:" + _nextMessage);
}
public boolean hasMoreElements()
{
- _logger.info("QB:hasMoreElements:" + (_nextMessage != null));
+ _logger.debug("QB:hasMoreElements:" + (_nextMessage != null));
return (_nextMessage != null);
}
@@ -136,9 +180,9 @@ public class AMQQueueBrowser implements QueueBrowser
}
try
{
- _logger.info("QB:nextElement about to receive");
+ _logger.debug("QB:nextElement about to receive");
prefetchMessage();
- _logger.info("QB:nextElement received:" + _nextMessage);
+ _logger.debug("QB:nextElement received:" + _nextMessage);
}
catch (JMSException e)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 30f1dcf8b7..766237006a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -1680,7 +1680,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
AMQProtocolHandler protocolHandler = getProtocolHandler();
declareExchange(amqd, protocolHandler, false);
- AMQShortString queueName = declareQueue(amqd, protocolHandler, false);
+ AMQShortString queueName = declareQueue(amqd, false);
bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd);
}
@@ -2714,6 +2714,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
final boolean nowait) throws AMQException, FailoverException;
+
+ void declareQueuePassive(AMQDestination queue) throws AMQException
+ {
+ declareQueue(queue,false,false,true);
+ }
+
/**
* Declares a queue for a JMS destination.
*
@@ -2723,27 +2729,35 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*
* <p/>Note that this operation automatically retries in the event of fail-over.
*
- * @param amqd The destination to declare as a queue.
- * @param protocolHandler The protocol handler to communicate through.
*
+ * @param amqd The destination to declare as a queue.
* @return The name of the decalred queue. This is useful where the broker is generating a queue name on behalf of
* the client.
*
+ *
+ *
* @throws AMQException If the queue cannot be declared for any reason.
* @todo Verify the destiation is valid or throw an exception.
* @todo Be aware of possible changes to parameter order as versions change.
*/
- protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+ protected AMQShortString declareQueue(final AMQDestination amqd,
final boolean noLocal) throws AMQException
{
- return declareQueue(amqd, protocolHandler, noLocal, false);
+ return declareQueue(amqd, noLocal, false);
}
- protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+ protected AMQShortString declareQueue(final AMQDestination amqd,
final boolean noLocal, final boolean nowait)
+ throws AMQException
+ {
+ return declareQueue(amqd, noLocal, nowait, false);
+ }
+
+ protected AMQShortString declareQueue(final AMQDestination amqd,
+ final boolean noLocal, final boolean nowait, final boolean passive)
throws AMQException
{
- /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
+ final AMQProtocolHandler protocolHandler = getProtocolHandler();
return new FailoverNoopSupport<AMQShortString, AMQException>(
new FailoverProtectedOperation<AMQShortString, AMQException>()
{
@@ -2755,7 +2769,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
amqd.setQueueName(protocolHandler.generateQueueName());
}
- sendQueueDeclare(amqd, protocolHandler, nowait);
+ sendQueueDeclare(amqd, protocolHandler, nowait, passive);
return amqd.getAMQQueueName();
}
@@ -2763,7 +2777,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean nowait) throws AMQException, FailoverException;
+ final boolean nowait, boolean passive) throws AMQException, FailoverException;
/**
* Undeclares the specified queue.
@@ -2904,7 +2918,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (_delareQueues || amqd.isNameRequired())
{
- declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait);
+ declareQueue(amqd, consumer.isNoLocal(), nowait);
}
bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 36dbad1928..3902c726f3 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -721,7 +721,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* Declare a queue with the given queueName
*/
public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean nowait)
+ final boolean nowait, boolean passive)
throws AMQException, FailoverException
{
// do nothing this is only used by 0_8
@@ -731,7 +731,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* Declare a queue with the given queueName
*/
public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean noLocal, final boolean nowait)
+ final boolean noLocal, final boolean nowait, boolean passive)
throws AMQException
{
AMQShortString queueName;
@@ -757,7 +757,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
getQpidSession().queueDeclare(queueName.toString(), "" , arguments,
amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
amqd.isDurable() ? Option.DURABLE : Option.NONE,
- amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
+ amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE,
+ passive ? Option.PASSIVE : Option.NONE);
}
else
{
@@ -927,11 +928,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return getCurrentException();
}
- protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean noLocal, final boolean nowait)
+ protected AMQShortString declareQueue(final AMQDestination amqd,
+ final boolean noLocal, final boolean nowait, final boolean passive)
throws AMQException
{
- /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
+ final AMQProtocolHandler protocolHandler = getProtocolHandler();
+
return new FailoverNoopSupport<AMQShortString, AMQException>(
new FailoverProtectedOperation<AMQShortString, AMQException>()
{
@@ -948,7 +950,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
amqd.setQueueName(new AMQShortString( binddingKey + "@"
+ amqd.getExchangeName().toString() + "_" + UUID.randomUUID()));
}
- return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait);
+ return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait, passive);
}
}, getAMQConnection()).execute();
}
@@ -1205,7 +1207,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
else if(createNode)
{
setLegacyFiledsForQueueType(dest);
- send0_10QueueDeclare(dest,null,noLocal,noWait);
+ send0_10QueueDeclare(dest,null,noLocal,noWait, false);
sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
null,dest.getExchangeName(),dest, false);
break;
@@ -1311,7 +1313,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
node.setExclusive(true);
node.setAutoDelete(!node.isDurable());
- send0_10QueueDeclare(dest,null,noLocal,true);
+ send0_10QueueDeclare(dest,null,noLocal,true, false);
getQpidSession().exchangeBind(dest.getQueueName(),
dest.getAddressName(),
dest.getSubject(),
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index b56da5c2ec..8ab23a240e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -38,7 +38,6 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
-import org.apache.qpid.client.filter.MessageFilter;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
@@ -401,9 +400,17 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
}
public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean nowait) throws AMQException, FailoverException
- {
- QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null);
+ final boolean nowait, boolean passive) throws AMQException, FailoverException
+ {
+ QueueDeclareBody body =
+ getMethodRegistry().createQueueDeclareBody(getTicket(),
+ amqd.getAMQQueueName(),
+ passive,
+ amqd.isDurable(),
+ amqd.isExclusive(),
+ amqd.isAutoDelete(),
+ false,
+ null);
AMQFrame queueDeclare = body.generateFrame(getChannelId());
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
index 84d91ee57e..f199961b6f 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
@@ -145,7 +145,7 @@ public class TestAMQSession extends AMQSession_0_8
}
public void sendQueueDeclare(AMQDestination amqd, AMQProtocolHandler protocolHandler,
- boolean nowait) throws AMQException, FailoverException
+ boolean nowait, boolean passive) throws AMQException, FailoverException
{
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
index ddcb96b4db..236202f323 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.logging;
+import javax.jms.QueueBrowser;
import junit.framework.AssertionFailedError;
import org.apache.qpid.client.AMQConnection;
@@ -166,8 +167,10 @@ public class SubscriptionLoggingTest extends AbstractTestLogging
*/
public void testSubscriptionCreateQueueBrowser() throws JMSException, IOException
{
- _session.createBrowser(_queue);
+ _connection.start();
+ QueueBrowser browser = _session.createBrowser(_queue);
+ browser.getEnumeration();
//Validate
//Ensure that we wait for the SUB log message
waitAndFindMatches("SUB-1001");
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
index 7a4a45a2c8..f3433adb3f 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
@@ -20,11 +20,7 @@
*/
package org.apache.qpid.test.client;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
+import java.util.Enumeration;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -35,7 +31,10 @@ import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.NamingException;
-import java.util.Enumeration;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
public class QueueBrowserAutoAckTest extends QpidBrokerTestCase
{