diff options
author | Arnaud Simon <arnaudsimon@apache.org> | 2009-02-05 15:56:26 +0000 |
---|---|---|
committer | Arnaud Simon <arnaudsimon@apache.org> | 2009-02-05 15:56:26 +0000 |
commit | 1a79789263ca6f33ac9a888a709126ba17d42b92 (patch) | |
tree | 1a330041801ed4baf937744610a23bb404b2a49d | |
parent | b04984bca4642dc166bdb9cf93e4e50d92b24fcb (diff) | |
download | qpid-python-1a79789263ca6f33ac9a888a709126ba17d42b92.tar.gz |
qpid-1647: Added headers exchange example
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@741151 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 267 insertions, 0 deletions
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/DeclareQueue.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/DeclareQueue.java new file mode 100644 index 0000000000..6fd73a59c4 --- /dev/null +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/DeclareQueue.java @@ -0,0 +1,74 @@ +package org.apache.qpid.example.amqpexample.headers; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.Session; + +import java.util.Map; +import java.util.HashMap; + + +public class DeclareQueue +{ + + /** + * Creates 2 queues and bind them to an headers exchange. One queue receives messages with both + * properties H1 and H2 and the other queue receives messages with either one of those properties. + */ + public static void main(String[] args) + { + // Create connection + Connection con = new Connection(); + con.connect("localhost", 5672, "test", "guest", "guest",false); + + // Create session + Session session = con.createSession(0); + + // declare and bind queues + session.queueDeclare("headers_queue_any", null, null); + session.queueDeclare("headers_queue_all", null, null); + // we need to declare the header: name, type, alternate exchange + session.exchangeDeclare("test.headers", "headers", "amq.direct", null); + // The matching algorithm is controlled by 'x-match' property + // 'x-match' can take one of two values, + // (i) 'all' implies that all the other pairs must match the headers + // property of a message for that message to be routed (i.e. an AND match) + // (ii) 'any' implies that the message should be routed if any of the + // fields in the headers property match one of the fields in the arguments table (i.e. an OR match) + Map<String, Object> arguments = new HashMap<String, Object>(); + arguments.put("x-match", "any"); + arguments.put("h1", "v1"); + arguments.put("h2", "v2"); + session.exchangeBind("headers_queue_any", "test.headers", "useless", arguments); + arguments = new HashMap<String, Object>(); + arguments.put("x-match", "all"); + arguments.put("h1", "v1"); + arguments.put("h2", "v2"); + session.exchangeBind("headers_queue_all", "test.headers", "useless", arguments); + // confirm completion + session.sync(); + //cleanup + session.close(); + con.close(); + } +} diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/Listener.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/Listener.java new file mode 100644 index 0000000000..aa1c9b0a41 --- /dev/null +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/Listener.java @@ -0,0 +1,105 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.example.amqpexample.headers; + + +import org.apache.qpid.transport.*; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class Listener implements SessionListener +{ + private static CountDownLatch _countDownLatch = new CountDownLatch(1); + + public void opened(Session ssn) {} + + public void message(Session ssn, MessageTransfer xfr) + { + String body = xfr.getBodyString(); + System.out.println("Message: " + body); + if ( body.equals("That's all, folks!")) + { + System.out.println("Received final message"); + _countDownLatch.countDown(); + } + } + + public void exception(Session ssn, SessionException exc) + { + exc.printStackTrace(); + } + + public void closed(Session ssn) {} + + /** + * Receives messages from queue ANY and then ALL + */ + public static void main(String[] args) throws InterruptedException + { + // Create connection + Connection con = new Connection(); + con.connect("localhost", 5672, "test", "guest", "guest",false); + + // Create session + Session session = con.createSession(0); + // we expect to receive all the messages + Consume(session, "headers_queue_any"); + // we expect to receive only messages that have both properties set. + Consume(session, "headers_queue_all"); + + //cleanup + session.close(); + con.close(); + } + + private static void Consume(Session session, String queueName) throws InterruptedException + { + System.out.println("Consuming messages for queue " + queueName); + _countDownLatch = new CountDownLatch(1); + // Create an instance of the listener + Listener listener = new Listener(); + session.setSessionListener(listener); + + // create a subscription + session.messageSubscribe(queueName, + "listener_destination", + MessageAcceptMode.NONE, + MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); + + + // issue credits + session.messageFlow("listener_destination", MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT); + session.messageFlow("listener_destination", MessageCreditUnit.MESSAGE, 100); + // confirm completion + session.sync(); + + // wait to receive all the messages + System.out.println("Waiting 100 seconds for messages from queue " + queueName); + + _countDownLatch.await(30, TimeUnit.SECONDS); + System.out.println("Shutting down listener for " + queueName); + System.out.println("========================================="); + session.messageCancel("listener_destination"); + } + +} diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/Producer.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/Producer.java new file mode 100644 index 0000000000..a54069889a --- /dev/null +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/Producer.java @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.example.amqpexample.headers; + +import org.apache.qpid.transport.*; +import java.util.Map; +import java.util.HashMap; + + +public class Producer +{ + /** + * Sends 10 messages with a single property and 10 messages + * with 2 properties to a headers exchange. + */ + public static void main(String[] args) + { + // Create connection + org.apache.qpid.transport.Connection con = new org.apache.qpid.transport.Connection(); + con.connect("localhost", 5672, "test", "guest", "guest",false); + + // Create session + org.apache.qpid.transport.Session session = con.createSession(0); + DeliveryProperties deliveryProps = new DeliveryProperties(); + + // set message headers + MessageProperties messageProperties = new MessageProperties(); + Map<String, Object> messageHeaders = new HashMap<String, Object>(); + // set the message property + messageHeaders.put("h1", "v1"); + messageProperties.setApplicationHeaders(messageHeaders); + Header header = new Header(deliveryProps, messageProperties); + + for (int i=0; i<10; i++) + { + session.messageTransfer("test.headers", MessageAcceptMode.EXPLICIT,MessageAcquireMode.PRE_ACQUIRED, + header, + "Message H1: " + i); + } + + // set message headers + messageProperties = new MessageProperties(); + messageHeaders = new HashMap<String, Object>(); + // set the message properties + messageHeaders.put("h1", "v1"); + messageHeaders.put("h2", "v2"); + messageProperties.setApplicationHeaders(messageHeaders); + header = new Header(deliveryProps, messageProperties); + + for (int i=0; i<10; i++) + { + session.messageTransfer("test.headers", MessageAcceptMode.EXPLICIT,MessageAcquireMode.PRE_ACQUIRED, + header, + "Message H1 and H2: " + i); + } + + + session.messageTransfer("test.headers", MessageAcceptMode.EXPLICIT,MessageAcquireMode.PRE_ACQUIRED, + header, + "That's all, folks!" ); + + // confirm completion + session.sync(); + + //cleanup + session.close(); + con.close(); + } + +} |