mirror of
https://github.com/slackhq/nebula.git
synced 2024-09-20 14:56:12 +08:00
9af242dc47
These new helpers make the code a lot cleaner. I confirmed that the simple helpers like `atomic.Int64` don't add any extra overhead as they get inlined by the compiler. `atomic.Pointer` adds an extra method call as it no longer gets inlined, but we aren't using these on the hot path so it is probably okay.
399 lines
10 KiB
Go
399 lines
10 KiB
Go
package nebula
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"os"
|
|
"runtime"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/rcrowley/go-metrics"
|
|
"github.com/sirupsen/logrus"
|
|
"github.com/slackhq/nebula/cert"
|
|
"github.com/slackhq/nebula/config"
|
|
"github.com/slackhq/nebula/firewall"
|
|
"github.com/slackhq/nebula/iputil"
|
|
"github.com/slackhq/nebula/overlay"
|
|
"github.com/slackhq/nebula/udp"
|
|
)
|
|
|
|
const mtu = 9001
|
|
|
|
type InterfaceConfig struct {
|
|
HostMap *HostMap
|
|
Outside *udp.Conn
|
|
Inside overlay.Device
|
|
certState *CertState
|
|
Cipher string
|
|
Firewall *Firewall
|
|
ServeDns bool
|
|
HandshakeManager *HandshakeManager
|
|
lightHouse *LightHouse
|
|
checkInterval int
|
|
pendingDeletionInterval int
|
|
DropLocalBroadcast bool
|
|
DropMulticast bool
|
|
routines int
|
|
MessageMetrics *MessageMetrics
|
|
version string
|
|
caPool *cert.NebulaCAPool
|
|
disconnectInvalid bool
|
|
relayManager *relayManager
|
|
|
|
ConntrackCacheTimeout time.Duration
|
|
l *logrus.Logger
|
|
}
|
|
|
|
type Interface struct {
|
|
hostMap *HostMap
|
|
outside *udp.Conn
|
|
inside overlay.Device
|
|
certState *CertState
|
|
cipher string
|
|
firewall *Firewall
|
|
connectionManager *connectionManager
|
|
handshakeManager *HandshakeManager
|
|
serveDns bool
|
|
createTime time.Time
|
|
lightHouse *LightHouse
|
|
localBroadcast iputil.VpnIp
|
|
myVpnIp iputil.VpnIp
|
|
dropLocalBroadcast bool
|
|
dropMulticast bool
|
|
routines int
|
|
caPool *cert.NebulaCAPool
|
|
disconnectInvalid bool
|
|
closed atomic.Bool
|
|
relayManager *relayManager
|
|
|
|
sendRecvErrorConfig sendRecvErrorConfig
|
|
|
|
// rebindCount is used to decide if an active tunnel should trigger a punch notification through a lighthouse
|
|
rebindCount int8
|
|
version string
|
|
|
|
conntrackCacheTimeout time.Duration
|
|
|
|
writers []*udp.Conn
|
|
readers []io.ReadWriteCloser
|
|
|
|
metricHandshakes metrics.Histogram
|
|
messageMetrics *MessageMetrics
|
|
cachedPacketMetrics *cachedPacketMetrics
|
|
|
|
l *logrus.Logger
|
|
}
|
|
|
|
type sendRecvErrorConfig uint8
|
|
|
|
const (
|
|
sendRecvErrorAlways sendRecvErrorConfig = iota
|
|
sendRecvErrorNever
|
|
sendRecvErrorPrivate
|
|
)
|
|
|
|
func (s sendRecvErrorConfig) ShouldSendRecvError(ip net.IP) bool {
|
|
switch s {
|
|
case sendRecvErrorPrivate:
|
|
return ip.IsPrivate()
|
|
case sendRecvErrorAlways:
|
|
return true
|
|
case sendRecvErrorNever:
|
|
return false
|
|
default:
|
|
panic(fmt.Errorf("invalid sendRecvErrorConfig value: %d", s))
|
|
}
|
|
}
|
|
|
|
func (s sendRecvErrorConfig) String() string {
|
|
switch s {
|
|
case sendRecvErrorAlways:
|
|
return "always"
|
|
case sendRecvErrorNever:
|
|
return "never"
|
|
case sendRecvErrorPrivate:
|
|
return "private"
|
|
default:
|
|
return fmt.Sprintf("invalid(%d)", s)
|
|
}
|
|
}
|
|
|
|
func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) {
|
|
if c.Outside == nil {
|
|
return nil, errors.New("no outside connection")
|
|
}
|
|
if c.Inside == nil {
|
|
return nil, errors.New("no inside interface (tun)")
|
|
}
|
|
if c.certState == nil {
|
|
return nil, errors.New("no certificate state")
|
|
}
|
|
if c.Firewall == nil {
|
|
return nil, errors.New("no firewall rules")
|
|
}
|
|
|
|
myVpnIp := iputil.Ip2VpnIp(c.certState.certificate.Details.Ips[0].IP)
|
|
ifce := &Interface{
|
|
hostMap: c.HostMap,
|
|
outside: c.Outside,
|
|
inside: c.Inside,
|
|
certState: c.certState,
|
|
cipher: c.Cipher,
|
|
firewall: c.Firewall,
|
|
serveDns: c.ServeDns,
|
|
handshakeManager: c.HandshakeManager,
|
|
createTime: time.Now(),
|
|
lightHouse: c.lightHouse,
|
|
localBroadcast: myVpnIp | ^iputil.Ip2VpnIp(c.certState.certificate.Details.Ips[0].Mask),
|
|
dropLocalBroadcast: c.DropLocalBroadcast,
|
|
dropMulticast: c.DropMulticast,
|
|
routines: c.routines,
|
|
version: c.version,
|
|
writers: make([]*udp.Conn, c.routines),
|
|
readers: make([]io.ReadWriteCloser, c.routines),
|
|
caPool: c.caPool,
|
|
disconnectInvalid: c.disconnectInvalid,
|
|
myVpnIp: myVpnIp,
|
|
relayManager: c.relayManager,
|
|
|
|
conntrackCacheTimeout: c.ConntrackCacheTimeout,
|
|
|
|
metricHandshakes: metrics.GetOrRegisterHistogram("handshakes", nil, metrics.NewExpDecaySample(1028, 0.015)),
|
|
messageMetrics: c.MessageMetrics,
|
|
cachedPacketMetrics: &cachedPacketMetrics{
|
|
sent: metrics.GetOrRegisterCounter("hostinfo.cached_packets.sent", nil),
|
|
dropped: metrics.GetOrRegisterCounter("hostinfo.cached_packets.dropped", nil),
|
|
},
|
|
|
|
l: c.l,
|
|
}
|
|
|
|
ifce.connectionManager = newConnectionManager(ctx, c.l, ifce, c.checkInterval, c.pendingDeletionInterval)
|
|
|
|
return ifce, nil
|
|
}
|
|
|
|
// activate creates the interface on the host. After the interface is created, any
|
|
// other services that want to bind listeners to its IP may do so successfully. However,
|
|
// the interface isn't going to process anything until run() is called.
|
|
func (f *Interface) activate() {
|
|
// actually turn on tun dev
|
|
|
|
addr, err := f.outside.LocalAddr()
|
|
if err != nil {
|
|
f.l.WithError(err).Error("Failed to get udp listen address")
|
|
}
|
|
|
|
f.l.WithField("interface", f.inside.Name()).WithField("network", f.inside.Cidr().String()).
|
|
WithField("build", f.version).WithField("udpAddr", addr).
|
|
Info("Nebula interface is active")
|
|
|
|
metrics.GetOrRegisterGauge("routines", nil).Update(int64(f.routines))
|
|
|
|
// Prepare n tun queues
|
|
var reader io.ReadWriteCloser = f.inside
|
|
for i := 0; i < f.routines; i++ {
|
|
if i > 0 {
|
|
reader, err = f.inside.NewMultiQueueReader()
|
|
if err != nil {
|
|
f.l.Fatal(err)
|
|
}
|
|
}
|
|
f.readers[i] = reader
|
|
}
|
|
|
|
if err := f.inside.Activate(); err != nil {
|
|
f.inside.Close()
|
|
f.l.Fatal(err)
|
|
}
|
|
}
|
|
|
|
func (f *Interface) run() {
|
|
// Launch n queues to read packets from udp
|
|
for i := 0; i < f.routines; i++ {
|
|
go f.listenOut(i)
|
|
}
|
|
|
|
// Launch n queues to read packets from tun dev
|
|
for i := 0; i < f.routines; i++ {
|
|
go f.listenIn(f.readers[i], i)
|
|
}
|
|
}
|
|
|
|
func (f *Interface) listenOut(i int) {
|
|
runtime.LockOSThread()
|
|
|
|
var li *udp.Conn
|
|
// TODO clean this up with a coherent interface for each outside connection
|
|
if i > 0 {
|
|
li = f.writers[i]
|
|
} else {
|
|
li = f.outside
|
|
}
|
|
|
|
lhh := f.lightHouse.NewRequestHandler()
|
|
conntrackCache := firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout)
|
|
li.ListenOut(f.readOutsidePackets, lhh.HandleRequest, conntrackCache, i)
|
|
}
|
|
|
|
func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) {
|
|
runtime.LockOSThread()
|
|
|
|
packet := make([]byte, mtu)
|
|
out := make([]byte, mtu)
|
|
fwPacket := &firewall.Packet{}
|
|
nb := make([]byte, 12, 12)
|
|
|
|
conntrackCache := firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout)
|
|
|
|
for {
|
|
n, err := reader.Read(packet)
|
|
if err != nil {
|
|
if errors.Is(err, os.ErrClosed) && f.closed.Load() {
|
|
return
|
|
}
|
|
|
|
f.l.WithError(err).Error("Error while reading outbound packet")
|
|
// This only seems to happen when something fatal happens to the fd, so exit.
|
|
os.Exit(2)
|
|
}
|
|
|
|
f.consumeInsidePacket(packet[:n], fwPacket, nb, out, i, conntrackCache.Get(f.l))
|
|
}
|
|
}
|
|
|
|
func (f *Interface) RegisterConfigChangeCallbacks(c *config.C) {
|
|
c.RegisterReloadCallback(f.reloadCA)
|
|
c.RegisterReloadCallback(f.reloadCertKey)
|
|
c.RegisterReloadCallback(f.reloadFirewall)
|
|
c.RegisterReloadCallback(f.reloadSendRecvError)
|
|
for _, udpConn := range f.writers {
|
|
c.RegisterReloadCallback(udpConn.ReloadConfig)
|
|
}
|
|
}
|
|
|
|
func (f *Interface) reloadCA(c *config.C) {
|
|
// reload and check regardless
|
|
// todo: need mutex?
|
|
newCAs, err := loadCAFromConfig(f.l, c)
|
|
if err != nil {
|
|
f.l.WithError(err).Error("Could not refresh trusted CA certificates")
|
|
return
|
|
}
|
|
|
|
f.caPool = newCAs
|
|
f.l.WithField("fingerprints", f.caPool.GetFingerprints()).Info("Trusted CA certificates refreshed")
|
|
}
|
|
|
|
func (f *Interface) reloadCertKey(c *config.C) {
|
|
// reload and check in all cases
|
|
cs, err := NewCertStateFromConfig(c)
|
|
if err != nil {
|
|
f.l.WithError(err).Error("Could not refresh client cert")
|
|
return
|
|
}
|
|
|
|
// did IP in cert change? if so, don't set
|
|
oldIPs := f.certState.certificate.Details.Ips
|
|
newIPs := cs.certificate.Details.Ips
|
|
if len(oldIPs) > 0 && len(newIPs) > 0 && oldIPs[0].String() != newIPs[0].String() {
|
|
f.l.WithField("new_ip", newIPs[0]).WithField("old_ip", oldIPs[0]).Error("IP in new cert was different from old")
|
|
return
|
|
}
|
|
|
|
f.certState = cs
|
|
f.l.WithField("cert", cs.certificate).Info("Client cert refreshed from disk")
|
|
}
|
|
|
|
func (f *Interface) reloadFirewall(c *config.C) {
|
|
//TODO: need to trigger/detect if the certificate changed too
|
|
if c.HasChanged("firewall") == false {
|
|
f.l.Debug("No firewall config change detected")
|
|
return
|
|
}
|
|
|
|
fw, err := NewFirewallFromConfig(f.l, f.certState.certificate, c)
|
|
if err != nil {
|
|
f.l.WithError(err).Error("Error while creating firewall during reload")
|
|
return
|
|
}
|
|
|
|
oldFw := f.firewall
|
|
conntrack := oldFw.Conntrack
|
|
conntrack.Lock()
|
|
defer conntrack.Unlock()
|
|
|
|
fw.rulesVersion = oldFw.rulesVersion + 1
|
|
// If rulesVersion is back to zero, we have wrapped all the way around. Be
|
|
// safe and just reset conntrack in this case.
|
|
if fw.rulesVersion == 0 {
|
|
f.l.WithField("firewallHash", fw.GetRuleHash()).
|
|
WithField("oldFirewallHash", oldFw.GetRuleHash()).
|
|
WithField("rulesVersion", fw.rulesVersion).
|
|
Warn("firewall rulesVersion has overflowed, resetting conntrack")
|
|
} else {
|
|
fw.Conntrack = conntrack
|
|
}
|
|
|
|
f.firewall = fw
|
|
|
|
oldFw.Destroy()
|
|
f.l.WithField("firewallHash", fw.GetRuleHash()).
|
|
WithField("oldFirewallHash", oldFw.GetRuleHash()).
|
|
WithField("rulesVersion", fw.rulesVersion).
|
|
Info("New firewall has been installed")
|
|
}
|
|
|
|
func (f *Interface) reloadSendRecvError(c *config.C) {
|
|
if c.InitialLoad() || c.HasChanged("listen.send_recv_error") {
|
|
stringValue := c.GetString("listen.send_recv_error", "always")
|
|
|
|
switch stringValue {
|
|
case "always":
|
|
f.sendRecvErrorConfig = sendRecvErrorAlways
|
|
case "never":
|
|
f.sendRecvErrorConfig = sendRecvErrorNever
|
|
case "private":
|
|
f.sendRecvErrorConfig = sendRecvErrorPrivate
|
|
default:
|
|
if c.GetBool("listen.send_recv_error", true) {
|
|
f.sendRecvErrorConfig = sendRecvErrorAlways
|
|
} else {
|
|
f.sendRecvErrorConfig = sendRecvErrorNever
|
|
}
|
|
}
|
|
|
|
f.l.WithField("sendRecvError", f.sendRecvErrorConfig.String()).
|
|
Info("Loaded send_recv_error config")
|
|
}
|
|
}
|
|
|
|
func (f *Interface) emitStats(ctx context.Context, i time.Duration) {
|
|
ticker := time.NewTicker(i)
|
|
defer ticker.Stop()
|
|
|
|
udpStats := udp.NewUDPStatsEmitter(f.writers)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
f.firewall.EmitStats()
|
|
f.handshakeManager.EmitStats()
|
|
udpStats()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (f *Interface) Close() error {
|
|
f.closed.Store(true)
|
|
|
|
// Release the tun device
|
|
return f.inside.Close()
|
|
}
|