summaryrefslogtreecommitdiff
path: root/M4-RCs/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'M4-RCs/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java')
-rw-r--r--M4-RCs/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java469
1 files changed, 0 insertions, 469 deletions
diff --git a/M4-RCs/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java b/M4-RCs/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java
deleted file mode 100644
index f375eda4d1..0000000000
--- a/M4-RCs/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java
+++ /dev/null
@@ -1,469 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.framework.distributedcircuit;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.test.framework.*;
-import org.apache.qpid.test.utils.ConversationFactory;
-
-import org.apache.qpid.junit.extensions.TimingController;
-import org.apache.qpid.junit.extensions.TimingControllerAware;
-import org.apache.qpid.junit.extensions.util.ParsedProperties;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Session;
-
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * DistributedCircuitImpl is a distributed implementation of the test {@link Circuit}. Many publishers and receivers
- * accross multiple machines may be combined to form a single test circuit. The test circuit extracts reports from
- * all of its publishers and receivers, and applies its assertions to these reports.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Supply the publishing and receiving ends of a test messaging circuit.
- * <tr><td> Start the circuit running.
- * <tr><td> Close the circuit down.
- * <tr><td> Take a reading of the circuits state.
- * <tr><td> Apply assertions against the circuits state.
- * <tr><td> Send test messages over the circuit.
- * <tr><td> Perform the default test procedue on the circuit.
- * </table>
- *
- * @todo There is a short pause after receiving sender reports before asking for receiver reports, because receivers may
- * not have finished receiving all their test messages before the report request arrives. This is going to be a
- * problem for taking test timings and needs to be eliminiated. Suggested solution: have receiver send back reports
- * asynchronously, on test batch size boundaries, and do so automatically rather than having to have the report
- * request sent to them. Number each test run, or otherwise uniquely identify it, when a receiver does not get
- * any more messages on a test run for more than a timeout, it can assume the test is complete and send a final
- * report. On the coordinator end a future will need to be created to wait for all final reports to come in, and
- * to register results and timings for the test. This must work in such a way that a new test cycle can be started
- * without waiting for the results of the old one to come in.
- *
- * @todo Add in setting of timing controller, from timing aware test cases.
- */
-public class DistributedCircuitImpl implements Circuit, TimingControllerAware
-{
- /** Used for debugging purposes. */
- private static final Logger log = Logger.getLogger(DistributedCircuitImpl.class);
-
- /** Holds the conversation factory over which to coordinate the test. */
- protected ConversationFactory conversationFactory;
-
- /** Holds the controlSession over which to hold the control conversation. */
- protected Session controlSession;
-
- /** Holds the sender nodes in the test circuit. */
- protected List<TestClientDetails> senders;
-
- /** Holds the receiver nodes in the test circuit. */
- protected List<TestClientDetails> receivers;
-
- /** Holds the sender control conversations. */
- protected ConversationFactory.Conversation[] senderConversation;
-
- /** Holds the receiver control conversations. */
- protected ConversationFactory.Conversation[] receiverConversation;
-
- /** Holds the control topics for the senders in the test circuit. */
- protected Destination[] senderControlTopic;
-
- /** Holds the control topics for the receivers in the test circuit. */
- protected Destination[] receiverControlTopic;
-
- /** Holds the number of messages to send per test run. */
- protected int numMessages;
-
- /**
- * Holds the timing controller for the circuit. This is used to log test times asynchronously, when reciever nodes
- * return their reports after senders have completed a test case.
- */
- TimingController timingController;
-
- /**
- * Creates a distributed test circuit on the specified senders and receivers.
- *
- * @param session The controlSession for all control conversations.
- * @param senders The senders.
- * @param receivers The receivers.
- * @param senderConversation A control conversation with the senders.
- * @param receiverConversation A control conversation with the receivers.
- * @param senderControlTopic The senders control topic.
- * @param receiverControlTopic The receivers control topic.
- */
- protected DistributedCircuitImpl(Session session, List<TestClientDetails> senders, List<TestClientDetails> receivers,
- ConversationFactory.Conversation[] senderConversation, ConversationFactory.Conversation[] receiverConversation,
- Destination[] senderControlTopic, Destination[] receiverControlTopic)
- {
- this.controlSession = session;
- this.senders = senders;
- this.receivers = receivers;
- this.senderConversation = senderConversation;
- this.receiverConversation = receiverConversation;
- this.senderControlTopic = senderControlTopic;
- this.receiverControlTopic = receiverControlTopic;
- }
-
- /**
- * Creates a distributed test circuit from the specified test parameters, on the senders and receivers
- * given.
- *
- * @param testProps The test parameters.
- * @param senders The sender ends in the test circuit.
- * @param receivers The receiver ends in the test circuit.
- * @param conversationFactory A conversation factory for creating the control conversations with senders and receivers.
- *
- * @return A connected and ready to start, test circuit.
- */
- public static Circuit createCircuit(ParsedProperties testProps, List<TestClientDetails> senders,
- List<TestClientDetails> receivers, ConversationFactory conversationFactory)
- {
- log.debug("public static Circuit createCircuit(ParsedProperties testProps, List<TestClientDetails> senders, "
- + " List<TestClientDetails> receivers, ConversationFactory conversationFactory)");
-
- try
- {
- Session session = conversationFactory.getSession();
-
- // Create control conversations with each of the senders.
- ConversationFactory.Conversation[] senderConversation = new ConversationFactory.Conversation[senders.size()];
- Destination[] senderControlTopic = new Destination[senders.size()];
-
- for (int i = 0; i < senders.size(); i++)
- {
- TestClientDetails sender = senders.get(i);
-
- senderControlTopic[i] = session.createTopic(sender.privateControlKey);
- senderConversation[i] = conversationFactory.startConversation();
- }
-
- log.debug("Sender conversations created.");
-
- // Create control conversations with each of the receivers.
- ConversationFactory.Conversation[] receiverConversation = new ConversationFactory.Conversation[receivers.size()];
- Destination[] receiverControlTopic = new Destination[receivers.size()];
-
- for (int i = 0; i < receivers.size(); i++)
- {
- TestClientDetails receiver = receivers.get(i);
-
- receiverControlTopic[i] = session.createTopic(receiver.privateControlKey);
- receiverConversation[i] = conversationFactory.startConversation();
- }
-
- log.debug("Receiver conversations created.");
-
- // Assign the sender role to each of the sending test clients.
- for (int i = 0; i < senders.size(); i++)
- {
- TestClientDetails sender = senders.get(i);
-
- Message assignSender = conversationFactory.getSession().createMessage();
- TestUtils.setPropertiesOnMessage(assignSender, testProps);
- assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
- assignSender.setStringProperty("ROLE", "SENDER");
-
- senderConversation[i].send(senderControlTopic[i], assignSender);
- }
-
- log.debug("Sender role assignments sent.");
-
- // Assign the receivers role to each of the receiving test clients.
- for (int i = 0; i < receivers.size(); i++)
- {
- TestClientDetails receiver = receivers.get(i);
-
- Message assignReceiver = session.createMessage();
- TestUtils.setPropertiesOnMessage(assignReceiver, testProps);
- assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
- assignReceiver.setStringProperty("ROLE", "RECEIVER");
-
- receiverConversation[i].send(receiverControlTopic[i], assignReceiver);
- }
-
- log.debug("Receiver role assignments sent.");
-
- // Wait for the senders and receivers to confirm their roles.
- for (int i = 0; i < senders.size(); i++)
- {
- senderConversation[i].receive();
- }
-
- log.debug("Got all sender role confirmations");
-
- for (int i = 0; i < receivers.size(); i++)
- {
- receiverConversation[i].receive();
- }
-
- log.debug("Got all receiver role confirmations");
-
- // Package everything up as a circuit.
- return new DistributedCircuitImpl(session, senders, receivers, senderConversation, receiverConversation,
- senderControlTopic, receiverControlTopic);
- }
- catch (JMSException e)
- {
- throw new RuntimeException("JMSException not handled.");
- }
- }
-
- /**
- * Used by tests cases that can supply a {@link org.apache.qpid.junit.extensions.TimingController} to set the
- * controller on an aware test.
- *
- * @param controller The timing controller.
- */
- public void setTimingController(TimingController controller)
- {
- this.timingController = controller;
- }
-
- /**
- * Gets the interface on the publishing end of the circuit.
- *
- * @return The publishing end of the circuit.
- */
- public Publisher getPublisher()
- {
- throw new RuntimeException("Not Implemented.");
- }
-
- /**
- * Gets the interface on the receiving end of the circuit.
- *
- * @return The receiving end of the circuit.
- */
- public Receiver getReceiver()
- {
- throw new RuntimeException("Not Implemented.");
- }
-
- /**
- * Connects and starts the circuit. After this method is called the circuit is ready to send messages.
- */
- public void start()
- {
- log.debug("public void start(): called");
-
- try
- {
- // Start the test on each of the senders.
- Message start = controlSession.createMessage();
- start.setStringProperty("CONTROL_TYPE", "START");
- start.setIntProperty("MESSAGE_COUNT", numMessages);
-
- for (int i = 0; i < senders.size(); i++)
- {
- senderConversation[i].send(senderControlTopic[i], start);
- }
-
- log.debug("All senders told to start their tests.");
- }
- catch (JMSException e)
- {
- throw new RuntimeException("Unhandled JMSException.", e);
- }
- }
-
- /**
- * Checks the test circuit. The effect of this is to gather the circuits state, for both ends of the circuit,
- * into a report, against which assertions may be checked.
- *
- * @todo Replace the asynch receiver report thread with a choice of direct or asynch executor, so that asynch
- * or synch logging of test timings is optional. Also need to provide an onMessage method that is capable
- * of receiving timing reports that receivers will generate during an ongoing test, on the test sample
- * size boundaries. The message timing logging code should be factored out as a common method that can
- * be called in response to the final report responses, or the onMessage method. Another alternative is
- * to abandon the final report request altogether and just use the onMessage method? I think the two
- * differ though, as the final report is used to apply assertions, and the ongoing report is just for
- * periodic timing results... In which case, maybe there needs to be a way for the onMessage method
- * to process just some of the incoming messages, and forward the rest on to the conversion helper, as
- * a sort of pre-conversation helper filter? Make conversation expose its onMessage method (it should
- * already) and allow another delivery thread to filter the incoming messages to the conversation.
- */
- public void check()
- {
- log.debug("public void check(): called");
-
- try
- {
- // Wait for all the test senders to return their reports.
- for (int i = 0; i < senders.size(); i++)
- {
- Message senderReport = senderConversation[i].receive();
- log.debug("Sender " + senderReport.getStringProperty("CLIENT_NAME") + " reports message count: "
- + senderReport.getIntProperty("MESSAGE_COUNT"));
- log.debug("Sender " + senderReport.getStringProperty("CLIENT_NAME") + " reports message time: "
- + senderReport.getLongProperty("TEST_TIME"));
- }
-
- log.debug("Got all sender test reports.");
-
- // Apply sender assertions to pass/fail the tests.
-
- // Inject a short pause to give the receivers time to finish receiving their test messages.
- TestUtils.pause(500);
-
- // Ask the receivers for their reports.
- Message statusRequest = controlSession.createMessage();
- statusRequest.setStringProperty("CONTROL_TYPE", "STATUS_REQUEST");
-
- for (int i = 0; i < receivers.size(); i++)
- {
- receiverConversation[i].send(receiverControlTopic[i], statusRequest);
- }
-
- log.debug("All receiver test reports requested.");
-
- // Wait for all receiver reports to come in, but do so asynchronously.
- Runnable gatherAllReceiverReports =
- new Runnable()
- {
- public void run()
- {
- try
- {
- // Wait for all the receivers to send their reports.
- for (int i = 0; i < receivers.size(); i++)
- {
- Message receiverReport = receiverConversation[i].receive();
-
- String clientName = receiverReport.getStringProperty("CLIENT_NAME");
- int messageCount = receiverReport.getIntProperty("MESSAGE_COUNT");
- long testTime = receiverReport.getLongProperty("TEST_TIME");
-
- log.debug("Receiver " + clientName + " reports message count: " + messageCount);
- log.debug("Receiver " + receiverReport.getStringProperty("CLIENT_NAME")
- + " reports message time: " + testTime);
-
- // Apply receiver assertions to pass/fail the tests.
-
- // Log the test timings on the asynchronous test timing controller.
- /*try
- {
- timingController.completeTest(true, messageCount, testTime);
- }
- // The timing controll can throw InterruptedException is the current test is to be
- // interrupted.
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }*/
- }
-
- log.debug("All receiver test reports received.");
- }
- catch (JMSException e)
- {
- throw new RuntimeException(e);
- }
- }
- };
-
- Thread receiverReportsThread = new Thread(gatherAllReceiverReports);
- receiverReportsThread.start();
-
- // return new Message[] { senderReport, receiverReport };
-
- }
- catch (JMSException e)
- {
- throw new RuntimeException("Unhandled JMSException.", e);
- }
- }
-
- /**
- * Closes the circuit. All associated resources are closed.
- */
- public void close()
- {
- log.debug("public void close(): called");
-
- // End the current test on all senders and receivers.
- }
-
- /**
- * Applies a list of assertions against the test circuit. The {@link #check()} method should be called before doing
- * this, to ensure that the circuit has gathered its state into a report to assert against.
- *
- * @param assertions The list of assertions to apply.
- *
- * @return Any assertions that failed.
- */
- public List<Assertion> applyAssertions(List<Assertion> assertions)
- {
- log.debug("public List<Assertion> applyAssertions(List<Assertion> assertions = " + assertions + "): called");
-
- List<Assertion> failures = new LinkedList<Assertion>();
-
- for (Assertion assertion : assertions)
- {
- if (!assertion.apply())
- {
- failures.add(assertion);
- }
- }
-
- return failures;
- }
-
- /**
- * Runs the default test procedure against the circuit, and checks that all of the specified assertions hold.
- *
- * @param numMessages The number of messages to send using the default test procedure.
- * @param assertions The list of assertions to apply.
- *
- * @return Any assertions that failed.
- *
- * @todo From check onwards needs to be handled as a future. The future must call back onto the test case to
- * report results asynchronously.
- */
- public List<Assertion> test(int numMessages, List<Assertion> assertions)
- {
- log.debug("public List<Assertion> test(int numMessages = " + numMessages + ", List<Assertion> assertions = "
- + assertions + "): called");
-
- // Keep the number of messages to send per test run, where the send method can reference it.
- this.numMessages = numMessages;
-
- // Start the test running on all sender circuit ends.
- start();
-
- // Request status reports to be handed in.
- check();
-
- // Assert conditions on the publishing end of the circuit.
- // Assert conditions on the receiving end of the circuit.
- List<Assertion> failures = applyAssertions(assertions);
-
- // Close the circuit ending the current test case.
- close();
-
- // Pass with no failed assertions or fail with a list of failed assertions.
- return failures;
- }
-}