mirror of
https://github.com/StackExchange/dnscontrol.git
synced 2025-10-06 20:05:50 +08:00
FEATURE: "--cmax n" limits preview/push concurrency to n connections (#3764)
This commit is contained in:
parent
f2ff95a20e
commit
850a2bdc07
4 changed files with 108 additions and 24 deletions
|
@ -9,6 +9,7 @@ import (
|
|||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/StackExchange/dnscontrol/v4/models"
|
||||
"github.com/StackExchange/dnscontrol/v4/pkg/bindserial"
|
||||
|
@ -20,6 +21,7 @@ import (
|
|||
"github.com/StackExchange/dnscontrol/v4/pkg/rfc4183"
|
||||
"github.com/StackExchange/dnscontrol/v4/pkg/zonerecs"
|
||||
"github.com/StackExchange/dnscontrol/v4/providers"
|
||||
"github.com/nozzle/throttler"
|
||||
"github.com/urfave/cli/v2"
|
||||
"golang.org/x/exp/slices"
|
||||
"golang.org/x/net/idna"
|
||||
|
@ -50,6 +52,7 @@ type PPreviewArgs struct {
|
|||
Notify bool
|
||||
WarnChanges bool
|
||||
ConcurMode string
|
||||
ConcurMax int // Maximum number of concurrent connections
|
||||
NoPopulate bool
|
||||
DePopulate bool
|
||||
PopulateOnPreview bool
|
||||
|
@ -92,6 +95,19 @@ func (args *PPreviewArgs) flags() []cli.Flag {
|
|||
return nil
|
||||
},
|
||||
})
|
||||
flags = append(flags, &cli.IntFlag{
|
||||
Name: "cmax",
|
||||
Destination: &args.ConcurMax,
|
||||
Value: 999,
|
||||
Usage: `Maximum number of concurrent connections`,
|
||||
Action: func(c *cli.Context, v int) error {
|
||||
if v < 1 {
|
||||
fmt.Printf("%d is not a valid value for --cmax. Values must be 1 or greater\n", v)
|
||||
os.Exit(1)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
})
|
||||
flags = append(flags, &cli.BoolFlag{
|
||||
Name: "no-populate",
|
||||
Destination: &args.NoPopulate,
|
||||
|
@ -115,7 +131,7 @@ func (args *PPreviewArgs) flags() []cli.Flag {
|
|||
})
|
||||
flags = append(flags, &cli.IntFlag{
|
||||
Name: "reportmax",
|
||||
Hidden: true,
|
||||
Hidden: false,
|
||||
Usage: `Limit the IGNORE/NO_PURGE report to this many lines (Expermental. Will change in the future.)`,
|
||||
Action: func(ctx *cli.Context, maxreport int) error {
|
||||
printer.MaxReport = maxreport
|
||||
|
@ -191,7 +207,7 @@ func prun(args PPreviewArgs, push bool, interactive bool, out printer.CLI, repor
|
|||
return err
|
||||
}
|
||||
|
||||
out.PrintfIf(fullMode, "Reading creds.json or equiv.\n")
|
||||
out.PrintfIf(fullMode, "Reading creds: %q\n", args.CredsFile)
|
||||
providerConfigs, err := credsfile.LoadProviderConfigs(args.CredsFile)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -214,6 +230,7 @@ func prun(args PPreviewArgs, push bool, interactive bool, out printer.CLI, repor
|
|||
// Loop over all (or some) zones:
|
||||
zonesToProcess := whichZonesToProcess(cfg.Domains, args.Domains)
|
||||
zonesSerial, zonesConcurrent := splitConcurrent(zonesToProcess, args.ConcurMode)
|
||||
zonesConcurrent = optimizeOrder(zonesConcurrent)
|
||||
|
||||
var totalCorrections int
|
||||
var reportItems []*ReportItem
|
||||
|
@ -223,28 +240,48 @@ func prun(args PPreviewArgs, push bool, interactive bool, out printer.CLI, repor
|
|||
// Populate the zones (if desired/needed/able):
|
||||
if !args.NoPopulate {
|
||||
out.PrintfIf(fullMode, "PHASE 1: CHECKING for missing zones\n")
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(zonesConcurrent))
|
||||
out.PrintfIf(fullMode, "CONCURRENTLY checking for %d zone(s)\n", len(zonesConcurrent))
|
||||
for _, zone := range optimizeOrder(zonesConcurrent) {
|
||||
t := throttler.New(args.ConcurMax, len(zonesConcurrent))
|
||||
out.Printf("CONCURRENTLY checking for %d zone(s)\n", len(zonesConcurrent))
|
||||
for i, zone := range zonesConcurrent {
|
||||
out.PrintfIf(fullMode, "Concurrently checking for zone: %q\n", zone.Name)
|
||||
go func(zone *models.DomainConfig) {
|
||||
defer wg.Done()
|
||||
if err := oneZonePopulate(zone, zcache); err != nil {
|
||||
start := time.Now()
|
||||
err := oneZonePopulate(zone, zcache)
|
||||
if err != nil {
|
||||
concurrentErrors.Store(true)
|
||||
}
|
||||
out.Debugf("...DONE: %q (%.1fs)\n", zone.Name, time.Since(start).Seconds())
|
||||
t.Done(err)
|
||||
}(zone)
|
||||
// Delay the last call to t.Throttle() until the serial processing is done.
|
||||
if i != ultimate(zonesConcurrent) {
|
||||
errorCount := t.Throttle()
|
||||
if errorCount > 0 {
|
||||
anyErrors = true
|
||||
}
|
||||
}
|
||||
}
|
||||
out.PrintfIf(fullMode, "SERIALLY checking for %d zone(s)\n", len(zonesSerial))
|
||||
|
||||
out.Printf("SERIALLY checking for %d zone(s)\n", len(zonesSerial))
|
||||
for _, zone := range zonesSerial {
|
||||
out.PrintfIf(fullMode, "Serially checking for zone: %q\n", zone.Name)
|
||||
out.Printf("Serially checking for zone: %q\n", zone.Name)
|
||||
if err := oneZonePopulate(zone, zcache); err != nil {
|
||||
anyErrors = true
|
||||
}
|
||||
}
|
||||
out.PrintfIf(fullMode && len(zonesConcurrent) > 0, "Waiting for concurrent checking(s) to complete...")
|
||||
wg.Wait()
|
||||
out.PrintfIf(fullMode && len(zonesConcurrent) > 0, "DONE\n")
|
||||
|
||||
if len(zonesConcurrent) > 0 {
|
||||
if printer.DefaultPrinter.Verbose {
|
||||
out.PrintfIf(true, "Waiting for concurrent checking(s) to complete...\n")
|
||||
} else {
|
||||
out.PrintfIf(true, "Waiting for concurrent checking(s) to complete...")
|
||||
}
|
||||
errorCount := t.Throttle()
|
||||
if errorCount > 0 {
|
||||
anyErrors = true
|
||||
}
|
||||
out.PrintfIf(true, "DONE\n")
|
||||
}
|
||||
|
||||
for _, zone := range zonesToProcess {
|
||||
started := false // Do not emit noise when no provider has corrections.
|
||||
|
@ -271,28 +308,48 @@ func prun(args PPreviewArgs, push bool, interactive bool, out printer.CLI, repor
|
|||
}
|
||||
|
||||
out.PrintfIf(fullMode, "PHASE 2: GATHERING data\n")
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(zonesConcurrent))
|
||||
out.Printf("CONCURRENTLY gathering %d zone(s)\n", len(zonesConcurrent))
|
||||
for _, zone := range optimizeOrder(zonesConcurrent) {
|
||||
t := throttler.New(args.ConcurMax, len(zonesConcurrent))
|
||||
out.Printf("CONCURRENTLY gathering records of %d zone(s)\n", len(zonesConcurrent))
|
||||
for i, zone := range zonesConcurrent {
|
||||
out.PrintfIf(fullMode, "Concurrently gathering: %q\n", zone.Name)
|
||||
go func(zone *models.DomainConfig, args PPreviewArgs, zcache *cmdZoneCache) {
|
||||
defer wg.Done()
|
||||
if err := oneZone(zone, args); err != nil {
|
||||
start := time.Now()
|
||||
err := oneZone(zone, args)
|
||||
if err != nil {
|
||||
concurrentErrors.Store(true)
|
||||
}
|
||||
out.Debugf("...DONE: %q (%.1fs)\n", zone.Name, time.Since(start).Seconds())
|
||||
t.Done(err)
|
||||
}(zone, args, zcache)
|
||||
// Delay the last call to t.Throttle() until the serial processing is done.
|
||||
if i != ultimate(zonesConcurrent) {
|
||||
errorCount := t.Throttle()
|
||||
if errorCount > 0 {
|
||||
anyErrors = true
|
||||
}
|
||||
}
|
||||
}
|
||||
out.Printf("SERIALLY gathering %d zone(s)\n", len(zonesSerial))
|
||||
out.Printf("SERIALLY gathering records of %d zone(s)\n", len(zonesSerial))
|
||||
for _, zone := range zonesSerial {
|
||||
out.Printf("Serially Gathering: %q\n", zone.Name)
|
||||
if err := oneZone(zone, args); err != nil {
|
||||
anyErrors = true
|
||||
}
|
||||
}
|
||||
out.PrintfIf(len(zonesConcurrent) > 0, "Waiting for concurrent gathering(s) to complete...")
|
||||
wg.Wait()
|
||||
out.PrintfIf(len(zonesConcurrent) > 0, "DONE\n")
|
||||
|
||||
if len(zonesConcurrent) > 0 {
|
||||
msg := "Waiting for concurrent gathering(s) to complete..."
|
||||
if printer.DefaultPrinter.Verbose {
|
||||
msg = "Waiting for concurrent gathering(s) to complete...\n"
|
||||
}
|
||||
out.PrintfIf(true, msg)
|
||||
errorCount := t.Throttle()
|
||||
if errorCount > 0 {
|
||||
anyErrors = true
|
||||
}
|
||||
out.PrintfIf(true, "DONE\n")
|
||||
}
|
||||
|
||||
anyErrors = cmp.Or(anyErrors, concurrentErrors.Load())
|
||||
|
||||
// Now we know what to do, print or do the tasks.
|
||||
|
@ -382,7 +439,8 @@ func whichZonesToProcess(domains []*models.DomainConfig, filter string) []*model
|
|||
func splitConcurrent(domains []*models.DomainConfig, filter string) (serial []*models.DomainConfig, concurrent []*models.DomainConfig) {
|
||||
if filter == "none" {
|
||||
return domains, nil
|
||||
} else if filter == "all" {
|
||||
}
|
||||
if filter == "all" {
|
||||
return nil, domains
|
||||
}
|
||||
for _, dc := range domains {
|
||||
|
|
23
commands/ultimate.go
Normal file
23
commands/ultimate.go
Normal file
|
@ -0,0 +1,23 @@
|
|||
package commands
|
||||
|
||||
import "github.com/StackExchange/dnscontrol/v4/models"
|
||||
|
||||
/*
|
||||
I proposed that Go add something like "len()" that returns the highest
|
||||
index. This would avoid off-by-one errors. The proposed names include
|
||||
ultimate(), ult(), high(), highest().
|
||||
|
||||
Nay-sayers said I should implement this as a function and see if I
|
||||
actually used it. (I suspect the nay-sayers are perfect people that
|
||||
never make off-by-one errors.)
|
||||
|
||||
That's what this file is about. It should be exactly the same (except
|
||||
the first line) anywhere this is needed. After a few years I'll be
|
||||
able to report if it actually helped.
|
||||
|
||||
Go will in-line this function.
|
||||
*/
|
||||
|
||||
func ultimate(s []*models.DomainConfig) int {
|
||||
return len(s) - 1
|
||||
}
|
1
go.mod
1
go.mod
|
@ -70,6 +70,7 @@ require (
|
|||
github.com/kylelemons/godebug v1.1.0
|
||||
github.com/luadns/luadns-go v0.3.0
|
||||
github.com/mattn/go-isatty v0.0.20
|
||||
github.com/nozzle/throttler v0.0.0-20180817012639-2ea982251481
|
||||
github.com/oracle/oci-go-sdk/v65 v65.99.1
|
||||
github.com/vultr/govultr/v2 v2.17.2
|
||||
golang.org/x/exp v0.0.0-20250819193227-8b4c13bb791b
|
||||
|
|
2
go.sum
2
go.sum
|
@ -306,6 +306,8 @@ github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJ
|
|||
github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04 h1:o6uBwrhM5C8Ll3MAAxrQxRHEu7FkapwTuI2WmL1rw4g=
|
||||
github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8=
|
||||
github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32/go.mod h1:9wM+0iRr9ahx58uYLpLIr5fm8diHn0JbqRycJi6w0Ms=
|
||||
github.com/nozzle/throttler v0.0.0-20180817012639-2ea982251481 h1:Up6+btDp321ZG5/zdSLo48H9Iaq0UQGthrhWC6pCxzE=
|
||||
github.com/nozzle/throttler v0.0.0-20180817012639-2ea982251481/go.mod h1:yKZQO8QE2bHlgozqWDiRVqTFlLQSj30K/6SAK8EeYFw=
|
||||
github.com/nrdcg/goinwx v0.11.0 h1:GER0SE3POub7rxARt3Y3jRy1OON1hwF1LRxHz5xsFBw=
|
||||
github.com/nrdcg/goinwx v0.11.0/go.mod h1:0BXSC0FxVtU4aTjX0Zw3x0DK32tjugLzeNIAGtwXvPQ=
|
||||
github.com/onsi/ginkgo/v2 v2.23.3 h1:edHxnszytJ4lD9D5Jjc4tiDkPBZ3siDeJJkUZJJVkp0=
|
||||
|
|
Loading…
Add table
Reference in a new issue