ethr/server.go
Jianhui Zhao eb6df4d72c
Fix count bandwidth on server. (#162)
Signed-off-by: Jianhui Zhao <zhaojh329@gmail.com>
2022-03-13 18:34:34 -07:00

325 lines
9.5 KiB
Go

//-----------------------------------------------------------------------------
// Copyright (C) Microsoft. All rights reserved.
// Licensed under the MIT license.
// See LICENSE.txt file in the project root for full license information.
//-----------------------------------------------------------------------------
package main
import (
"fmt"
"io"
"net"
"os"
"runtime"
"sort"
"sync/atomic"
"time"
)
var gCert []byte
func initServer(showUI bool) {
initServerUI(showUI)
}
func finiServer() {
ui.fini()
logFini()
}
func showAcceptedIPVersion() {
var ipVerString = "ipv4, ipv6"
if gIPVersion == ethrIPv4 {
ipVerString = "ipv4"
} else if gIPVersion == ethrIPv6 {
ipVerString = "ipv6"
}
ui.printMsg("Accepting IP version: %s", ipVerString)
}
func runServer(serverParam ethrServerParam) {
defer stopStatsTimer()
initServer(serverParam.showUI)
startStatsTimer()
fmt.Println("-----------------------------------------------------------")
showAcceptedIPVersion()
ui.printMsg("Listening on port %d for TCP & UDP", gEthrPort)
srvrRunUDPServer()
err := srvrRunTCPServer()
if err != nil {
finiServer()
fmt.Printf("Fatal error running TCP server: %v\n", err)
os.Exit(1)
}
}
func handshakeWithClient(test *ethrTest, conn net.Conn) (testID EthrTestID, clientParam EthrClientParam, err error) {
ethrMsg := recvSessionMsg(conn)
if ethrMsg.Type != EthrSyn {
ui.printDbg("Failed to receive SYN message from client.")
err = os.ErrInvalid
return
}
testID = ethrMsg.Syn.TestID
clientParam = ethrMsg.Syn.ClientParam
ethrMsg = createAckMsg()
err = sendSessionMsg(conn, ethrMsg)
return
}
func srvrRunTCPServer() error {
l, err := net.Listen(Tcp(), gLocalIP+":"+gEthrPortStr)
if err != nil {
return err
}
defer l.Close()
for {
conn, err := l.Accept()
if err != nil {
ui.printErr("Error accepting new TCP connection: %v", err)
continue
}
go srvrHandleNewTcpConn(conn)
}
}
func srvrHandleNewTcpConn(conn net.Conn) {
defer conn.Close()
server, port, err := net.SplitHostPort(conn.RemoteAddr().String())
ethrUnused(server, port)
if err != nil {
ui.printDbg("RemoteAddr: Split host port failed: %v", err)
return
}
lserver, lport, err := net.SplitHostPort(conn.LocalAddr().String())
if err != nil {
ui.printDbg("LocalAddr: Split host port failed: %v", err)
return
}
ethrUnused(lserver, lport)
ui.printDbg("New connection from %v, port %v to %v, port %v", server, port, lserver, lport)
test, isNew := createOrGetTest(server, TCP, All)
if test == nil {
return
}
if isNew {
ui.emitTestHdr()
}
isCPSorPing := true
// For CPS and Ping tests, there is no deterministic way to know when the test starts
// from the client side and when it ends. This defer function ensures that test is not
// created/deleted repeatedly by doing a deferred deletion. If another connection
// comes with-in 2s, then another reference would be taken on existing test object
// and it won't be deleted by safeDeleteTest call. This also ensures, test header is
// not printed repeatedly via emitTestHdr.
// Note: Similar mechanism is used in UDP tests to handle test lifetime as well.
defer func() {
if isCPSorPing {
time.Sleep(2 * time.Second)
}
safeDeleteTest(test)
}()
// Always increment CPS count and then check if the test is Bandwidth etc. and handle
// those cases as well.
atomic.AddUint64(&test.testResult.cps, 1)
testID, clientParam, err := handshakeWithClient(test, conn)
if err != nil {
ui.printDbg("Failed in handshake with the client. Error: %v", err)
return
}
isCPSorPing = false
if testID.Protocol == TCP {
if testID.Type == Bandwidth {
srvrRunTCPBandwidthTest(test, clientParam, conn)
} else if testID.Type == Latency {
ui.emitLatencyHdr()
srvrRunTCPLatencyTest(test, clientParam, conn)
}
}
}
func srvrRunTCPBandwidthTest(test *ethrTest, clientParam EthrClientParam, conn net.Conn) {
size := clientParam.BufferSize
buff := make([]byte, size)
for i := uint32(0); i < size; i++ {
buff[i] = byte(i)
}
bufferLen := len(buff)
totalBytesToSend := test.clientParam.BwRate
sentBytes := uint64(0)
start, waitTime, bytesToSend := beginThrottle(totalBytesToSend, bufferLen)
for {
n := 0
var err error
if clientParam.Reverse {
n, err = conn.Write(buff[:bytesToSend])
} else {
n, err = conn.Read(buff)
}
if err != nil {
ui.printDbg("Error sending/receiving data on a connection for bandwidth test: %v", err)
break
}
atomic.AddUint64(&test.testResult.bw, uint64(n))
if clientParam.Reverse {
sentBytes += uint64(n)
start, waitTime, sentBytes, bytesToSend = enforceThrottle(start, waitTime, totalBytesToSend, sentBytes, bufferLen)
}
}
}
func srvrRunTCPLatencyTest(test *ethrTest, clientParam EthrClientParam, conn net.Conn) {
bytes := make([]byte, clientParam.BufferSize)
rttCount := clientParam.RttCount
latencyNumbers := make([]time.Duration, rttCount)
for {
_, err := io.ReadFull(conn, bytes)
if err != nil {
ui.printDbg("Error receiving data for latency test: %v", err)
return
}
for i := uint32(0); i < rttCount; i++ {
s1 := time.Now()
_, err = conn.Write(bytes)
if err != nil {
ui.printDbg("Error sending data for latency test: %v", err)
return
}
_, err = io.ReadFull(conn, bytes)
if err != nil {
ui.printDbg("Error receiving data for latency test: %v", err)
return
}
e2 := time.Since(s1)
latencyNumbers[i] = e2
}
sum := int64(0)
for _, d := range latencyNumbers {
sum += d.Nanoseconds()
}
elapsed := time.Duration(sum / int64(rttCount))
sort.SliceStable(latencyNumbers, func(i, j int) bool {
return latencyNumbers[i] < latencyNumbers[j]
})
//
// Special handling for rttCount == 1. This prevents negative index
// in the latencyNumber index. The other option is to use
// roundUpToZero() but that is more expensive.
//
rttCountFixed := rttCount
if rttCountFixed == 1 {
rttCountFixed = 2
}
atomic.SwapUint64(&test.testResult.latency, uint64(elapsed.Nanoseconds()))
avg := elapsed
min := latencyNumbers[0]
max := latencyNumbers[rttCount-1]
p50 := latencyNumbers[((rttCountFixed*50)/100)-1]
p90 := latencyNumbers[((rttCountFixed*90)/100)-1]
p95 := latencyNumbers[((rttCountFixed*95)/100)-1]
p99 := latencyNumbers[((rttCountFixed*99)/100)-1]
p999 := latencyNumbers[uint64(((float64(rttCountFixed)*99.9)/100)-1)]
p9999 := latencyNumbers[uint64(((float64(rttCountFixed)*99.99)/100)-1)]
ui.emitLatencyResults(
test.session.remoteIP,
protoToString(test.testID.Protocol),
avg, min, max, p50, p90, p95, p99, p999, p9999)
}
}
func srvrRunUDPServer() error {
udpAddr, err := net.ResolveUDPAddr(Udp(), gLocalIP+":"+gEthrPortStr)
if err != nil {
ui.printDbg("Unable to resolve UDP address: %v", err)
return err
}
l, err := net.ListenUDP(Udp(), udpAddr)
if err != nil {
ui.printDbg("Error listening on %s for UDP pkt/s tests: %v", gEthrPortStr, err)
return err
}
// Set socket buffer to 4MB per CPU so we can queue 4MB per CPU in case Ethr is not
// able to keep up temporarily.
err = l.SetReadBuffer(runtime.NumCPU() * 4 * 1024 * 1024)
if err != nil {
ui.printDbg("Failed to set ReadBuffer on UDP socket: %v", err)
}
//
// We use NumCPU here instead of NumThreads passed from client. The
// reason is that for UDP, there is no connection, so all packets come
// on same CPU, so it isn't clear if there are any benefits to running
// more threads than NumCPU(). TODO: Evaluate this in future.
//
for i := 0; i < runtime.NumCPU(); i++ {
go srvrRunUDPPacketHandler(l)
}
return nil
}
func srvrRunUDPPacketHandler(conn *net.UDPConn) {
// This local map aids in efficiency to look up a test based on client's IP
// address. We could use createOrGetTest but that takes a global lock.
tests := make(map[string]*ethrTest)
// For UDP, allocate buffer that can accomodate largest UDP datagram.
readBuffer := make([]byte, 64*1024)
n, remoteIP, err := 0, new(net.UDPAddr), error(nil)
// This function handles UDP tests that came from clients that are no longer
// sending any traffic. This is poor man's garbage collection to ensure the
// server doesn't end up printing dormant client related statistics as UDP
// has no reliable way to detect if client is active or not.
go func() {
for {
time.Sleep(100 * time.Millisecond)
for k, v := range tests {
ui.printDbg("Found Test from server: %v, time: %v", k, v.lastAccess)
// At 200ms of no activity, mark the test in-active so stats stop
// printing.
if time.Since(v.lastAccess) > (200 * time.Millisecond) {
v.isDormant = true
}
// At 2s of no activity, delete the test by assuming that client
// has stopped.
if time.Since(v.lastAccess) > (2 * time.Second) {
ui.printDbg("Deleting UDP test from server: %v, lastAccess: %v", k, v.lastAccess)
safeDeleteTest(v)
delete(tests, k)
}
}
}
}()
for err == nil {
n, remoteIP, err = conn.ReadFromUDP(readBuffer)
if err != nil {
ui.printDbg("Error receiving data from UDP for bandwidth test: %v", err)
continue
}
ethrUnused(remoteIP)
ethrUnused(n)
server, port, _ := net.SplitHostPort(remoteIP.String())
test, found := tests[server]
if !found {
test, isNew := createOrGetTest(server, UDP, All)
if test != nil {
tests[server] = test
}
if isNew {
ui.printDbg("Creating UDP test from server: %v, lastAccess: %v", server, time.Now())
ui.emitTestHdr()
}
}
if test != nil {
test.isDormant = false
test.lastAccess = time.Now()
atomic.AddUint64(&test.testResult.pps, 1)
atomic.AddUint64(&test.testResult.bw, uint64(n))
} else {
ui.printDbg("Unable to create test for UDP traffic on port %s from %s port %s", gEthrPortStr, server, port)
}
}
}