summaryrefslogtreecommitdiff
path: root/qpid/java/management/client/src/example/org/apache/qpid/management/example/ConsumerAndProducerExample.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/management/client/src/example/org/apache/qpid/management/example/ConsumerAndProducerExample.java')
-rw-r--r--qpid/java/management/client/src/example/org/apache/qpid/management/example/ConsumerAndProducerExample.java51
1 files changed, 48 insertions, 3 deletions
diff --git a/qpid/java/management/client/src/example/org/apache/qpid/management/example/ConsumerAndProducerExample.java b/qpid/java/management/client/src/example/org/apache/qpid/management/example/ConsumerAndProducerExample.java
index 93e78707e9..42587d78ff 100644
--- a/qpid/java/management/client/src/example/org/apache/qpid/management/example/ConsumerAndProducerExample.java
+++ b/qpid/java/management/client/src/example/org/apache/qpid/management/example/ConsumerAndProducerExample.java
@@ -26,7 +26,9 @@ import java.util.Date;
import org.apache.muse.util.xml.XPathUtils;
import org.apache.muse.ws.addressing.EndpointReference;
import org.apache.muse.ws.addressing.soap.SoapFault;
+import org.apache.muse.ws.notification.impl.FilterCollection;
import org.apache.muse.ws.notification.impl.MessagePatternFilter;
+import org.apache.muse.ws.notification.impl.ProducerPropertiesFilter;
import org.apache.muse.ws.notification.impl.TopicFilter;
import org.apache.muse.ws.notification.remote.NotificationProducerClient;
import org.apache.qpid.management.Names;
@@ -65,11 +67,11 @@ public class ConsumerAndProducerExample extends AbstractQManExample
void executeExample(String host, int port) throws Exception
{
// This is QMan...
- URI producerURI = URI.create("http://"+host+":"+port+"/qman/services/consumer");
+ URI producerURI = URI.create("http://"+host+":"+port+"/qman/services/adapter");
// ...and this is QMan too! Note that it has an hidden consumer capability that is used in
// order to run successfully this example...
- URI consumerURI = producerURI;
+ URI consumerURI = URI.create("http://"+host+":"+port+"/qman/services/consumer");
EndpointReference producerEPR = new EndpointReference(producerURI);
EndpointReference consumerEPR = new EndpointReference(consumerURI);
@@ -93,8 +95,11 @@ public class ConsumerAndProducerExample extends AbstractQManExample
// Example 6: a MessageFilter is installed in order to listen only for connection events
// (connections created or removed). The subscription will expire in 10 seconds.
allMessagesWithMessageFilterAndTerminationTime(producerEPR,consumerEPR);
+
+ // Example 7 : a subscription with more than one filter.
+ complexSubscription(producerEPR, consumerEPR);
}
-
+
/**
* Makes a subscription on all topics / all messages without an expiry date.
*
@@ -223,6 +228,41 @@ public class ConsumerAndProducerExample extends AbstractQManExample
new Date(System.currentTimeMillis() + 10000)); // Termination Time
}
+ /**
+ * Makes a subscription on a specifc topic with an expiry date.
+ * Only messages published on the given topic will be delivered to the given consumer.
+ * The subscription will end after 10 seconds
+ *
+ * @param producer the producer endpoint reference.
+ * @param consumer the consumer endpoint reference .
+ * @throws SoapFault when the subscription cannot be made.
+ */
+ private void complexSubscription(EndpointReference producer, EndpointReference consumer) throws SoapFault
+ {
+ NotificationProducerClient producerClient = new NotificationProducerClient(producer);
+ producerClient.setTrace(true);
+
+ FilterCollection filter = new FilterCollection();
+
+ TopicFilter topicFilter = new TopicFilter(Names.EVENTS_LIFECYLE_TOPIC_NAME);
+ MessagePatternFilter messageFilter= new MessagePatternFilter(
+ "/wsnt:NotificationMessage/wsnt:Message/qman:LifeCycleEvent/qman:Resource/qman:Name/text()='connection'", // expression (XPath)
+ XPathUtils.NAMESPACE_URI); // Dialect : the only supported dialect is XPath 1.0
+
+ ProducerPropertiesFilter producerFilter = new ProducerPropertiesFilter(
+ "boolean(/*/MgtPubInterval > 100 and /*/MsgTotalEnqueues > 56272)",
+ XPathUtils.NAMESPACE_URI);
+
+ filter.addFilter(topicFilter);
+ filter.addFilter(messageFilter);
+ filter.addFilter(producerFilter);
+
+ producerClient.subscribe(
+ consumer, // Consumer Endpoint reference
+ filter, // Topic Filter
+ new Date(System.currentTimeMillis() + 10000)); // Termination Time
+ }
+
@Override
void printOutExampleDescription()
{
@@ -245,4 +285,9 @@ public class ConsumerAndProducerExample extends AbstractQManExample
System.out.println("A subscription with a termination time will have a predefined expiry");
System.out.println("date while if there's no termination the subscription will never expire.");
}
+
+ public static void main(String[] args)
+ {
+ new ConsumerAndProducerExample().execute(new String[]{"localhost","8080"});
+ }
}