summaryrefslogtreecommitdiff
path: root/qpid/dotnet/Qpid.Integration.Tests/framework/clocksynch/UDPClockSynchronizer.csx
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/dotnet/Qpid.Integration.Tests/framework/clocksynch/UDPClockSynchronizer.csx')
-rw-r--r--qpid/dotnet/Qpid.Integration.Tests/framework/clocksynch/UDPClockSynchronizer.csx453
1 files changed, 453 insertions, 0 deletions
diff --git a/qpid/dotnet/Qpid.Integration.Tests/framework/clocksynch/UDPClockSynchronizer.csx b/qpid/dotnet/Qpid.Integration.Tests/framework/clocksynch/UDPClockSynchronizer.csx
new file mode 100644
index 0000000000..98b02a043b
--- /dev/null
+++ b/qpid/dotnet/Qpid.Integration.Tests/framework/clocksynch/UDPClockSynchronizer.csx
@@ -0,0 +1,453 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using log4net;
+
+using uk.co.thebadgerset.junit.extensions.util.CommandLineParser;
+using uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+
+using java.io.IOException;
+using java.net.*;
+using java.nio.ByteBuffer;
+using java.util.Arrays;
+
+namespace Apache.Qpid.Integration.Tests.framework.clocksynch
+{
+ /// <summary>
+ /// UDPClockSynchronizer is a <see cref="ClockSynchronizer"/> that sends pings as UDP datagrams, and uses the following simple
+ /// algorithm to perform clock synchronization:
+ ///
+ /// <ol>
+ /// <li>Slave initiates synchronization with a Reference clock.</li>
+ /// <li>Slave stamps current local time on a "time request" message and sends to the Reference.</li>
+ /// <li>Upon receipt by Reference, Reference stamps Reference-time and returns.</li>
+ /// <li>Upon receipt by Slave, Slave subtracts current time from sent time and divides by two to compute latency. It
+ /// subtracts current time from Reference time to determine Slave-Reference time delta and adds in the
+ /// half-latency to get the correct clock delta.</li>
+ /// <li>The first result is immediately used to update the clock since it will get the local clock into at least
+ /// the right ballpark.</li>
+ /// <li>The Slave repeats steps 2 through 4, 15 more times.</li>
+ /// <li>The results of the packet receipts are accumulated and sorted in lowest-latency to highest-latency order. The
+ /// median latency is determined by picking the mid-point sample from this ordered list.</li>
+ /// <li>All samples outside 1 standard-deviation from the median are discarded and the remaining samples
+ /// are averaged using an arithmetic mean.</li>
+ /// </ol>
+ ///
+ /// <p/>The use of UDP datagrams, instead of TCP based communication eliminates the hidden delays that TCP can introduce,
+ /// as it can transparently re-order or re-send packets, or introduce delays as packets are naggled.
+ ///
+ /// <p/><table id="crc"><caption>CRC Card</caption>
+ /// <tr><th> Responsibilities <th> Collaborations
+ /// <tr><td> Trigger a clock synchronziation.
+ /// <tr><td> Compute a clock delta to apply to the local clock.
+ /// <tr><td> Estimate the error in the synchronzation.
+ /// </table>
+ /// </summary>
+ public class UDPClockSynchronizer : ClockSynchronizer
+ {
+ /// <summary> Used for debugging. </summary>
+ // private static ILog log = LogManager.GetLogger(typeof(UDPClockSynchronizer));
+
+ /// <summary> Defines the timeout to use when waiting for responses to time requests. </summary>
+ private static final int TIMEOUT = 50;
+
+ /// <summary> The clock delta. </summary>
+ private long delta = 0L;
+
+ /// <summary> Holds an estimate of the clock error relative to the reference clock. </summary>
+ private long epsilon = 0L;
+
+ /// <summary> Holds the address of the reference clock. </summary>
+ private InetAddress referenceAddress;
+
+ /// <summary> Holds the socket to communicate with the reference service over. </summary>
+ private DatagramSocket socket;
+
+ /// <summary> Used to control the shutdown in the main test loop. </summary>
+ private static bool doSynch = true;
+
+ /// <summary>
+ /// Creates a clock synchronizer against the specified address for the reference.
+ /// </summary>
+ /// <param name="address"> The address of the reference service. </param>
+ public UDPClockSynchronizer(string address)
+ {
+ try
+ {
+ referenceAddress = InetAddress.getByName(address);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /// <summary>
+ /// The slave side should call this to compute a clock delta with the reference.
+ /// </summary>
+ /// <exception cref="ClockSynchFailureException"> If synchronization cannot be achieved, due to unavailability of the reference
+ /// time service. </exception>
+ public void synch() throws ClockSynchFailureException
+ {
+ try
+ {
+ socket = new DatagramSocket();
+ socket.setSoTimeout(TIMEOUT);
+
+ // Synchronize on a single ping, to get the clock into the right ball-park.
+ synch(1);
+
+ // Synchronize on 15 pings.
+ synch(15);
+
+ // And again, for greater accuracy, on 31.
+ synch(31);
+
+ socket.close();
+ }
+ catch (SocketException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /// <summary>
+ /// Updates the synchronization delta by performing the specified number of reference clock requests.
+ /// </summary>
+ /// <param name="n"> The number of reference clock request cycles to perform. </param>
+ ///
+ /// <exception cref="ClockSynchFailureException"> If synchronization cannot be achieved, due to unavailability of the reference
+ /// time service. </exception>
+ protected void synch(int n) throws ClockSynchFailureException
+ {
+ // log.debug("protected void synch(int n = " + n + "): called");
+
+ // Create an array of deltas by performing n reference pings.
+ long[] delta = new long[n];
+
+ for (int i = 0; i < n; i++)
+ {
+ delta[i] = ping();
+ }
+
+ // Reject any deltas that are larger than 1 s.d. above the median.
+ long median = median(delta);
+ long sd = standardDeviation(delta);
+
+ // log.debug("median = " + median);
+ // log.debug("sd = " + sd);
+
+ long[] tempDeltas = new long[n];
+ int count = 0;
+
+ for (int i = 0; i < n; i++)
+ {
+ if ((delta[i] <= (median + sd)) && (delta[i] >= (median - sd)))
+ {
+ tempDeltas[count] = delta[i];
+ count++;
+ }
+ else
+ {
+ // log.debug("Rejected: " + delta[i]);
+ }
+ }
+
+ System.arraycopy(tempDeltas, 0, delta, 0, count);
+
+ // Estimate the delta as the mean of the remaining deltas.
+ this.delta += mean(delta);
+
+ // Estimate the error as the standard deviation of the remaining deltas.
+ this.epsilon = standardDeviation(delta);
+
+ // log.debug("this.delta = " + this.delta);
+ // log.debug("this.epsilon = " + this.epsilon);
+ }
+
+ /// <summary>
+ /// Performs a single reference clock request cycle and returns the estimated delta relative to the local clock.
+ /// This is computed as the half-latency of the requst cycle, plus the reference clock, minus the local clock.
+ /// </summary>
+ /// <return> The estimated clock delta. </return>
+ ///
+ /// <exception cref="ClockSynchFailureException"> If the reference service is not responding. </exception>
+ protected long ping() throws ClockSynchFailureException
+ {
+ // log.debug("protected long ping(): called");
+
+ try
+ {
+ byte[] buf = new byte[256];
+
+ bool timedOut = false;
+ long start = 0L;
+ long refTime = 0L;
+ long localTime = 0L;
+ long latency = 0L;
+ int failCount = 0;
+
+ // Keep trying the ping until it gets a response, or 10 tries in a row all time out.
+ do
+ {
+ // Start timing the request latency.
+ start = nanoTime();
+
+ // Get the reference time.
+ DatagramPacket packet =
+ new DatagramPacket(buf, buf.length, referenceAddress, UDPClockReference.REFERENCE_PORT);
+ socket.send(packet);
+ packet = new DatagramPacket(buf, buf.length);
+
+ timedOut = false;
+
+ try
+ {
+ socket.receive(packet);
+ }
+ catch (SocketTimeoutException e)
+ {
+ timedOut = true;
+ failCount++;
+
+ continue;
+ }
+
+ ByteBuffer bbuf = ByteBuffer.wrap(packet.getData());
+ refTime = bbuf.getLong();
+
+ // Stop timing the request latency.
+ localTime = nanoTime();
+ latency = localTime - start;
+
+ // log.debug("refTime = " + refTime);
+ // log.debug("localTime = " + localTime);
+ // log.debug("start = " + start);
+ // log.debug("latency = " + latency);
+ // log.debug("delta = " + ((latency / 2) + (refTime - localTime)));
+
+ }
+ while (timedOut && (failCount < 10));
+
+ // Fail completely if the fail count is too high.
+ if (failCount >= 10)
+ {
+ throw new ClockSynchFailureException("Clock reference not responding.", null);
+ }
+
+ // Estimate delta as (ref clock + half-latency) - local clock.
+ return (latency / 2) + (refTime - localTime);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /// <summary>
+ /// Gets the clock delta in nano seconds.
+ /// </summary>
+ /// <return> The clock delta in nano seconds. </return>
+ public long getDelta()
+ {
+ return delta;
+ }
+
+ /// <summary>
+ /// Gets an estimate of the clock error in nan seconds.
+ /// </summary>
+ /// <return> An estimate of the clock error in nan seconds. </return>
+ public long getEpsilon()
+ {
+ return epsilon;
+ }
+
+ /// <summary>
+ /// Gets the local clock time with any computed delta added in.
+ /// </summary>
+ /// <return> The local clock time with any computed delta added in. </return>
+ public long nanoTime()
+ {
+ return System.nanoTime() + delta;
+ }
+
+ /// <summary>
+ /// Computes the median of a series of values.
+ /// </summary>
+ /// <param name="values"> The values. </param>
+ ///
+ /// <return> The median. </return>
+ public static long median(long[] values)
+ {
+ // log.debug("public static long median(long[] values = " + Arrays.ToString(values) + "): called");
+
+ long median;
+
+ // Order the list of values.
+ long[] orderedValues = new long[values.length];
+ System.arraycopy(values, 0, orderedValues, 0, values.length);
+ Arrays.sort(orderedValues);
+
+ // Check if the median is computed from a pair of middle value.
+ if ((orderedValues.length % 2) == 0)
+ {
+ int middle = orderedValues.length / 2;
+
+ median = (orderedValues[middle] + orderedValues[middle - 1]) / 2;
+ }
+ // The median is computed from a single middle value.
+ else
+ {
+ median = orderedValues[orderedValues.length / 2];
+ }
+
+ // log.debug("median = " + median);
+
+ return median;
+ }
+
+ /// <summary>
+ /// Computes the mean of a series of values.
+ /// </summary>
+ /// <param name="values"> The values. </param>
+ ///
+ /// <return> The mean. </return>
+ public static long mean(long[] values)
+ {
+ // log.debug("public static long mean(long[] values = " + Arrays.ToString(values) + "): called");
+
+ long total = 0L;
+
+ for (long value : values)
+ {
+ total += value;
+ }
+
+ long mean = total / values.length;
+
+ // log.debug("mean = " + mean);
+
+ return mean;
+ }
+
+ /// <summary>
+ /// Computes the variance of series of values.
+ /// </summary>
+ /// <param name="values"> The values. </param>
+ ///
+ /// <return> The variance of the values. </return>
+ public static long variance(long[] values)
+ {
+ // log.debug("public static long variance(long[] values = " + Arrays.ToString(values) + "): called");
+
+ long mean = mean(values);
+
+ long totalVariance = 0;
+
+ for (long value : values)
+ {
+ long diff = (value - mean);
+ totalVariance += diff/// diff;
+ }
+
+ long variance = totalVariance / values.length;
+
+ // log.debug("variance = " + variance);
+
+ return variance;
+ }
+
+ /// <summary>
+ /// Computes the standard deviation of a series of values.
+ /// </summary>
+ /// <param name="values"> The values. </param>
+ ///
+ /// <return> The standard deviation. </return>
+ public static long standardDeviation(long[] values)
+ {
+ // log.debug("public static long standardDeviation(long[] values = " + Arrays.ToString(values) + "): called");
+
+ long sd = Double.valueOf(Math.sqrt(variance(values))).longValue();
+
+ // log.debug("sd = " + sd);
+
+ return sd;
+ }
+
+ /// <summary>
+ /// For testing purposes. Supply address of reference clock as arg 1.
+ /// </summary>
+ /// <param name="args"> Address of reference clock as arg 1. </param>
+ public static void main(String[] args)
+ {
+ ParsedProperties options =
+ new ParsedProperties(CommandLineParser.processCommandLine(args,
+ new CommandLineParser(
+ new String[][]
+ {
+ { "1", "Address of clock reference service.", "address", "true" }
+ }), System.getProperties()));
+
+ string address = options.getProperty("1");
+
+ // Create a clock synchronizer.
+ UDPClockSynchronizer clockSyncher = new UDPClockSynchronizer(address);
+
+ // Set up a shutdown hook for it.
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable()
+ {
+ public void run()
+ {
+ doSynch = false;
+ }
+ }));
+
+ // Repeat the clock synching until the user kills the progam.
+ while (doSynch)
+ {
+ // Perform a clock clockSynch.
+ try
+ {
+ clockSyncher.synch();
+
+ // Print out the clock delta and estimate of the error.
+ System.out.println("Delta = " + clockSyncher.getDelta());
+ System.out.println("Epsilon = " + clockSyncher.getEpsilon());
+
+ try
+ {
+ Thread.sleep(250);
+ }
+ catch (InterruptedException e)
+ {
+ // Restore the interrupted status and terminate the loop.
+ Thread.currentThread().interrupt();
+ doSynch = false;
+ }
+ }
+ // Terminate if the reference time service is unavailable.
+ catch (ClockSynchFailureException e)
+ {
+ doSynch = false;
+ }
+ }
+ }
+ }
+} \ No newline at end of file