This commit is contained in:
enoperm 2024-09-18 17:49:53 +00:00 committed by GitHub
commit e037407215
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 277 additions and 33 deletions

View file

@ -41,6 +41,7 @@ jobs:
- TestResolveMagicDNS
- TestValidateResolvConf
- TestDERPServerScenario
- TestDERPServerWebsocketScenario
- TestPingAllByIP
- TestPingAllByIPPublicDERP
- TestAuthKeyLogoutAndRelogin

2
go.mod
View file

@ -4,6 +4,7 @@ go 1.23.0
require (
github.com/AlecAivazis/survey/v2 v2.3.7
github.com/coder/websocket v1.8.12
github.com/coreos/go-oidc/v3 v3.11.0
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/deckarep/golang-set/v2 v2.6.0
@ -79,7 +80,6 @@ require (
github.com/bits-and-blooms/bitset v1.13.0 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/coder/websocket v1.8.12 // indirect
github.com/containerd/console v1.0.4 // indirect
github.com/containerd/continuity v0.4.3 // indirect
github.com/coreos/go-iptables v0.7.1-0.20240112124308-65c67c9f46e6 // indirect

View file

@ -425,7 +425,7 @@ func (h *Headscale) createRouter(grpcMux *grpcRuntime.ServeMux) *mux.Router {
router := mux.NewRouter()
router.Use(prometheusMiddleware)
router.HandleFunc(ts2021UpgradePath, h.NoiseUpgradeHandler).Methods(http.MethodPost)
router.HandleFunc(ts2021UpgradePath, h.NoiseUpgradeHandler).Methods(http.MethodPost, http.MethodGet)
router.HandleFunc("/health", h.HealthHandler).Methods(http.MethodGet)
router.HandleFunc("/key", h.KeyHandler).Methods(http.MethodGet)

View file

@ -1,6 +1,7 @@
package server
import (
"bufio"
"context"
"encoding/json"
"fmt"
@ -12,11 +13,13 @@ import (
"strings"
"time"
"github.com/coder/websocket"
"github.com/juanfont/headscale/hscontrol/types"
"github.com/juanfont/headscale/hscontrol/util"
"github.com/rs/zerolog/log"
"tailscale.com/derp"
"tailscale.com/net/stun"
"tailscale.com/net/wsconn"
"tailscale.com/tailcfg"
"tailscale.com/types/key"
)
@ -132,6 +135,56 @@ func (d *DERPServer) DERPHandler(
return
}
if strings.Contains(req.Header.Get("Sec-Websocket-Protocol"), "derp") {
d.serveWebsocket(writer, req)
} else {
d.servePlain(writer, req)
}
}
func (d *DERPServer) serveWebsocket(writer http.ResponseWriter, req *http.Request) {
websocketConn, err := websocket.Accept(writer, req, &websocket.AcceptOptions{
Subprotocols: []string{"derp"},
OriginPatterns: []string{"*"},
// Disable compression because DERP transmits WireGuard messages that
// are not compressible.
// Additionally, Safari has a broken implementation of compression
// (see https://github.com/nhooyr/websocket/issues/218) that makes
// enabling it actively harmful.
CompressionMode: websocket.CompressionDisabled,
})
if err != nil {
log.Error().
Caller().
Err(err).
Msg("Failed to upgrade websocket request")
writer.Header().Set("Content-Type", "text/plain")
writer.WriteHeader(http.StatusInternalServerError)
_, err = writer.Write([]byte("Failed to upgrade websocket request"))
if err != nil {
log.Error().
Caller().
Err(err).
Msg("Failed to write response")
}
return
}
defer websocketConn.Close(websocket.StatusInternalError, "closing")
if websocketConn.Subprotocol() != "derp" {
websocketConn.Close(websocket.StatusPolicyViolation, "client must speak the derp subprotocol")
return
}
wc := wsconn.NetConn(req.Context(), websocketConn, websocket.MessageBinary, req.RemoteAddr)
brw := bufio.NewReadWriter(bufio.NewReader(wc), bufio.NewWriter(wc))
d.tailscaleDERP.Accept(req.Context(), wc, brw, req.RemoteAddr)
}
func (d *DERPServer) servePlain(writer http.ResponseWriter, req *http.Request) {
fastStart := req.Header.Get(fastStartHeader) == "1"
hijacker, ok := writer.(http.Hijacker)

View file

@ -19,7 +19,7 @@ type User struct {
Name string `gorm:"unique"`
}
// TODO(kradalby): See if we can fill in Gravatar here
// TODO(kradalby): See if we can fill in Gravatar here.
func (u *User) profilePicURL() string {
return ""
}

View file

@ -13,7 +13,6 @@ func GrpcSocketDialer(ctx context.Context, addr string) (net.Conn, error) {
return d.DialContext(ctx, "unix", addr)
}
// TODO(kradalby): Remove after go 1.24, will be in stdlib.
// Compare returns an integer comparing two prefixes.
// The result will be 0 if p == p2, -1 if p < p2, and +1 if p > p2.

View file

@ -242,5 +242,4 @@ func TestValidateResolvConf(t *testing.T) {
}
})
}
}

View file

@ -3,6 +3,7 @@ package dockertestutil
import (
"bytes"
"context"
"io"
"log"
"os"
"path"
@ -13,6 +14,28 @@ import (
const filePerm = 0o644
func WriteLog(
pool *dockertest.Pool,
resource *dockertest.Resource,
stdout io.Writer,
stderr io.Writer,
) error {
return pool.Client.Logs(
docker.LogsOptions{
Context: context.TODO(),
Container: resource.Container.ID,
OutputStream: stdout,
ErrorStream: stderr,
Tail: "all",
RawTerminal: false,
Stdout: true,
Stderr: true,
Follow: false,
Timestamps: false,
},
)
}
func SaveLog(
pool *dockertest.Pool,
resource *dockertest.Resource,
@ -23,23 +46,13 @@ func SaveLog(
return "", "", err
}
// Wouldn't it be simpler to
// open and wrap the destination files in a
// bufio.Writer, and pass those in docker.LogsOptions?
var stdout bytes.Buffer
var stderr bytes.Buffer
err = pool.Client.Logs(
docker.LogsOptions{
Context: context.TODO(),
Container: resource.Container.ID,
OutputStream: &stdout,
ErrorStream: &stderr,
Tail: "all",
RawTerminal: false,
Stdout: true,
Stderr: true,
Follow: false,
Timestamps: false,
},
)
err = WriteLog(pool, resource, &stdout, &stderr)
if err != nil {
return "", "", err
}

View file

@ -15,6 +15,11 @@ import (
"github.com/ory/dockertest/v3"
)
type ClientsSpec struct {
Plain int
WebsocketDERP int
}
type EmbeddedDERPServerScenario struct {
*Scenario
@ -22,6 +27,65 @@ type EmbeddedDERPServerScenario struct {
}
func TestDERPServerScenario(t *testing.T) {
spec := map[string]ClientsSpec{
"user1": {
Plain: len(MustTestVersions),
WebsocketDERP: 0,
},
}
derpServerScenario(t, spec, func(scenario *EmbeddedDERPServerScenario) {
allClients, err := scenario.ListTailscaleClients()
assertNoErrListClients(t, err)
t.Logf("checking %d clients for websocket connections", len(allClients))
for _, client := range allClients {
if didClientUseWebsocketForDERP(t, client) {
t.Logf(
"client %q used websocket a connection, but was not expected to",
client.Hostname(),
)
t.Fail()
}
}
})
}
func TestDERPServerWebsocketScenario(t *testing.T) {
spec := map[string]ClientsSpec{
"user1": {
Plain: 0,
WebsocketDERP: len(MustTestVersions),
},
}
derpServerScenario(t, spec, func(scenario *EmbeddedDERPServerScenario) {
allClients, err := scenario.ListTailscaleClients()
assertNoErrListClients(t, err)
t.Logf("checking %d clients for websocket connections", len(allClients))
for _, client := range allClients {
if !didClientUseWebsocketForDERP(t, client) {
t.Logf(
"client %q does not seem to have used a websocket connection, even though it was expected to do so",
client.Hostname(),
)
t.Fail()
}
}
})
}
// This function implements the common parts of a DERP scenario,
// we *want* it to show up in stacktraces,
// so marking it as a test helper would be counterproductive.
//
//nolint:thelper
func derpServerScenario(
t *testing.T,
spec map[string]ClientsSpec,
furtherAssertions ...func(*EmbeddedDERPServerScenario),
) {
IntegrationSkip(t)
// t.Parallel()
@ -34,20 +98,18 @@ func TestDERPServerScenario(t *testing.T) {
}
defer scenario.ShutdownAssertNoPanics(t)
spec := map[string]int{
"user1": len(MustTestVersions),
}
err = scenario.CreateHeadscaleEnv(
spec,
hsic.WithTestName("derpserver"),
hsic.WithExtraPorts([]string{"3478/udp"}),
hsic.WithEmbeddedDERPServerOnly(),
hsic.WithPort(443),
hsic.WithTLS(),
hsic.WithHostnameAsServerURL(),
hsic.WithConfigEnv(map[string]string{
"HEADSCALE_DERP_AUTO_UPDATE_ENABLED": "true",
"HEADSCALE_DERP_UPDATE_FREQUENCY": "10s",
"HEADSCALE_LISTEN_ADDR": "0.0.0.0:443",
}),
)
assertNoErrHeadscaleEnv(t, err)
@ -76,6 +138,11 @@ func TestDERPServerScenario(t *testing.T) {
}
success := pingDerpAllHelper(t, allClients, allHostnames)
if len(allHostnames)*len(allClients) > success {
t.FailNow()
return
}
for _, client := range allClients {
status, err := client.Status()
@ -98,6 +165,9 @@ func TestDERPServerScenario(t *testing.T) {
time.Sleep(30 * time.Second)
success = pingDerpAllHelper(t, allClients, allHostnames)
if len(allHostnames)*len(allClients) > success {
t.Fail()
}
for _, client := range allClients {
status, err := client.Status()
@ -114,10 +184,14 @@ func TestDERPServerScenario(t *testing.T) {
}
t.Logf("Run2: %d successful pings out of %d", success, len(allClients)*len(allHostnames))
for _, check := range furtherAssertions {
check(&scenario)
}
}
func (s *EmbeddedDERPServerScenario) CreateHeadscaleEnv(
users map[string]int,
users map[string]ClientsSpec,
opts ...hsic.Option,
) error {
hsServer, err := s.Headscale(opts...)
@ -137,6 +211,7 @@ func (s *EmbeddedDERPServerScenario) CreateHeadscaleEnv(
if err != nil {
return err
}
log.Printf("headscale server ip address: %s", hsServer.GetIP())
hash, err := util.GenerateRandomStringDNSSafe(scenarioHashLength)
if err != nil {
@ -149,14 +224,31 @@ func (s *EmbeddedDERPServerScenario) CreateHeadscaleEnv(
return err
}
err = s.CreateTailscaleIsolatedNodesInUser(
hash,
userName,
"all",
clientCount,
)
if err != nil {
return err
if clientCount.Plain > 0 {
// Containers that use default DERP config
err = s.CreateTailscaleIsolatedNodesInUser(
hash,
userName,
"all",
clientCount.Plain,
)
if err != nil {
return err
}
}
if clientCount.WebsocketDERP > 0 {
// Containers that use DERP-over-WebSocket
err = s.CreateTailscaleIsolatedNodesInUser(
hash,
userName,
"all",
clientCount.WebsocketDERP,
tsic.WithWebsocketDERP(true),
)
if err != nil {
return err
}
}
key, err := s.CreatePreAuthKey(userName, true, false)

View file

@ -461,6 +461,12 @@ func (t *HeadscaleInContainer) Shutdown() (string, string, error) {
return stdoutPath, stderrPath, t.pool.Purge(t.container)
}
// WriteLogs writes the current stdout/stderr log of the container to
// the given io.Writers.
func (t *HeadscaleInContainer) WriteLogs(stdout, stderr io.Writer) error {
return dockertestutil.WriteLog(t.pool, t.container, stdout, stderr)
}
// SaveLog saves the current stdout log of the container to a path
// on the host system.
func (t *HeadscaleInContainer) SaveLog(path string) (string, string, error) {

View file

@ -1,6 +1,7 @@
package integration
import (
"io"
"net/netip"
"net/url"
@ -41,4 +42,6 @@ type TailscaleClient interface {
// FailingPeersAsString returns a formatted-ish multi-line-string of peers in the client
// and a bool indicating if the clients online count and peer count is equal.
FailingPeersAsString() (string, bool, error)
WriteLogs(stdout, stderr io.Writer) error
}

View file

@ -67,6 +67,7 @@ type TailscaleInContainer struct {
// optional config
headscaleCert []byte
headscaleHostname string
withWebsocketDERP bool
withSSH bool
withTags []string
withEntrypoint []string
@ -126,6 +127,14 @@ func WithTags(tags []string) Option {
}
}
// WithWebsocketDERP toggles a development knob to
// force enable DERP connection through the new websocket protocol.
func WithWebsocketDERP(enabled bool) Option {
return func(tsic *TailscaleInContainer) {
tsic.withWebsocketDERP = enabled
}
}
// WithSSH enables SSH for the Tailscale instance.
func WithSSH() Option {
return func(tsic *TailscaleInContainer) {
@ -206,6 +215,14 @@ func New(
// },
Entrypoint: tsic.withEntrypoint,
ExtraHosts: tsic.withExtraHosts,
Env: []string{},
}
if tsic.withWebsocketDERP {
tailscaleOptions.Env = append(
tailscaleOptions.Env,
fmt.Sprintf("TS_DEBUG_DERP_WS_CLIENT=%t", tsic.withWebsocketDERP),
)
}
if tsic.headscaleHostname != "" {
@ -351,6 +368,15 @@ func (t *TailscaleInContainer) Execute(
return stdout, stderr, nil
}
// Retrieve container logs.
func (t *TailscaleInContainer) Logs(stdout, stderr io.Writer) error {
return dockertestutil.WriteLog(
t.pool,
t.container,
stdout, stderr,
)
}
// Up runs the login routine on the given Tailscale instance.
// This login mechanism uses the authorised key for authentication.
func (t *TailscaleInContainer) Login(
@ -999,10 +1025,21 @@ func (t *TailscaleInContainer) WriteFile(path string, data []byte) error {
// on the host system.
func (t *TailscaleInContainer) SaveLog(path string) error {
// TODO(kradalby): Assert if tailscale logs contains panics.
// NOTE(enoperm): `t.WriteLog | countMatchingLines`
// is probably most of what is for that,
// but I'd rather not change the behaviour here,
// as it may affect all the other tests
// I have not otherwise touched.
_, _, err := dockertestutil.SaveLog(t.pool, t.container, path)
return err
}
// WriteLogs writes the current stdout/stderr log of the container to
// the given io.Writers.
func (t *TailscaleInContainer) WriteLogs(stdout, stderr io.Writer) error {
return dockertestutil.WriteLog(t.pool, t.container, stdout, stderr)
}
// ReadFile reads a file from the Tailscale container.
// It returns the content of the file as a byte slice.
func (t *TailscaleInContainer) ReadFile(path string) ([]byte, error) {

View file

@ -1,6 +1,9 @@
package integration
import (
"bufio"
"bytes"
"io"
"os"
"strings"
"sync"
@ -78,6 +81,25 @@ func assertContains(t *testing.T, str, subStr string) {
}
}
func didClientUseWebsocketForDERP(t *testing.T, client TailscaleClient) bool {
t.Helper()
buf := &bytes.Buffer{}
err := client.WriteLogs(buf, buf)
if err != nil {
t.Fatalf("failed to fetch client logs: %s: %s", client.Hostname(), err)
}
count, err := countMatchingLines(buf, func(line string) bool {
return strings.Contains(line, "websocket: connected to ")
})
if err != nil {
t.Fatalf("failed to process client logs: %s: %s", client.Hostname(), err)
}
return count > 0
}
func pingAllHelper(t *testing.T, clients []TailscaleClient, addrs []string, opts ...tsic.PingOption) int {
t.Helper()
success := 0
@ -113,7 +135,7 @@ func pingDerpAllHelper(t *testing.T, clients []TailscaleClient, addrs []string)
tsic.WithPingUntilDirect(false),
)
if err != nil {
t.Fatalf("failed to ping %s from %s: %s", addr, client.Hostname(), err)
t.Logf("failed to ping %s from %s: %s", addr, client.Hostname(), err)
} else {
success++
}
@ -321,6 +343,25 @@ func dockertestMaxWait() time.Duration {
return wait
}
func countMatchingLines(in io.Reader, predicate func(string) bool) (int, error) {
count := 0
scanner := bufio.NewScanner(in)
{
const logBufferInitialSize = 1024 << 10 // preallocate 1 MiB
buff := make([]byte, logBufferInitialSize)
scanner.Buffer(buff, len(buff))
scanner.Split(bufio.ScanLines)
}
for scanner.Scan() {
if predicate(scanner.Text()) {
count += 1
}
}
return count, scanner.Err()
}
// func dockertestCommandTimeout() time.Duration {
// timeout := 10 * time.Second //nolint
//