From 2bbee835b29f2ca87fcf459fb37e9d626ee4a297 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Mon, 14 Jan 2019 13:56:36 -0800 Subject: [PATCH] Load balancer support via External Mode (#70) * Fix command line arguments to enable supporting external server mode. * Initial version of external mode server. This can work behind a load balancer. * Intermediate changes for xserver/xclient. * Few fixes for external mode. Add support for 'g' for gap in connection latency test. --- ethr.go | 101 ++++++----- server.go | 1 - session.go | 55 +++++- xclient.go | 102 +++++++++-- xserver.go | 486 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 685 insertions(+), 60 deletions(-) create mode 100644 xserver.go diff --git a/ethr.go b/ethr.go index 1b7b11a..b4b47da 100644 --- a/ethr.go +++ b/ethr.go @@ -19,7 +19,9 @@ const defaultLogFileName = "./ethrs.log for server, ./ethrc.log for client" func main() { isServer := flag.Bool("s", false, "Run as server") clientDest := flag.String("c", "", - "Run as client and connect to server specified by String") + "Run as client and connect to server specified by String\n"+ + "Ethr mode (default): specifies host name or IP address for Ethr server\n"+ + "External mode: specifies IP:port or name:port or URI:port for external server") testTypePtr := flag.String("t", "", "Test to run (\"b\", \"c\", \"p\", \"l\" or \"cl\")\n"+ "b: Bandwidth\n"+ @@ -27,7 +29,7 @@ func main() { "p: Packets/s\n"+ "l: Latency, Loss & Jitter\n"+ "cl: Connection setup latency\n"+ - "Default: b - Regular mode, cl - External mode") + "Default: b - Bandwidth measurement.") thCount := flag.Int("n", 1, "Number of Threads\n"+ "0: Equal to number of CPUs") @@ -45,7 +47,7 @@ func main() { "0: Run forever") showUI := flag.Bool("ui", false, "Show output in text UI. Valid for server only.") rttCount := flag.Int("i", 1000, - "Number of round trip iterations for latency test.") + "Number of round trip iterations for each latency test measurement.") portStr := flag.String("ports", "", "Ports to use for server and client\n"+ "Format: \"key1=value1, key2=value2\"\n"+ @@ -53,13 +55,16 @@ func main() { "For protocols, only base port is specified, so tcp=9999 means:\n"+ "9999 - Bandwidth, 9998 - CPS, 9997 - PPS, 9996 - Latency tests\n"+ "Default: control=8888, tcp=9999, udp=9999, http=9899, https=9799") - xclientDest := flag.String("x", "", - "External mode.\n"+ - "Run as client and connect to non-ethr server\n"+ - "Server can be specified using IP address, name or URI\n"+ - "Please refer to documentation for testing in this mode.") + modeStr := flag.String("m", "", + "Execution mode for Ethr (\"\" or \"x\")\n"+ + "Default: Ethr mode\n"+ + "x: External mode\n"+ + "In external mode, ethr client connects to non-ethr server") use4 := flag.Bool("4", false, "Use IPv4 only") use6 := flag.Bool("6", false, "Use IPv6 only") + gapStr := flag.String("g", "0", + "Time Gap/Interval between successive measurements (format: [ms | s | m | h] \n"+ + "0: No gap") flag.Parse() @@ -68,27 +73,33 @@ func main() { // fmt.Println("Number of incorrect arguments: " + strconv.Itoa(flag.NArg())) // + xMode := false + switch *modeStr { + case "": + case "x": + xMode = true + default: + printUsageError("Invalid value for execution mode (-m).") + } mode := ethrModeInv + if *isServer { - mode = ethrModeServer - if *clientDest != "" || *xclientDest != "" { - fmt.Println("Error: Client parameters are passed in server mode.") - flag.PrintDefaults() - os.Exit(1) + if *clientDest != "" { + printUsageError("Invalid arguments, \"-c\" cannot be used with \"-s\".") + } + if xMode { + mode = ethrModeExtServer + } else { + mode = ethrModeServer } } else if *clientDest != "" { - mode = ethrModeClient - if *xclientDest != "" { - fmt.Println("Error: External client parameters are passed in client mode.") - flag.PrintDefaults() - os.Exit(1) + if xMode { + mode = ethrModeExtClient + } else { + mode = ethrModeClient } - } else if *xclientDest != "" { - mode = ethrModeExtClient } else { - fmt.Println("Error: Invalid arguments, please specify \"-s\", \"-c\" or \"-x\"") - flag.PrintDefaults() - os.Exit(1) + printUsageError("Invalid arguments, use either \"-s\" or \"-c\".") } if *use4 && !*use6 { @@ -116,10 +127,12 @@ func main() { switch mode { case ethrModeServer: testType = All + case ethrModeExtServer: + testType = All case ethrModeClient: testType = Bandwidth case ethrModeExtClient: - testType = ConnLatency + testType = Bandwidth } case "b": testType = Bandwidth @@ -129,11 +142,11 @@ func main() { testType = Pps case "l": testType = Latency + case "cl": + testType = ConnLatency default: - fmt.Printf("Invalid value \"%s\" specified for parameter \"-t\".\n"+ - "Valid parameters and values are:\n", *testTypePtr) - flag.PrintDefaults() - os.Exit(1) + printUsageError(fmt.Sprintf("Invalid value \"%s\" specified for parameter \"-t\".\n"+ + "Valid parameters and values are:\n", *testTypePtr)) } p := strings.ToUpper(*protocol) @@ -150,18 +163,18 @@ func main() { case "ICMP": proto = ICMP default: - fmt.Printf("Invalid value \"%s\" specified for parameter \"-p\".\n"+ - "Valid parameters and values are:\n", *protocol) - flag.PrintDefaults() - os.Exit(1) + printUsageError(fmt.Sprintf("Invalid value \"%s\" specified for parameter \"-p\".\n"+ + "Valid parameters and values are:\n", *protocol)) } duration, err := time.ParseDuration(*durationStr) if err != nil { - fmt.Printf("Invalid value \"%s\" specified for parameter \"-d\".\n", - *durationStr) - flag.PrintDefaults() - os.Exit(1) + printUsageError(fmt.Sprintf("Invalid value \"%s\" specified for parameter \"-d\".\n", *durationStr)) + } + + gap, err := time.ParseDuration(*gapStr) + if err != nil { + printUsageError(fmt.Sprintf("Invalid value \"%s\" specified for parameter \"-g\".\n", *gapStr)) } if *thCount <= 0 { @@ -193,6 +206,8 @@ func main() { switch mode { case ethrModeServer: logFileName = "ethrs.log" + case ethrModeExtServer: + logFileName = "ethrxs.log" case ethrModeClient: logFileName = "ethrc.log" case ethrModeExtClient: @@ -202,16 +217,18 @@ func main() { logInit(logFileName, *debug) } - clientParam := ethrClientParam{duration} + clientParam := ethrClientParam{duration, gap} serverParam := ethrServerParam{*showUI} switch mode { case ethrModeServer: runServer(testParam, serverParam) + case ethrModeExtServer: + runXServer(testParam, serverParam) case ethrModeClient: runClient(testParam, clientParam, *clientDest) case ethrModeExtClient: - runXClient(testParam, clientParam, *xclientDest) + runXClient(testParam, clientParam, *clientDest) } } @@ -261,10 +278,16 @@ func validateTestParam(mode ethrMode, testParam EthrTestParam) bool { return false } } else if mode == ethrModeExtClient { - if testType != ConnLatency || protocol != TCP { + if (protocol != TCP) || (testType != ConnLatency && testType != Bandwidth) { emitUnsupportedTest(testParam) return false } } return true } + +func printUsageError(s string) { + fmt.Printf("Error: %s\n", s) + flag.PrintDefaults() + os.Exit(1) +} diff --git a/server.go b/server.go index ecfa23b..d3268da 100644 --- a/server.go +++ b/server.go @@ -181,7 +181,6 @@ func runTCPBandwidthServer() { } func closeConn(conn net.Conn) { - ui.printDbg("Closing TCP connection: %v", conn) err := conn.Close() if err != nil { ui.printDbg("Failed to close TCP connection, error: %v", err) diff --git a/session.go b/session.go index d59c207..259cf96 100644 --- a/session.go +++ b/session.go @@ -11,6 +11,7 @@ import ( "net" "os" "sync" + "sync/atomic" "time" ) @@ -168,6 +169,7 @@ type ethrTest struct { isActive bool session *ethrSession ctrlConn net.Conn + refCount int32 enc *gob.Encoder dec *gob.Decoder rcvdMsgs chan *EthrMsg @@ -182,6 +184,7 @@ type ethrMode uint32 const ( ethrModeInv ethrMode = iota ethrModeServer + ethrModeExtServer ethrModeClient ethrModeExtClient ) @@ -196,6 +199,7 @@ const ( type ethrClientParam struct { duration time.Duration + gap time.Duration } type ethrServerParam struct { @@ -237,6 +241,10 @@ func deleteKey(key string) { func newTest(remoteAddr string, conn net.Conn, testParam EthrTestParam, enc *gob.Encoder, dec *gob.Decoder) (*ethrTest, error) { gSessionLock.Lock() defer gSessionLock.Unlock() + return newTestInternal(remoteAddr, conn, testParam, enc, dec) +} + +func newTestInternal(remoteAddr string, conn net.Conn, testParam EthrTestParam, enc *gob.Encoder, dec *gob.Decoder) (*ethrTest, error) { var session *ethrSession session, found := gSessions[remoteAddr] if !found { @@ -249,12 +257,13 @@ func newTest(remoteAddr string, conn net.Conn, testParam EthrTestParam, enc *gob test, found := session.tests[testParam.TestID] if found { - return nil, os.ErrExist + return test, os.ErrExist } session.testCount++ test = ðrTest{} test.session = session test.ctrlConn = conn + test.refCount = 0 test.enc = enc test.dec = dec test.rcvdMsgs = make(chan *EthrMsg) @@ -269,6 +278,10 @@ func newTest(remoteAddr string, conn net.Conn, testParam EthrTestParam, enc *gob func deleteTest(test *ethrTest) { gSessionLock.Lock() defer gSessionLock.Unlock() + deleteTestInternal(test) +} + +func deleteTestInternal(test *ethrTest) { session := test.session testID := test.testParam.TestID // @@ -296,17 +309,53 @@ func deleteTest(test *ethrTest) { } func getTest(remoteAddr string, proto EthrProtocol, testType EthrTestType) (test *ethrTest) { - test = nil gSessionLock.RLock() defer gSessionLock.RUnlock() + return getTestInternal(remoteAddr, proto, testType) +} + +func getTestInternal(remoteAddr string, proto EthrProtocol, testType EthrTestType) (test *ethrTest) { + test = nil session, found := gSessions[remoteAddr] if !found { return } - test, found = session.tests[EthrTestID{proto, testType}] + test, _ = session.tests[EthrTestID{proto, testType}] return } +func createOrGetTest(remoteAddr string, proto EthrProtocol, testType EthrTestType) (test *ethrTest, isNew bool) { + gSessionLock.Lock() + defer gSessionLock.Unlock() + isNew = false + test = getTestInternal(remoteAddr, proto, testType) + if test == nil { + isNew = true + testParam := EthrTestParam{TestID: EthrTestID{proto, testType}} + test, _ = newTestInternal(remoteAddr, nil, testParam, nil, nil) + test.isActive = true + } + atomic.AddInt32(&test.refCount, 1) + return +} + +func safeDeleteTest(test *ethrTest) bool { + gSessionLock.Lock() + defer gSessionLock.Unlock() + if atomic.AddInt32(&test.refCount, -1) == 0 { + deleteTestInternal(test) + return true + } + return false +} + +func addRef(test *ethrTest) { + gSessionLock.Lock() + defer gSessionLock.Unlock() + // TODO: Since we already take lock, atomic is not needed. Fix this later. + atomic.AddInt32(&test.refCount, 1) +} + func (test *ethrTest) newConn(conn net.Conn) (ec *ethrConn) { gSessionLock.Lock() defer gSessionLock.Unlock() diff --git a/xclient.go b/xclient.go index 683ef58..0d7b36c 100644 --- a/xclient.go +++ b/xclient.go @@ -8,6 +8,8 @@ package main import ( "fmt" "net" + "os" + "sync/atomic" "time" ) @@ -18,17 +20,20 @@ func runXClient(testParam EthrTestParam, clientParam ethrClientParam, server str ui.printErr("Failed to create the new test.") return } - xclientTest(test, clientParam.duration) + xcRunTest(test, clientParam.duration, clientParam.gap) } func initXClient() { initClientUI() } -func xclientTest(test *ethrTest, d time.Duration) { +func xcRunTest(test *ethrTest, d, g time.Duration) { + startStatsTimer() if test.testParam.TestID.Protocol == TCP { if test.testParam.TestID.Type == ConnLatency { - go xclientTCPLatencyTest(test) + go xcRunTCPConnLatencyTest(test, g) + } else if test.testParam.TestID.Type == Bandwidth { + go xcRunTCPBandwidthTest(test) } } test.isActive = true @@ -36,18 +41,17 @@ func xclientTest(test *ethrTest, d time.Duration) { runDurationTimer(d, toStop) handleCtrlC(toStop) reason := <-toStop + close(test.done) + stopStatsTimer() switch reason { case timeout: ui.printMsg("Ethr done, duration: " + d.String() + ".") case interrupt: ui.printMsg("Ethr done, received interrupt signal.") } - ui.printMsg("") - close(test.done) - time.Sleep(time.Second) } -func xclientTCPLatencyTest(test *ethrTest) { +func xcRunTCPConnLatencyTest(test *ethrTest, g time.Duration) { server := test.session.remoteAddr // TODO: Override NumThreads for now, fix it later to support parallel // threads. @@ -75,7 +79,8 @@ func xclientTCPLatencyTest(test *ethrTest) { conn, err := net.Dial(tcp(ipVer), server) if err != nil { lost++ - ui.printErr("Unable to dial TCP connection to [%s], error: %v", server, err) + ui.printDbg("Unable to dial TCP connection to [%s], error: %v", server, err) + ui.printMsg("[tcp] %sConnection %s: Timed out", warmupText, server) continue } t1 := time.Since(t0) @@ -98,20 +103,83 @@ func xclientTCPLatencyTest(test *ethrTest) { warmupText = "" server = fmt.Sprintf("[%s]:%s", rserver, rport) } - /* - tcpconn, ok := conn.(*net.TCPConn) - if ok { - tcpconn.SetLinger(0) - } - */ + tcpconn, ok := conn.(*net.TCPConn) + if ok { + tcpconn.SetLinger(0) + } conn.Close() - t1 = time.Since(t0) - if t1 < time.Second { - time.Sleep(time.Second - t1) + if t1 < g { + time.Sleep(g - t1) } } } }() } } + +func xcCloseConn(conn net.Conn, test *ethrTest) { + test.delConn(conn) + err := conn.Close() + if err != nil { + ui.printDbg("Failed to close TCP connection, error: %v", err) + } + xcDeleteTest(test) +} + +func xcDeleteTest(test *ethrTest) { + if test != nil { + if safeDeleteTest(test) { + ui.printMsg("Ethr done, server terminated the session.") + os.Exit(0) + } + } +} + +func xcRunTCPBandwidthTest(test *ethrTest) { + server := test.session.remoteAddr + ui.printMsg("Connecting to host %s, port %s", server, tcpBandwidthPort) + for th := uint32(0); th < test.testParam.NumThreads; th++ { + buff := make([]byte, test.testParam.BufferSize) + for i := uint32(0); i < test.testParam.BufferSize; i++ { + buff[i] = byte(i) + } + go func() { + conn, err := net.Dial(tcp(ipVer), server) + if err != nil { + ui.printErr("Error in dialing TCP connection: %v", err) + os.Exit(1) + return + } + ec := test.newConn(conn) + rserver, rport, _ := net.SplitHostPort(conn.RemoteAddr().String()) + lserver, lport, _ := net.SplitHostPort(conn.LocalAddr().String()) + ui.printMsg("[%3d] local %s port %s connected to %s port %s", + ec.fd, lserver, lport, rserver, rport) + blen := len(buff) + addRef(test) + defer xcCloseConn(conn, test) + ExitForLoop: + for { + select { + case <-test.done: + break ExitForLoop + default: + n, err := conn.Write(buff) + if err != nil { + // ui.printErr(err) + // test.ctrlConn.Close() + return + } + if n < blen { + // ui.printErr("Partial write: " + strconv.Itoa(n)) + // test.ctrlConn.Close() + return + } + atomic.AddUint64(&ec.data, uint64(blen)) + atomic.AddUint64(&test.testResult.data, uint64(blen)) + } + } + }() + } +} diff --git a/xserver.go b/xserver.go new file mode 100644 index 0000000..5758fe9 --- /dev/null +++ b/xserver.go @@ -0,0 +1,486 @@ +//----------------------------------------------------------------------------- +// Copyright (C) Microsoft. All rights reserved. +// Licensed under the MIT license. +// See LICENSE.txt file in the project root for full license information. +//----------------------------------------------------------------------------- +package main + +import ( + /* + "crypto/rand" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "encoding/gob" + */ + "fmt" + /* + "io" + "io/ioutil" + "math/big" + */ + "net" + /* + "net/http" + */ + "os" + /* + "runtime" + "sort" + */ + "sync/atomic" + "time" +) + +func runXServer(testParam EthrTestParam, serverParam ethrServerParam) { + defer stopStatsTimer() + initXServer(serverParam.showUI) + xsRunTCPServer() + // runHTTPBandwidthServer() + // runHTTPSBandwidthServer() + startStatsTimer() + toStop := make(chan int, 1) + handleCtrlC(toStop) + <-toStop + ui.printMsg("Ethr done, received interrupt signal.") +} + +func initXServer(showUI bool) { + initServerUI(showUI) +} + +func finiXServer() { + ui.fini() + logFini() +} + +func xsRunTCPServer() { + l, err := net.Listen(tcp(ipVer), hostAddr+":"+tcpBandwidthPort) + if err != nil { + finiXServer() + fmt.Printf("Fatal error listening on "+tcpBandwidthPort+" for TCP tests: %v", err) + os.Exit(1) + } + ui.printMsg("Listening on " + tcpBandwidthPort + " for TCP tests") + go func(l net.Listener) { + defer l.Close() + for { + conn, err := l.Accept() + if err != nil { + ui.printErr("Error accepting new TCP connection: %v", err) + continue + } + go xserverTCPHandler(conn) + } + }(l) +} + +func xsCloseConn(conn net.Conn, cpsTest, bwTest *ethrTest) { + err := conn.Close() + if err != nil { + ui.printDbg("Failed to close TCP connection, error: %v", err) + } + // Delay delete the test. This is to ensure that tests like CPS don't + // end up not printing stats + time.Sleep(2 * time.Second) + if cpsTest != nil { + safeDeleteTest(cpsTest) + } + if bwTest != nil { + safeDeleteTest(bwTest) + } +} + +func xserverTCPHandler(conn net.Conn) { + server, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) + cpsTest, isNew := createOrGetTest(server, TCP, Cps) + if cpsTest != nil { + atomic.AddUint64(&cpsTest.testResult.data, 1) + } + if isNew { + ui.emitTestHdr() + } + bwTest, _ := createOrGetTest(server, TCP, Bandwidth) + defer xsCloseConn(conn, cpsTest, bwTest) + buff := make([]byte, 2048) + for { + size, err := conn.Read(buff) + if err != nil { + return + } + if bwTest != nil { + atomic.AddUint64(&bwTest.testResult.data, uint64(size)) + } + } +} + +/* +func runTCPCpsServer() { + l, err := net.Listen(tcp(ipVer), hostAddr+":"+tcpCpsPort) + if err != nil { + finiServer() + fmt.Printf("Fatal error listening on "+tcpCpsPort+" for TCP conn/s tests: %v", err) + os.Exit(1) + } + ui.printMsg("Listening on " + tcpCpsPort + " for TCP conn/s tests") + go func(l net.Listener) { + defer l.Close() + for { + conn, err := l.Accept() + if err != nil { + // This can happen a lot during load, hence don't log by + // default. + ui.printDbg("Error accepting new conn/s connection: %v", err) + continue + } + go runTCPCpsHandler(conn) + } + }(l) +} + +func runTCPCpsHandler(conn net.Conn) { + defer conn.Close() + server, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) + test := getTest(server, TCP, Cps) + if test != nil { + atomic.AddUint64(&test.testResult.data, 1) + } +} + +func runTCPLatencyServer() { + l, err := net.Listen(tcp(ipVer), hostAddr+":"+tcpLatencyPort) + if err != nil { + finiServer() + fmt.Printf("Fatal error listening on "+tcpLatencyPort+" for TCP latency tests: %v", err) + os.Exit(1) + } + ui.printMsg("Listening on " + tcpLatencyPort + " for TCP latency tests") + go func(l net.Listener) { + defer l.Close() + for { + conn, err := l.Accept() + if err != nil { + ui.printErr("Error accepting new latency connection: %v", err) + continue + } + server, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) + test := getTest(server, TCP, Latency) + if test == nil { + conn.Close() + continue + } + ui.emitLatencyHdr() + go runTCPLatencyHandler(conn, test) + } + }(l) +} + +func runTCPLatencyHandler(conn net.Conn, test *ethrTest) { + defer conn.Close() + bytes := make([]byte, test.testParam.BufferSize) + // TODO Override buffer size to 1 for now. Evaluate if we need to allow + // client to specify the buffer size in future. + bytes = make([]byte, 1) + rttCount := test.testParam.RttCount + latencyNumbers := make([]time.Duration, rttCount) + for { + _, err := io.ReadFull(conn, bytes) + if err != nil { + ui.printDbg("Error receiving data for latency test: %v", err) + return + } + for i := uint32(0); i < rttCount; i++ { + s1 := time.Now() + _, err = conn.Write(bytes) + if err != nil { + ui.printDbg("Error sending data for latency test: %v", err) + return + } + _, err = io.ReadFull(conn, bytes) + if err != nil { + ui.printDbg("Error receiving data for latency test: %v", err) + return + } + e2 := time.Since(s1) + latencyNumbers[i] = e2 + } + sum := int64(0) + for _, d := range latencyNumbers { + sum += d.Nanoseconds() + } + elapsed := time.Duration(sum / int64(rttCount)) + sort.SliceStable(latencyNumbers, func(i, j int) bool { + return latencyNumbers[i] < latencyNumbers[j] + }) + // + // Special handling for rttCount == 1. This prevents negative index + // in the latencyNumber index. The other option is to use + // roundUpToZero() but that is more expensive. + // + rttCountFixed := rttCount + if rttCountFixed == 1 { + rttCountFixed = 2 + } + atomic.SwapUint64(&test.testResult.data, uint64(elapsed.Nanoseconds())) + avg := elapsed + min := latencyNumbers[0] + max := latencyNumbers[rttCount-1] + p50 := latencyNumbers[((rttCountFixed*50)/100)-1] + p90 := latencyNumbers[((rttCountFixed*90)/100)-1] + p95 := latencyNumbers[((rttCountFixed*95)/100)-1] + p99 := latencyNumbers[((rttCountFixed*99)/100)-1] + p999 := latencyNumbers[uint64(((float64(rttCountFixed)*99.9)/100)-1)] + p9999 := latencyNumbers[uint64(((float64(rttCountFixed)*99.99)/100)-1)] + ui.emitLatencyResults( + test.session.remoteAddr, + protoToString(test.testParam.TestID.Protocol), + avg, min, max, p50, p90, p95, p99, p999, p9999) + } +} + +func runUDPBandwidthServer(test *ethrTest) error { + udpAddr, err := net.ResolveUDPAddr(udp(ipVer), hostAddr+":"+udpBandwidthPort) + if err != nil { + ui.printDbg("Unable to resolve UDP address: %v", err) + return err + } + l, err := net.ListenUDP(udp(ipVer), udpAddr) + if err != nil { + ui.printDbg("Error listening on %s for UDP pkt/s tests: %v", udpPpsPort, err) + return err + } + go func(l *net.UDPConn) { + defer l.Close() + // + // We use NumCPU here instead of NumThreads passed from client. The + // reason is that for UDP, there is no connection, so all packets come + // on same CPU, so it isn't clear if there are any benefits to running + // more threads than NumCPU(). TODO: Evaluate this in future. + // + for i := 0; i < runtime.NumCPU(); i++ { + go runUDPBandwidthHandler(test, l) + } + <-test.done + }(l) + return nil +} + +func runUDPBandwidthHandler(test *ethrTest, conn *net.UDPConn) { + buffer := make([]byte, test.testParam.BufferSize) + n, remoteAddr, err := 0, new(net.UDPAddr), error(nil) + for err == nil { + n, remoteAddr, err = conn.ReadFromUDP(buffer) + if err != nil { + ui.printDbg("Error receiving data from UDP for bandwidth test: %v", err) + continue + } + ethrUnused(n) + server, port, _ := net.SplitHostPort(remoteAddr.String()) + test := getTest(server, UDP, Bandwidth) + if test != nil { + atomic.AddUint64(&test.testResult.data, uint64(n)) + } else { + ui.printDbg("Received unsolicited UDP traffic on port %s from %s port %s", udpPpsPort, server, port) + } + } +} + +func runUDPPpsServer(test *ethrTest) error { + udpAddr, err := net.ResolveUDPAddr(udp(ipVer), hostAddr+":"+udpPpsPort) + if err != nil { + ui.printDbg("Unable to resolve UDP address: %v", err) + return err + } + l, err := net.ListenUDP(udp(ipVer), udpAddr) + if err != nil { + ui.printDbg("Error listening on %s for UDP pkt/s tests: %v", udpPpsPort, err) + return err + } + go func(l *net.UDPConn) { + defer l.Close() + // + // We use NumCPU here instead of NumThreads passed from client. The + // reason is that for UDP, there is no connection, so all packets come + // on same CPU, so it isn't clear if there are any benefits to running + // more threads than NumCPU(). TODO: Evaluate this in future. + // + for i := 0; i < runtime.NumCPU(); i++ { + go runUDPPpsHandler(test, l) + } + <-test.done + }(l) + return nil +} + +func runUDPPpsHandler(test *ethrTest, conn *net.UDPConn) { + buffer := make([]byte, test.testParam.BufferSize) + n, remoteAddr, err := 0, new(net.UDPAddr), error(nil) + for err == nil { + n, remoteAddr, err = conn.ReadFromUDP(buffer) + if err != nil { + ui.printDbg("Error receiving data from UDP for pkt/s test: %v", err) + continue + } + ethrUnused(n) + server, port, _ := net.SplitHostPort(remoteAddr.String()) + test := getTest(server, UDP, Pps) + if test != nil { + atomic.AddUint64(&test.testResult.data, 1) + } else { + ui.printDbg("Received unsolicited UDP traffic on port %s from %s port %s", udpPpsPort, server, port) + } + } +} + +func runHTTPBandwidthServer() { + sm := http.NewServeMux() + sm.HandleFunc("/", runHTTPBandwidthHandler) + l, err := net.Listen(tcp(ipVer), ":"+httpBandwidthPort) + if err != nil { + ui.printErr("Unable to start HTTP server. Error in listening on socket: %v", err) + return + } + ui.printMsg("Listening on " + httpBandwidthPort + " for HTTP bandwidth tests") + go runHTTPServer(tcpKeepAliveListener{l.(*net.TCPListener)}, sm) +} + +func runHTTPBandwidthHandler(w http.ResponseWriter, r *http.Request) { + runHTTPandHTTPSBandwidthHandler(w, r, HTTP) +} + +func runHTTPSBandwidthServer() { + cert, err := genX509KeyPair() + if err != nil { + ui.printErr("Unable to start HTTPS server. Error in X509 certificate: %v", err) + return + } + config := &tls.Config{} + config.NextProtos = []string{"http/1.1"} + config.Certificates = make([]tls.Certificate, 1) + config.Certificates[0] = cert + sm := http.NewServeMux() + sm.HandleFunc("/", runHTTPSBandwidthHandler) + l, err := net.Listen(tcp(ipVer), ":"+httpsBandwidthPort) + if err != nil { + ui.printErr("Unable to start HTTPS server. Error in listening on socket: %v", err) + return + } + ui.printMsg("Listening on " + httpsBandwidthPort + " for HTTPS bandwidth tests") + tl := tls.NewListener(tcpKeepAliveListener{l.(*net.TCPListener)}, config) + go runHTTPServer(tl, sm) +} + +func runHTTPSBandwidthHandler(w http.ResponseWriter, r *http.Request) { + runHTTPandHTTPSBandwidthHandler(w, r, HTTPS) +} + +func runHTTPServer(l net.Listener, handler http.Handler) error { + err := http.Serve(l, handler) + if err != nil { + ui.printErr("Unable to start HTTP server, error: %v", err) + } + return err +} + +func genX509KeyPair() (tls.Certificate, error) { + now, _ := time.Parse(time.RFC3339, "2019-01-01T00:00:00Z") + template := &x509.Certificate{ + SerialNumber: big.NewInt(now.Unix()), + Subject: pkix.Name{ + CommonName: "localhost", + Country: []string{"USA"}, + Organization: []string{"localhost"}, + OrganizationalUnit: []string{"127.0.0.1"}, + }, + NotBefore: now, + NotAfter: now.AddDate(100, 0, 0), // Valid for 100 years + SubjectKeyId: []byte{113, 117, 105, 99, 107, 115, 101, 114, 118, 101}, + // IPAddresses: []net.IP{net.ParseIP("127.0.0.1")}, + IPAddresses: allLocalIPs(), + DNSNames: []string{"localhost", "*"}, + BasicConstraintsValid: true, + IsCA: true, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + KeyUsage: x509.KeyUsageKeyEncipherment | + x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign, + } + + priv, err := rsa.GenerateKey(rand.Reader, 2048) + if err != nil { + return tls.Certificate{}, err + } + + cert, err := x509.CreateCertificate(rand.Reader, template, template, + priv.Public(), priv) + if err != nil { + return tls.Certificate{}, err + } + gCert = cert + + var outCert tls.Certificate + outCert.Certificate = append(outCert.Certificate, cert) + outCert.PrivateKey = priv + + return outCert, nil +} + +func allLocalIPs() (ipList []net.IP) { + ifaces, err := net.Interfaces() + if err != nil { + return + } + for _, iface := range ifaces { + if iface.Flags&net.FlagUp == 0 { + continue + } + addrs, err := iface.Addrs() + if err != nil { + continue + } + for _, addr := range addrs { + var ip net.IP + switch v := addr.(type) { + case *net.IPNet: + ip = v.IP + case *net.IPAddr: + ip = v.IP + } + if ip == nil { + continue + } + ipList = append(ipList, ip) + } + } + return +} + +func runHTTPandHTTPSBandwidthHandler(w http.ResponseWriter, r *http.Request, p EthrProtocol) { + _, err := ioutil.ReadAll(r.Body) + if err != nil { + ui.printDbg("Error reading HTTP body: %v", err) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + server, _, _ := net.SplitHostPort(r.RemoteAddr) + test := getTest(server, p, Bandwidth) + if test == nil { + http.Error(w, "Unauthorized request.", http.StatusUnauthorized) + return + } + switch r.Method { + case "GET": + w.Write([]byte("OK.")) + case "PUT": + w.Write([]byte("OK.")) + case "POST": + w.Write([]byte("OK.")) + default: + http.Error(w, "Only GET, PUT and POST are supported.", http.StatusMethodNotAllowed) + return + } + if r.ContentLength > 0 { + atomic.AddUint64(&test.testResult.data, uint64(r.ContentLength)) + } +} +*/