ethr/client.go

1006 lines
27 KiB
Go
Raw Permalink Normal View History

//-----------------------------------------------------------------------------
// Copyright (C) Microsoft. All rights reserved.
// Licensed under the MIT license.
// See LICENSE.txt file in the project root for full license information.
//-----------------------------------------------------------------------------
package main
import (
// "bytes"
// "crypto/tls"
// "crypto/x509"
"bytes"
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"net/url"
"sort"
"strconv"
"sync"
"sync/atomic"
// "io"
// "io/ioutil"
"net"
// "net/http"
"os"
"os/signal"
// "sort"
// "sync/atomic"
"syscall"
"time"
"golang.org/x/net/icmp"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
)
var gIgnoreCert bool
const (
done = 0
timeout = 1
interrupt = 2
disconnect = 3
)
2020-09-23 07:28:41 +08:00
func handleInterrupt(toStop chan<- int) {
sigChan := make(chan os.Signal)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
go func() {
2020-09-23 07:28:41 +08:00
<-sigChan
toStop <- interrupt
}()
}
func runDurationTimer(d time.Duration, toStop chan int) {
go func() {
dSeconds := uint64(d.Seconds())
if dSeconds == 0 {
return
}
time.Sleep(d)
// Sleep extra 200ms to ensure stats print for correct number of seconds.
time.Sleep(200 * time.Millisecond)
toStop <- timeout
}()
}
func initClient(title string) {
initClientUI(title)
}
func handshakeWithServer(test *ethrTest, conn net.Conn) (err error) {
ethrMsg := createSynMsg(test.testID, test.clientParam)
err = sendSessionMsg(conn, ethrMsg)
if err != nil {
ui.printDbg("Failed to send SYN message to Ethr server. Error: %v", err)
return
}
ethrMsg = recvSessionMsg(conn)
if ethrMsg.Type != EthrAck {
ui.printDbg("Failed to receive ACK message from Ethr server. Error: %v", err)
err = os.ErrInvalid
}
return
}
func getServerIPandPort(server string) (string, string, string, error) {
hostName := ""
hostIP := ""
port := ""
u, err := url.Parse(server)
if err == nil && u.Hostname() != "" {
hostName = u.Hostname()
if u.Port() != "" {
port = u.Port()
} else {
// Only implicitly derive port in External client mode.
if gIsExternalClient {
if u.Scheme == "http" {
port = "80"
} else if u.Scheme == "https" {
port = "443"
}
}
}
} else {
hostName, port, err = net.SplitHostPort(server)
if err != nil {
hostName = server
}
}
_, hostIP, err = ethrLookupIP(hostName)
return hostName, hostIP, port, err
}
func runClient(testID EthrTestID, title string, clientParam EthrClientParam, server string) {
initClient(title)
hostName, hostIP, port, err := getServerIPandPort(server)
if err != nil {
return
}
ip := net.ParseIP(hostIP)
if ip != nil {
if ip.To4() != nil {
gIPVersion = ethrIPv4
} else {
gIPVersion = ethrIPv6
}
} else {
return
}
if gIsExternalClient {
if testID.Protocol != ICMP && port == "" {
ui.printErr("In external mode, port cannot be empty for TCP tests.")
return
}
} else {
if port != "" {
ui.printErr("In client mode, port (%s) cannot be specified in destination (%s).", port, server)
ui.printMsg("Hint: Use external mode (-x).")
return
}
port = gEthrPortStr
}
ui.printMsg("Using destination: %s, ip: %s, port: %s", hostName, hostIP, port)
test, err := newTest(hostIP, testID, clientParam)
if err != nil {
ui.printErr("Failed to create the new test.")
return
}
test.remoteAddr = server
test.remoteIP = hostIP
test.remotePort = port
if testID.Protocol == ICMP {
test.dialAddr = hostIP
} else {
test.dialAddr = fmt.Sprintf("[%s]:%s", hostIP, port)
}
runTest(test)
}
func runTest(test *ethrTest) {
toStop := make(chan int, 16)
startStatsTimer()
gap := test.clientParam.Gap
duration := test.clientParam.Duration
runDurationTimer(duration, toStop)
test.isActive = true
if test.testID.Protocol == TCP {
if test.testID.Type == Bandwidth {
tcpRunBandwidthTest(test, toStop)
} else if test.testID.Type == Latency {
go runTCPLatencyTest(test, gap, toStop)
} else if test.testID.Type == Cps {
go tcpRunCpsTest(test)
} else if test.testID.Type == Ping {
go clientRunPingTest(test, gap, test.clientParam.WarmupCount)
} else if test.testID.Type == TraceRoute {
VerifyPermissionForTest(test.testID)
go tcpRunTraceRoute(test, gap, toStop)
} else if test.testID.Type == MyTraceRoute {
VerifyPermissionForTest(test.testID)
go tcpRunMyTraceRoute(test, gap, toStop)
}
} else if test.testID.Protocol == UDP {
if test.testID.Type == Bandwidth ||
test.testID.Type == Pps {
runUDPBandwidthAndPpsTest(test)
}
} else if test.testID.Protocol == ICMP {
VerifyPermissionForTest(test.testID)
if test.testID.Type == Ping {
go clientRunPingTest(test, gap, test.clientParam.WarmupCount)
} else if test.testID.Type == TraceRoute {
go icmpRunTraceRoute(test, gap, toStop)
} else if test.testID.Type == MyTraceRoute {
go icmpRunMyTraceRoute(test, gap, toStop)
}
}
2020-09-23 07:28:41 +08:00
handleInterrupt(toStop)
reason := <-toStop
stopStatsTimer()
close(test.done)
if test.testID.Type == Ping {
time.Sleep(2 * time.Second)
}
switch reason {
case done:
ui.printMsg("Ethr done, measurement complete.")
case timeout:
ui.printMsg("Ethr done, duration: " + duration.String() + ".")
ui.printMsg("Hint: Use -d parameter to change duration of the test.")
case interrupt:
ui.printMsg("Ethr done, received interrupt signal.")
case disconnect:
ui.printMsg("Ethr done, connection terminated.")
}
return
}
func tcpRunBandwidthTest(test *ethrTest, toStop chan int) {
var wg sync.WaitGroup
tcpRunBanwidthTestThreads(test, &wg)
go func(wg *sync.WaitGroup) {
wg.Wait()
toStop <- disconnect
}(&wg)
}
func tcpRunBanwidthTestThreads(test *ethrTest, wg *sync.WaitGroup) {
for th := uint32(0); th < test.clientParam.NumThreads; th++ {
conn, err := ethrDialInc(TCP, test.dialAddr, uint16(th))
if err != nil {
ui.printErr("Error dialing connection: %v", err)
continue
}
err = handshakeWithServer(test, conn)
if err != nil {
ui.printErr("Failed in handshake with the server. Error: %v", err)
conn.Close()
continue
}
wg.Add(1)
go runTCPBandwidthTestHandler(test, conn, wg)
}
}
func runTCPBandwidthTestHandler(test *ethrTest, conn net.Conn, wg *sync.WaitGroup) {
defer wg.Done()
defer conn.Close()
ec := test.newConn(conn)
rserver, rport, _ := net.SplitHostPort(conn.RemoteAddr().String())
lserver, lport, _ := net.SplitHostPort(conn.LocalAddr().String())
ui.printMsg("[%3d] local %s port %s connected to %s port %s",
ec.fd, lserver, lport, rserver, rport)
size := test.clientParam.BufferSize
buff := make([]byte, size)
for i := uint32(0); i < size; i++ {
buff[i] = byte(i)
}
bufferLen := len(buff)
totalBytesToSend := test.clientParam.BwRate
sentBytes := uint64(0)
start, waitTime, bytesToSend := beginThrottle(totalBytesToSend, bufferLen)
ExitForLoop:
for {
select {
case <-test.done:
break ExitForLoop
default:
n := 0
var err error = nil
if test.clientParam.Reverse {
n, err = conn.Read(buff)
} else {
n, err = conn.Write(buff[:bytesToSend])
}
if err != nil {
ui.printDbg("Error sending/receiving data on a connection for bandwidth test: %v", err)
break ExitForLoop
}
atomic.AddUint64(&ec.bw, uint64(n))
atomic.AddUint64(&test.testResult.bw, uint64(n))
if !test.clientParam.Reverse {
sentBytes += uint64(n)
start, waitTime, sentBytes, bytesToSend = enforceThrottle(start, waitTime, totalBytesToSend, sentBytes, bufferLen)
}
}
}
}
func runTCPLatencyTest(test *ethrTest, g time.Duration, toStop chan int) {
ui.printMsg("Running latency test: %v, %v", test.clientParam.RttCount, test.clientParam.BufferSize)
conn, err := ethrDial(TCP, test.dialAddr)
if err != nil {
ui.printErr("Error dialing the latency connection: %v", err)
return
}
defer conn.Close()
err = handshakeWithServer(test, conn)
if err != nil {
ui.printErr("Failed in handshake with the server. Error: %v", err)
return
}
ui.emitLatencyHdr()
buffSize := test.clientParam.BufferSize
buff := make([]byte, buffSize)
for i := uint32(0); i < buffSize; i++ {
buff[i] = byte(i)
}
blen := len(buff)
rttCount := test.clientParam.RttCount
latencyNumbers := make([]time.Duration, rttCount)
ExitForLoop:
for {
ExitSelect:
select {
case <-test.done:
break ExitForLoop
default:
t0 := time.Now()
for i := uint32(0); i < rttCount; i++ {
s1 := time.Now()
n, err := conn.Write(buff)
if err != nil || n < blen {
ui.printDbg("Error sending/receiving data on a connection for latency test: %v", err)
toStop <- disconnect
break ExitSelect
}
_, err = io.ReadFull(conn, buff)
if err != nil {
ui.printDbg("Error sending/receiving data on a connection for latency test: %v", err)
toStop <- disconnect
break ExitSelect
}
e2 := time.Since(s1)
latencyNumbers[i] = e2
}
// TODO temp code, fix it better, this is to allow server to do
// server side latency measurements as well.
_, _ = conn.Write(buff)
calcAndPrintLatency(test, rttCount, latencyNumbers)
t1 := time.Since(t0)
if t1 < g {
time.Sleep(g - t1)
}
}
}
}
func calcAndPrintLatency(test *ethrTest, rttCount uint32, latencyNumbers []time.Duration) {
sum := int64(0)
for _, d := range latencyNumbers {
sum += d.Nanoseconds()
}
elapsed := time.Duration(sum / int64(rttCount))
sort.SliceStable(latencyNumbers, func(i, j int) bool {
return latencyNumbers[i] < latencyNumbers[j]
})
//
// Special handling for rttCount == 1. This prevents negative index
// in the latencyNumber index. The other option is to use
// roundUpToZero() but that is more expensive.
//
rttCountFixed := rttCount
if rttCountFixed == 1 {
rttCountFixed = 2
}
avg := elapsed
min := latencyNumbers[0]
max := latencyNumbers[rttCount-1]
p50 := latencyNumbers[((rttCountFixed*50)/100)-1]
p90 := latencyNumbers[((rttCountFixed*90)/100)-1]
p95 := latencyNumbers[((rttCountFixed*95)/100)-1]
p99 := latencyNumbers[((rttCountFixed*99)/100)-1]
p999 := latencyNumbers[uint64(((float64(rttCountFixed)*99.9)/100)-1)]
p9999 := latencyNumbers[uint64(((float64(rttCountFixed)*99.99)/100)-1)]
ui.emitLatencyResults(
test.session.remoteIP,
protoToString(test.testID.Protocol),
avg, min, max, p50, p90, p95, p99, p999, p9999)
}
func tcpRunCpsTest(test *ethrTest) {
for th := uint32(0); th < test.clientParam.NumThreads; th++ {
go func(th uint32) {
ExitForLoop:
for {
select {
case <-test.done:
break ExitForLoop
default:
conn, err := ethrDialAll(TCP, test.dialAddr)
if err == nil {
atomic.AddUint64(&test.testResult.cps, 1)
tcpconn, ok := conn.(*net.TCPConn)
if ok {
tcpconn.SetLinger(0)
}
conn.Close()
} else {
ui.printDbg("Unable to dial TCP connection to %s, error: %v", test.dialAddr, err)
}
}
}
}(th)
}
}
func clientRunPingTest(test *ethrTest, g time.Duration, warmupCount uint32) {
// TODO: Override NumThreads for now, fix it later to support parallel
// threads.
test.clientParam.NumThreads = 1
for th := uint32(0); th < test.clientParam.NumThreads; th++ {
go func() {
var sent, rcvd, lost uint32
warmupText := "[warmup] "
latencyNumbers := make([]time.Duration, 0)
ExitForLoop:
for {
select {
case <-test.done:
printConnectionLatencyResults(test.dialAddr, test, sent, rcvd, lost, latencyNumbers)
break ExitForLoop
default:
t0 := time.Now()
if warmupCount > 0 {
warmupCount--
clientRunPing(test, warmupText)
} else {
sent++
latency, err := clientRunPing(test, "")
if err == nil {
rcvd++
latencyNumbers = append(latencyNumbers, latency)
} else {
lost++
}
}
if rcvd >= 1000 {
printConnectionLatencyResults(test.dialAddr, test, sent, rcvd, lost, latencyNumbers)
latencyNumbers = make([]time.Duration, 0)
sent, rcvd, lost = 0, 0, 0
}
t1 := time.Since(t0)
if t1 < g {
time.Sleep(g - t1)
}
}
}
}()
}
}
func clientRunPing(test *ethrTest, prefix string) (time.Duration, error) {
if test.testID.Protocol == TCP {
return tcpRunPing(test, prefix)
} else {
return icmpRunPing(test, prefix)
}
}
func tcpRunPing(test *ethrTest, prefix string) (timeTaken time.Duration, err error) {
t0 := time.Now()
conn, err := ethrDial(TCP, test.dialAddr)
if err != nil {
ui.printMsg("[tcp] %sConnection to %s: Timed out (%v)", prefix, test.dialAddr, err)
return
}
timeTaken = time.Since(t0)
rserver, rport, _ := net.SplitHostPort(conn.RemoteAddr().String())
lserver, lport, _ := net.SplitHostPort(conn.LocalAddr().String())
ui.printMsg("[tcp] %sConnection from [%s]:%s to [%s]:%s: %s",
prefix, lserver, lport, rserver, rport, durationToString(timeTaken))
tcpconn, ok := conn.(*net.TCPConn)
if ok {
tcpconn.SetLinger(0)
}
conn.Close()
return
}
func printConnectionLatencyResults(server string, test *ethrTest, sent, rcvd, lost uint32, latencyNumbers []time.Duration) {
fmt.Println("-----------------------------------------------------------------------------------------")
ui.printMsg("TCP connect statistics for %s:", server)
ui.printMsg(" Sent = %d, Received = %d, Lost = %d", sent, rcvd, lost)
if rcvd > 0 {
ui.emitLatencyHdr()
calcAndPrintLatency(test, rcvd, latencyNumbers)
fmt.Println("-----------------------------------------------------------------------------------------")
}
}
func tcpRunTraceRoute(test *ethrTest, gap time.Duration, toStop chan int) {
tcpRunTraceRouteInternal(test, gap, toStop, false)
}
func tcpRunMyTraceRoute(test *ethrTest, gap time.Duration, toStop chan int) {
tcpRunTraceRouteInternal(test, gap, toStop, true)
}
func tcpRunTraceRouteInternal(test *ethrTest, gap time.Duration, toStop chan int, mtrMode bool) {
gHop = make([]ethrHopData, gMaxHops)
err := tcpDiscoverHops(test, mtrMode)
if err != nil {
ui.printErr("Destination %s is not responding to TCP connection.", test.session.remoteIP)
ui.printErr("Terminating tracing...")
toStop <- interrupt
return
}
if !mtrMode {
toStop <- done
return
}
for i := 0; i < gCurHops; i++ {
if gHop[i].addr != "" {
go tcpProbeHop(test, gap, i)
}
}
}
func tcpProbeHop(test *ethrTest, gap time.Duration, hop int) {
seq := 0
ExitForLoop:
for {
select {
case <-test.done:
break ExitForLoop
default:
t0 := time.Now()
err, _ := tcpProbe(test, hop+1, gHop[hop].addr, &gHop[hop])
if err == nil {
}
seq++
t1 := time.Since(t0)
if t1 < gap {
time.Sleep(gap - t1)
}
}
}
}
func tcpDiscoverHops(test *ethrTest, mtrMode bool) error {
ui.printMsg("Tracing route to %s over %d hops:", test.session.remoteIP, gMaxHops)
for i := 0; i < gMaxHops; i++ {
var hopData ethrHopData
err, isLast := tcpProbe(test, i+1, "", &hopData)
if err == nil {
hopData.name, hopData.fullName = lookupHopName(hopData.addr)
}
if hopData.addr != "" {
if mtrMode {
ui.printMsg("%2d.|--%s", i+1, hopData.addr+" ["+hopData.fullName+"]")
} else {
ui.printMsg("%2d.|--%-70s %s", i+1, hopData.addr+" ["+hopData.fullName+"]", durationToString(hopData.last))
}
} else {
ui.printMsg("%2d.|--%s", i+1, "???")
}
copyInitialHopData(i, hopData)
if isLast {
gCurHops = i + 1
return nil
}
}
return os.ErrNotExist
}
func tcpProbe(test *ethrTest, hop int, hopIP string, hopData *ethrHopData) (error, bool) {
isLast := false
c, err := IcmpNewConn(test.remoteIP)
if err != nil {
ui.printErr("Failed to create ICMP connection. Error: %v", err)
return err, isLast
}
defer c.Close()
localPortNum := uint16(8888)
if gClientPort != 0 {
localPortNum = gClientPort
}
localPortNum += uint16(hop)
b := make([]byte, 4)
binary.BigEndian.PutUint16(b[0:], localPortNum)
remotePortNum, err := strconv.ParseUint(test.remotePort, 10, 16)
binary.BigEndian.PutUint16(b[2:], uint16(remotePortNum))
peerAddrChan := make(chan string)
endTimeChan := make(chan time.Time)
go func() {
peerAddr, _, _ := icmpRecvMsg(c, TCP, time.Second*2, hopIP, b, nil, 0)
endTimeChan <- time.Now()
peerAddrChan <- peerAddr
}()
startTime := time.Now()
conn, err := ethrDialEx(TCP, test.dialAddr, gLocalIP, localPortNum, hop, int(gTOS))
if err != nil {
ui.printDbg("Failed to Dial the connection. Error: %v", err)
} else {
conn.Close()
}
hopData.sent++
peerAddr := ""
endTime := time.Now()
if err == nil {
isLast = true
peerAddr = test.remoteIP
} else {
endTime = <-endTimeChan
peerAddr = <-peerAddrChan
}
elapsed := endTime.Sub(startTime)
if peerAddr == "" || (hopIP != "" && peerAddr != hopIP) {
hopData.lost++
ui.printDbg("Neither connection completed, nor ICMP TTL exceeded received.")
return os.ErrNotExist, isLast
}
genHopData(hopData, peerAddr, elapsed)
return nil, isLast
}
type ethrHopData struct {
addr string
sent uint32
rcvd uint32
lost uint32
last time.Duration
best time.Duration
worst time.Duration
total time.Duration
name string
fullName string
}
var gMaxHops int = 30
var gCurHops int
var gHop []ethrHopData
func icmpRunPing(test *ethrTest, prefix string) (time.Duration, error) {
dstIPAddr, _, err := ethrLookupIP(test.dialAddr)
if err != nil {
return time.Second, err
}
var hopData ethrHopData
err, isLast := icmpProbe(test, dstIPAddr, time.Second, "", &hopData, 254, 255)
if err != nil {
ui.printMsg("[icmp] %sPing to %s: %v", prefix, test.dialAddr, err)
return time.Second, err
}
if !isLast {
ui.printMsg("[icmp] %sPing to %s: %s",
prefix, test.dialAddr, "Non-EchoReply Received.")
return time.Second, os.ErrNotExist
}
ui.printMsg("[icmp] %sPing to %s: %s",
prefix, test.dialAddr, durationToString(hopData.last))
return hopData.last, nil
}
func icmpRunTraceRoute(test *ethrTest, gap time.Duration, toStop chan int) {
icmpRunTraceRouteInternal(test, gap, toStop, false)
}
func icmpRunMyTraceRoute(test *ethrTest, gap time.Duration, toStop chan int) {
icmpRunTraceRouteInternal(test, gap, toStop, true)
}
func icmpRunTraceRouteInternal(test *ethrTest, gap time.Duration, toStop chan int, mtrMode bool) {
gHop = make([]ethrHopData, gMaxHops)
dstIPAddr, _, err := ethrLookupIP(test.session.remoteIP)
if err != nil {
toStop <- interrupt
return
}
err = icmpDiscoverHops(test, dstIPAddr, mtrMode)
if err != nil {
ui.printErr("Destination %s is not responding to ICMP Echo.", test.session.remoteIP)
ui.printErr("Terminating tracing...")
toStop <- interrupt
return
}
if !mtrMode {
toStop <- done
return
}
for i := 0; i < gCurHops; i++ {
if gHop[i].addr != "" {
go icmpProbeHop(test, gap, i, dstIPAddr)
}
}
}
func copyInitialHopData(hop int, hopData ethrHopData) {
gHop[hop].addr = hopData.addr
gHop[hop].best = hopData.last
gHop[hop].name = hopData.name
gHop[hop].fullName = hopData.fullName
}
func genHopData(hopData *ethrHopData, peerAddr string, elapsed time.Duration) {
hopData.addr = peerAddr
hopData.last = elapsed
if hopData.best > elapsed {
hopData.best = elapsed
}
if hopData.worst < elapsed {
hopData.worst = elapsed
}
hopData.total += elapsed
hopData.rcvd++
}
func lookupHopName(addr string) (string, string) {
name := ""
tname := ""
if addr == "" {
return tname, name
}
names, err := net.LookupAddr(addr)
if err == nil && len(names) > 0 {
name = names[0]
sz := len(name)
if sz > 0 && name[sz-1] == '.' {
name = name[:sz-1]
}
tname = truncateStringFromEnd(name, 16)
}
return tname, name
}
func icmpDiscoverHops(test *ethrTest, dstIPAddr net.IPAddr, mtrMode bool) error {
if test.session.remoteIP == dstIPAddr.String() {
ui.printMsg("Tracing route to %s over %d hops:", test.session.remoteIP, gMaxHops)
} else {
ui.printMsg("Tracing route to %s (%s) over %d hops:", test.session.remoteIP, dstIPAddr.String(), gMaxHops)
}
for i := 0; i < gMaxHops; i++ {
var hopData ethrHopData
err, isLast := icmpProbe(test, dstIPAddr, time.Second*2, "", &hopData, i, 1)
if err == nil {
hopData.name, hopData.fullName = lookupHopName(hopData.addr)
}
if hopData.addr != "" {
if mtrMode {
ui.printMsg("%2d.|--%s", i+1, hopData.addr+" ["+hopData.fullName+"]")
} else {
ui.printMsg("%2d.|--%-70s %s", i+1, hopData.addr+" ["+hopData.fullName+"]", durationToString(hopData.last))
}
} else {
ui.printMsg("%2d.|--%s", i+1, "???")
}
copyInitialHopData(i, hopData)
if isLast {
gCurHops = i + 1
return nil
}
}
return os.ErrNotExist
}
func icmpProbeHop(test *ethrTest, gap time.Duration, hop int, dstIPAddr net.IPAddr) {
seq := 0
ExitForLoop:
for {
select {
case <-test.done:
break ExitForLoop
default:
t0 := time.Now()
err, _ := icmpProbe(test, dstIPAddr, time.Second, gHop[hop].addr, &gHop[hop], hop, seq)
if err == nil {
}
seq++
t1 := time.Since(t0)
if t1 < gap {
time.Sleep(gap - t1)
}
}
}
}
func icmpProbe(test *ethrTest, dstIPAddr net.IPAddr, icmpTimeout time.Duration, hopIP string, hopData *ethrHopData, hop, seq int) (error, bool) {
isLast := false
echoMsg := fmt.Sprintf("Hello: Ethr - %v", hop)
c, err := IcmpNewConn(test.remoteIP)
if err != nil {
ui.printErr("Failed to create ICMP connection. Error: %v", err)
return err, isLast
}
defer c.Close()
start, wb, err := icmpSendMsg(c, dstIPAddr, hop, seq, echoMsg, icmpTimeout)
if err != nil {
return err, isLast
}
hopData.sent++
neededSeq := hop<<8 | seq
peerAddr, isLast, err := icmpRecvMsg(c, ICMP, icmpTimeout, hopIP, wb[4:8], []byte(echoMsg), neededSeq)
if err != nil {
hopData.lost++
ui.printDbg("Failed to receive ICMP reply packet. Error: %v", err)
return err, isLast
}
elapsed := time.Since(start)
genHopData(hopData, peerAddr, elapsed)
return nil, isLast
}
func icmpSetTTL(c net.PacketConn, ttl int) error {
err := os.ErrInvalid
if gIPVersion == ethrIPv4 {
cIPv4 := ipv4.NewPacketConn(c)
err = cIPv4.SetTTL(ttl)
} else if gIPVersion == ethrIPv6 {
cIPv6 := ipv6.NewPacketConn(c)
err = cIPv6.SetHopLimit(ttl)
}
return err
}
func icmpSetTOS(c net.PacketConn, tos int) error {
if tos == 0 {
return nil
}
err := os.ErrInvalid
if gIPVersion == ethrIPv4 {
cIPv4 := ipv4.NewPacketConn(c)
err = cIPv4.SetTOS(tos)
} else if gIPVersion == ethrIPv6 {
cIPv6 := ipv6.NewPacketConn(c)
err = cIPv6.SetTrafficClass(tos)
}
return err
}
func icmpSendMsg(c net.PacketConn, dstIPAddr net.IPAddr, hop, seq int, body string, timeout time.Duration) (time.Time, []byte, error) {
start := time.Now()
err := icmpSetTTL(c, hop+1)
if err != nil {
ui.printErr("Failed to set TTL. Error: %v", err)
return start, nil, err
}
icmpSetTOS(c, int(gTOS))
err = c.SetDeadline(time.Now().Add(timeout))
if err != nil {
ui.printErr("Failed to set Deadline. Error: %v", err)
return start, nil, err
}
pid := os.Getpid() & 0xffff
pid = 9999
wm := icmp.Message{
Type: ipv4.ICMPTypeEcho, Code: 0,
Body: &icmp.Echo{
ID: pid, Seq: hop<<8 | seq,
Data: []byte(body),
},
}
if gIPVersion == ethrIPv6 {
wm.Type = ipv6.ICMPTypeEchoRequest
}
wb, err := wm.Marshal(nil)
if err != nil {
ui.printErr("Failed to Marshal data. Error: %v", err)
return start, nil, err
}
start = time.Now()
if _, err := c.WriteTo(wb, &dstIPAddr); err != nil {
ui.printErr("Failed to send ICMP data. Error: %v", err)
return start, nil, err
}
return start, wb, nil
}
func icmpRecvMsg(c net.PacketConn, proto EthrProtocol, timeout time.Duration, neededPeer string, neededSig []byte, neededIcmpBody []byte, neededIcmpSeq int) (string, bool, error) {
peerAddr := ""
isLast := false
err := c.SetDeadline(time.Now().Add(timeout))
if err != nil {
ui.printErr("Failed to set Deadline. Error: %v", err)
return peerAddr, isLast, err
}
for {
peerAddr = ""
b := make([]byte, 1500)
n, peer, err := c.ReadFrom(b)
if err != nil {
if proto == ICMP {
// In case of non-ICMP TraceRoute, it is expected that no packet is received
// in some case, e.g. when packet reach final hop and TCP connection establishes.
ui.printDbg("Failed to receive ICMP packet. Error: %v", err)
}
return peerAddr, isLast, err
}
if n == 0 {
continue
}
ui.printDbg("Packet:\n%s", hex.Dump(b[:n]))
ui.printDbg("Finding Pattern\n%v", hex.Dump(neededSig[:4]))
peerAddr = peer.String()
if neededPeer != "" && peerAddr != neededPeer {
ui.printDbg("Matching peer is not found.")
continue
}
icmpMsg, err := icmp.ParseMessage(IcmpProto(), b[:n])
if err != nil {
ui.printDbg("Failed to parse ICMP message: %v", err)
continue
}
if icmpMsg.Type == ipv4.ICMPTypeTimeExceeded || icmpMsg.Type == ipv6.ICMPTypeTimeExceeded {
body := icmpMsg.Body.(*icmp.TimeExceeded).Data
index := bytes.Index(body, neededSig[:4])
if index > 0 {
if proto == TCP {
ui.printDbg("Found correct ICMP error message. PeerAddr: %v", peerAddr)
return peerAddr, isLast, nil
} else if proto == ICMP {
if index < 4 {
ui.printDbg("Incorrect length of ICMP message.")
continue
}
innerIcmpMsg, _ := icmp.ParseMessage(IcmpProto(), body[index-4:])
switch innerIcmpMsg.Body.(type) {
case *icmp.Echo:
seq := innerIcmpMsg.Body.(*icmp.Echo).Seq
if seq == neededIcmpSeq {
return peerAddr, isLast, nil
}
default:
// Ignore as this is not the right ICMP packet.
ui.printDbg("Unable to recognize packet.")
}
}
} else {
ui.printDbg("Pattern %v not found.", hex.Dump(neededSig[:4]))
}
}
if proto == ICMP && (icmpMsg.Type == ipv4.ICMPTypeEchoReply || icmpMsg.Type == ipv6.ICMPTypeEchoReply) {
echo := icmpMsg.Body.(*icmp.Echo)
ethrUnused(echo)
b, _ := icmpMsg.Body.Marshal(1)
if string(b[4:]) != string(neededIcmpBody) {
continue
}
isLast = true
return peerAddr, isLast, nil
}
}
}
func runUDPBandwidthAndPpsTest(test *ethrTest) {
for th := uint32(0); th < test.clientParam.NumThreads; th++ {
go func(th uint32) {
size := test.clientParam.BufferSize
buff := make([]byte, size)
conn, err := ethrDialInc(UDP, test.dialAddr, uint16(th))
if err != nil {
ui.printDbg("Unable to dial UDP, error: %v", err)
return
}
defer conn.Close()
ec := test.newConn(conn)
rserver, rport, _ := net.SplitHostPort(conn.RemoteAddr().String())
lserver, lport, _ := net.SplitHostPort(conn.LocalAddr().String())
ui.printMsg("[%3d] local %s port %s connected to %s port %s",
ec.fd, lserver, lport, rserver, rport)
bufferLen := len(buff)
totalBytesToSend := test.clientParam.BwRate
sentBytes := uint64(0)
start, waitTime, bytesToSend := beginThrottle(totalBytesToSend, bufferLen)
ExitForLoop:
for {
select {
case <-test.done:
break ExitForLoop
default:
n, err := conn.Write(buff[:bytesToSend])
if err != nil {
ui.printDbg("%v", err)
continue
}
if n < bytesToSend {
ui.printDbg("Partial write: %d", n)
continue
}
atomic.AddUint64(&ec.bw, uint64(n))
atomic.AddUint64(&ec.pps, 1)
atomic.AddUint64(&test.testResult.bw, uint64(n))
atomic.AddUint64(&test.testResult.pps, 1)
if !test.clientParam.Reverse {
sentBytes += uint64(n)
start, waitTime, sentBytes, bytesToSend = enforceThrottle(start, waitTime, totalBytesToSend, sentBytes, bufferLen)
}
}
}
}(th)
}
}