//--------------------------------------------------------------------------- // CSETPNode.java -- Sample solution for CSEP590SG HW2a // // Here is my sample solution/authoritative server/test engine. // The code is basically divided up as follows: // // * class CSETPNode handles arguments and sets up the proper // set of client and server "sides". It also keeps the // authoritative clock, and passes references to it to the // sides. // // * class CSETPForwardServerSide implements the client-initiation server, // handling the bound socket and starting up CSETPServerConversations // // * class CSETPInverseServerSide implements the server-initiation server, // watching for name/port pairs on stdin, and then kickstarting connections // to those clients and starting up a CSETPServerConversation for them // // Note the use of generic server conversations for both kinds of servers, // this is combined with a generic CSETPServerSide interface, which allows // the conversations to register themselves with the parent and let it // know when they are done. The client sides do not share implementation // this way, because one of them is a normal client and the other is the // test engine, which doesn't really syncronize // // * class CSETPForwardClientSide implements the client-initiation client. // Note that there is no separate conversation; the side has only one // connection. The CSETPNode ensures that only one client side is // created. This has a reference to the authoritative clock so it can // update it. Note that this uses the constants from the node to // control the assumed min RTT and such. // // * class CSETPInverseClientSide implements the server-initiation client, // which is the test engine. This *does* follow the side/conversation // metaphor, so the side listens for kickstarts, and spawns off // conversations to run the tests. // // * class CSETPInverseClientConversation implements a single test. // Note that this does not use the constants from the node, and // in particular, probes for an estimated min RTT before starting // the real test. It also keeps both a local clock, which it // updates, and a reference to the authoritative clock, to check // against. // // * Rand is my own random number generator, which I moved here // so you can compile this without my packages. // // Usage: there are two basic ways to start this up. With no arguments // (or with only a logfile), it starts in the authoritativer server // mode, with a forward client side and an inverse server side. In this // mode it prints a periodic status on stdout, using a CR to overwrite. // // If given at least a server name, it starts in the normal mode, with // a forward client side connected to that server. By default, it // enables a forward server side but not an inverse server side, but // by using the +/-iserv and +/-fserv options these may be run in // any combination. // // A final note for anyone thinking about running this: my email // is hardcoded in the inverse server side, so change that to your // email. Also, this uses elm externally for email, and odds are // you don't have that. // // Andy Collins - acollins@cs.washington.edu - University of Washington //--------------------------------------------------------------------------- import java.io.*; import java.util.*; import java.net.*; import java.text.*; public class CSETPNode implements Runnable { // These are the control constants for the clock // synchronization calculations performed by the // client sides public static long minRTT = 40; // assumed minimum RTT, in ms public static double rho = 0.01; // assumed worst-case clock drift public static long overallError = 200; // from assignment, worst case clock error, in ms public static long syncError = 150; // from experience, worst case error after sync, in ms // These constants are for the test engine only; they have // no basis in clock syncrhonization theory public static int numSyncTests = 10; public static double meanSyncTestSpacing = 60.0; // in seconds public static int syncTestExpectedTier = 2; static int numMinRTTProbes = 20; static double meanMinRTTProbeSpacing = 0.5; private static Rand rand = new Rand (42); public static PrintStream log = System.err; public static void main (String[] args) { try { boolean enableForwardServer = true; boolean enableInverseServer = false; String inverseServerEmail = "acollins@cs.washington.edu"; String forwardClientServerName = null; int forwardClientServerPort = 21319; int forwardServerBindPort = 21319; int inverseClientBindPort = 21320; String logFilename = null; for (int i=0; i= 0 && conversations.size () < 200) { CSETPServerConversation c = new CSETPServerConversation (this, localClock, tierNumber, conversation); conversations.put (c, Boolean.TRUE); } else { if (tierNumber < 0) CSETPNode.log.println ("Warning: rejected connection attempt due to no good clock"); else CSETPNode.log.println ("Warning: rejected connection attempt due to too many connections"); conversation.close (); } } } catch (IOException ioe) { System.err.println ("IOException in listen loop: " + ioe); System.exit (1); } } } class CSETPServerConversation implements Runnable { private static Rand rand = new Rand (37); public CSETPServerConversation (CSETPServerSide server, OffsetClock clock, int tier, Socket sock) throws IOException { listener = server; localClock = clock; tierNumber = tier; socket = sock; input = new BufferedReader (new InputStreamReader (socket.getInputStream ())); output = new PrintStream (socket.getOutputStream ()); thread = new Thread (this); thread.start (); } private CSETPServerSide listener; private OffsetClock localClock; private int tierNumber; private Socket socket; private BufferedReader input; private PrintStream output; private Thread thread; public void run () { DateFormat df = DateFormat.getDateTimeInstance (); try { CSETPNode.log.println ("client " + socket.getInetAddress ().getHostAddress () + "/" + socket.getPort () + ": opened connection at " + df.format (new Date (localClock.CurrentTimeMillis ()))); String line = input.readLine (); while (line != null) { StringTokenizer tokens = new StringTokenizer (line); int seqnum = Integer.parseInt (tokens.nextToken ()); try { Thread.currentThread ().sleep (20 + (long) rand.nextExponential (50.0)); } catch (InterruptedException ie) { CSETPNode.log.println ("WARNING: interrupted"); } long reportedTime = localClock.CurrentTimeMillis (); try { Thread.currentThread ().sleep (20 + (long) rand.nextExponential (50.0)); } catch (InterruptedException ie) { CSETPNode.log.println ("WARNING: interrupted"); } CSETPNode.log.println ("client " + socket.getInetAddress ().getHostAddress () + "/" + socket.getPort () + ": sent (" + seqnum + "\t" + tierNumber + "\t" + df.format (new Date (reportedTime)) + "." + (reportedTime % 1000) + ")"); output.print ("" + seqnum + "\t" + tierNumber + "\t" + reportedTime + "\r\n"); line = input.readLine (); } } catch (Exception e) { CSETPNode.log.println ("client " + socket.getInetAddress ().getHostAddress () + "/" + socket.getPort () + ": at " + df.format (new Date (localClock.CurrentTimeMillis ())) + " exception: " + e); } try { CSETPNode.log.println ("client " + socket.getInetAddress ().getHostAddress () + "/" + socket.getPort () + ": closed connection at " + df.format (new Date (localClock.CurrentTimeMillis ()))); listener.RemoveConversation (this); socket.close (); } catch (IOException e) { } } } interface CSETPClientSide { public int GetTierNumber (); } class CSETPForwardClientSide implements Runnable, CSETPClientSide { public CSETPForwardClientSide (OffsetClock clock, InetAddress serverAddr, int serverPort) throws IOException { localClock = clock; tierNumber = -1; socket = new Socket (serverAddr, serverPort); output = new PrintStream (socket.getOutputStream ()); input = new BufferedReader (new InputStreamReader (socket.getInputStream ())); seqnum = 0; // These calculations both assume the local clock is as bad as rho, // so we take them to be conservative maxAcceptableRTT = (long) (2.0 * (double) (CSETPNode.syncError + CSETPNode.minRTT / 2.0) / (1.0 + 2.0 * CSETPNode.rho)); syncSpacing = (long) ((double) (CSETPNode.overallError - CSETPNode.syncError) / (2.0 * CSETPNode.rho)); assumedMinRTT = CSETPNode.minRTT; thread = new Thread (this); thread.start (); } private long assumedMinRTT; private long maxAcceptableRTT; private long syncSpacing; private OffsetClock localClock; private int tierNumber; private Socket socket; private PrintStream output; private BufferedReader input; private int seqnum; private Thread thread; public int GetTierNumber () { return tierNumber; } public void run () { CSETPNode.log.println ("Local client: max RTT " + maxAcceptableRTT + "\t min sync interval " + syncSpacing); try { for (;;) { boolean goodSync = false; while (!goodSync) { long before = localClock.CurrentTimeMillis (); output.print ("" + (++seqnum) + "\r\n"); String response = input.readLine (); long after = localClock.CurrentTimeMillis (); if (response == null) { System.err.println ("Unexpected EOF on forward client socket"); System.exit (1); } if (after - before < assumedMinRTT) { System.err.println ("ERROR: violation of assumed min RTT"); System.exit (1); } int reportedSeqnum = 0; int reportedTier = -1; long reportedTime = 0; try { StringTokenizer tokens = new StringTokenizer (response); reportedSeqnum = Integer.parseInt (tokens.nextToken ()); reportedTier = Integer.parseInt (tokens.nextToken ()); reportedTime = Long.parseLong (tokens.nextToken ()); } catch (Exception e) { System.err.println ("Parse error on response '" + response + "'"); System.exit (1); } if (reportedSeqnum == seqnum) { if (tierNumber < 0) tierNumber = reportedTier; else if (tierNumber != reportedTier) { System.err.println ("ERROR: tier number mismatch"); System.exit (1); } if (after - before <= maxAcceptableRTT) { long oldTime = localClock.CurrentTimeMillis (); long newTime = reportedTime + (after - before) / 2; DateFormat df = DateFormat.getDateTimeInstance (); System.out.println ("Got seq: " + reportedSeqnum + " " + df.format (new Date (reportedTime)) + "." + (reportedTime % 1000) + " rtt: " + (after-before) + "\n setting time from: " + df.format (new Date (oldTime)) + "." + (oldTime % 1000) + "\n setting time to: " + df.format (new Date (newTime)) + "." + (newTime % 1000)); localClock.SetCurrentTimeMillis (newTime); goodSync = true; } } else { System.err.println ("Bad seqnum on forward client response '" + response + "'"); System.exit (1); } } try { Thread.currentThread ().sleep (syncSpacing); } catch (InterruptedException ie) { } } } catch (IOException ioe) { System.err.println ("IOException in client loop: " + ioe); System.exit (1); } } } class CSETPInverseServerSide implements CSETPServerSide, Runnable { public CSETPInverseServerSide (OffsetClock clock, CSETPClientSide client, String email) throws IOException { localClock = clock; clientSide = client; emailAddr = email; conversations = new Hashtable (); thread = new Thread (this); thread.start (); } public int NumConnections () { return conversations.size (); } private CSETPServerConversation conversation; private OffsetClock localClock; private CSETPClientSide clientSide; private String emailAddr; private Hashtable conversations; private Thread thread; public void RemoveConversation (CSETPServerConversation c) { conversations.remove (c); } public void run () { try { BufferedReader input = new BufferedReader (new InputStreamReader (System.in)); String line = input.readLine (); while (line != null) { try { StringTokenizer tokens = new StringTokenizer (line); String clientName = tokens.nextToken (); InetAddress clientAddr = InetAddress.getByName (clientName); int clientPort = Integer.parseInt (tokens.nextToken ()); // Open a socket, to give to the conversation Socket socket = new Socket (clientAddr, clientPort); PrintStream output = new PrintStream (socket.getOutputStream ()); // Either this is the root, or else we need to have gotten a tier num int tierNumber = 0; if (clientSide != null) { tierNumber = clientSide.GetTierNumber () + 1; } if (tierNumber >= 0) { CSETPServerConversation c = new CSETPServerConversation (this, localClock, tierNumber, socket); conversations.put (c, Boolean.TRUE); } else { System.out.println ("ERROR: rejected inverse server startup: no local tier number"); socket.close (); } // And kick the client to get things going output.print ("" + emailAddr + "\r\n"); } catch (Exception e) { System.out.println ("ERROR initiating inverse server connection '" + line + "': " + e); } line = input.readLine (); } } catch (IOException ioe) { System.err.println ("Bad IOException: " + ioe); System.exit (1); } } } class CSETPInverseClientSide implements Runnable { public CSETPInverseClientSide (OffsetClock clock, int wellKnownPort) throws IOException { authoritativeClock = clock; listenSocket = new ServerSocket (wellKnownPort); conversations = new Hashtable (); thread = new Thread (this); thread.start (); } public int NumConnections () { return conversations.size (); } private OffsetClock authoritativeClock; private ServerSocket listenSocket; private Hashtable conversations; private Thread thread; public void RemoveConversation (CSETPInverseClientConversation c) { conversations.remove (c); } public void run () { try { for (;;) { Socket conversation = listenSocket.accept (); if (conversations.size () < 200) { CSETPInverseClientConversation c = new CSETPInverseClientConversation (this, authoritativeClock, conversation); conversations.put (c, Boolean.TRUE); } else { PrintStream output = new PrintStream (conversation.getOutputStream ()); output.println ("Too many threads, try again later"); conversation.close (); } } } catch (IOException ioe) { System.err.println ("IOException in listen loop: " + ioe); System.exit (1); } } } class CSETPInverseClientConversation implements Runnable { private static Rand rand = new Rand (37); public CSETPInverseClientConversation (CSETPInverseClientSide server, OffsetClock clock, Socket sock) throws IOException { listener = server; authoritativeClock = clock; testClock = new OffsetClock (); socket = sock; input = new BufferedReader (new InputStreamReader (socket.getInputStream ())); output = new PrintStream (socket.getOutputStream ()); seqnum = 42; tierNumber = -1; // This calculation assumes the local clock is as bad as rho, // so we take it to be conservative. We also force minRTT to zero, // because we don't know anything about the servers we might be // talking to, but we refine it later on. maxAcceptableRTT = (long) (2.0 * (double) (CSETPNode.syncError) / (1.0 + 2.0*CSETPNode.rho)); assumedMinRTT = 0; thread = new Thread (this); thread.start (); } private long assumedMinRTT; private long maxAcceptableRTT; private CSETPInverseClientSide listener; private OffsetClock authoritativeClock; private OffsetClock testClock; private Socket socket; private BufferedReader input; private PrintStream output; private int seqnum; private int tierNumber; private Thread thread; // returns null if email is okay private static String CheckEmailAddr (String emailAddr) { int atIndex = emailAddr.indexOf ("@"); if (atIndex < 0) return "No @ sign"; else if (atIndex == 0) return "No addr before @ sign"; String domain = emailAddr.substring (atIndex+1); if (domain.equals ("")) return "No domain"; /* try { InetAddress domainAddr = InetAddress.getByName (domain); } catch (UnknownHostException uhe) { return "unknown host " + domain; } */ return null; } public void run () { DateFormat df = DateFormat.getDateTimeInstance (); String emailAddr = "UNKNOWN"; try { emailAddr = input.readLine (); String badAddrReason = CheckEmailAddr (emailAddr); if (badAddrReason == null) { // Open the email process, and get a PrintStream we can write to Process p = Runtime.getRuntime ().exec ("elm -s CSETPOTF_response acollins@cs.washington.edu " + emailAddr); OutputStream emailOS = p.getOutputStream (); PrintStream email = new PrintStream (emailOS); CSETPNode.log.println ("iserver " + socket.getInetAddress ().getHostAddress () + "/" + socket.getPort () + ": initiated test for '" + emailAddr + "' at " + df.format (new Date (authoritativeClock.CurrentTimeMillis ()))); email.println ("CSETP Online Test Facility"); long authStartTime = authoritativeClock.CurrentTimeMillis (); email.println (" test started at " + df.format (new Date (authStartTime)) + "." + (authStartTime % 1000) + "\n wall-clock time " + df.format (new Date ())); // First, run a few queries to get a feel for the min RTT // Note that we can be a little sloppy about the min RTT because our // test goal is so conservative, and because we aren't running a clock // or serving up anything based on this. The only danger is that we // will see only bad samples here, and then allow a bad sample later // on, rather than squelching it and retrying like we should long minRTT = -1; for (int i=0; i < CSETPNode.numMinRTTProbes; ++i) { try { Thread.currentThread ().sleep (1000 * (long) rand.nextExponential (CSETPNode.meanMinRTTProbeSpacing)); } catch (InterruptedException e) { } CSETPNode.log.println ("iserver " + socket.getInetAddress ().getHostAddress () + "/" + socket.getPort () + " '" + emailAddr + "' sent probe (" + (seqnum+1) + ") at " + df.format (new Date (authoritativeClock.CurrentTimeMillis ()))); long before = testClock.CurrentTimeMillis (); output.print ("" + (++seqnum) + "\r\n"); String response = input.readLine (); long after = testClock.CurrentTimeMillis (); CSETPNode.log.println ("iserver " + socket.getInetAddress ().getHostAddress () + "/" + socket.getPort () + " '" + emailAddr + "' got literal response (" + response + ") at " + df.format (new Date (authoritativeClock.CurrentTimeMillis ()))); CSETPNode.log.println ("iserver " + socket.getInetAddress ().getHostAddress () + "/" + socket.getPort () + " '" + emailAddr + "' RTT sample was " + (after-before) + " ms"); if (minRTT < 0 || (after-before) < minRTT) minRTT = after-before; } maxAcceptableRTT = (long) (2.0 * (double) (CSETPNode.syncError + minRTT / 2.0) / (1.0 + 2.0 * CSETPNode.rho)); assumedMinRTT = minRTT; CSETPNode.log.println ("iserver " + socket.getInetAddress ().getHostAddress () + "/" + socket.getPort () + " '" + emailAddr + "' set min RTT to " + minRTT + "; starting real test at " + df.format (new Date (authoritativeClock.CurrentTimeMillis ()))); email.println (""); email.println ("Sampled min RTT of " + minRTT + "ms"); email.println (""); // Now run the actual test boolean testFailed = false; long errors[] = new long[CSETPNode.numSyncTests]; for (int i=0; i 10) { email.println (" too many attempts; bailing out"); testFailed = true; break; } email.println (" query seqnum " + seqnum + ":"); if (response == null) { email.println (" unexpected EOF"); testFailed = true; break; } if (after - before < assumedMinRTT) { minRTT = after - before; maxAcceptableRTT = (long) (2.0 * (double) (CSETPNode.syncError + minRTT / 2.0) / (1.0 + 2.0 * CSETPNode.rho)); assumedMinRTT = minRTT; CSETPNode.log.println ("iserver " + socket.getInetAddress ().getHostAddress () + "/" + socket.getPort () + " '" + emailAddr + "' adjusted min RTT to " + minRTT + " at " + df.format (new Date (authoritativeClock.CurrentTimeMillis ()))); break; } int reportedSeqnum = 0; int reportedTier = -1; long reportedTime = 0; try { StringTokenizer tokens = new StringTokenizer (response); reportedSeqnum = Integer.parseInt (tokens.nextToken ()); reportedTier = Integer.parseInt (tokens.nextToken ()); reportedTime = Long.parseLong (tokens.nextToken ()); } catch (Exception e) { email.println (" parse error on response '" + response + "'"); testFailed = true; break; } if (reportedSeqnum == seqnum) { if (tierNumber < 0) tierNumber = reportedTier; else if (tierNumber != reportedTier) { email.println (" ERROR: tier number mismatch"); testFailed = true; break; } if (after - before <= maxAcceptableRTT) { long oldTime = testClock.CurrentTimeMillis (); long newTime = reportedTime + (after - before) / 2; long authTime = authoritativeClock.CurrentTimeMillis (); email.println (" RTT " + (after - before) + " usable:" + "\n setting time to: " + df.format (new Date (newTime)) + "." + (newTime % 1000) + "\n true time now: " + df.format (new Date (authTime)) + "." + (authTime % 1000)); testClock.SetCurrentTimeMillis (newTime); errors[i] = Math.abs (newTime - authTime); goodSync = true; } else email.println (" RTT " + (after - before) + " exceeded max of " + maxAcceptableRTT + "; so retry"); } else { email.println (" bad seqnum on forward client response '" + response + "'"); } } } email.println (""); email.println (""); email.println (""); if (testFailed) { CSETPNode.log.println ("iserver " + socket.getInetAddress ().getHostAddress () + "/" + socket.getPort () + " '" + emailAddr + "' test failed. connection closed at " + df.format (new Date (authoritativeClock.CurrentTimeMillis ()))); email.println ("Test failed. Fix the problem and try again"); } else { if (tierNumber == CSETPNode.syncTestExpectedTier) { email.println ("Test completed. Reported tier number was " + tierNumber + ", as expected"); } else { email.println ("Test completed. Reported tier number was " + tierNumber + ", but should have been " + CSETPNode.syncTestExpectedTier); } email.println (""); email.println ("Summary of errors: "); email.println (""); long maxError = -1; long secondError = -1; for (int i=0; i maxError) { secondError = maxError; maxError = errors[i]; } else if (errors[i] > secondError) secondError = errors[i]; email.println (" test " + i + ":\t" + errors[i] + " ms error"); } email.println (""); email.println ("max error = " + maxError + " ms; second error = " + secondError + "ms"); email.println (""); if (secondError < 600 && tierNumber == CSETPNode.syncTestExpectedTier) email.println ("Congratulations, you passed the CSETPOTF test."); else if (secondError < 600) email.println ("Congratulations, you passed the error test,\nbut need to fix the tier numbers and/or server setup"); else email.println ("Sorry, this error exceeds the specification of 600 ms.\nPlease try again."); CSETPNode.log.println ("iserver " + socket.getInetAddress ().getHostAddress () + "/" + socket.getPort () + " '" + emailAddr + "' test completed. connection closed at " + df.format (new Date (authoritativeClock.CurrentTimeMillis ()))); } email.println (""); // Closing the stream will send the message emailOS.close (); p.waitFor (); } else CSETPNode.log.println ("iserver " + socket.getInetAddress ().getHostAddress () + "/" + socket.getPort () + ": bad email '" + emailAddr + "' (" + badAddrReason + ") at " + df.format (new Date (authoritativeClock.CurrentTimeMillis ()))); } catch (Exception e) { CSETPNode.log.println ("Warning: exception in online-test for '" + emailAddr + "': " + e); } try { listener.RemoveConversation (this); socket.close (); } catch (IOException e) { } } } //--------------------------------------------------------------------------- // Andy's own random number generator //--------------------------------------------------------------------------- class Rand extends Random { public Rand () { super (); } public Rand (long seed) { super (seed); } public double nextExponential (double mean) { return - mean * Math.log (nextDouble ()); } public double nextChiSquared (int degree, double mean, double sd) { double val = nextGaussian (); double extra = 1.0; for (int i=1; i