diff --git a/client.go b/client.go index 483c4de..0b4ede0 100644 --- a/client.go +++ b/client.go @@ -72,6 +72,8 @@ func establishSession(testParam EthrTestParam, server string) (test *ethrTest, e deleteTest(test) } gCert = ethrMsg.Ack.Cert + napDuration := ethrMsg.Ack.NapDuration + time.Sleep(napDuration) // TODO: Enable this in future, right now there is not much value coming // from this. /** @@ -157,6 +159,7 @@ func runTest(test *ethrTest, d time.Duration) { handleCtrlC(toStop) reason := <-toStop close(test.done) + sendSessionMsg(test.enc, &EthrMsg{}) test.ctrlConn.Close() stopStatsTimer() switch reason { @@ -197,17 +200,14 @@ func runTCPBandwidthTest(test *ethrTest) { case <-test.done: break ExitForLoop default: - n, err := conn.Write(buff) - if err != nil { - // ui.printErr(err) - // test.ctrlConn.Close() - // return - continue + n := 0 + if test.testParam.Reverse { + n, err = io.ReadFull(conn, buff) + } else { + n, err = conn.Write(buff) } - if n < blen { - // ui.printErr("Partial write: " + strconv.Itoa(n)) - // test.ctrlConn.Close() - // return + if err != nil || n < blen { + ui.printDbg("Error sending/receiving data on a connection for bandwidth test: %v", err) continue } atomic.AddUint64(&ec.data, uint64(blen)) diff --git a/clientui.go b/clientui.go index 6c74551..756d411 100644 --- a/clientui.go +++ b/clientui.go @@ -37,7 +37,7 @@ func (u *clientUI) printDbg(format string, a ...interface{}) { } } -func (u *clientUI) paint() { +func (u *clientUI) paint(seconds uint64) { } func (u *clientUI) emitTestResultBegin() { @@ -80,8 +80,9 @@ func initClientUI() { } var gInterval uint64 +var noConnectionStats bool -func printTestResult(test *ethrTest, value uint64) { +func printTestResult(test *ethrTest, value uint64, seconds uint64) { if test.testParam.TestID.Type == Bandwidth && (test.testParam.TestID.Protocol == TCP || test.testParam.TestID.Protocol == UDP) { if gInterval == 0 { @@ -91,18 +92,23 @@ func printTestResult(test *ethrTest, value uint64) { cvalue := uint64(0) ccount := 0 test.connListDo(func(ec *ethrConn) { - value = atomic.SwapUint64(&ec.data, 0) - ui.printMsg("[%3d] %-5s %03d-%03d sec %7s", ec.fd, - protoToString(test.testParam.TestID.Protocol), - gInterval, gInterval+1, bytesToRate(value)) - cvalue += value + val := atomic.SwapUint64(&ec.data, 0) + val /= seconds + if !noConnectionStats { + ui.printMsg("[%3d] %-5s %03d-%03d sec %7s", ec.fd, + protoToString(test.testParam.TestID.Protocol), + gInterval, gInterval+1, bytesToRate(val)) + } + cvalue += val ccount++ }) - if ccount > 1 { + if ccount > 1 || noConnectionStats { ui.printMsg("[SUM] %-5s %03d-%03d sec %7s", protoToString(test.testParam.TestID.Protocol), gInterval, gInterval+1, bytesToRate(cvalue)) - ui.printMsg("- - - - - - - - - - - - - - - - - - - - - - -") + if !noConnectionStats { + ui.printMsg("- - - - - - - - - - - - - - - - - - - - - - -") + } } logResults([]string{test.session.remoteAddr, protoToString(test.testParam.TestID.Protocol), bytesToRate(cvalue), "", "", ""}) @@ -141,7 +147,7 @@ func printTestResult(test *ethrTest, value uint64) { gInterval++ } -func (u *clientUI) emitTestResult(s *ethrSession, proto EthrProtocol) { +func (u *clientUI) emitTestResult(s *ethrSession, proto EthrProtocol, seconds uint64) { var data uint64 var testList = []EthrTestType{Bandwidth, Cps, Pps} @@ -149,7 +155,8 @@ func (u *clientUI) emitTestResult(s *ethrSession, proto EthrProtocol) { test, found := s.tests[EthrTestID{proto, testType}] if found && test.isActive { data = atomic.SwapUint64(&test.testResult.data, 0) - printTestResult(test, data) + data /= seconds + printTestResult(test, data, seconds) } } } diff --git a/ethr.go b/ethr.go index 9272b5a..c90b76e 100644 --- a/ethr.go +++ b/ethr.go @@ -74,7 +74,7 @@ func printExtTestType() { } func printThreadUsage() { - printFlagUsage("n", "", "Number of Threads", + printFlagUsage("n", "", "Number of Parallel Sessions (and Threads).", "0: Equal to number of CPUs", "Default: 1") } @@ -123,6 +123,13 @@ func printModeUsage() { "'-m x' MUST be specified for external mode.") } +func printNoConnStatUsage() { + printFlagUsage("ncs", "", + "No Connection Stats would be printed if this flag is specified.", + "This is useful for running with large number of connections as", + "specified by -n option.") +} + func ethrUsage() { fmt.Println("\nEthr - Tool for comprehensive network performance measurements.") fmt.Println("It supports 4 modes, usage of each mode is described below:") @@ -149,8 +156,10 @@ func ethrUsage() { fmt.Println("\nMode: Client") fmt.Println("================================================================================") printClientUsage() + printFlagUsage("r", "", "For Bandwidth tests, send data from server to client.") printDurationUsage() printThreadUsage() + printNoConnStatUsage() printBufLenUsage() printProtocolUsage() printPortUsage() @@ -169,6 +178,7 @@ func ethrUsage() { printExtClientUsage() printDurationUsage() printThreadUsage() + printNoConnStatUsage() printBufLenUsage() printExtProtocolUsage() printExtTestType() @@ -176,6 +186,15 @@ func ethrUsage() { } func main() { + // + // Set GOMAXPROCS to 1024 as running large number of goroutines in a loop + // to send network traffic results in timer starvation, as well as unfair + // processing time across goroutines resulting in starvation of many TCP + // connections. Using a higher number of threads via GOMAXPROCS solves this + // problem. + // + runtime.GOMAXPROCS(1024) + flag.Usage = ethrUsage isServer := flag.Bool("s", false, "") clientDest := flag.String("c", "", "") @@ -194,6 +213,8 @@ func main() { use4 := flag.Bool("4", false, "") use6 := flag.Bool("6", false, "") gap := flag.Duration("g", 0, "") + reverse := flag.Bool("r", false, "") + ncs := flag.Bool("ncs", false, "") flag.Parse() @@ -202,6 +223,12 @@ func main() { // fmt.Println("Number of incorrect arguments: " + strconv.Itoa(flag.NArg())) // + // + // Only used in client mode, to control whether to display per connection + // statistics or not. + // + noConnectionStats = *ncs + xMode := false switch *modeStr { case "": @@ -231,6 +258,10 @@ func main() { printUsageError("Invalid arguments, use either \"-s\" or \"-c\".") } + if *reverse && mode != ethrModeClient { + printUsageError("Invalid arguments, \"-r\" can only be used in client mode.") + } + if *use4 && !*use6 { ipVer = ethrIPv4 } else if *use6 && !*use4 { @@ -239,15 +270,11 @@ func main() { bufLen := unitToNumber(*bufLenStr) if bufLen == 0 { - fmt.Println("Invalid length specified: " + *bufLenStr) - ethrUsage() - os.Exit(1) + printUsageError(fmt.Sprintf("Invalid length specified: %s" + *bufLenStr)) } if *rttCount <= 0 { - fmt.Println("Invalid RTT count for latency test:", *rttCount) - ethrUsage() - os.Exit(1) + printUsageError(fmt.Sprintf("Invalid RTT count for latency test: %d", *rttCount)) } var testType EthrTestType @@ -312,10 +339,9 @@ func main() { testParam := EthrTestParam{EthrTestID{EthrProtocol(proto), testType}, uint32(*thCount), uint32(bufLen), - uint32(*rttCount)} - if !validateTestParam(mode, testParam) { - os.Exit(1) - } + uint32(*rttCount), + *reverse} + validateTestParam(mode, testParam) generatePortNumbers(*portStr) @@ -352,61 +378,53 @@ func main() { } func emitUnsupportedTest(testParam EthrTestParam) { - fmt.Printf("Error: \"%s\" test for \"%s\" is not supported.\n", - testToString(testParam.TestID.Type), protoToString(testParam.TestID.Protocol)) + printUsageError(fmt.Sprintf("\"%s\" test for \"%s\" is not supported.\n", + testToString(testParam.TestID.Type), protoToString(testParam.TestID.Protocol))) } -func validateTestParam(mode ethrMode, testParam EthrTestParam) bool { +func validateTestParam(mode ethrMode, testParam EthrTestParam) { testType := testParam.TestID.Type protocol := testParam.TestID.Protocol if mode == ethrModeServer { if testType != All || protocol != TCP { emitUnsupportedTest(testParam) - return false } } else if mode == ethrModeClient { switch protocol { case TCP: if testType != Bandwidth && testType != Cps && testType != Latency { emitUnsupportedTest(testParam) - return false } case UDP: if testType != Bandwidth && testType != Pps { emitUnsupportedTest(testParam) - return false } if testType == Bandwidth { if testParam.BufferSize > (64 * 1024) { - fmt.Printf("Error: Maximum supported buffer size for UDP is 64K\n") - return false + printUsageError("Maximum supported buffer size for UDP is 64K\n") } } case HTTP: if testType != Bandwidth { emitUnsupportedTest(testParam) - return false } case HTTPS: if testType != Bandwidth { emitUnsupportedTest(testParam) - return false } default: emitUnsupportedTest(testParam) - return false } } else if mode == ethrModeExtClient { if (protocol != TCP) || (testType != ConnLatency && testType != Bandwidth) { emitUnsupportedTest(testParam) - return false } } - return true } func printUsageError(s string) { fmt.Printf("Error: %s\n", s) - ethrUsage() + fmt.Printf("Please use \"ethr -h\" for ethr command line arguments.\n") + // ethrUsage() os.Exit(1) } diff --git a/log.go b/log.go index da5acca..810b7df 100644 --- a/log.go +++ b/log.go @@ -14,6 +14,7 @@ import ( ) type logMessage struct { + Time string Type string Message string } @@ -35,6 +36,7 @@ type logLatencyData struct { } type logTestResults struct { + Time string Type string RemoteAddr string Protocol string @@ -79,6 +81,7 @@ func runLogger(logFile *os.File) { func _log(prefix, msg string) { if loggingActive { logData := logMessage{} + logData.Time = time.Now().UTC().Format(time.RFC3339) logData.Type = prefix logData.Message = msg logJSON, _ := json.Marshal(logData) @@ -103,6 +106,7 @@ func logDbg(msg string) { func logResults(s []string) { if loggingActive { logData := logTestResults{} + logData.Time = time.Now().UTC().Format(time.RFC3339) logData.Type = "TestResult" logData.RemoteAddr = s[0] logData.Protocol = s[1] diff --git a/server.go b/server.go index 7d372a2..4a4360d 100644 --- a/server.go +++ b/server.go @@ -120,13 +120,15 @@ func handleRequest(conn net.Conn) { return } } - ethrMsg = createAckMsg(gCert) + delay := timeToNextTick() + ethrMsg = createAckMsg(gCert, delay) err = sendSessionMsg(enc, ethrMsg) if err != nil { ui.printErr("send session message: %v", err) cleanupFunc() return } + time.Sleep(delay) // TODO: Enable this in future, right now there is not much value coming // from this. /** @@ -140,8 +142,8 @@ func handleRequest(conn net.Conn) { waitForChannelStop := make(chan bool, 1) serverWatchControlChannel(test, waitForChannelStop) <-waitForChannelStop - ui.printMsg("Ending " + testToString(testParam.TestID.Type) + " test from " + server) test.isActive = false + ui.printMsg("Ending " + testToString(testParam.TestID.Type) + " test from " + server) cleanupFunc() if len(gSessionKeys) > 0 { ui.emitTestHdr() @@ -190,16 +192,24 @@ func closeConn(conn net.Conn) { func runTCPBandwidthHandler(conn net.Conn, test *ethrTest) { defer closeConn(conn) size := test.testParam.BufferSize - bytes := make([]byte, size) + buff := make([]byte, size) + for i := uint32(0); i < test.testParam.BufferSize; i++ { + buff[i] = byte(i) + } ExitForLoop: for { select { case <-test.done: break ExitForLoop default: - _, err := io.ReadFull(conn, bytes) + var err error + if test.testParam.Reverse { + _, err = conn.Write(buff) + } else { + _, err = io.ReadFull(conn, buff) + } if err != nil { - ui.printDbg("Error receiving data on a connection for bandwidth test: %v", err) + ui.printDbg("Error sending/receiving data on a connection for bandwidth test: %v", err) continue } atomic.AddUint64(&test.testResult.data, uint64(size)) diff --git a/serverui.go b/serverui.go index ea59390..e8945d1 100644 --- a/serverui.go +++ b/serverui.go @@ -179,8 +179,8 @@ func (u *serverTui) emitTestResultBegin() { u.results = nil } -func (u *serverTui) emitTestResult(s *ethrSession, proto EthrProtocol) { - str := getTestResults(s, proto) +func (u *serverTui) emitTestResult(s *ethrSession, proto EthrProtocol, seconds uint64) { + str := getTestResults(s, proto, seconds) if len(str) > 0 { ui.printTestResults(str) } @@ -209,7 +209,7 @@ func (u *serverTui) emitLatencyResults(remote, proto string, avg, min, max, p50, logLatency(remote, proto, avg, min, max, p50, p90, p95, p99, p999, p9999) } -func (u *serverTui) paint() { +func (u *serverTui) paint(seconds uint64) { tm.Clear(tm.ColorDefault, tm.ColorDefault) defer tm.Flush() printCenterText(0, 0, u.w, "Ethr v0.1", tm.ColorBlack, tm.ColorWhite) @@ -253,7 +253,7 @@ func (u *serverTui) paint() { w := u.statW y := u.statY for _, ns := range gCurNetStats.netDevStats { - nsDiff := getNetDevStatDiff(ns, gPrevNetStats) + nsDiff := getNetDevStatDiff(ns, gPrevNetStats, seconds) // TODO: Log the network adapter stats in file as well. printText(x, y, w, fmt.Sprintf("if: %s", ns.interfaceName), tm.ColorWhite, tm.ColorBlack) y++ @@ -275,7 +275,8 @@ func (u *serverTui) paint() { y++ } printText(x, y, w, - fmt.Sprintf("Tcp Retrans: %s", numberToUnit(gCurNetStats.tcpStats.segRetrans-gPrevNetStats.tcpStats.segRetrans)), + fmt.Sprintf("Tcp Retrans: %s", + numberToUnit((gCurNetStats.tcpStats.segRetrans-gPrevNetStats.tcpStats.segRetrans)/seconds)), tm.ColorDefault, tm.ColorDefault) } @@ -321,7 +322,7 @@ func (u *serverCli) printErr(format string, a ...interface{}) { logErr(s) } -func (u *serverCli) paint() { +func (u *serverCli) paint(seconds uint64) { } func (u *serverCli) emitTestResultBegin() { @@ -333,8 +334,8 @@ func (u *serverCli) emitTestResultBegin() { } } -func (u *serverCli) emitTestResult(s *ethrSession, proto EthrProtocol) { - str := getTestResults(s, proto) +func (u *serverCli) emitTestResult(s *ethrSession, proto EthrProtocol, seconds uint64) { + str := getTestResults(s, proto, seconds) if len(str) > 0 { ui.printTestResults(str) } @@ -394,7 +395,7 @@ func emitAggregate(proto EthrProtocol) { } } -func getTestResults(s *ethrSession, proto EthrProtocol) []string { +func getTestResults(s *ethrSession, proto EthrProtocol, seconds uint64) []string { var bwTestOn, cpsTestOn, ppsTestOn, latTestOn bool var bw, cps, pps, latency uint64 aggTestResult, _ := gAggregateTestResults[proto] @@ -402,6 +403,7 @@ func getTestResults(s *ethrSession, proto EthrProtocol) []string { if found && test.isActive { bwTestOn = true bw = atomic.SwapUint64(&test.testResult.data, 0) + bw /= seconds aggTestResult.bw += bw aggTestResult.cbw++ } @@ -409,6 +411,7 @@ func getTestResults(s *ethrSession, proto EthrProtocol) []string { if found && test.isActive { cpsTestOn = true cps = atomic.SwapUint64(&test.testResult.data, 0) + cps /= seconds aggTestResult.cps += cps aggTestResult.ccps++ } @@ -416,6 +419,7 @@ func getTestResults(s *ethrSession, proto EthrProtocol) []string { if found && test.isActive { ppsTestOn = true pps = atomic.SwapUint64(&test.testResult.data, 0) + pps /= seconds aggTestResult.pps += pps aggTestResult.cpps++ } diff --git a/session.go b/session.go index 259cf96..bce9a27 100644 --- a/session.go +++ b/session.go @@ -125,7 +125,8 @@ type EthrMsgSyn struct { // EthrMsgAck represents the Ack entity. type EthrMsgAck struct { - Cert []byte + Cert []byte + NapDuration time.Duration } // EthrMsgFin represents the Fin entity. @@ -159,6 +160,9 @@ type EthrTestParam struct { // RttCount represents the rtt count. RttCount uint32 + + // Reverse mode for bandwidth tests. + Reverse bool } type ethrTestResult struct { @@ -418,10 +422,11 @@ func sendSessionMsg(enc *gob.Encoder, ethrMsg *EthrMsg) error { return err } -func createAckMsg(cert []byte) (ethrMsg *EthrMsg) { +func createAckMsg(cert []byte, d time.Duration) (ethrMsg *EthrMsg) { ethrMsg = &EthrMsg{Version: 0, Type: EthrAck} ethrMsg.Ack = &EthrMsgAck{} ethrMsg.Ack.Cert = cert + ethrMsg.Ack.NapDuration = d return } diff --git a/stats.go b/stats.go index dc684aa..30321c2 100644 --- a/stats.go +++ b/stats.go @@ -39,7 +39,7 @@ func getNetworkStats() ethrNetStat { return *stats } -func getNetDevStatDiff(curStats ethrNetDevStat, prevNetStats ethrNetStat) ethrNetDevStat { +func getNetDevStatDiff(curStats ethrNetDevStat, prevNetStats ethrNetStat, seconds uint64) ethrNetDevStat { for _, prevStats := range prevNetStats.netDevStats { if prevStats.interfaceName != curStats.interfaceName { continue @@ -71,6 +71,10 @@ func getNetDevStatDiff(curStats ethrNetDevStat, prevNetStats ethrNetStat) ethrNe break } + curStats.rxBytes /= seconds + curStats.txBytes /= seconds + curStats.rxPkts /= seconds + curStats.txPkts /= seconds return curStats } @@ -98,23 +102,36 @@ func stopStatsTimer() { statsEnabled = false } -func emitStats() { - ui.emitTestResultBegin() - emitTestResults() - ui.emitTestResultEnd() - ui.emitStats(getNetworkStats()) - ui.paint() +var lastStatsTime time.Time = time.Now() + +func timeToNextTick() time.Duration { + nextTick := lastStatsTime.Add(time.Second) + return time.Until(nextTick) } -func emitTestResults() { +func emitStats() { + d := time.Since(lastStatsTime) + lastStatsTime = time.Now() + seconds := int64(d.Seconds()) + if seconds < 1 { + seconds = 1 + } + ui.emitTestResultBegin() + emitTestResults(uint64(seconds)) + ui.emitTestResultEnd() + ui.emitStats(getNetworkStats()) + ui.paint(uint64(seconds)) +} + +func emitTestResults(s uint64) { gSessionLock.RLock() defer gSessionLock.RUnlock() for _, k := range gSessionKeys { v := gSessions[k] - ui.emitTestResult(v, TCP) - ui.emitTestResult(v, UDP) - ui.emitTestResult(v, HTTP) - ui.emitTestResult(v, HTTPS) - ui.emitTestResult(v, ICMP) + ui.emitTestResult(v, TCP, s) + ui.emitTestResult(v, UDP, s) + ui.emitTestResult(v, HTTP, s) + ui.emitTestResult(v, HTTPS, s) + ui.emitTestResult(v, ICMP, s) } } diff --git a/ui.go b/ui.go index 4226fc1..0d8192f 100644 --- a/ui.go +++ b/ui.go @@ -214,13 +214,13 @@ type ethrUI interface { printMsg(format string, a ...interface{}) printErr(format string, a ...interface{}) printDbg(format string, a ...interface{}) - paint() + paint(uint64) emitTestHdr() emitLatencyHdr() emitLatencyResults(remote, proto string, avg, min, max, p50, p90, p95, p99, p999, p9999 time.Duration) emitTestResultBegin() - emitTestResult(s *ethrSession, proto EthrProtocol) - printTestResults(s []string) + emitTestResult(*ethrSession, EthrProtocol, uint64) + printTestResults([]string) emitTestResultEnd() emitStats(ethrNetStat) }