/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.example.transport; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.url.URLSyntaxException; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.channels.SocketChannel; import java.util.UUID; /** * This is a simple application that demonstrates how you can use the Qpid AMQP interfaces to use existing sockets as * the transport for the Client API. * * The Demo here runs twice: * 1. Just to show a simple publish and receive. * 2. To demonstrate how to use existing sockets and utilise the underlying client failover mechnaism. */ public class ExistingSocketConnectorDemo implements ConnectionListener { private static boolean DEMO_FAILOVER = false; public static void main(String[] args) throws IOException, URLSyntaxException, AMQException, JMSException { System.out.println("Testing socket connection to localhost:5672."); new ExistingSocketConnectorDemo(); System.out.println("Testing socket connection failover between localhost:5672 and localhost:5673."); DEMO_FAILOVER = true; new ExistingSocketConnectorDemo(); } Connection _connection; MessageProducer _producer; Session _session; String Socket1_ID = UUID.randomUUID().toString(); String Socket2_ID = UUID.randomUUID().toString(); /** Here we can see the broker we are connecting to is set to be 'socket:///' signifying we will provide the socket. */ public final String CONNECTION = "amqp://guest:guest@id/test?brokerlist='socket://" + Socket1_ID + ";socket://" + Socket2_ID + "'"; public ExistingSocketConnectorDemo() throws IOException, URLSyntaxException, AMQException, JMSException { Socket socket = SocketChannel.open().socket(); socket.connect(new InetSocketAddress("localhost", 5672)); TransportConnection.registerOpenSocket(Socket1_ID, socket); _connection = new AMQConnection(CONNECTION); _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = _session.createConsumer(_session.createQueue("Queue")); _producer = _session.createProducer(_session.createQueue("Queue")); _connection.start(); if (!DEMO_FAILOVER) { _producer.send(_session.createTextMessage("Simple Test")); } else { // Using the Qpid interfaces we can set a listener that allows us to demonstrate failover ((AMQConnection) _connection).setConnectionListener(this); System.out.println("Testing failover: Please ensure second broker running on localhost:5673 and shutdown broker on 5672."); } //We do a blocking receive here so that we can demonstrate failover. Message message = consumer.receive(); System.out.println("Recevied :" + message); _connection.close(); } // ConnectionListener Interface public void bytesSent(long count) { //not used in this example } public void bytesReceived(long count) { //not used in this example } public boolean preFailover(boolean redirect) { /** * This method is called before the underlying client library starts to reconnect. This gives us the opportunity * to set a new socket for the failover to occur on. */ try { Socket socket = SocketChannel.open().socket(); socket.connect(new InetSocketAddress("localhost", 5673)); // This is the new method to pass in an open socket for the connection to use. TransportConnection.registerOpenSocket(Socket2_ID, socket); } catch (IOException e) { e.printStackTrace(); return false; } return true; } public boolean preResubscribe() { //not used in this example - but must return true to allow the resubscription of existing clients. return true; } public void failoverComplete() { // Now that failover has completed we can send a message that the receiving thread will pick up try { _producer.send(_session.createTextMessage("Simple Failover Test")); } catch (JMSException e) { e.printStackTrace(); } } }