diff --git a/commands/ppreviewPush.go b/commands/ppreviewPush.go index 5a9413e23..059a030c1 100644 --- a/commands/ppreviewPush.go +++ b/commands/ppreviewPush.go @@ -20,6 +20,7 @@ import ( "github.com/StackExchange/dnscontrol/v4/pkg/rfc4183" "github.com/StackExchange/dnscontrol/v4/pkg/zonerecs" "github.com/StackExchange/dnscontrol/v4/providers" + "github.com/nozzle/throttler" "github.com/urfave/cli/v2" "golang.org/x/exp/slices" "golang.org/x/net/idna" @@ -50,6 +51,7 @@ type PPreviewArgs struct { Notify bool WarnChanges bool ConcurMode string + ConcurMax int NoPopulate bool DePopulate bool PopulateOnPreview bool @@ -92,6 +94,12 @@ func (args *PPreviewArgs) flags() []cli.Flag { return nil }, }) + flags = append(flags, &cli.IntFlag{ + Name: "cmax", + Destination: &args.ConcurMax, + Value: 999, + Usage: `Maximum number of concurrent connections`, + }) flags = append(flags, &cli.BoolFlag{ Name: "no-populate", Destination: &args.NoPopulate, @@ -223,18 +231,24 @@ func prun(args PPreviewArgs, push bool, interactive bool, out printer.CLI, repor // Populate the zones (if desired/needed/able): if !args.NoPopulate { out.PrintfIf(fullMode, "PHASE 1: CHECKING for missing zones\n") - var wg sync.WaitGroup - wg.Add(len(zonesConcurrent)) + t := throttler.New(args.ConcurMax, len(zonesConcurrent)) out.PrintfIf(fullMode, "CONCURRENTLY checking for %d zone(s)\n", len(zonesConcurrent)) for _, zone := range optimizeOrder(zonesConcurrent) { out.PrintfIf(fullMode, "Concurrently checking for zone: %q\n", zone.Name) go func(zone *models.DomainConfig) { - defer wg.Done() - if err := oneZonePopulate(zone, zcache); err != nil { + err := oneZonePopulate(zone, zcache) + if err != nil { concurrentErrors.Store(true) } + t.Done(err) }(zone) + errorCount := t.Throttle() + if errorCount > 0 { + anyErrors = true + } } + //out.PrintfIf(fullMode && len(zonesConcurrent) > 0, "Waiting for concurrent checking(s) to complete...") + //out.PrintfIf(fullMode && len(zonesConcurrent) > 0, "DONE\n") out.PrintfIf(fullMode, "SERIALLY checking for %d zone(s)\n", len(zonesSerial)) for _, zone := range zonesSerial { out.PrintfIf(fullMode, "Serially checking for zone: %q\n", zone.Name) @@ -242,9 +256,6 @@ func prun(args PPreviewArgs, push bool, interactive bool, out printer.CLI, repor anyErrors = true } } - out.PrintfIf(fullMode && len(zonesConcurrent) > 0, "Waiting for concurrent checking(s) to complete...") - wg.Wait() - out.PrintfIf(fullMode && len(zonesConcurrent) > 0, "DONE\n") for _, zone := range zonesToProcess { started := false // Do not emit noise when no provider has corrections. @@ -271,18 +282,24 @@ func prun(args PPreviewArgs, push bool, interactive bool, out printer.CLI, repor } out.PrintfIf(fullMode, "PHASE 2: GATHERING data\n") - var wg sync.WaitGroup - wg.Add(len(zonesConcurrent)) + t := throttler.New(args.ConcurMax, len(zonesConcurrent)) out.Printf("CONCURRENTLY gathering %d zone(s)\n", len(zonesConcurrent)) for _, zone := range optimizeOrder(zonesConcurrent) { out.PrintfIf(fullMode, "Concurrently gathering: %q\n", zone.Name) go func(zone *models.DomainConfig, args PPreviewArgs, zcache *cmdZoneCache) { - defer wg.Done() - if err := oneZone(zone, args); err != nil { + err := oneZone(zone, args) + if err != nil { concurrentErrors.Store(true) } + t.Done(err) }(zone, args, zcache) + errorCount := t.Throttle() + if errorCount > 0 { + anyErrors = true + } } + // TODO(tlim): It would be nice if the concurrent gathering overlapped with the serial gathering. + // This could be achieved by delaying the final call to t.Throttle() until after the serial gathering. out.Printf("SERIALLY gathering %d zone(s)\n", len(zonesSerial)) for _, zone := range zonesSerial { out.Printf("Serially Gathering: %q\n", zone.Name) @@ -290,9 +307,8 @@ func prun(args PPreviewArgs, push bool, interactive bool, out printer.CLI, repor anyErrors = true } } - out.PrintfIf(len(zonesConcurrent) > 0, "Waiting for concurrent gathering(s) to complete...") - wg.Wait() - out.PrintfIf(len(zonesConcurrent) > 0, "DONE\n") + //out.PrintfIf(len(zonesConcurrent) > 0, "Waiting for concurrent gathering(s) to complete...") + //out.PrintfIf(len(zonesConcurrent) > 0, "DONE\n") anyErrors = cmp.Or(anyErrors, concurrentErrors.Load()) // Now we know what to do, print or do the tasks. @@ -382,7 +398,8 @@ func whichZonesToProcess(domains []*models.DomainConfig, filter string) []*model func splitConcurrent(domains []*models.DomainConfig, filter string) (serial []*models.DomainConfig, concurrent []*models.DomainConfig) { if filter == "none" { return domains, nil - } else if filter == "all" { + } + if filter == "all" { return nil, domains } for _, dc := range domains { diff --git a/go.mod b/go.mod index 2734d183c..998e9c14e 100644 --- a/go.mod +++ b/go.mod @@ -70,6 +70,7 @@ require ( github.com/kylelemons/godebug v1.1.0 github.com/luadns/luadns-go v0.3.0 github.com/mattn/go-isatty v0.0.20 + github.com/nozzle/throttler v0.0.0-20180817012639-2ea982251481 github.com/oracle/oci-go-sdk/v65 v65.99.1 github.com/vultr/govultr/v2 v2.17.2 golang.org/x/exp v0.0.0-20250819193227-8b4c13bb791b diff --git a/go.sum b/go.sum index 570d8a30b..93764ca3d 100644 --- a/go.sum +++ b/go.sum @@ -306,6 +306,8 @@ github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJ github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04 h1:o6uBwrhM5C8Ll3MAAxrQxRHEu7FkapwTuI2WmL1rw4g= github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8= github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32/go.mod h1:9wM+0iRr9ahx58uYLpLIr5fm8diHn0JbqRycJi6w0Ms= +github.com/nozzle/throttler v0.0.0-20180817012639-2ea982251481 h1:Up6+btDp321ZG5/zdSLo48H9Iaq0UQGthrhWC6pCxzE= +github.com/nozzle/throttler v0.0.0-20180817012639-2ea982251481/go.mod h1:yKZQO8QE2bHlgozqWDiRVqTFlLQSj30K/6SAK8EeYFw= github.com/nrdcg/goinwx v0.11.0 h1:GER0SE3POub7rxARt3Y3jRy1OON1hwF1LRxHz5xsFBw= github.com/nrdcg/goinwx v0.11.0/go.mod h1:0BXSC0FxVtU4aTjX0Zw3x0DK32tjugLzeNIAGtwXvPQ= github.com/onsi/ginkgo/v2 v2.23.3 h1:edHxnszytJ4lD9D5Jjc4tiDkPBZ3siDeJJkUZJJVkp0=