diff options
author | Aidan Skinner <aidan@apache.org> | 2008-04-24 01:54:20 +0000 |
---|---|---|
committer | Aidan Skinner <aidan@apache.org> | 2008-04-24 01:54:20 +0000 |
commit | 470ebf87ec336e81277c550669839161f2630b5f (patch) | |
tree | 44f6a2152a768c429ceb4e8181b8001a77f0e75f /qpid/java/client/example | |
parent | d3d44fd5452e21c4a8fc3d66596e6ff9f8274f2f (diff) | |
download | qpid-python-470ebf87ec336e81277c550669839161f2630b5f.tar.gz |
QPID-832 merge M2.x
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@651133 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/example')
-rw-r--r-- | qpid/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java | 171 |
1 files changed, 171 insertions, 0 deletions
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java new file mode 100644 index 0000000000..d7eb138523 --- /dev/null +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java @@ -0,0 +1,171 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.example.transport; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.url.URLSyntaxException; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.channels.SocketChannel; +import java.util.UUID; + +/** + * This is a simple application that demonstrates how you can use the Qpid AMQP interfaces to use existing sockets as + * the transport for the Client API. + * + * The Demo here runs twice: + * 1. Just to show a simple publish and receive. + * 2. To demonstrate how to use existing sockets and utilise the underlying client failover mechnaism. + */ +public class ExistingSocketConnectorDemo implements ConnectionListener +{ + private static boolean DEMO_FAILOVER = false; + + public static void main(String[] args) throws IOException, URLSyntaxException, AMQException, JMSException + { + System.out.println("Testing socket connection to localhost:5672."); + + new ExistingSocketConnectorDemo(); + + System.out.println("Testing socket connection failover between localhost:5672 and localhost:5673."); + + DEMO_FAILOVER = true; + + new ExistingSocketConnectorDemo(); + } + + Connection _connection; + MessageProducer _producer; + Session _session; + + String Socket1_ID = UUID.randomUUID().toString(); + String Socket2_ID = UUID.randomUUID().toString(); + + + + /** Here we can see the broker we are connecting to is set to be 'socket:///' signifying we will provide the socket. */ + public final String CONNECTION = "amqp://guest:guest@id/test?brokerlist='socket://" + Socket1_ID + ";socket://" + Socket2_ID + "'"; + + + public ExistingSocketConnectorDemo() throws IOException, URLSyntaxException, AMQException, JMSException + { + + Socket socket = SocketChannel.open().socket(); + socket.connect(new InetSocketAddress("localhost", 5672)); + + TransportConnection.registerOpenSocket(Socket1_ID, socket); + + + _connection = new AMQConnection(CONNECTION); + + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer consumer = _session.createConsumer(_session.createQueue("Queue")); + + _producer = _session.createProducer(_session.createQueue("Queue")); + + _connection.start(); + + if (!DEMO_FAILOVER) + { + _producer.send(_session.createTextMessage("Simple Test")); + } + else + { + // Using the Qpid interfaces we can set a listener that allows us to demonstrate failover + ((AMQConnection) _connection).setConnectionListener(this); + + System.out.println("Testing failover: Please ensure second broker running on localhost:5673 and shutdown broker on 5672."); + } + + //We do a blocking receive here so that we can demonstrate failover. + Message message = consumer.receive(); + + System.out.println("Recevied :" + message); + + _connection.close(); + } + + // ConnectionListener Interface + + public void bytesSent(long count) + { + //not used in this example + } + public void bytesReceived(long count) + { + //not used in this example + } + + public boolean preFailover(boolean redirect) + { + /** + * This method is called before the underlying client library starts to reconnect. This gives us the opportunity + * to set a new socket for the failover to occur on. + */ + try + { + Socket socket = SocketChannel.open().socket(); + + socket.connect(new InetSocketAddress("localhost", 5673)); + + // This is the new method to pass in an open socket for the connection to use. + TransportConnection.registerOpenSocket(Socket2_ID, socket); + } + catch (IOException e) + { + e.printStackTrace(); + return false; + } + return true; + } + + public boolean preResubscribe() + { + //not used in this example - but must return true to allow the resubscription of existing clients. + return true; + } + + public void failoverComplete() + { + // Now that failover has completed we can send a message that the receiving thread will pick up + try + { + _producer.send(_session.createTextMessage("Simple Failover Test")); + } + catch (JMSException e) + { + e.printStackTrace(); + } + } +} |