diff options
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockSynchronizer.java')
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockSynchronizer.java | 468 |
1 files changed, 0 insertions, 468 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockSynchronizer.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockSynchronizer.java deleted file mode 100644 index 226c84611d..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockSynchronizer.java +++ /dev/null @@ -1,468 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.framework.clocksynch; - -import org.apache.qpid.junit.extensions.util.CommandLineParser; -import org.apache.qpid.junit.extensions.util.ParsedProperties; - -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetAddress; -import java.net.SocketException; -import java.net.SocketTimeoutException; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.util.Arrays; - -/** - * UDPClockSynchronizer is a {@link ClockSynchronizer} that sends pings as UDP datagrams, and uses the following simple - * algorithm to perform clock synchronization: - * - * <ol> - * <li>Slave initiates synchronization with a Reference clock.</li> - * <li>Slave stamps current local time on a "time request" message and sends to the Reference.</li> - * <li>Upon receipt by Reference, Reference stamps Reference-time and returns.</li> - * <li>Upon receipt by Slave, Slave subtracts current time from sent time and divides by two to compute latency. It - * subtracts current time from Reference time to determine Slave-Reference time delta and adds in the - * half-latency to get the correct clock delta.</li> - * <li>The first result is immediately used to update the clock since it will get the local clock into at least - * the right ballpark.</li> - * <li>The Slave repeats steps 2 through 4, 15 more times.</li> - * <li>The results of the packet receipts are accumulated and sorted in lowest-latency to highest-latency order. The - * median latency is determined by picking the mid-point sample from this ordered list.</li> - * <li>All samples outside 1 standard-deviation from the median are discarded and the remaining samples - * are averaged using an arithmetic mean.</li> - * </ol> - * - * <p/>The use of UDP datagrams, instead of TCP based communication eliminates the hidden delays that TCP can introduce, - * as it can transparently re-order or re-send packets, or introduce delays as packets are naggled. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Trigger a clock synchronziation. - * <tr><td> Compute a clock delta to apply to the local clock. - * <tr><td> Estimate the error in the synchronzation. - * </table> - */ -public class UDPClockSynchronizer implements ClockSynchronizer -{ - /** Used for debugging. */ - // private static final Logger log = Logger.getLogger(UDPClockSynchronizer.class); - - /** Defines the timeout to use when waiting for responses to time requests. */ - private static final int TIMEOUT = 50; - - /** The clock delta. */ - private long delta = 0L; - - /** Holds an estimate of the clock error relative to the reference clock. */ - private long epsilon = 0L; - - /** Holds the address of the reference clock. */ - private InetAddress referenceAddress; - - /** Holds the socket to communicate with the reference service over. */ - private DatagramSocket socket; - - /** Used to control the shutdown in the main test loop. */ - private static boolean doSynch = true; - - /** - * Creates a clock synchronizer against the specified address for the reference. - * - * @param address The address of the reference service. - */ - public UDPClockSynchronizer(String address) - { - try - { - referenceAddress = InetAddress.getByName(address); - } - catch (UnknownHostException e) - { - throw new RuntimeException(e); - } - } - - /** - * The slave side should call this to compute a clock delta with the reference. - * - * @throws ClockSynchFailureException If synchronization cannot be achieved, due to unavailability of the reference - * time service. - */ - public void synch() throws ClockSynchFailureException - { - try - { - socket = new DatagramSocket(); - socket.setSoTimeout(TIMEOUT); - - // Synchronize on a single ping, to get the clock into the right ball-park. - synch(1); - - // Synchronize on 15 pings. - synch(15); - - // And again, for greater accuracy, on 31. - synch(31); - - socket.close(); - } - catch (SocketException e) - { - throw new RuntimeException(e); - } - } - - /** - * Updates the synchronization delta by performing the specified number of reference clock requests. - * - * @param n The number of reference clock request cycles to perform. - * - * @throws ClockSynchFailureException If synchronization cannot be achieved, due to unavailability of the reference - * time service. - */ - protected void synch(int n) throws ClockSynchFailureException - { - // log.debug("protected void synch(int n = " + n + "): called"); - - // Create an array of deltas by performing n reference pings. - long[] delta = new long[n]; - - for (int i = 0; i < n; i++) - { - delta[i] = ping(); - } - - // Reject any deltas that are larger than 1 s.d. above the median. - long median = median(delta); - long sd = standardDeviation(delta); - - // log.debug("median = " + median); - // log.debug("sd = " + sd); - - long[] tempDeltas = new long[n]; - int count = 0; - - for (int i = 0; i < n; i++) - { - if ((delta[i] <= (median + sd)) && (delta[i] >= (median - sd))) - { - tempDeltas[count] = delta[i]; - count++; - } - else - { - // log.debug("Rejected: " + delta[i]); - } - } - - System.arraycopy(tempDeltas, 0, delta, 0, count); - - // Estimate the delta as the mean of the remaining deltas. - this.delta += mean(delta); - - // Estimate the error as the standard deviation of the remaining deltas. - this.epsilon = standardDeviation(delta); - - // log.debug("this.delta = " + this.delta); - // log.debug("this.epsilon = " + this.epsilon); - } - - /** - * Performs a single reference clock request cycle and returns the estimated delta relative to the local clock. - * This is computed as the half-latency of the requst cycle, plus the reference clock, minus the local clock. - * - * @return The estimated clock delta. - * - * @throws ClockSynchFailureException If the reference service is not responding. - */ - protected long ping() throws ClockSynchFailureException - { - // log.debug("protected long ping(): called"); - - try - { - byte[] buf = new byte[256]; - - boolean timedOut = false; - long start = 0L; - long refTime = 0L; - long localTime = 0L; - long latency = 0L; - int failCount = 0; - - // Keep trying the ping until it gets a response, or 10 tries in a row all time out. - do - { - // Start timing the request latency. - start = nanoTime(); - - // Get the reference time. - DatagramPacket packet = - new DatagramPacket(buf, buf.length, referenceAddress, UDPClockReference.REFERENCE_PORT); - socket.send(packet); - packet = new DatagramPacket(buf, buf.length); - - timedOut = false; - - try - { - socket.receive(packet); - } - catch (SocketTimeoutException e) - { - timedOut = true; - failCount++; - - continue; - } - - ByteBuffer bbuf = ByteBuffer.wrap(packet.getData()); - refTime = bbuf.getLong(); - - // Stop timing the request latency. - localTime = nanoTime(); - latency = localTime - start; - - // log.debug("refTime = " + refTime); - // log.debug("localTime = " + localTime); - // log.debug("start = " + start); - // log.debug("latency = " + latency); - // log.debug("delta = " + ((latency / 2) + (refTime - localTime))); - - } - while (timedOut && (failCount < 10)); - - // Fail completely if the fail count is too high. - if (failCount >= 10) - { - throw new ClockSynchFailureException("Clock reference not responding.", null); - } - - // Estimate delta as (ref clock + half-latency) - local clock. - return (latency / 2) + (refTime - localTime); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - - /** - * Gets the clock delta in nano seconds. - * - * @return The clock delta in nano seconds. - */ - public long getDelta() - { - return delta; - } - - /** - * Gets an estimate of the clock error in nan seconds. - * - * @return An estimate of the clock error in nan seconds. - */ - public long getEpsilon() - { - return epsilon; - } - - /** - * Gets the local clock time with any computed delta added in. - * - * @return The local clock time with any computed delta added in. - */ - public long nanoTime() - { - return System.nanoTime() + delta; - } - - /** - * Computes the median of a series of values. - * - * @param values The values. - * - * @return The median. - */ - public static long median(long[] values) - { - // log.debug("public static long median(long[] values = " + Arrays.toString(values) + "): called"); - - long median; - - // Order the list of values. - long[] orderedValues = new long[values.length]; - System.arraycopy(values, 0, orderedValues, 0, values.length); - Arrays.sort(orderedValues); - - // Check if the median is computed from a pair of middle value. - if ((orderedValues.length % 2) == 0) - { - int middle = orderedValues.length / 2; - - median = (orderedValues[middle] + orderedValues[middle - 1]) / 2; - } - // The median is computed from a single middle value. - else - { - median = orderedValues[orderedValues.length / 2]; - } - - // log.debug("median = " + median); - - return median; - } - - /** - * Computes the mean of a series of values. - * - * @param values The values. - * - * @return The mean. - */ - public static long mean(long[] values) - { - // log.debug("public static long mean(long[] values = " + Arrays.toString(values) + "): called"); - - long total = 0L; - - for (long value : values) - { - total += value; - } - - long mean = total / values.length; - - // log.debug("mean = " + mean); - - return mean; - } - - /** - * Computes the variance of series of values. - * - * @param values The values. - * - * @return The variance of the values. - */ - public static long variance(long[] values) - { - // log.debug("public static long variance(long[] values = " + Arrays.toString(values) + "): called"); - - long mean = mean(values); - - long totalVariance = 0; - - for (long value : values) - { - long diff = (value - mean); - totalVariance += diff * diff; - } - - long variance = totalVariance / values.length; - - // log.debug("variance = " + variance); - - return variance; - } - - /** - * Computes the standard deviation of a series of values. - * - * @param values The values. - * - * @return The standard deviation. - */ - public static long standardDeviation(long[] values) - { - // log.debug("public static long standardDeviation(long[] values = " + Arrays.toString(values) + "): called"); - - long sd = Double.valueOf(Math.sqrt(variance(values))).longValue(); - - // log.debug("sd = " + sd); - - return sd; - } - - /** - * For testing purposes. Supply address of reference clock as arg 1. - * - * @param args Address of reference clock as arg 1. - */ - public static void main(String[] args) - { - ParsedProperties options = - new ParsedProperties(CommandLineParser.processCommandLine(args, - new CommandLineParser( - new String[][] - { - { "1", "Address of clock reference service.", "address", "true" } - }), System.getProperties())); - - String address = options.getProperty("1"); - - // Create a clock synchronizer. - UDPClockSynchronizer clockSyncher = new UDPClockSynchronizer(address); - - // Set up a shutdown hook for it. - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() - { - public void run() - { - doSynch = false; - } - })); - - // Repeat the clock synching until the user kills the progam. - while (doSynch) - { - // Perform a clock clockSynch. - try - { - clockSyncher.synch(); - - // Print out the clock delta and estimate of the error. - System.out.println("Delta = " + clockSyncher.getDelta()); - System.out.println("Epsilon = " + clockSyncher.getEpsilon()); - - try - { - Thread.sleep(250); - } - catch (InterruptedException e) - { - // Restore the interrupted status and terminate the loop. - Thread.currentThread().interrupt(); - doSynch = false; - } - } - // Terminate if the reference time service is unavailable. - catch (ClockSynchFailureException e) - { - doSynch = false; - } - } - } -} - |