summaryrefslogtreecommitdiff
path: root/M4-RCs/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'M4-RCs/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java')
-rw-r--r--M4-RCs/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java385
1 files changed, 0 insertions, 385 deletions
diff --git a/M4-RCs/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/M4-RCs/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
deleted file mode 100644
index 7ce41db4d0..0000000000
--- a/M4-RCs/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
+++ /dev/null
@@ -1,385 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport;
-
-import org.apache.mina.util.AvailablePortFinder;
-
-import org.apache.qpid.util.concurrent.Condition;
-
-import org.apache.qpid.transport.network.ConnectionBinding;
-import org.apache.qpid.transport.network.io.IoAcceptor;
-import org.apache.qpid.transport.util.Logger;
-import org.apache.qpid.transport.util.Waiter;
-
-import junit.framework.TestCase;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Collections;
-import java.io.IOException;
-
-import static org.apache.qpid.transport.Option.*;
-
-/**
- * ConnectionTest
- */
-
-public class ConnectionTest extends TestCase implements SessionListener
-{
-
- private static final Logger log = Logger.get(ConnectionTest.class);
-
- private int port;
- private volatile boolean queue = false;
- private List<MessageTransfer> messages = new ArrayList<MessageTransfer>();
- private List<MessageTransfer> incoming = new ArrayList<MessageTransfer>();
-
- private IoAcceptor _ioa = null;
-
-
- protected void setUp() throws Exception
- {
- super.setUp();
-
- port = AvailablePortFinder.getNextAvailable(12000);
- }
-
- protected void tearDown() throws Exception
- {
- if (_ioa != null)
- {
- _ioa.close();
- }
-
- super.tearDown();
- }
-
- public void opened(Session ssn) {}
-
- public void message(final Session ssn, MessageTransfer xfr)
- {
- if (queue)
- {
- messages.add(xfr);
- ssn.processed(xfr);
- return;
- }
-
- String body = xfr.getBodyString();
-
- if (body.startsWith("CLOSE"))
- {
- ssn.getConnection().close();
- }
- else if (body.startsWith("DELAYED_CLOSE"))
- {
- ssn.processed(xfr);
- new Thread()
- {
- public void run()
- {
- try
- {
- sleep(3000);
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- ssn.getConnection().close();
- }
- }.start();
- }
- else if (body.startsWith("ECHO"))
- {
- int id = xfr.getId();
- ssn.invoke(xfr);
- ssn.processed(id);
- }
- else if (body.startsWith("SINK"))
- {
- ssn.processed(xfr);
- }
- else if (body.startsWith("DROP"))
- {
- // do nothing
- }
- else
- {
- throw new IllegalArgumentException
- ("unrecognized message: " + body);
- }
- }
-
- public void exception(Session ssn, SessionException exc)
- {
- throw exc;
- }
-
- public void closed(Session ssn) {}
-
- private void send(Session ssn, String msg)
- {
- ssn.messageTransfer
- ("xxx", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
- null, msg);
- }
-
- private Connection connect(final Condition closed)
- {
- Connection conn = new Connection();
- conn.setConnectionListener(new ConnectionListener()
- {
- public void opened(Connection conn) {}
- public void exception(Connection conn, ConnectionException exc)
- {
- exc.printStackTrace();
- }
- public void closed(Connection conn)
- {
- if (closed != null)
- {
- closed.set();
- }
- }
- });
- conn.connect("localhost", port, null, "guest", "guest", false);
- return conn;
- }
-
- public void testProtocolNegotiationExceptionOverridesCloseException() throws Exception
- {
- // Force os.name to be windows to exercise code in IoReceiver
- // that looks for the value of os.name
- System.setProperty("os.name","windows");
-
- // Start server as 0-9 to froce a ProtocolVersionException
- startServer(new ProtocolHeader(1, 0, 9));
-
- Condition closed = new Condition();
-
- try
- {
- connect(closed);
- fail("ProtocolVersionException expected");
- }
- catch (ProtocolVersionException pve)
- {
- //Expected code path
- }
- catch (Exception e)
- {
- fail("ProtocolVersionException expected. Got:" + e.getMessage());
- }
- }
-
- private void startServer()
- {
- startServer(new ProtocolHeader(1, 0, 10));
- }
-
- private void startServer(final ProtocolHeader protocolHeader)
- {
- ConnectionDelegate server = new ServerDelegate()
- {
- @Override
- public void init(Connection conn, ProtocolHeader hdr)
- {
- conn.send(protocolHeader);
- List<Object> utf8 = new ArrayList<Object>();
- utf8.add("utf8");
- conn.connectionStart(null, Collections.EMPTY_LIST, utf8);
- }
-
- @Override
- public Session getSession(Connection conn, SessionAttach atc)
- {
- Session ssn = super.getSession(conn, atc);
- ssn.setSessionListener(ConnectionTest.this);
- return ssn;
- }
- };
-
- try
- {
- _ioa = new IoAcceptor("localhost", port, ConnectionBinding.get(server));
- }
- catch (IOException e)
- {
- e.printStackTrace();
- fail("Unable to start Server for test due to:" + e.getMessage());
- }
-
- _ioa.start();
- }
-
- public void testClosedNotificationAndWriteToClosed() throws Exception
- {
- startServer();
-
- Condition closed = new Condition();
- Connection conn = connect(closed);
-
- Session ssn = conn.createSession(1);
- send(ssn, "CLOSE");
-
- if (!closed.get(3000))
- {
- fail("never got notified of connection close");
- }
-
- try
- {
- conn.connectionCloseOk();
- fail("writing to a closed socket succeeded");
- }
- catch (TransportException e)
- {
- // expected
- }
- }
-
- class FailoverConnectionListener implements ConnectionListener
- {
- public void opened(Connection conn) {}
-
- public void exception(Connection conn, ConnectionException e)
- {
- throw e;
- }
-
- public void closed(Connection conn)
- {
- queue = true;
- conn.connect("localhost", port, null, "guest", "guest");
- conn.resume();
- }
- }
-
- class TestSessionListener implements SessionListener
- {
- public void opened(Session s) {}
- public void exception(Session s, SessionException e) {}
- public void message(Session s, MessageTransfer xfr)
- {
- synchronized (incoming)
- {
- incoming.add(xfr);
- incoming.notifyAll();
- }
-
- s.processed(xfr);
- }
- public void closed(Session s) {}
- }
-
- public void testResumeNonemptyReplayBuffer() throws Exception
- {
- startServer();
-
- Connection conn = new Connection();
- conn.setConnectionListener(new FailoverConnectionListener());
- conn.connect("localhost", port, null, "guest", "guest");
- Session ssn = conn.createSession(1);
- ssn.setSessionListener(new TestSessionListener());
-
- send(ssn, "SINK 0");
- send(ssn, "ECHO 1");
- send(ssn, "ECHO 2");
-
- ssn.sync();
-
- String[] msgs = { "DROP 3", "DROP 4", "DROP 5", "CLOSE 6", "SINK 7" };
- for (String m : msgs)
- {
- send(ssn, m);
- }
-
- ssn.sync();
-
- assertEquals(msgs.length, messages.size());
- for (int i = 0; i < msgs.length; i++)
- {
- assertEquals(msgs[i], messages.get(i).getBodyString());
- }
-
- queue = false;
-
- send(ssn, "ECHO 8");
- send(ssn, "ECHO 9");
-
- synchronized (incoming)
- {
- Waiter w = new Waiter(incoming, 30000);
- while (w.hasTime() && incoming.size() < 4)
- {
- w.await();
- }
-
- assertEquals(4, incoming.size());
- assertEquals("ECHO 1", incoming.get(0).getBodyString());
- assertEquals(0, incoming.get(0).getId());
- assertEquals("ECHO 2", incoming.get(1).getBodyString());
- assertEquals(1, incoming.get(1).getId());
- assertEquals("ECHO 8", incoming.get(2).getBodyString());
- assertEquals(0, incoming.get(0).getId());
- assertEquals("ECHO 9", incoming.get(3).getBodyString());
- assertEquals(1, incoming.get(1).getId());
- }
- }
-
- public void testResumeEmptyReplayBuffer() throws InterruptedException
- {
- startServer();
-
- Connection conn = new Connection();
- conn.setConnectionListener(new FailoverConnectionListener());
- conn.connect("localhost", port, null, "guest", "guest");
- Session ssn = conn.createSession(1);
- ssn.setSessionListener(new TestSessionListener());
-
- send(ssn, "SINK 0");
- send(ssn, "SINK 1");
- send(ssn, "DELAYED_CLOSE 2");
- ssn.sync();
- Thread.sleep(6000);
- send(ssn, "SINK 3");
- ssn.sync();
- System.out.println(messages);
- assertEquals(1, messages.size());
- assertEquals("SINK 3", messages.get(0).getBodyString());
- }
-
- public void testFlushExpected() throws InterruptedException
- {
- startServer();
-
- Connection conn = new Connection();
- conn.connect("localhost", port, null, "guest", "guest");
- Session ssn = conn.createSession();
- ssn.sessionFlush(EXPECTED);
- send(ssn, "SINK 0");
- ssn.sessionFlush(EXPECTED);
- send(ssn, "SINK 1");
- ssn.sync();
- }
-
-}