Misc/codecleanup (#29)

* Intermediate changes for Connected UDP.

* Add more changes to support connected UDP.

There is an issue with this approach, as multiple sockets are not able
to listen on connected UDP. We need to add SO_REUSEPORT to make this a
viable solution. This would be done in a later commit but before this
change is merged into master.

* Minor code cleanup.
This commit is contained in:
Pankaj Garg 2018-12-10 08:06:37 -08:00 committed by GitHub
parent 6fcc37915d
commit 0c9674d15d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 184 additions and 162 deletions

View file

@ -8,7 +8,7 @@ package main
import ( import (
"bytes" "bytes"
"encoding/gob" "encoding/gob"
"fmt" // "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"net" "net"
@ -57,15 +57,24 @@ func establishSession(testParam EthrTestParam, server string) (err error, test *
sendSessionMsg(enc, ethrMsg) sendSessionMsg(enc, ethrMsg)
return return
} }
ethrMsg = recvSessionMsg(test.dec) // TODO: Enable this in future, right now there is not much value coming
if ethrMsg.Type != EthrAck { // from this.
if ethrMsg.Type == EthrFin { /**
err = fmt.Errorf("%s", ethrMsg.Fin.Message) ethrMsg = recvSessionMsg(test.dec)
} else { if ethrMsg.Type != EthrAck {
err = fmt.Errorf("Unexpected control message received. %v", ethrMsg) if ethrMsg.Type == EthrFin {
err = fmt.Errorf("%s", ethrMsg.Fin.Message)
} else {
err = fmt.Errorf("Unexpected control message received. %v", ethrMsg)
}
deleteTest(test)
} }
deleteTest(test) ethrMsg = createAckMsg()
} err = sendSessionMsg(test.enc, ethrMsg)
if err != nil {
os.Exit(1)
}
**/
return return
} }
@ -100,10 +109,11 @@ func runDurationTimer(d time.Duration, toStop chan int) {
}() }()
} }
func monitorControlChannel(test *ethrTest, toStop chan int) { func clientWatchControlChannel(test *ethrTest, toStop chan int) {
go func() { go func() {
var b [1]byte waitForChannelStop := make(chan bool, 1)
_, _ = test.ctrlConn.Read(b[0:]) watchControlChannel(test, waitForChannelStop)
<-waitForChannelStop
toStop <- serverDone toStop <- serverDone
}() }()
} }
@ -127,14 +137,9 @@ func runTest(test *ethrTest, d time.Duration) {
go runHttpTest(test) go runHttpTest(test)
} }
test.isActive = true test.isActive = true
ethrMsg := createAckMsg()
err := sendSessionMsg(test.enc, ethrMsg)
if err != nil {
os.Exit(1)
}
toStop := make(chan int, 1) toStop := make(chan int, 1)
runDurationTimer(d, toStop) runDurationTimer(d, toStop)
monitorControlChannel(test, toStop) clientWatchControlChannel(test, toStop)
handleCtrlC(toStop) handleCtrlC(toStop)
reason := <-toStop reason := <-toStop
close(test.done) close(test.done)
@ -231,8 +236,7 @@ func runPpsTest(test *ethrTest) {
buff := make([]byte, test.testParam.BufferSize) buff := make([]byte, test.testParam.BufferSize)
conn, err := net.Dial(protoUDP, server+":"+udpPpsPort) conn, err := net.Dial(protoUDP, server+":"+udpPpsPort)
if err != nil { if err != nil {
ui.printErr("%v", err) ui.printDbg("Unable to dial UDP, error: %v", err)
os.Exit(1)
return return
} }
defer conn.Close() defer conn.Close()
@ -240,11 +244,6 @@ func runPpsTest(test *ethrTest) {
lserver, lport, _ := net.SplitHostPort(conn.LocalAddr().String()) lserver, lport, _ := net.SplitHostPort(conn.LocalAddr().String())
ui.printMsg("[udp] local %s port %s connected to %s port %s", ui.printMsg("[udp] local %s port %s connected to %s port %s",
lserver, lport, rserver, rport) lserver, lport, rserver, rport)
/*
ethrMsg := createBgnMsg(lport)
sendSessionMsg(test.enc, ethrMsg)
sendSessionMsg(test.enc, ethrMsg)
*/
blen := len(buff) blen := len(buff)
ExitForLoop: ExitForLoop:
for { for {
@ -254,13 +253,11 @@ func runPpsTest(test *ethrTest) {
default: default:
n, err := conn.Write(buff) n, err := conn.Write(buff)
if err != nil { if err != nil {
// ui.printErr(err) ui.printDbg("%v", err)
// return
continue continue
} }
if n < blen { if n < blen {
// ui.printErr("Partial write: " + strconv.Itoa(n)) ui.printDbg("Partial write: %d", n)
// return
continue continue
} }
atomic.AddUint64(&test.testResult.data, 1) atomic.AddUint64(&test.testResult.data, 1)

View file

@ -30,9 +30,11 @@ func (u *clientUi) printErr(format string, a ...interface{}) {
} }
func (u *clientUi) printDbg(format string, a ...interface{}) { func (u *clientUi) printDbg(format string, a ...interface{}) {
s := fmt.Sprintf(format, a...) if logDebug {
logDbg(s) s := fmt.Sprintf(format, a...)
fmt.Println(s) logDbg(s)
fmt.Println(s)
}
} }
func (u *clientUi) paint() { func (u *clientUi) paint() {

208
server.go
View file

@ -23,10 +23,10 @@ func runServer(testParam EthrTestParam, showUi bool) {
initServer(showUi) initServer(showUi)
l := runControlChannel() l := runControlChannel()
defer l.Close() defer l.Close()
runServerLatencyTest() runTcpBandwidthServer()
runServerCpsTest() runTcpCpsServer()
runServerBandwidthTest() runTcpLatencyServer()
go runHttpServer() go runHttpBandwidthServer()
startStatsTimer() startStatsTimer()
for { for {
conn, err := l.Accept() conn, err := l.Accept()
@ -70,19 +70,19 @@ func handleRequest(conn net.Conn) {
testParam := ethrMsg.Syn.TestParam testParam := ethrMsg.Syn.TestParam
server, port, err := net.SplitHostPort(conn.RemoteAddr().String()) server, port, err := net.SplitHostPort(conn.RemoteAddr().String())
if err != nil { if err != nil {
ui.printErr("remote: split host port: %v",err) ui.printDbg("RemoteAddr: Split host port failed: %v", err)
return return
} }
ethrUnused(port) ethrUnused(port)
lserver, lport, err := net.SplitHostPort(conn.LocalAddr().String()) lserver, lport, err := net.SplitHostPort(conn.LocalAddr().String())
if err != nil { if err != nil {
ui.printErr("local: split host port: %v",err) ui.printDbg("LocalAddr: Split host port failed: %v", err)
return return
} }
ethrUnused(lserver, lport) ethrUnused(lserver, lport)
ui.printMsg("New control connection from " + server + ", port " + port) ui.printMsg("New control connection from " + server + ", port " + port)
ui.printMsg("Starting " + protoToString(testParam.TestId.Protocol) + " " + ui.printMsg("Starting " + protoToString(testParam.TestId.Protocol) + " " +
testToString(testParam.TestId.Type) + " test from " + server) testToString(testParam.TestId.Type) + " test from " + server)
test, err := newTest(server, conn, testParam, enc, dec) test, err := newTest(server, conn, testParam, enc, dec)
if err != nil { if err != nil {
msg := "Rejected duplicate " + protoToString(testParam.TestId.Protocol) + " " + msg := "Rejected duplicate " + protoToString(testParam.TestId.Protocol) + " " +
@ -99,28 +99,33 @@ func handleRequest(conn net.Conn) {
} }
ui.emitTestHdr() ui.emitTestHdr()
if test.testParam.TestId.Type == Pps { if test.testParam.TestId.Type == Pps {
err = runServerPpsTest(test) err = runUdpPpsServer(test)
if err != nil { if err != nil {
ui.printErr("run server pps test: %v",err) ui.printDbg("Error encounterd in running Pkt/s test: %v", err)
cleanupFunc() cleanupFunc()
return return
} }
} }
ethrMsg = createAckMsg() // TODO: Enable this in future, right now there is not much value coming
err = sendSessionMsg(enc, ethrMsg) // from this.
if err != nil { /**
ui.printErr("send session message: %v",err) ethrMsg = createAckMsg()
cleanupFunc() err = sendSessionMsg(enc, ethrMsg)
return if err != nil {
} ui.printErr("send session message: %v",err)
ethrMsg = recvSessionMsg(dec) cleanupFunc()
if ethrMsg.Type != EthrAck { return
cleanupFunc() }
return ethrMsg = recvSessionMsg(dec)
} if ethrMsg.Type != EthrAck {
cleanupFunc()
return
}
**/
test.isActive = true test.isActive = true
var b [1]byte waitForChannelStop := make(chan bool, 1)
_, err = test.ctrlConn.Read(b[0:]) serverWatchControlChannel(test, waitForChannelStop)
<-waitForChannelStop
ui.printMsg("Ending " + testToString(testParam.TestId.Type) + " test from " + server) ui.printMsg("Ending " + testToString(testParam.TestId.Type) + " test from " + server)
test.isActive = false test.isActive = false
cleanupFunc() cleanupFunc()
@ -129,7 +134,11 @@ func handleRequest(conn net.Conn) {
} }
} }
func runServerBandwidthTest() { func serverWatchControlChannel(test *ethrTest, waitForChannelStop chan bool) {
watchControlChannel(test, waitForChannelStop)
}
func runTcpBandwidthServer() {
l, err := net.Listen(protoTCP, hostAddr+":"+tcpBandwidthPort) l, err := net.Listen(protoTCP, hostAddr+":"+tcpBandwidthPort)
if err != nil { if err != nil {
finiServer() finiServer()
@ -152,7 +161,7 @@ func runServerBandwidthTest() {
conn.Close() conn.Close()
continue continue
} }
go runBandwidthHandler(conn, test) go runTcpBandwidthHandler(conn, test)
} }
}(l) }(l)
} }
@ -165,7 +174,7 @@ func closeConn(conn net.Conn) {
} }
} }
func runBandwidthHandler(conn net.Conn, test *ethrTest) { func runTcpBandwidthHandler(conn net.Conn, test *ethrTest) {
defer closeConn(conn) defer closeConn(conn)
size := test.testParam.BufferSize size := test.testParam.BufferSize
bytes := make([]byte, size) bytes := make([]byte, size)
@ -185,7 +194,7 @@ ExitForLoop:
} }
} }
func runServerCpsTest() { func runTcpCpsServer() {
l, err := net.Listen(protoTCP, hostAddr+":"+tcpCpsPort) l, err := net.Listen(protoTCP, hostAddr+":"+tcpCpsPort)
if err != nil { if err != nil {
finiServer() finiServer()
@ -203,12 +212,12 @@ func runServerCpsTest() {
ui.printDbg("Error accepting new conn/s connection: %v", err) ui.printDbg("Error accepting new conn/s connection: %v", err)
continue continue
} }
go runCPSHandler(conn) go runTcpCpsHandler(conn)
} }
}(l) }(l)
} }
func runCPSHandler(conn net.Conn) { func runTcpCpsHandler(conn net.Conn) {
defer conn.Close() defer conn.Close()
server, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) server, _, _ := net.SplitHostPort(conn.RemoteAddr().String())
test := getTest(server, Tcp, Cps) test := getTest(server, Tcp, Cps)
@ -217,77 +226,7 @@ func runCPSHandler(conn net.Conn) {
} }
} }
func runServerPpsTest(test *ethrTest) error { func runTcpLatencyServer() {
udpAddr, err := net.ResolveUDPAddr(protoUDP, hostAddr+":"+udpPpsPort)
if err != nil {
ui.printDbg("Unable to resolve UDP address: %v", err)
return err
}
l, err := net.ListenUDP(protoUDP, udpAddr)
if err != nil {
ui.printDbg("Error listening on %s for UDP pkt/s tests: %v", udpPpsPort, err)
return err
}
go func(l *net.UDPConn) {
defer l.Close()
for i := 0; i < runtime.NumCPU(); i++ {
go runPPSHandler(test, l)
}
<-test.done
}(l)
return nil
/*
ludpAddr, err := net.ResolveUDPAddr(protoUDP, hostAddr+":"+udpPpsPort)
if err != nil {
ui.printErr("%v", err)
os.Exit(1)
}
for i := 0; i < int(test.testParam.NumThreads); i++ {
ui.printMsg("Running PPS test")
ethrMsg := recvSessionMsg(test.dec)
if ethrMsg.Type != EthrBgn {
ui.printErr("%v", ethrMsg)
continue
}
rudpPort := ethrMsg.Bgn.UdpPort
// rudpAddr, err := net.ResolveUDPAddr(protoUDP, test.session.remoteAddr+":"+rudpPort)
rudpAddr, err := net.ResolveUDPAddr(protoUDP, "localhost"+":"+rudpPort)
if err != nil {
ui.printErr("%v", err)
os.Exit(1)
}
conn, err := net.DialUDP(protoUDP, ludpAddr, rudpAddr)
if err != nil {
ui.printErr("%v", err)
os.Exit(1)
}
go runPPSHandler(test, conn)
}
<-test.done
*/
}
func runPPSHandler(test *ethrTest, conn *net.UDPConn) {
buffer := make([]byte, 1)
n, remoteAddr, err := 0, new(net.UDPAddr), error(nil)
for err == nil {
n, remoteAddr, err = conn.ReadFromUDP(buffer)
if err != nil {
ui.printDbg("Error receiving data from UDP for pkt/s test: %v", err)
continue
}
ethrUnused(n)
server, port, _ := net.SplitHostPort(remoteAddr.String())
test := getTest(server, Udp, Pps)
if test != nil {
atomic.AddUint64(&test.testResult.data, 1)
} else {
ui.printDbg("Received unsolicited UDP traffic on port %s from %s port %s", udpPpsPort, server, port)
}
}
}
func runServerLatencyTest() {
l, err := net.Listen(protoTCP, hostAddr+":"+tcpLatencyPort) l, err := net.Listen(protoTCP, hostAddr+":"+tcpLatencyPort)
if err != nil { if err != nil {
finiServer() finiServer()
@ -310,12 +249,12 @@ func runServerLatencyTest() {
continue continue
} }
ui.emitLatencyHdr() ui.emitLatencyHdr()
go runLatencyHandler(conn, test) go runTcpLatencyHandler(conn, test)
} }
}(l) }(l)
} }
func runLatencyHandler(conn net.Conn, test *ethrTest) { func runTcpLatencyHandler(conn net.Conn, test *ethrTest) {
defer conn.Close() defer conn.Close()
bytes := make([]byte, test.testParam.BufferSize) bytes := make([]byte, test.testParam.BufferSize)
// TODO Override buffer size to 1 for now. Evaluate if we need to allow // TODO Override buffer size to 1 for now. Evaluate if we need to allow
@ -378,7 +317,62 @@ func runLatencyHandler(conn net.Conn, test *ethrTest) {
} }
} }
func handleHttpRequest(w http.ResponseWriter, r *http.Request) { func runUdpPpsServer(test *ethrTest) error {
udpAddr, err := net.ResolveUDPAddr(protoUDP, hostAddr+":"+udpPpsPort)
if err != nil {
ui.printDbg("Unable to resolve UDP address: %v", err)
return err
}
l, err := net.ListenUDP(protoUDP, udpAddr)
if err != nil {
ui.printDbg("Error listening on %s for UDP pkt/s tests: %v", udpPpsPort, err)
return err
}
go func(l *net.UDPConn) {
defer l.Close()
//
// We use NumCPU here instead of NumThreads passed from client. The
// reason is that for UDP, there is no connection, so all packets come
// on same CPU, so it isn't clear if there are any benefits to running
// more threads than NumCPU(). TODO: Evaluate this in future.
//
for i := 0; i < runtime.NumCPU(); i++ {
go runUdpPpsHandler(test, l)
}
<-test.done
}(l)
return nil
}
func runUdpPpsHandler(test *ethrTest, conn *net.UDPConn) {
buffer := make([]byte, test.testParam.BufferSize)
n, remoteAddr, err := 0, new(net.UDPAddr), error(nil)
for err == nil {
n, remoteAddr, err = conn.ReadFromUDP(buffer)
if err != nil {
ui.printDbg("Error receiving data from UDP for pkt/s test: %v", err)
continue
}
ethrUnused(n)
server, port, _ := net.SplitHostPort(remoteAddr.String())
test := getTest(server, Udp, Pps)
if test != nil {
atomic.AddUint64(&test.testResult.data, 1)
} else {
ui.printDbg("Received unsolicited UDP traffic on port %s from %s port %s", udpPpsPort, server, port)
}
}
}
func runHttpBandwidthServer() {
http.HandleFunc("/", runHttpBandwidthHandler)
err := http.ListenAndServe(":"+httpBandwidthPort, nil)
if err != nil {
ui.printErr("Unable to start HTTP server, so HTTP tests cannot be run: %v", err)
}
}
func runHttpBandwidthHandler(w http.ResponseWriter, r *http.Request) {
_, err := ioutil.ReadAll(r.Body) _, err := ioutil.ReadAll(r.Body)
if err != nil { if err != nil {
ui.printDbg("Error reading HTTP body: %v", err) ui.printDbg("Error reading HTTP body: %v", err)
@ -406,11 +400,3 @@ func handleHttpRequest(w http.ResponseWriter, r *http.Request) {
atomic.AddUint64(&test.testResult.data, uint64(r.ContentLength)) atomic.AddUint64(&test.testResult.data, uint64(r.ContentLength))
} }
} }
func runHttpServer() {
http.HandleFunc("/", handleHttpRequest)
err := http.ListenAndServe(":"+httpBandwidthPort, nil)
if err != nil {
ui.printErr("Unable to start HTTP server, so HTTP tests cannot be run: %v", err)
}
}

View file

@ -164,13 +164,15 @@ func (u *serverTui) printErr(format string, a ...interface{}) {
} }
func (u *serverTui) printDbg(format string, a ...interface{}) { func (u *serverTui) printDbg(format string, a ...interface{}) {
s := fmt.Sprintf(format, a...) if logDebug {
logDbg(s) s := fmt.Sprintf(format, a...)
ss := splitString(s, u.errW) logDbg(s)
u.ringLock.Lock() ss := splitString(s, u.errW)
u.errRing = u.errRing[len(ss):] u.ringLock.Lock()
u.errRing = append(u.errRing, ss...) u.errRing = u.errRing[len(ss):]
u.ringLock.Unlock() u.errRing = append(u.errRing, ss...)
u.ringLock.Unlock()
}
} }
func (u *serverTui) emitTestResultBegin() { func (u *serverTui) emitTestResultBegin() {
@ -306,8 +308,11 @@ func (u *serverCli) printMsg(format string, a ...interface{}) {
} }
func (u *serverCli) printDbg(format string, a ...interface{}) { func (u *serverCli) printDbg(format string, a ...interface{}) {
s := fmt.Sprintf(format, a...) if logDebug {
logDbg(s) s := fmt.Sprintf(format, a...)
fmt.Println(s)
logDbg(s)
}
} }
func (u *serverCli) printErr(format string, a ...interface{}) { func (u *serverCli) printErr(format string, a ...interface{}) {

View file

@ -8,6 +8,7 @@ package main
import ( import (
"container/list" "container/list"
"encoding/gob" "encoding/gob"
"fmt"
"net" "net"
"os" "os"
"sync" "sync"
@ -96,6 +97,7 @@ type ethrTest struct {
ctrlConn net.Conn ctrlConn net.Conn
enc *gob.Encoder enc *gob.Encoder
dec *gob.Decoder dec *gob.Decoder
rcvdMsgs chan *EthrMsg
testParam EthrTestParam testParam EthrTestParam
testResult ethrTestResult testResult ethrTestResult
done chan struct{} done chan struct{}
@ -155,6 +157,7 @@ func newTest(remoteAddr string, conn net.Conn, testParam EthrTestParam, enc *gob
test.ctrlConn = conn test.ctrlConn = conn
test.enc = enc test.enc = enc
test.dec = dec test.dec = dec
test.rcvdMsgs = make(chan *EthrMsg)
test.testParam = testParam test.testParam = testParam
test.done = make(chan struct{}) test.done = make(chan struct{})
test.connList = list.New() test.connList = list.New()
@ -234,6 +237,20 @@ func (test *ethrTest) connListDo(f func(*ethrConn)) {
} }
} }
func watchControlChannel(test *ethrTest, waitForChannelStop chan bool) {
go func() {
for {
ethrMsg := recvSessionMsg(test.dec)
if ethrMsg.Type == EthrInv {
break
}
test.rcvdMsgs <- ethrMsg
fmt.Println(ethrMsg)
}
waitForChannelStop <- true
}()
}
func recvSessionMsg(dec *gob.Decoder) (ethrMsg *EthrMsg) { func recvSessionMsg(dec *gob.Decoder) (ethrMsg *EthrMsg) {
ethrMsg = &EthrMsg{} ethrMsg = &EthrMsg{}
err := dec.Decode(ethrMsg) err := dec.Decode(ethrMsg)

View file

@ -15,17 +15,32 @@ import (
"unicode/utf8" "unicode/utf8"
) )
//
// TODO: Use a better way to define ports. The core logic is:
// Find a base port, such as 9999, and the Bandwidth is: base - 0,
// Cps is base - 1, Pps is base - 2 and Latency is base - 3
//
const ( const (
hostAddr = "" hostAddr = ""
ctrlPort = "9991" ctrlPort = "8888"
tcpBandwidthPort = "9999" tcpBandwidthPort = "9999"
tcpCpsPort = "9998" tcpCpsPort = "9998"
tcpPpsPort = "9997" tcpPpsPort = "9997"
tcpLatencyPort = "9996" tcpLatencyPort = "9996"
udpPpsPort = "9997" udpBandwidthPort = "9999"
httpBandwidthPort = "8080" udpCpsPort = "9998"
protoTCP = "tcp" udpPpsPort = "9997"
protoUDP = "udp" udpLatencyPort = "9996"
httpBandwidthPort = "9899"
httpCpsPort = "9898"
httpPpsPort = "9897"
httpLatencyPort = "9896"
httpsBandwidthPort = "9799"
httpsCpsPort = "9798"
httpsPpsPort = "9797"
httpsLatencyPort = "9796"
protoTCP = "tcp"
protoUDP = "udp"
) )
var gDone = false var gDone = false