From aa636603a7179e83d5a3dca7083ff8dd01e2c2a8 Mon Sep 17 00:00:00 2001 From: Pankaj Garg <590987+ipankajg@users.noreply.github.com> Date: Fri, 4 Dec 2020 16:02:14 -0800 Subject: [PATCH] Improve throttling and add Title support (#141) --- client.go | 37 +++++++++++++++++++++---------------- ethr.go | 7 ------- server.go | 23 +++++++++++++++++------ utils.go | 46 ++++++++++++++++++++++++++++++++++++---------- 4 files changed, 74 insertions(+), 39 deletions(-) diff --git a/client.go b/client.go index f0dee44..cb01298 100644 --- a/client.go +++ b/client.go @@ -263,7 +263,10 @@ func runTCPBandwidthTestHandler(test *ethrTest, conn net.Conn, wg *sync.WaitGrou for i := uint32(0); i < size; i++ { buff[i] = byte(i) } - start, waitTime, sendRate := beginThrottle() + bufferLen := len(buff) + totalBytesToSend := test.clientParam.BwRate + sentBytes := uint64(0) + start, waitTime, bytesToSend := beginThrottle(totalBytesToSend, bufferLen) ExitForLoop: for { select { @@ -273,19 +276,19 @@ ExitForLoop: n := 0 var err error = nil if test.clientParam.Reverse { - n, err = io.ReadFull(conn, buff) + n, err = conn.Read(buff) } else { - n, err = conn.Write(buff) + n, err = conn.Write(buff[:bytesToSend]) } - if err != nil || n < int(size) { + if err != nil { ui.printDbg("Error sending/receiving data on a connection for bandwidth test: %v", err) break ExitForLoop } - atomic.AddUint64(&ec.bw, uint64(size)) - atomic.AddUint64(&test.testResult.bw, uint64(size)) - sendRate += uint64(size) - if test.clientParam.BwRate > 0 && !test.clientParam.Reverse && sendRate >= test.clientParam.BwRate { - start, waitTime, sendRate = enforceThrottle(start, waitTime) + atomic.AddUint64(&ec.bw, uint64(n)) + atomic.AddUint64(&test.testResult.bw, uint64(n)) + if !test.clientParam.Reverse { + sentBytes += uint64(n) + start, waitTime, sentBytes, bytesToSend = enforceThrottle(start, waitTime, totalBytesToSend, sentBytes, bufferLen) } } } @@ -968,20 +971,22 @@ func runUDPBandwidthAndPpsTest(test *ethrTest) { lserver, lport, _ := net.SplitHostPort(conn.LocalAddr().String()) ui.printMsg("[%3d] local %s port %s connected to %s port %s", ec.fd, lserver, lport, rserver, rport) - blen := len(buff) - start, waitTime, sendRate := beginThrottle() + bufferLen := len(buff) + totalBytesToSend := test.clientParam.BwRate + sentBytes := uint64(0) + start, waitTime, bytesToSend := beginThrottle(totalBytesToSend, bufferLen) ExitForLoop: for { select { case <-test.done: break ExitForLoop default: - n, err := conn.Write(buff) + n, err := conn.Write(buff[:bytesToSend]) if err != nil { ui.printDbg("%v", err) continue } - if n < blen { + if n < bytesToSend { ui.printDbg("Partial write: %d", n) continue } @@ -989,9 +994,9 @@ func runUDPBandwidthAndPpsTest(test *ethrTest) { atomic.AddUint64(&ec.pps, 1) atomic.AddUint64(&test.testResult.bw, uint64(n)) atomic.AddUint64(&test.testResult.pps, 1) - sendRate += uint64(size) - if test.clientParam.BwRate > 0 && !test.clientParam.Reverse && sendRate >= test.clientParam.BwRate { - start, waitTime, sendRate = enforceThrottle(start, waitTime) + if !test.clientParam.Reverse { + sentBytes += uint64(n) + start, waitTime, sentBytes, bytesToSend = enforceThrottle(start, waitTime, totalBytesToSend, sentBytes, bufferLen) } } } diff --git a/ethr.go b/ethr.go index 8c201bc..71eeb62 100644 --- a/ethr.go +++ b/ethr.go @@ -208,13 +208,6 @@ func main() { bwRate /= 8 } - // Adjust the numbers so that data can be transfered in equal units. - if bwRate > 0 { - factor := (bwRate + bufLen - 1) / bufLen - bufLen = bwRate / factor - bwRate = bufLen * factor - } - // // For Pkt/s, we always override the buffer size to be just 1 byte. // TODO: Evaluate in future, if we need to support > 1 byte packets for diff --git a/server.go b/server.go index b17c1a1..265b34f 100644 --- a/server.go +++ b/server.go @@ -149,22 +149,26 @@ func srvrRunTCPBandwidthTest(test *ethrTest, clientParam EthrClientParam, conn n for i := uint32(0); i < size; i++ { buff[i] = byte(i) } - start, waitTime, sendRate := beginThrottle() + bufferLen := len(buff) + totalBytesToSend := test.clientParam.BwRate + sentBytes := uint64(0) + start, waitTime, bytesToSend := beginThrottle(totalBytesToSend, bufferLen) for { + n := 0 var err error if clientParam.Reverse { - _, err = conn.Write(buff) + n, err = conn.Write(buff[:bytesToSend]) } else { - _, err = io.ReadFull(conn, buff) + n, err = conn.Read(buff) } if err != nil { ui.printDbg("Error sending/receiving data on a connection for bandwidth test: %v", err) break } - sendRate += uint64(size) atomic.AddUint64(&test.testResult.bw, uint64(size)) - if clientParam.BwRate > 0 && clientParam.Reverse && sendRate >= clientParam.BwRate { - start, waitTime, sendRate = enforceThrottle(start, waitTime) + if clientParam.Reverse { + sentBytes += uint64(n) + start, waitTime, sentBytes, bytesToSend = enforceThrottle(start, waitTime, totalBytesToSend, sentBytes, bufferLen) } } } @@ -239,6 +243,12 @@ func srvrRunUDPServer() error { ui.printDbg("Error listening on %s for UDP pkt/s tests: %v", gEthrPortStr, err) return err } + // Set socket buffer to 4MB per CPU so we can queue 4MB per CPU in case Ethr is not + // able to keep up temporarily. + err = l.SetReadBuffer(runtime.NumCPU() * 4 * 1024 * 1024) + if err != nil { + ui.printDbg("Failed to set ReadBuffer on UDP socket: %v", err) + } // // We use NumCPU here instead of NumThreads passed from client. The // reason is that for UDP, there is no connection, so all packets come @@ -289,6 +299,7 @@ func srvrRunUDPPacketHandler(conn *net.UDPConn) { ui.printDbg("Error receiving data from UDP for bandwidth test: %v", err) continue } + ethrUnused(remoteIP) ethrUnused(n) server, port, _ := net.SplitHostPort(remoteIP.String()) test, found := tests[server] diff --git a/utils.go b/utils.go index a64344a..2fd835c 100644 --- a/utils.go +++ b/utils.go @@ -397,6 +397,13 @@ func ethrDialEx(p EthrProtocol, dialAddr, localIP string, localPortNum uint16, t if ok { tcpconn.SetLinger(0) } + udpconn, ok := conn.(*net.UDPConn) + if ok { + err = udpconn.SetWriteBuffer(4 * 1024 * 1024) + if err != nil { + ui.printDbg("Failed to set ReadBuffer on UDP socket: %v", err) + } + } } return } @@ -432,20 +439,39 @@ func ethrLookupIP(server string) (net.IPAddr, string, error) { // This is a workaround to ensure we generate traffic at certain rate // and stats are printed correctly. We ensure that current interval lasts // 100ms after stats are printed, not perfect but workable. -func beginThrottle() (start time.Time, waitTime time.Duration, sendRate uint64) { +func beginThrottle(totalBytesToSend uint64, bufferLen int) (start time.Time, waitTime time.Duration, bytesToSend int) { start = time.Now() - waitTime = time.Until(lastStatsTime.Add(time.Second + 100*time.Millisecond)) - sendRate = uint64(0) + waitTime = time.Until(lastStatsTime.Add(time.Second + 50*time.Millisecond)) + bytesToSend = bufferLen + if totalBytesToSend > 0 && totalBytesToSend < uint64(bufferLen) { + bytesToSend = int(totalBytesToSend) + } return } -func enforceThrottle(s time.Time, wt time.Duration) (start time.Time, waitTime time.Duration, sendRate uint64) { - timeTaken := time.Since(s) - if timeTaken < wt { - time.Sleep(wt - timeTaken) +func enforceThrottle(s time.Time, wt time.Duration, totalBytesToSend, oldSentBytes uint64, bufferLen int) (start time.Time, waitTime time.Duration, newSentBytes uint64, bytesToSend int) { + start = s + waitTime = wt + newSentBytes = oldSentBytes + bytesToSend = bufferLen + if totalBytesToSend > 0 { + remainingBytes := totalBytesToSend - oldSentBytes + if remainingBytes > 0 { + if remainingBytes < uint64(bufferLen) { + bytesToSend = int(remainingBytes) + } + } else { + timeTaken := time.Since(s) + if timeTaken < wt { + time.Sleep(wt - timeTaken) + } + start = time.Now() + waitTime = time.Until(lastStatsTime.Add(time.Second + 50*time.Millisecond)) + newSentBytes = 0 + if totalBytesToSend < uint64(bufferLen) { + bytesToSend = int(totalBytesToSend) + } + } } - start = time.Now() - waitTime = time.Until(lastStatsTime.Add(time.Second + 100*time.Millisecond)) - sendRate = 0 return }