Add support for reverse mode (for TCP). Fixes (#71) (#73)

Initial version of -r for TCP bandwidth tests + few other enhancements:
- Add timestamp in log messages.
- Handle the case where timer tick is delayed on client side. Server side still
needs work.
- Use high value of GOMAXPROCS to fix starvation of some goroutines.
- Support option to disable per connection statistics. If large number
of sessions are used, then per connection stats were making it hard
to read the results on console.
- Synchronize stats timer between client/server. This makes server and
client print similar test results. Earlier, under varying TCP throuhgput,
results in server and client side were different due to different time
periods for calculating throughput.
This commit is contained in:
Pankaj Garg 2019-01-20 09:13:47 -08:00 committed by Pankaj Garg
parent a8036a44fd
commit 7f38d662d2
9 changed files with 143 additions and 78 deletions

View file

@ -72,6 +72,8 @@ func establishSession(testParam EthrTestParam, server string) (test *ethrTest, e
deleteTest(test)
}
gCert = ethrMsg.Ack.Cert
napDuration := ethrMsg.Ack.NapDuration
time.Sleep(napDuration)
// TODO: Enable this in future, right now there is not much value coming
// from this.
/**
@ -157,6 +159,7 @@ func runTest(test *ethrTest, d time.Duration) {
handleCtrlC(toStop)
reason := <-toStop
close(test.done)
sendSessionMsg(test.enc, &EthrMsg{})
test.ctrlConn.Close()
stopStatsTimer()
switch reason {
@ -197,17 +200,14 @@ func runTCPBandwidthTest(test *ethrTest) {
case <-test.done:
break ExitForLoop
default:
n, err := conn.Write(buff)
if err != nil {
// ui.printErr(err)
// test.ctrlConn.Close()
// return
continue
n := 0
if test.testParam.Reverse {
n, err = io.ReadFull(conn, buff)
} else {
n, err = conn.Write(buff)
}
if n < blen {
// ui.printErr("Partial write: " + strconv.Itoa(n))
// test.ctrlConn.Close()
// return
if err != nil || n < blen {
ui.printDbg("Error sending/receiving data on a connection for bandwidth test: %v", err)
continue
}
atomic.AddUint64(&ec.data, uint64(blen))

View file

@ -37,7 +37,7 @@ func (u *clientUI) printDbg(format string, a ...interface{}) {
}
}
func (u *clientUI) paint() {
func (u *clientUI) paint(seconds uint64) {
}
func (u *clientUI) emitTestResultBegin() {
@ -80,8 +80,9 @@ func initClientUI() {
}
var gInterval uint64
var noConnectionStats bool
func printTestResult(test *ethrTest, value uint64) {
func printTestResult(test *ethrTest, value uint64, seconds uint64) {
if test.testParam.TestID.Type == Bandwidth && (test.testParam.TestID.Protocol == TCP ||
test.testParam.TestID.Protocol == UDP) {
if gInterval == 0 {
@ -91,18 +92,23 @@ func printTestResult(test *ethrTest, value uint64) {
cvalue := uint64(0)
ccount := 0
test.connListDo(func(ec *ethrConn) {
value = atomic.SwapUint64(&ec.data, 0)
ui.printMsg("[%3d] %-5s %03d-%03d sec %7s", ec.fd,
protoToString(test.testParam.TestID.Protocol),
gInterval, gInterval+1, bytesToRate(value))
cvalue += value
val := atomic.SwapUint64(&ec.data, 0)
val /= seconds
if !noConnectionStats {
ui.printMsg("[%3d] %-5s %03d-%03d sec %7s", ec.fd,
protoToString(test.testParam.TestID.Protocol),
gInterval, gInterval+1, bytesToRate(val))
}
cvalue += val
ccount++
})
if ccount > 1 {
if ccount > 1 || noConnectionStats {
ui.printMsg("[SUM] %-5s %03d-%03d sec %7s",
protoToString(test.testParam.TestID.Protocol),
gInterval, gInterval+1, bytesToRate(cvalue))
ui.printMsg("- - - - - - - - - - - - - - - - - - - - - - -")
if !noConnectionStats {
ui.printMsg("- - - - - - - - - - - - - - - - - - - - - - -")
}
}
logResults([]string{test.session.remoteAddr, protoToString(test.testParam.TestID.Protocol),
bytesToRate(cvalue), "", "", ""})
@ -141,7 +147,7 @@ func printTestResult(test *ethrTest, value uint64) {
gInterval++
}
func (u *clientUI) emitTestResult(s *ethrSession, proto EthrProtocol) {
func (u *clientUI) emitTestResult(s *ethrSession, proto EthrProtocol, seconds uint64) {
var data uint64
var testList = []EthrTestType{Bandwidth, Cps, Pps}
@ -149,7 +155,8 @@ func (u *clientUI) emitTestResult(s *ethrSession, proto EthrProtocol) {
test, found := s.tests[EthrTestID{proto, testType}]
if found && test.isActive {
data = atomic.SwapUint64(&test.testResult.data, 0)
printTestResult(test, data)
data /= seconds
printTestResult(test, data, seconds)
}
}
}

68
ethr.go
View file

@ -74,7 +74,7 @@ func printExtTestType() {
}
func printThreadUsage() {
printFlagUsage("n", "<number>", "Number of Threads",
printFlagUsage("n", "<number>", "Number of Parallel Sessions (and Threads).",
"0: Equal to number of CPUs",
"Default: 1")
}
@ -123,6 +123,13 @@ func printModeUsage() {
"'-m x' MUST be specified for external mode.")
}
func printNoConnStatUsage() {
printFlagUsage("ncs", "",
"No Connection Stats would be printed if this flag is specified.",
"This is useful for running with large number of connections as",
"specified by -n option.")
}
func ethrUsage() {
fmt.Println("\nEthr - Tool for comprehensive network performance measurements.")
fmt.Println("It supports 4 modes, usage of each mode is described below:")
@ -149,8 +156,10 @@ func ethrUsage() {
fmt.Println("\nMode: Client")
fmt.Println("================================================================================")
printClientUsage()
printFlagUsage("r", "", "For Bandwidth tests, send data from server to client.")
printDurationUsage()
printThreadUsage()
printNoConnStatUsage()
printBufLenUsage()
printProtocolUsage()
printPortUsage()
@ -169,6 +178,7 @@ func ethrUsage() {
printExtClientUsage()
printDurationUsage()
printThreadUsage()
printNoConnStatUsage()
printBufLenUsage()
printExtProtocolUsage()
printExtTestType()
@ -176,6 +186,15 @@ func ethrUsage() {
}
func main() {
//
// Set GOMAXPROCS to 1024 as running large number of goroutines in a loop
// to send network traffic results in timer starvation, as well as unfair
// processing time across goroutines resulting in starvation of many TCP
// connections. Using a higher number of threads via GOMAXPROCS solves this
// problem.
//
runtime.GOMAXPROCS(1024)
flag.Usage = ethrUsage
isServer := flag.Bool("s", false, "")
clientDest := flag.String("c", "", "")
@ -194,6 +213,8 @@ func main() {
use4 := flag.Bool("4", false, "")
use6 := flag.Bool("6", false, "")
gap := flag.Duration("g", 0, "")
reverse := flag.Bool("r", false, "")
ncs := flag.Bool("ncs", false, "")
flag.Parse()
@ -202,6 +223,12 @@ func main() {
// fmt.Println("Number of incorrect arguments: " + strconv.Itoa(flag.NArg()))
//
//
// Only used in client mode, to control whether to display per connection
// statistics or not.
//
noConnectionStats = *ncs
xMode := false
switch *modeStr {
case "":
@ -231,6 +258,10 @@ func main() {
printUsageError("Invalid arguments, use either \"-s\" or \"-c\".")
}
if *reverse && mode != ethrModeClient {
printUsageError("Invalid arguments, \"-r\" can only be used in client mode.")
}
if *use4 && !*use6 {
ipVer = ethrIPv4
} else if *use6 && !*use4 {
@ -239,15 +270,11 @@ func main() {
bufLen := unitToNumber(*bufLenStr)
if bufLen == 0 {
fmt.Println("Invalid length specified: " + *bufLenStr)
ethrUsage()
os.Exit(1)
printUsageError(fmt.Sprintf("Invalid length specified: %s" + *bufLenStr))
}
if *rttCount <= 0 {
fmt.Println("Invalid RTT count for latency test:", *rttCount)
ethrUsage()
os.Exit(1)
printUsageError(fmt.Sprintf("Invalid RTT count for latency test: %d", *rttCount))
}
var testType EthrTestType
@ -312,10 +339,9 @@ func main() {
testParam := EthrTestParam{EthrTestID{EthrProtocol(proto), testType},
uint32(*thCount),
uint32(bufLen),
uint32(*rttCount)}
if !validateTestParam(mode, testParam) {
os.Exit(1)
}
uint32(*rttCount),
*reverse}
validateTestParam(mode, testParam)
generatePortNumbers(*portStr)
@ -352,61 +378,53 @@ func main() {
}
func emitUnsupportedTest(testParam EthrTestParam) {
fmt.Printf("Error: \"%s\" test for \"%s\" is not supported.\n",
testToString(testParam.TestID.Type), protoToString(testParam.TestID.Protocol))
printUsageError(fmt.Sprintf("\"%s\" test for \"%s\" is not supported.\n",
testToString(testParam.TestID.Type), protoToString(testParam.TestID.Protocol)))
}
func validateTestParam(mode ethrMode, testParam EthrTestParam) bool {
func validateTestParam(mode ethrMode, testParam EthrTestParam) {
testType := testParam.TestID.Type
protocol := testParam.TestID.Protocol
if mode == ethrModeServer {
if testType != All || protocol != TCP {
emitUnsupportedTest(testParam)
return false
}
} else if mode == ethrModeClient {
switch protocol {
case TCP:
if testType != Bandwidth && testType != Cps && testType != Latency {
emitUnsupportedTest(testParam)
return false
}
case UDP:
if testType != Bandwidth && testType != Pps {
emitUnsupportedTest(testParam)
return false
}
if testType == Bandwidth {
if testParam.BufferSize > (64 * 1024) {
fmt.Printf("Error: Maximum supported buffer size for UDP is 64K\n")
return false
printUsageError("Maximum supported buffer size for UDP is 64K\n")
}
}
case HTTP:
if testType != Bandwidth {
emitUnsupportedTest(testParam)
return false
}
case HTTPS:
if testType != Bandwidth {
emitUnsupportedTest(testParam)
return false
}
default:
emitUnsupportedTest(testParam)
return false
}
} else if mode == ethrModeExtClient {
if (protocol != TCP) || (testType != ConnLatency && testType != Bandwidth) {
emitUnsupportedTest(testParam)
return false
}
}
return true
}
func printUsageError(s string) {
fmt.Printf("Error: %s\n", s)
ethrUsage()
fmt.Printf("Please use \"ethr -h\" for ethr command line arguments.\n")
// ethrUsage()
os.Exit(1)
}

4
log.go
View file

@ -14,6 +14,7 @@ import (
)
type logMessage struct {
Time string
Type string
Message string
}
@ -35,6 +36,7 @@ type logLatencyData struct {
}
type logTestResults struct {
Time string
Type string
RemoteAddr string
Protocol string
@ -79,6 +81,7 @@ func runLogger(logFile *os.File) {
func _log(prefix, msg string) {
if loggingActive {
logData := logMessage{}
logData.Time = time.Now().UTC().Format(time.RFC3339)
logData.Type = prefix
logData.Message = msg
logJSON, _ := json.Marshal(logData)
@ -103,6 +106,7 @@ func logDbg(msg string) {
func logResults(s []string) {
if loggingActive {
logData := logTestResults{}
logData.Time = time.Now().UTC().Format(time.RFC3339)
logData.Type = "TestResult"
logData.RemoteAddr = s[0]
logData.Protocol = s[1]

View file

@ -120,13 +120,15 @@ func handleRequest(conn net.Conn) {
return
}
}
ethrMsg = createAckMsg(gCert)
delay := timeToNextTick()
ethrMsg = createAckMsg(gCert, delay)
err = sendSessionMsg(enc, ethrMsg)
if err != nil {
ui.printErr("send session message: %v", err)
cleanupFunc()
return
}
time.Sleep(delay)
// TODO: Enable this in future, right now there is not much value coming
// from this.
/**
@ -140,8 +142,8 @@ func handleRequest(conn net.Conn) {
waitForChannelStop := make(chan bool, 1)
serverWatchControlChannel(test, waitForChannelStop)
<-waitForChannelStop
ui.printMsg("Ending " + testToString(testParam.TestID.Type) + " test from " + server)
test.isActive = false
ui.printMsg("Ending " + testToString(testParam.TestID.Type) + " test from " + server)
cleanupFunc()
if len(gSessionKeys) > 0 {
ui.emitTestHdr()
@ -190,16 +192,24 @@ func closeConn(conn net.Conn) {
func runTCPBandwidthHandler(conn net.Conn, test *ethrTest) {
defer closeConn(conn)
size := test.testParam.BufferSize
bytes := make([]byte, size)
buff := make([]byte, size)
for i := uint32(0); i < test.testParam.BufferSize; i++ {
buff[i] = byte(i)
}
ExitForLoop:
for {
select {
case <-test.done:
break ExitForLoop
default:
_, err := io.ReadFull(conn, bytes)
var err error
if test.testParam.Reverse {
_, err = conn.Write(buff)
} else {
_, err = io.ReadFull(conn, buff)
}
if err != nil {
ui.printDbg("Error receiving data on a connection for bandwidth test: %v", err)
ui.printDbg("Error sending/receiving data on a connection for bandwidth test: %v", err)
continue
}
atomic.AddUint64(&test.testResult.data, uint64(size))

View file

@ -179,8 +179,8 @@ func (u *serverTui) emitTestResultBegin() {
u.results = nil
}
func (u *serverTui) emitTestResult(s *ethrSession, proto EthrProtocol) {
str := getTestResults(s, proto)
func (u *serverTui) emitTestResult(s *ethrSession, proto EthrProtocol, seconds uint64) {
str := getTestResults(s, proto, seconds)
if len(str) > 0 {
ui.printTestResults(str)
}
@ -209,7 +209,7 @@ func (u *serverTui) emitLatencyResults(remote, proto string, avg, min, max, p50,
logLatency(remote, proto, avg, min, max, p50, p90, p95, p99, p999, p9999)
}
func (u *serverTui) paint() {
func (u *serverTui) paint(seconds uint64) {
tm.Clear(tm.ColorDefault, tm.ColorDefault)
defer tm.Flush()
printCenterText(0, 0, u.w, "Ethr v0.1", tm.ColorBlack, tm.ColorWhite)
@ -253,7 +253,7 @@ func (u *serverTui) paint() {
w := u.statW
y := u.statY
for _, ns := range gCurNetStats.netDevStats {
nsDiff := getNetDevStatDiff(ns, gPrevNetStats)
nsDiff := getNetDevStatDiff(ns, gPrevNetStats, seconds)
// TODO: Log the network adapter stats in file as well.
printText(x, y, w, fmt.Sprintf("if: %s", ns.interfaceName), tm.ColorWhite, tm.ColorBlack)
y++
@ -275,7 +275,8 @@ func (u *serverTui) paint() {
y++
}
printText(x, y, w,
fmt.Sprintf("Tcp Retrans: %s", numberToUnit(gCurNetStats.tcpStats.segRetrans-gPrevNetStats.tcpStats.segRetrans)),
fmt.Sprintf("Tcp Retrans: %s",
numberToUnit((gCurNetStats.tcpStats.segRetrans-gPrevNetStats.tcpStats.segRetrans)/seconds)),
tm.ColorDefault, tm.ColorDefault)
}
@ -321,7 +322,7 @@ func (u *serverCli) printErr(format string, a ...interface{}) {
logErr(s)
}
func (u *serverCli) paint() {
func (u *serverCli) paint(seconds uint64) {
}
func (u *serverCli) emitTestResultBegin() {
@ -333,8 +334,8 @@ func (u *serverCli) emitTestResultBegin() {
}
}
func (u *serverCli) emitTestResult(s *ethrSession, proto EthrProtocol) {
str := getTestResults(s, proto)
func (u *serverCli) emitTestResult(s *ethrSession, proto EthrProtocol, seconds uint64) {
str := getTestResults(s, proto, seconds)
if len(str) > 0 {
ui.printTestResults(str)
}
@ -394,7 +395,7 @@ func emitAggregate(proto EthrProtocol) {
}
}
func getTestResults(s *ethrSession, proto EthrProtocol) []string {
func getTestResults(s *ethrSession, proto EthrProtocol, seconds uint64) []string {
var bwTestOn, cpsTestOn, ppsTestOn, latTestOn bool
var bw, cps, pps, latency uint64
aggTestResult, _ := gAggregateTestResults[proto]
@ -402,6 +403,7 @@ func getTestResults(s *ethrSession, proto EthrProtocol) []string {
if found && test.isActive {
bwTestOn = true
bw = atomic.SwapUint64(&test.testResult.data, 0)
bw /= seconds
aggTestResult.bw += bw
aggTestResult.cbw++
}
@ -409,6 +411,7 @@ func getTestResults(s *ethrSession, proto EthrProtocol) []string {
if found && test.isActive {
cpsTestOn = true
cps = atomic.SwapUint64(&test.testResult.data, 0)
cps /= seconds
aggTestResult.cps += cps
aggTestResult.ccps++
}
@ -416,6 +419,7 @@ func getTestResults(s *ethrSession, proto EthrProtocol) []string {
if found && test.isActive {
ppsTestOn = true
pps = atomic.SwapUint64(&test.testResult.data, 0)
pps /= seconds
aggTestResult.pps += pps
aggTestResult.cpps++
}

View file

@ -125,7 +125,8 @@ type EthrMsgSyn struct {
// EthrMsgAck represents the Ack entity.
type EthrMsgAck struct {
Cert []byte
Cert []byte
NapDuration time.Duration
}
// EthrMsgFin represents the Fin entity.
@ -159,6 +160,9 @@ type EthrTestParam struct {
// RttCount represents the rtt count.
RttCount uint32
// Reverse mode for bandwidth tests.
Reverse bool
}
type ethrTestResult struct {
@ -418,10 +422,11 @@ func sendSessionMsg(enc *gob.Encoder, ethrMsg *EthrMsg) error {
return err
}
func createAckMsg(cert []byte) (ethrMsg *EthrMsg) {
func createAckMsg(cert []byte, d time.Duration) (ethrMsg *EthrMsg) {
ethrMsg = &EthrMsg{Version: 0, Type: EthrAck}
ethrMsg.Ack = &EthrMsgAck{}
ethrMsg.Ack.Cert = cert
ethrMsg.Ack.NapDuration = d
return
}

View file

@ -39,7 +39,7 @@ func getNetworkStats() ethrNetStat {
return *stats
}
func getNetDevStatDiff(curStats ethrNetDevStat, prevNetStats ethrNetStat) ethrNetDevStat {
func getNetDevStatDiff(curStats ethrNetDevStat, prevNetStats ethrNetStat, seconds uint64) ethrNetDevStat {
for _, prevStats := range prevNetStats.netDevStats {
if prevStats.interfaceName != curStats.interfaceName {
continue
@ -71,6 +71,10 @@ func getNetDevStatDiff(curStats ethrNetDevStat, prevNetStats ethrNetStat) ethrNe
break
}
curStats.rxBytes /= seconds
curStats.txBytes /= seconds
curStats.rxPkts /= seconds
curStats.txPkts /= seconds
return curStats
}
@ -98,23 +102,36 @@ func stopStatsTimer() {
statsEnabled = false
}
func emitStats() {
ui.emitTestResultBegin()
emitTestResults()
ui.emitTestResultEnd()
ui.emitStats(getNetworkStats())
ui.paint()
var lastStatsTime time.Time = time.Now()
func timeToNextTick() time.Duration {
nextTick := lastStatsTime.Add(time.Second)
return time.Until(nextTick)
}
func emitTestResults() {
func emitStats() {
d := time.Since(lastStatsTime)
lastStatsTime = time.Now()
seconds := int64(d.Seconds())
if seconds < 1 {
seconds = 1
}
ui.emitTestResultBegin()
emitTestResults(uint64(seconds))
ui.emitTestResultEnd()
ui.emitStats(getNetworkStats())
ui.paint(uint64(seconds))
}
func emitTestResults(s uint64) {
gSessionLock.RLock()
defer gSessionLock.RUnlock()
for _, k := range gSessionKeys {
v := gSessions[k]
ui.emitTestResult(v, TCP)
ui.emitTestResult(v, UDP)
ui.emitTestResult(v, HTTP)
ui.emitTestResult(v, HTTPS)
ui.emitTestResult(v, ICMP)
ui.emitTestResult(v, TCP, s)
ui.emitTestResult(v, UDP, s)
ui.emitTestResult(v, HTTP, s)
ui.emitTestResult(v, HTTPS, s)
ui.emitTestResult(v, ICMP, s)
}
}

6
ui.go
View file

@ -214,13 +214,13 @@ type ethrUI interface {
printMsg(format string, a ...interface{})
printErr(format string, a ...interface{})
printDbg(format string, a ...interface{})
paint()
paint(uint64)
emitTestHdr()
emitLatencyHdr()
emitLatencyResults(remote, proto string, avg, min, max, p50, p90, p95, p99, p999, p9999 time.Duration)
emitTestResultBegin()
emitTestResult(s *ethrSession, proto EthrProtocol)
printTestResults(s []string)
emitTestResult(*ethrSession, EthrProtocol, uint64)
printTestResults([]string)
emitTestResultEnd()
emitStats(ethrNetStat)
}