refactor proxy code

This commit is contained in:
Abhishek Kondur 2022-12-01 10:39:43 +05:30
parent 6feb9916cc
commit 1d4b915f2d
11 changed files with 289 additions and 256 deletions

View file

@ -18,7 +18,7 @@ var BehindNAT bool
var WgIfaceMap = models.WgIfaceConf{
Iface: nil,
PeerMap: make(map[string]*models.ConnConfig),
PeerMap: make(map[string]*models.Conn),
}
var PeerKeyHashMap = make(map[string]models.RemotePeer)

View file

@ -0,0 +1,22 @@
package common
import (
"github.com/gravitl/netmaker/nm-proxy/models"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
)
func GetPeer(peerKey wgtypes.Key) (*models.Conn, bool) {
var peerInfo *models.Conn
var found bool
peerInfo, found = WgIfaceMap.PeerMap[peerKey.String()]
peerInfo.Mutex.RLock()
defer peerInfo.Mutex.RUnlock()
return peerInfo, found
}
func UpdatePeer(peer *models.Conn) {
peer.Mutex.Lock()
defer peer.Mutex.Unlock()
WgIfaceMap.PeerMap[peer.Key.String()] = peer
}

View file

@ -141,9 +141,12 @@ func (m *ManagerAction) RelayPeers() {
func cleanUpInterface() {
log.Println("########------------> CLEANING UP: ", common.WgIfaceMap.Iface.Name)
for _, peerI := range common.WgIfaceMap.PeerMap {
peerI.Mutex.Lock()
peerI.StopConn()
peerI.Mutex.Unlock()
delete(common.WgIfaceMap.PeerMap, peerI.Key.String())
}
common.WgIfaceMap.PeerMap = make(map[string]*models.ConnConfig)
common.WgIfaceMap.PeerMap = make(map[string]*models.Conn)
}
func (m *ManagerAction) processPayload() (*wg.WGIface, error) {
@ -209,6 +212,7 @@ func (m *ManagerAction) processPayload() (*wg.WGIface, error) {
for _, currPeerI := range wgProxyConf.Iface.Peers {
if _, ok := m.Payload.PeerMap[currPeerI.PublicKey.String()]; !ok {
if val, ok := wgProxyConf.PeerMap[currPeerI.PublicKey.String()]; ok {
val.Mutex.Lock()
if val.IsAttachedExtClient {
log.Println("------> Deleting ExtClient Watch Thread: ", currPeerI.PublicKey.String())
if val, ok := common.ExtClientsWaitTh[currPeerI.PublicKey.String()]; ok {
@ -216,9 +220,11 @@ func (m *ManagerAction) processPayload() (*wg.WGIface, error) {
delete(common.ExtClientsWaitTh, currPeerI.PublicKey.String())
}
log.Println("-----> Deleting Ext Client from Src Ip Map: ", currPeerI.PublicKey.String())
delete(common.ExtSourceIpMap, val.PeerConf.Endpoint.String())
delete(common.ExtSourceIpMap, val.Config.PeerConf.Endpoint.String())
}
val.StopConn()
val.Mutex.Unlock()
delete(wgProxyConf.PeerMap, currPeerI.PublicKey.String())
}
// delete peer from interface
@ -226,14 +232,15 @@ func (m *ManagerAction) processPayload() (*wg.WGIface, error) {
if err := wgIface.RemovePeer(currPeerI.PublicKey.String()); err != nil {
log.Println("failed to remove peer: ", currPeerI.PublicKey.String(), err)
}
delete(common.PeerKeyHashMap, fmt.Sprintf("%x", md5.Sum([]byte(currPeerI.PublicKey.String()))))
delete(wgProxyConf.PeerMap, currPeerI.PublicKey.String())
}
}
for i := len(m.Payload.Peers) - 1; i >= 0; i-- {
if currentPeer, ok := wgProxyConf.PeerMap[m.Payload.Peers[i].PublicKey.String()]; ok {
currentPeer.Mutex.Lock()
if currentPeer.IsAttachedExtClient {
m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...)
continue
@ -255,8 +262,8 @@ func (m *ManagerAction) processPayload() (*wg.WGIface, error) {
// check if peer is not connected to proxy
devPeer, err := wg.GetPeer(m.Payload.InterfaceName, currentPeer.Key.String())
if err == nil {
log.Printf("---------> COMAPRING ENDPOINT: DEV: %s, Proxy: %s", devPeer.Endpoint.String(), currentPeer.LocalConnAddr.String())
if devPeer.Endpoint.String() != currentPeer.LocalConnAddr.String() {
log.Printf("---------> COMAPRING ENDPOINT: DEV: %s, Proxy: %s", devPeer.Endpoint.String(), currentPeer.Config.LocalConnAddr.String())
if devPeer.Endpoint.String() != currentPeer.Config.LocalConnAddr.String() {
log.Println("---------> endpoint is not set to proxy: ", currentPeer.Key)
currentPeer.StopConn()
delete(wgProxyConf.PeerMap, currentPeer.Key.String())
@ -279,24 +286,25 @@ func (m *ManagerAction) processPayload() (*wg.WGIface, error) {
delete(wgProxyConf.PeerMap, currentPeer.Key.String())
continue
}
if !reflect.DeepEqual(m.Payload.Peers[i], *currentPeer.PeerConf) {
if currentPeer.RemoteConnAddr.IP.String() != m.Payload.Peers[i].Endpoint.IP.String() {
if !reflect.DeepEqual(m.Payload.Peers[i], *currentPeer.Config.PeerConf) {
if currentPeer.Config.RemoteConnAddr.IP.String() != m.Payload.Peers[i].Endpoint.IP.String() {
log.Println("----------> Resetting proxy for Peer: ", currentPeer.Key, m.Payload.InterfaceName)
currentPeer.StopConn()
currentPeer.Mutex.Unlock()
delete(wgProxyConf.PeerMap, currentPeer.Key.String())
continue
} else {
log.Println("----->##### Updating Peer on Interface: ", m.Payload.InterfaceName, currentPeer.Key)
updatePeerConf := m.Payload.Peers[i]
localUdpAddr, err := net.ResolveUDPAddr("udp", currentPeer.LocalConnAddr.String())
localUdpAddr, err := net.ResolveUDPAddr("udp", currentPeer.Config.LocalConnAddr.String())
if err == nil {
updatePeerConf.Endpoint = localUdpAddr
}
if err := wgIface.Update(updatePeerConf, true); err != nil {
log.Println("failed to update peer: ", currentPeer.Key, err)
}
currentPeer.PeerConf = &m.Payload.Peers[i]
currentPeer.Config.PeerConf = &m.Payload.Peers[i]
wgProxyConf.PeerMap[currentPeer.Key.String()] = currentPeer
// delete the peer from the list
log.Println("-----------> deleting peer from list: ", m.Payload.Peers[i].PublicKey)
@ -309,6 +317,7 @@ func (m *ManagerAction) processPayload() (*wg.WGIface, error) {
log.Println("-----------> No updates observed so deleting peer: ", m.Payload.Peers[i].PublicKey)
m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...)
}
currentPeer.Mutex.Unlock()
} else if !m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].Proxy && !m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].IsAttachedExtClient {
log.Println("-----------> skipping peer, proxy is off: ", m.Payload.Peers[i].PublicKey)
@ -373,13 +382,6 @@ func (m *ManagerAction) AddInterfaceToProxy() error {
defer func() {
if addExtClient {
log.Println("GOT ENDPOINT for Extclient adding peer...")
common.PeerKeyHashMap[fmt.Sprintf("%x", md5.Sum([]byte(peer.PublicKey.String())))] = models.RemotePeer{
Interface: wgInterface.Name,
PeerKey: peer.PublicKey.String(),
IsExtClient: peerConf.IsExtClient,
IsAttachedExtClient: peerConf.IsAttachedExtClient,
Endpoint: peer.Endpoint,
}
common.ExtSourceIpMap[peer.Endpoint.String()] = models.RemotePeer{
Interface: wgInterface.Name,
@ -413,18 +415,10 @@ func (m *ManagerAction) AddInterfaceToProxy() error {
}(wgInterface, &peerI, isRelayed, relayedTo, peerConf, m.Payload.WgAddr)
continue
}
common.PeerKeyHashMap[fmt.Sprintf("%x", md5.Sum([]byte(peerI.PublicKey.String())))] = models.RemotePeer{
Interface: m.Payload.InterfaceName,
PeerKey: peerI.PublicKey.String(),
IsExtClient: peerConf.IsExtClient,
Endpoint: peerI.Endpoint,
IsAttachedExtClient: peerConf.IsAttachedExtClient,
}
peerpkg.AddNewPeer(wgInterface, &peerI, peerConf.Address, isRelayed,
peerConf.IsExtClient, peerConf.IsAttachedExtClient, relayedTo)
}
log.Printf("------> PEERHASHMAP: %+v\n", common.PeerKeyHashMap)
log.Printf("-------> WgIFaceMap: %+v\n", common.WgIfaceMap)
return nil
}

View file

@ -32,7 +32,7 @@ const (
TrafficRecievedUpdate MetricsUpdateType = 3
)
var MetricsMapLock = &sync.Mutex{}
var MetricsMapLock = &sync.RWMutex{}
var MetricsMap = make(map[string]Metric)

View file

@ -3,7 +3,10 @@ package models
import (
"context"
"net"
"sync"
"time"
"github.com/gravitl/netmaker/nm-proxy/wg"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
)
@ -12,8 +15,21 @@ const (
DefaultCIDR = "127.0.0.1/8"
)
// ConnConfig is a peer Connection configuration
type ConnConfig struct {
type ProxyConfig struct {
RemoteKey wgtypes.Key
LocalKey wgtypes.Key
WgInterface *wg.WGIface
IsExtClient bool
PersistentKeepalive *time.Duration
RecieverChan chan []byte
PeerConf *wgtypes.PeerConfig
PeerEndpoint *net.UDPAddr
RemoteConnAddr *net.UDPAddr
LocalConnAddr *net.UDPAddr
}
// Conn is a peer Connection configuration
type Conn struct {
// Key is a public key of a remote peer
Key wgtypes.Key
@ -21,13 +37,11 @@ type ConnConfig struct {
IsRelayed bool
RelayedEndpoint *net.UDPAddr
IsAttachedExtClient bool
PeerConf *wgtypes.PeerConfig
Config ProxyConfig
StopConn func()
ResetConn func()
PeerListenPort uint32
RemoteConnAddr *net.UDPAddr
LocalConnAddr *net.UDPAddr
RecieverChan chan []byte
LocalConn net.Conn
Mutex *sync.RWMutex
}
type RemotePeer struct {
@ -36,6 +50,7 @@ type RemotePeer struct {
Endpoint *net.UDPAddr
IsExtClient bool
IsAttachedExtClient bool
LocalConn net.Conn
}
type ExtClientPeer struct {
@ -46,5 +61,5 @@ type ExtClientPeer struct {
type WgIfaceConf struct {
Iface *wgtypes.Device
IfaceKeyHash string
PeerMap map[string]*ConnConfig
PeerMap map[string]*Conn
}

View file

@ -12,6 +12,13 @@ import (
"github.com/gravitl/netmaker/nm-proxy/stun"
)
/*
TODO:
1. Mutex locks for maps
2. CRUD funcs on Maps
3. Comments
*/
func Start(ctx context.Context, mgmChan chan *manager.ManagerAction, apiServerAddr string) {
log.Println("Starting Proxy...")
common.IsHostNetwork = (os.Getenv("HOST_NETWORK") == "" || os.Getenv("HOST_NETWORK") == "on")

View file

@ -154,8 +154,8 @@ func ProcessPacketBeforeSending(buf []byte, n int, srckey, dstKey string) ([]byt
func ExtractInfo(buffer []byte, n int) (int, string, string, error) {
data := buffer[:n]
if len(data) < 36 {
return 0, "", "", errors.New("proxy message not found")
if len(data) < MessageProxySize {
return n, "", "", errors.New("proxy message not found")
}
var msg ProxyMessage
var err error

View file

@ -1,13 +1,15 @@
package peer
import (
"crypto/md5"
"errors"
"fmt"
"log"
"net"
"sync"
"time"
"github.com/gravitl/netmaker/nm-proxy/common"
"github.com/gravitl/netmaker/nm-proxy/metrics"
"github.com/gravitl/netmaker/nm-proxy/models"
"github.com/gravitl/netmaker/nm-proxy/proxy"
"github.com/gravitl/netmaker/nm-proxy/wg"
@ -20,15 +22,14 @@ func AddNewPeer(wgInterface *wg.WGIface, peer *wgtypes.PeerConfig, peerAddr stri
d := time.Second * 25
peer.PersistentKeepaliveInterval = &d
}
c := proxy.Config{
c := models.ProxyConfig{
LocalKey: wgInterface.Device.PublicKey,
RemoteKey: peer.PublicKey,
WgInterface: wgInterface,
IsExtClient: isExtClient,
PeerConf: peer,
PersistentKeepalive: peer.PersistentKeepaliveInterval,
RecieverChan: make(chan []byte, 100),
MetricsCh: make(chan metrics.MetricsPayload, 30),
RecieverChan: make(chan []byte, 1000),
}
p := proxy.NewProxy(c)
peerPort := models.NmProxyPort
@ -36,38 +37,47 @@ func AddNewPeer(wgInterface *wg.WGIface, peer *wgtypes.PeerConfig, peerAddr stri
peerPort = peer.Endpoint.Port
}
peerEndpoint := peer.Endpoint.IP
peerEndpointIP := peer.Endpoint.IP
if isRelayed {
//go server.NmProxyServer.KeepAlive(peer.Endpoint.IP.String(), common.NmProxyPort)
if relayTo == nil {
return errors.New("relay endpoint is nil")
}
peerEndpoint = relayTo.IP
peerEndpointIP = relayTo.IP
}
p.Config.PeerIp = peerEndpoint
p.Config.PeerPort = uint32(peerPort)
peerEndpoint, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", peerEndpointIP, peerPort))
if err != nil {
return err
}
p.Config.PeerEndpoint = peerEndpoint
log.Printf("Starting proxy for Peer: %s\n", peer.PublicKey.String())
lAddr, rAddr, err := p.Start()
err = p.Start()
if err != nil {
return err
}
connConf := models.ConnConfig{
connConf := models.Conn{
Mutex: &sync.RWMutex{},
Key: peer.PublicKey,
IsRelayed: isRelayed,
RelayedEndpoint: relayTo,
IsAttachedExtClient: isAttachedExtClient,
PeerConf: peer,
Config: p.Config,
StopConn: p.Close,
ResetConn: p.Reset,
RemoteConnAddr: rAddr,
LocalConnAddr: lAddr,
RecieverChan: p.Config.RecieverChan,
PeerListenPort: p.Config.PeerPort,
LocalConn: p.LocalConn,
}
common.WgIfaceMap.PeerMap[peer.PublicKey.String()] = &connConf
common.PeerKeyHashMap[fmt.Sprintf("%x", md5.Sum([]byte(peer.PublicKey.String())))] = models.RemotePeer{
Interface: wgInterface.Name,
PeerKey: peer.PublicKey.String(),
IsExtClient: isExtClient,
Endpoint: peerEndpoint,
IsAttachedExtClient: isAttachedExtClient,
LocalConn: p.LocalConn,
}
return nil
}

View file

@ -7,91 +7,63 @@ import (
"log"
"net"
"runtime"
"time"
"github.com/gravitl/netmaker/nm-proxy/common"
"github.com/gravitl/netmaker/nm-proxy/metrics"
"github.com/gravitl/netmaker/nm-proxy/models"
"github.com/gravitl/netmaker/nm-proxy/wg"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
)
const (
defaultBodySize = 10000
defaultPort = 51722
)
type Config struct {
BodySize int
Addr string
RemoteKey wgtypes.Key
LocalKey wgtypes.Key
WgInterface *wg.WGIface
IsExtClient bool
PersistentKeepalive *time.Duration
RecieverChan chan []byte
MetricsCh chan metrics.MetricsPayload
PeerConf *wgtypes.PeerConfig
PeerIp net.IP
PeerPort uint32
}
// Proxy - WireguardProxy proxies
type Proxy struct {
Ctx context.Context
Cancel context.CancelFunc
Config Config
Config models.ProxyConfig
RemoteConn *net.UDPAddr
LocalConn net.Conn
}
func (p *Proxy) Start() (*net.UDPAddr, *net.UDPAddr, error) {
func (p *Proxy) Start() error {
var err error
remoteConn, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", p.Config.PeerIp.String(), p.Config.PeerPort))
if err != nil {
return nil, nil, err
}
p.RemoteConn = remoteConn
log.Printf("----> Established Remote Conn with RPeer: %s, ----> RAddr: %s", p.Config.RemoteKey.String(), remoteConn.String())
//log.Printf("----> WGIFACE: %+v\n", p.Config.WgInterface)
p.RemoteConn = p.Config.PeerEndpoint
log.Printf("----> Established Remote Conn with RPeer: %s, ----> RAddr: %s", p.Config.RemoteKey.String(), p.RemoteConn.String())
addr, err := GetFreeIp(models.DefaultCIDR, p.Config.WgInterface.Port)
if err != nil {
log.Println("Failed to get freeIp: ", err)
return nil, nil, err
return err
}
wgListenAddr, err := GetInterfaceListenAddr(p.Config.WgInterface.Port)
if err != nil {
log.Println("failed to get wg listen addr: ", err)
return nil, nil, err
return err
}
if runtime.GOOS == "darwin" {
wgListenAddr.IP = net.ParseIP(addr)
}
//log.Println("--------->#### Wg Listen Addr: ", wgListenAddr.String())
p.LocalConn, err = net.DialUDP("udp", &net.UDPAddr{
IP: net.ParseIP(addr),
Port: models.NmProxyPort,
}, wgListenAddr)
if err != nil {
log.Printf("failed dialing to local Wireguard port,Err: %v\n", err)
return nil, nil, err
return err
}
log.Printf("Dialing to local Wireguard port %s --> %s\n", p.LocalConn.LocalAddr().String(), p.LocalConn.RemoteAddr().String())
err = p.updateEndpoint()
if err != nil {
log.Printf("error while updating Wireguard peer endpoint [%s] %v\n", p.Config.RemoteKey, err)
return nil, nil, err
return err
}
localAddr, err := net.ResolveUDPAddr("udp", p.LocalConn.LocalAddr().String())
if err != nil {
log.Println("failed to resolve local addr: ", err)
return nil, nil, err
return err
}
p.Config.LocalConnAddr = localAddr
p.Config.RemoteConnAddr = p.RemoteConn
go p.ProxyPeer()
return localAddr, p.RemoteConn, nil
return nil
}
func (p *Proxy) Close() {

View file

@ -22,33 +22,12 @@ import (
"github.com/gravitl/netmaker/nm-proxy/wg"
)
func NewProxy(config Config) *Proxy {
func NewProxy(config models.ProxyConfig) *Proxy {
p := &Proxy{Config: config}
p.Ctx, p.Cancel = context.WithCancel(context.Background())
return p
}
func (p *Proxy) proxyToLocal(wg *sync.WaitGroup, ticker *time.Ticker) {
defer wg.Done()
for {
select {
case <-p.Ctx.Done():
return
case buffer := <-p.Config.RecieverChan:
ticker.Reset(*p.Config.PersistentKeepalive + time.Second*5)
log.Printf("PROXING TO LOCAL!!!---> %s <<<< %s \n",
p.LocalConn.RemoteAddr(), p.LocalConn.LocalAddr())
_, err := p.LocalConn.Write(buffer[:])
if err != nil {
log.Println("Failed to proxy to Wg local interface: ", err)
}
}
}
}
func (p *Proxy) proxyToRemote(wg *sync.WaitGroup) {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
@ -58,21 +37,6 @@ func (p *Proxy) proxyToRemote(wg *sync.WaitGroup) {
select {
case <-p.Ctx.Done():
return
case <-ticker.C:
metrics.MetricsMapLock.Lock()
metric := metrics.MetricsMap[p.Config.RemoteKey.String()]
metric.ConnectionStatus = false
metrics.MetricsMap[p.Config.RemoteKey.String()] = metric
metrics.MetricsMapLock.Unlock()
pkt, err := packet.CreateMetricPacket(uuid.New().ID(), p.Config.LocalKey, p.Config.RemoteKey)
if err == nil {
log.Printf("-----------> ##### $$$$$ SENDING METRIC PACKET TO: %s\n", p.RemoteConn.String())
_, err = server.NmProxyServer.Server.WriteToUDP(pkt, p.RemoteConn)
if err != nil {
log.Println("Failed to send to metric pkt: ", err)
}
}
default:
n, err := p.LocalConn.Read(buf)
@ -81,29 +45,29 @@ func (p *Proxy) proxyToRemote(wg *sync.WaitGroup) {
continue
}
if _, ok := common.WgIfaceMap.PeerMap[p.Config.RemoteKey.String()]; ok {
// if _, found := common.GetPeer(p.Config.RemoteKey); !found {
// log.Printf("Peer: %s not found in config\n", p.Config.RemoteKey)
// p.Close()
// return
// }
go func(n int, peerKey string) {
metrics.MetricsMapLock.Lock()
metric := metrics.MetricsMap[p.Config.RemoteKey.String()]
metric := metrics.MetricsMap[peerKey]
metric.TrafficSent += uint64(n)
metrics.MetricsMap[p.Config.RemoteKey.String()] = metric
metrics.MetricsMap[peerKey] = metric
metrics.MetricsMapLock.Unlock()
}(n, p.Config.RemoteKey.String())
var srcPeerKeyHash, dstPeerKeyHash string
//var srcPeerKeyHash, dstPeerKeyHash string
if !p.Config.IsExtClient {
buf, n, srcPeerKeyHash, dstPeerKeyHash = packet.ProcessPacketBeforeSending(buf, n, common.WgIfaceMap.Iface.PublicKey.String(), p.Config.RemoteKey.String())
buf, n, _, _ = packet.ProcessPacketBeforeSending(buf, n, p.Config.WgInterface.Device.PublicKey.String(), p.Config.RemoteKey.String())
if err != nil {
log.Println("failed to process pkt before sending: ", err)
}
}
log.Printf("PROXING TO REMOTE!!!---> %s >>>>> %s >>>>> %s [[ SrcPeerHash: %s, DstPeerHash: %s ]]\n",
p.LocalConn.LocalAddr(), server.NmProxyServer.Server.LocalAddr().String(), p.RemoteConn.String(), srcPeerKeyHash, dstPeerKeyHash)
} else {
log.Printf("Peer: %s not found in config\n", p.Config.RemoteKey)
p.Close()
return
}
// log.Printf("PROXING TO REMOTE!!!---> %s >>>>> %s >>>>> %s [[ SrcPeerHash: %s, DstPeerHash: %s ]]\n",
// p.LocalConn.LocalAddr(), server.NmProxyServer.Server.LocalAddr().String(), p.RemoteConn.String(), srcPeerKeyHash, dstPeerKeyHash)
_, err = server.NmProxyServer.Server.WriteToUDP(buf[:n], p.RemoteConn)
if err != nil {
@ -117,14 +81,52 @@ func (p *Proxy) proxyToRemote(wg *sync.WaitGroup) {
func (p *Proxy) Reset() {
p.Close()
p.pullLatestConfig()
if err := p.pullLatestConfig(); err != nil {
log.Println("couldn't perform reset: ", err)
return
}
p.Start()
}
func (p *Proxy) pullLatestConfig() {
if peer, ok := common.WgIfaceMap.PeerMap[p.Config.RemoteKey.String()]; ok {
p.Config.PeerPort = peer.PeerListenPort
func (p *Proxy) pullLatestConfig() error {
peer, found := common.GetPeer(p.Config.RemoteKey)
if found {
p.Config.PeerEndpoint.Port = peer.Config.PeerEndpoint.Port
} else {
return errors.New("peer not found")
}
return nil
}
func (p *Proxy) startMetricsThread(wg *sync.WaitGroup, rTicker *time.Ticker) {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
defer wg.Done()
for {
select {
case <-p.Ctx.Done():
return
case <-ticker.C:
metrics.MetricsMapLock.Lock()
metric := metrics.MetricsMap[p.Config.RemoteKey.String()]
if metric.ConnectionStatus {
rTicker.Reset(*p.Config.PersistentKeepalive)
}
metric.ConnectionStatus = false
metrics.MetricsMap[p.Config.RemoteKey.String()] = metric
metrics.MetricsMapLock.Unlock()
pkt, err := packet.CreateMetricPacket(uuid.New().ID(), p.Config.LocalKey, p.Config.RemoteKey)
if err == nil {
log.Printf("-----------> ##### $$$$$ SENDING METRIC PACKET TO: %s\n", p.RemoteConn.String())
_, err = server.NmProxyServer.Server.WriteToUDP(pkt, p.RemoteConn)
if err != nil {
log.Println("Failed to send to metric pkt: ", err)
}
}
}
}
}
@ -162,11 +164,11 @@ func (p *Proxy) ProxyPeer() {
defer ticker.Stop()
wg := &sync.WaitGroup{}
wg.Add(1)
go p.proxyToLocal(wg, ticker)
wg.Add(1)
go p.proxyToRemote(wg)
// if common.BehindNAT {
wg.Add(1)
go p.startMetricsThread(wg, ticker)
wg.Add(1)
go p.peerUpdates(wg, ticker)
// }
wg.Wait()

View file

@ -3,6 +3,7 @@ package server
import (
"context"
"encoding/binary"
"fmt"
"log"
"net"
"time"
@ -34,36 +35,38 @@ type ProxyServer struct {
Server *net.UDPConn
}
func (p *ProxyServer) Close() {
log.Println("--------->### Shutting down Proxy.....")
// clean up proxy connections
for _, peerI := range common.WgIfaceMap.PeerMap {
peerI.Mutex.Lock()
peerI.StopConn()
peerI.Mutex.Unlock()
}
// close server connection
NmProxyServer.Server.Close()
}
// Proxy.Listen - begins listening for packets
func (p *ProxyServer) Listen(ctx context.Context) {
// Buffer with indicated body size
buffer := make([]byte, 65032)
buffer := make([]byte, 65036)
for {
select {
case <-ctx.Done():
log.Println("--------->### Shutting down Proxy.....")
// clean up proxy connections
log.Println("########------------> CLEANING UP Interface ")
for _, peerI := range common.WgIfaceMap.PeerMap {
peerI.StopConn()
}
// close server connection
NmProxyServer.Server.Close()
p.Close()
return
default:
// Read Packet
n, source, err := p.Server.ReadFromUDP(buffer)
if err != nil { // in future log errors?
if err != nil || source == nil { // in future log errors?
log.Println("RECV ERROR: ", err)
continue
}
//go func(buffer []byte, source *net.UDPAddr, n int) {
origBufferLen := n
proxyTransportMsg := true
var srcPeerKeyHash, dstPeerKeyHash string
n, srcPeerKeyHash, dstPeerKeyHash, err = packet.ExtractInfo(buffer, n)
@ -74,37 +77,25 @@ func (p *ProxyServer) Listen(ctx context.Context) {
if proxyTransportMsg {
proxyIncomingPacket(buffer[:], source, n, srcPeerKeyHash, dstPeerKeyHash)
continue
} else {
// unknown peer to proxy -> check if extclient and handle it
if peerInfo, ok := common.ExtSourceIpMap[source.String()]; ok {
if peerI, ok := common.WgIfaceMap.PeerMap[peerInfo.PeerKey]; ok {
metrics.MetricsMapLock.Lock()
metric := metrics.MetricsMap[peerInfo.PeerKey]
metric.TrafficRecieved += uint64(n)
metric.ConnectionStatus = true
metrics.MetricsMap[peerInfo.PeerKey] = metric
metrics.MetricsMapLock.Unlock()
peerI.RecieverChan <- buffer[:n]
// log.Printf("PROXING TO LOCAL!!!---> %s <<<< %s <<<<<<<< %s [[ RECV PKT [SRCKEYHASH: %s], [DSTKEYHASH: %s], SourceIP: [%s] ]]\n",
// peerI.LocalConn.RemoteAddr(), peerI.LocalConn.LocalAddr(),
// fmt.Sprintf("%s:%d", source.IP.String(), source.Port), srcPeerKeyHash, dstPeerKeyHash, source.IP.String())
// _, err = peerI.LocalConn.Write(buffer[:n])
// if err != nil {
// log.Println("Failed to proxy to Wg local interface: ", err)
// //continue
// }
if handleExtClients(buffer[:], n, source) {
continue
}
}
handleMsgs(buffer, n, source)
}
}
}
func handleMsgs(buffer []byte, n int, source *net.UDPAddr) {
msgType := binary.LittleEndian.Uint32(buffer[:4])
switch packet.MessageType(msgType) {
case packet.MessageMetricsType:
metricMsg, err := packet.ConsumeMetricPacket(buffer[:origBufferLen])
metricMsg, err := packet.ConsumeMetricPacket(buffer[:n])
// calc latency
if err == nil {
log.Printf("------->$$$$$ Recieved Metric Pkt: %+v, FROM:%s\n", metricMsg, source.String())
@ -115,37 +106,41 @@ func (p *ProxyServer) Listen(ctx context.Context) {
metric := metrics.MetricsMap[metricMsg.Reciever.String()]
metric.LastRecordedLatency = uint64(latency)
metric.ConnectionStatus = true
metric.TrafficRecieved += uint64(origBufferLen)
metric.TrafficRecieved += uint64(n)
metrics.MetricsMap[metricMsg.Reciever.String()] = metric
metrics.MetricsMapLock.Unlock()
} else if metricMsg.Reciever == common.WgIfaceMap.Iface.PublicKey {
// proxy it back to the sender
log.Println("------------> $$$ SENDING back the metric pkt to the source: ", source.String())
_, err = NmProxyServer.Server.WriteToUDP(buffer[:origBufferLen], source)
_, err = NmProxyServer.Server.WriteToUDP(buffer[:n], source)
if err != nil {
log.Println("Failed to send metric packet to remote: ", err)
}
metrics.MetricsMapLock.Lock()
metric := metrics.MetricsMap[metricMsg.Sender.String()]
metric.ConnectionStatus = true
metric.TrafficRecieved += uint64(origBufferLen)
metric.TrafficRecieved += uint64(n)
metrics.MetricsMap[metricMsg.Sender.String()] = metric
metrics.MetricsMapLock.Unlock()
}
}
case packet.MessageProxyUpdateType:
msg, err := packet.ConsumeProxyUpdateMsg(buffer[:origBufferLen])
msg, err := packet.ConsumeProxyUpdateMsg(buffer[:n])
if err == nil {
switch msg.Action {
case packet.UpdateListenPort:
if peer, ok := common.WgIfaceMap.PeerMap[msg.Sender.String()]; ok {
if peer.PeerListenPort != msg.ListenPort {
peer.Mutex.Lock()
if peer.Config.PeerEndpoint.Port != int(msg.ListenPort) {
// update peer conn
peer.PeerListenPort = msg.ListenPort
peer.Config.PeerEndpoint.Port = int(msg.ListenPort)
common.WgIfaceMap.PeerMap[msg.Sender.String()] = peer
log.Println("--------> Resetting Proxy Conn For Peer ", msg.Sender.String())
peer.Mutex.Unlock()
peer.ResetConn()
return
}
peer.Mutex.Unlock()
}
}
@ -153,17 +148,32 @@ func (p *ProxyServer) Listen(ctx context.Context) {
// consume handshake message for ext clients
case packet.MessageInitiationType:
err := packet.ConsumeHandshakeInitiationMsg(false, buffer[:origBufferLen], source,
err := packet.ConsumeHandshakeInitiationMsg(false, buffer[:n], source,
packet.NoisePublicKey(common.WgIfaceMap.Iface.PublicKey), packet.NoisePrivateKey(common.WgIfaceMap.Iface.PrivateKey))
if err != nil {
log.Println("---------> @@@ failed to decode HS: ", err)
}
}
}
func handleExtClients(buffer []byte, n int, source *net.UDPAddr) bool {
isExtClient := false
if peerInfo, ok := common.ExtSourceIpMap[source.String()]; ok {
if peerI, ok := common.WgIfaceMap.PeerMap[peerInfo.PeerKey]; ok {
peerI.Mutex.RLock()
peerI.Config.RecieverChan <- buffer[:n]
metrics.MetricsMapLock.Lock()
metric := metrics.MetricsMap[peerInfo.PeerKey]
metric.TrafficRecieved += uint64(n)
metric.ConnectionStatus = true
metrics.MetricsMap[peerInfo.PeerKey] = metric
metrics.MetricsMapLock.Unlock()
peerI.Mutex.RUnlock()
isExtClient = true
}
}
}
return isExtClient
}
func proxyIncomingPacket(buffer []byte, source *net.UDPAddr, n int, srcPeerKeyHash, dstPeerKeyHash string) {
@ -201,24 +211,25 @@ func proxyIncomingPacket(buffer []byte, source *net.UDPAddr, n int, srcPeerKeyHa
}
if peerInfo, ok := common.PeerKeyHashMap[srcPeerKeyHash]; ok {
if peerI, ok := common.WgIfaceMap.PeerMap[peerInfo.PeerKey]; ok {
log.Printf("PROXING TO LOCAL!!!---> %s <<<< %s <<<<<<<< %s [[ RECV PKT [SRCKEYHASH: %s], [DSTKEYHASH: %s], SourceIP: [%s] ]]\n",
peerInfo.LocalConn.RemoteAddr(), peerInfo.LocalConn.LocalAddr(),
fmt.Sprintf("%s:%d", source.IP.String(), source.Port), srcPeerKeyHash, dstPeerKeyHash, source.IP.String())
_, err = peerInfo.LocalConn.Write(buffer[:n])
if err != nil {
log.Println("Failed to proxy to Wg local interface: ", err)
//continue
}
go func(n int, peerKey string) {
metrics.MetricsMapLock.Lock()
metric := metrics.MetricsMap[peerInfo.PeerKey]
metric := metrics.MetricsMap[peerKey]
metric.TrafficRecieved += uint64(n)
metric.ConnectionStatus = true
metrics.MetricsMap[peerInfo.PeerKey] = metric
metrics.MetricsMap[peerKey] = metric
metrics.MetricsMapLock.Unlock()
peerI.RecieverChan <- buffer[:n]
// log.Printf("PROXING TO LOCAL!!!---> %s <<<< %s <<<<<<<< %s [[ RECV PKT [SRCKEYHASH: %s], [DSTKEYHASH: %s], SourceIP: [%s] ]]\n",
// peerI.LocalConn.RemoteAddr(), peerI.LocalConn.LocalAddr(),
// fmt.Sprintf("%s:%d", source.IP.String(), source.Port), srcPeerKeyHash, dstPeerKeyHash, source.IP.String())
// _, err = peerI.LocalConn.Write(buffer[:n])
// if err != nil {
// log.Println("Failed to proxy to Wg local interface: ", err)
// //continue
// }
}(n, peerInfo.PeerKey)
return
}
}