diff options
Diffstat (limited to 'M4-RCs/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java')
-rw-r--r-- | M4-RCs/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java | 385 |
1 files changed, 0 insertions, 385 deletions
diff --git a/M4-RCs/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/M4-RCs/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java deleted file mode 100644 index 7ce41db4d0..0000000000 --- a/M4-RCs/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java +++ /dev/null @@ -1,385 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport; - -import org.apache.mina.util.AvailablePortFinder; - -import org.apache.qpid.util.concurrent.Condition; - -import org.apache.qpid.transport.network.ConnectionBinding; -import org.apache.qpid.transport.network.io.IoAcceptor; -import org.apache.qpid.transport.util.Logger; -import org.apache.qpid.transport.util.Waiter; - -import junit.framework.TestCase; - -import java.util.ArrayList; -import java.util.List; -import java.util.Collections; -import java.io.IOException; - -import static org.apache.qpid.transport.Option.*; - -/** - * ConnectionTest - */ - -public class ConnectionTest extends TestCase implements SessionListener -{ - - private static final Logger log = Logger.get(ConnectionTest.class); - - private int port; - private volatile boolean queue = false; - private List<MessageTransfer> messages = new ArrayList<MessageTransfer>(); - private List<MessageTransfer> incoming = new ArrayList<MessageTransfer>(); - - private IoAcceptor _ioa = null; - - - protected void setUp() throws Exception - { - super.setUp(); - - port = AvailablePortFinder.getNextAvailable(12000); - } - - protected void tearDown() throws Exception - { - if (_ioa != null) - { - _ioa.close(); - } - - super.tearDown(); - } - - public void opened(Session ssn) {} - - public void message(final Session ssn, MessageTransfer xfr) - { - if (queue) - { - messages.add(xfr); - ssn.processed(xfr); - return; - } - - String body = xfr.getBodyString(); - - if (body.startsWith("CLOSE")) - { - ssn.getConnection().close(); - } - else if (body.startsWith("DELAYED_CLOSE")) - { - ssn.processed(xfr); - new Thread() - { - public void run() - { - try - { - sleep(3000); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - ssn.getConnection().close(); - } - }.start(); - } - else if (body.startsWith("ECHO")) - { - int id = xfr.getId(); - ssn.invoke(xfr); - ssn.processed(id); - } - else if (body.startsWith("SINK")) - { - ssn.processed(xfr); - } - else if (body.startsWith("DROP")) - { - // do nothing - } - else - { - throw new IllegalArgumentException - ("unrecognized message: " + body); - } - } - - public void exception(Session ssn, SessionException exc) - { - throw exc; - } - - public void closed(Session ssn) {} - - private void send(Session ssn, String msg) - { - ssn.messageTransfer - ("xxx", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, - null, msg); - } - - private Connection connect(final Condition closed) - { - Connection conn = new Connection(); - conn.setConnectionListener(new ConnectionListener() - { - public void opened(Connection conn) {} - public void exception(Connection conn, ConnectionException exc) - { - exc.printStackTrace(); - } - public void closed(Connection conn) - { - if (closed != null) - { - closed.set(); - } - } - }); - conn.connect("localhost", port, null, "guest", "guest", false); - return conn; - } - - public void testProtocolNegotiationExceptionOverridesCloseException() throws Exception - { - // Force os.name to be windows to exercise code in IoReceiver - // that looks for the value of os.name - System.setProperty("os.name","windows"); - - // Start server as 0-9 to froce a ProtocolVersionException - startServer(new ProtocolHeader(1, 0, 9)); - - Condition closed = new Condition(); - - try - { - connect(closed); - fail("ProtocolVersionException expected"); - } - catch (ProtocolVersionException pve) - { - //Expected code path - } - catch (Exception e) - { - fail("ProtocolVersionException expected. Got:" + e.getMessage()); - } - } - - private void startServer() - { - startServer(new ProtocolHeader(1, 0, 10)); - } - - private void startServer(final ProtocolHeader protocolHeader) - { - ConnectionDelegate server = new ServerDelegate() - { - @Override - public void init(Connection conn, ProtocolHeader hdr) - { - conn.send(protocolHeader); - List<Object> utf8 = new ArrayList<Object>(); - utf8.add("utf8"); - conn.connectionStart(null, Collections.EMPTY_LIST, utf8); - } - - @Override - public Session getSession(Connection conn, SessionAttach atc) - { - Session ssn = super.getSession(conn, atc); - ssn.setSessionListener(ConnectionTest.this); - return ssn; - } - }; - - try - { - _ioa = new IoAcceptor("localhost", port, ConnectionBinding.get(server)); - } - catch (IOException e) - { - e.printStackTrace(); - fail("Unable to start Server for test due to:" + e.getMessage()); - } - - _ioa.start(); - } - - public void testClosedNotificationAndWriteToClosed() throws Exception - { - startServer(); - - Condition closed = new Condition(); - Connection conn = connect(closed); - - Session ssn = conn.createSession(1); - send(ssn, "CLOSE"); - - if (!closed.get(3000)) - { - fail("never got notified of connection close"); - } - - try - { - conn.connectionCloseOk(); - fail("writing to a closed socket succeeded"); - } - catch (TransportException e) - { - // expected - } - } - - class FailoverConnectionListener implements ConnectionListener - { - public void opened(Connection conn) {} - - public void exception(Connection conn, ConnectionException e) - { - throw e; - } - - public void closed(Connection conn) - { - queue = true; - conn.connect("localhost", port, null, "guest", "guest"); - conn.resume(); - } - } - - class TestSessionListener implements SessionListener - { - public void opened(Session s) {} - public void exception(Session s, SessionException e) {} - public void message(Session s, MessageTransfer xfr) - { - synchronized (incoming) - { - incoming.add(xfr); - incoming.notifyAll(); - } - - s.processed(xfr); - } - public void closed(Session s) {} - } - - public void testResumeNonemptyReplayBuffer() throws Exception - { - startServer(); - - Connection conn = new Connection(); - conn.setConnectionListener(new FailoverConnectionListener()); - conn.connect("localhost", port, null, "guest", "guest"); - Session ssn = conn.createSession(1); - ssn.setSessionListener(new TestSessionListener()); - - send(ssn, "SINK 0"); - send(ssn, "ECHO 1"); - send(ssn, "ECHO 2"); - - ssn.sync(); - - String[] msgs = { "DROP 3", "DROP 4", "DROP 5", "CLOSE 6", "SINK 7" }; - for (String m : msgs) - { - send(ssn, m); - } - - ssn.sync(); - - assertEquals(msgs.length, messages.size()); - for (int i = 0; i < msgs.length; i++) - { - assertEquals(msgs[i], messages.get(i).getBodyString()); - } - - queue = false; - - send(ssn, "ECHO 8"); - send(ssn, "ECHO 9"); - - synchronized (incoming) - { - Waiter w = new Waiter(incoming, 30000); - while (w.hasTime() && incoming.size() < 4) - { - w.await(); - } - - assertEquals(4, incoming.size()); - assertEquals("ECHO 1", incoming.get(0).getBodyString()); - assertEquals(0, incoming.get(0).getId()); - assertEquals("ECHO 2", incoming.get(1).getBodyString()); - assertEquals(1, incoming.get(1).getId()); - assertEquals("ECHO 8", incoming.get(2).getBodyString()); - assertEquals(0, incoming.get(0).getId()); - assertEquals("ECHO 9", incoming.get(3).getBodyString()); - assertEquals(1, incoming.get(1).getId()); - } - } - - public void testResumeEmptyReplayBuffer() throws InterruptedException - { - startServer(); - - Connection conn = new Connection(); - conn.setConnectionListener(new FailoverConnectionListener()); - conn.connect("localhost", port, null, "guest", "guest"); - Session ssn = conn.createSession(1); - ssn.setSessionListener(new TestSessionListener()); - - send(ssn, "SINK 0"); - send(ssn, "SINK 1"); - send(ssn, "DELAYED_CLOSE 2"); - ssn.sync(); - Thread.sleep(6000); - send(ssn, "SINK 3"); - ssn.sync(); - System.out.println(messages); - assertEquals(1, messages.size()); - assertEquals("SINK 3", messages.get(0).getBodyString()); - } - - public void testFlushExpected() throws InterruptedException - { - startServer(); - - Connection conn = new Connection(); - conn.connect("localhost", port, null, "guest", "guest"); - Session ssn = conn.createSession(); - ssn.sessionFlush(EXPECTED); - send(ssn, "SINK 0"); - ssn.sessionFlush(EXPECTED); - send(ssn, "SINK 1"); - ssn.sync(); - } - -} |