dnscontrol/pkg/cloudflare-go/queue.go

379 lines
11 KiB
Go
Raw Normal View History

package cloudflare
import (
"context"
"errors"
"fmt"
"net/http"
"time"
"github.com/goccy/go-json"
)
var (
ErrMissingQueueName = errors.New("required queue name is missing")
ErrMissingQueueConsumerName = errors.New("required queue consumer name is missing")
)
type Queue struct {
ID string `json:"queue_id,omitempty"`
Name string `json:"queue_name,omitempty"`
CreatedOn *time.Time `json:"created_on,omitempty"`
ModifiedOn *time.Time `json:"modified_on,omitempty"`
ProducersTotalCount int `json:"producers_total_count,omitempty"`
Producers []QueueProducer `json:"producers,omitempty"`
ConsumersTotalCount int `json:"consumers_total_count,omitempty"`
Consumers []QueueConsumer `json:"consumers,omitempty"`
}
type QueueProducer struct {
Service string `json:"service,omitempty"`
Environment string `json:"environment,omitempty"`
}
type QueueConsumer struct {
Name string `json:"-"`
Service string `json:"service,omitempty"`
ScriptName string `json:"script_name,omitempty"`
Environment string `json:"environment,omitempty"`
Settings QueueConsumerSettings `json:"settings,omitempty"`
QueueName string `json:"queue_name,omitempty"`
CreatedOn *time.Time `json:"created_on,omitempty"`
DeadLetterQueue string `json:"dead_letter_queue,omitempty"`
}
type QueueConsumerSettings struct {
BatchSize int `json:"batch_size,omitempty"`
MaxRetires int `json:"max_retries,omitempty"`
MaxWaitTime int `json:"max_wait_time_ms,omitempty"`
}
type QueueListResponse struct {
Response
ResultInfo `json:"result_info"`
Result []Queue `json:"result"`
}
type CreateQueueParams struct {
Name string `json:"queue_name"`
}
type QueueResponse struct {
Response
Result Queue `json:"result"`
}
type ListQueueConsumersResponse struct {
Response
ResultInfo `json:"result_info"`
Result []QueueConsumer `json:"result"`
}
type ListQueuesParams struct {
ResultInfo
}
type QueueConsumerResponse struct {
Response
Result QueueConsumer `json:"result"`
}
type UpdateQueueParams struct {
Name string `json:"-"`
UpdatedName string `json:"queue_name,omitempty"`
}
type ListQueueConsumersParams struct {
QueueName string `url:"-"`
ResultInfo
}
type CreateQueueConsumerParams struct {
QueueName string `json:"-"`
Consumer QueueConsumer
}
type UpdateQueueConsumerParams struct {
QueueName string `json:"-"`
Consumer QueueConsumer
}
type DeleteQueueConsumerParams struct {
QueueName, ConsumerName string
}
// ListQueues returns the queues owned by an account.
//
// API reference: https://api.cloudflare.com/#queue-list-queues
func (api *API) ListQueues(ctx context.Context, rc *ResourceContainer, params ListQueuesParams) ([]Queue, *ResultInfo, error) {
if rc.Identifier == "" {
return []Queue{}, &ResultInfo{}, ErrMissingAccountID
}
autoPaginate := true
if params.PerPage >= 1 || params.Page >= 1 {
autoPaginate = false
}
if params.PerPage < 1 {
params.PerPage = 50
}
if params.Page < 1 {
params.Page = 1
}
var queues []Queue
var qResponse QueueListResponse
for {
qResponse = QueueListResponse{}
uri := buildURI(fmt.Sprintf("/accounts/%s/workers/queues", rc.Identifier), params)
res, err := api.makeRequestContext(ctx, http.MethodGet, uri, nil)
if err != nil {
return []Queue{}, &ResultInfo{}, err
}
err = json.Unmarshal(res, &qResponse)
if err != nil {
return []Queue{}, &ResultInfo{}, fmt.Errorf("failed to unmarshal filters JSON data: %w", err)
}
queues = append(queues, qResponse.Result...)
params.ResultInfo = qResponse.ResultInfo.Next()
if params.ResultInfo.Done() || !autoPaginate {
break
}
}
return queues, &qResponse.ResultInfo, nil
}
// CreateQueue creates a new queue.
//
// API reference: https://api.cloudflare.com/#queue-create-queue
func (api *API) CreateQueue(ctx context.Context, rc *ResourceContainer, queue CreateQueueParams) (Queue, error) {
if rc.Identifier == "" {
return Queue{}, ErrMissingAccountID
}
if queue.Name == "" {
return Queue{}, ErrMissingQueueName
}
uri := fmt.Sprintf("/accounts/%s/workers/queues", rc.Identifier)
res, err := api.makeRequestContext(ctx, http.MethodPost, uri, queue)
if err != nil {
return Queue{}, fmt.Errorf("%s: %w", errMakeRequestError, err)
}
var r QueueResponse
err = json.Unmarshal(res, &r)
if err != nil {
return Queue{}, fmt.Errorf("%s: %w", errUnmarshalError, err)
}
return r.Result, nil
}
// DeleteQueue deletes a queue.
//
// API reference: https://api.cloudflare.com/#queue-delete-queue
func (api *API) DeleteQueue(ctx context.Context, rc *ResourceContainer, queueName string) error {
if rc.Identifier == "" {
return ErrMissingAccountID
}
if queueName == "" {
return ErrMissingQueueName
}
uri := fmt.Sprintf("/accounts/%s/workers/queues/%s", rc.Identifier, queueName)
_, err := api.makeRequestContext(ctx, http.MethodDelete, uri, nil)
if err != nil {
return fmt.Errorf("%s: %w", errMakeRequestError, err)
}
return nil
}
// GetQueue returns a single queue based on the name.
//
// API reference: https://api.cloudflare.com/#queue-get-queue
func (api *API) GetQueue(ctx context.Context, rc *ResourceContainer, queueName string) (Queue, error) {
if rc.Identifier == "" {
return Queue{}, ErrMissingAccountID
}
if queueName == "" {
return Queue{}, ErrMissingQueueName
}
uri := fmt.Sprintf("/accounts/%s/workers/queues/%s", rc.Identifier, queueName)
res, err := api.makeRequestContext(ctx, http.MethodGet, uri, nil)
if err != nil {
return Queue{}, fmt.Errorf("%s: %w", errMakeRequestError, err)
}
var r QueueResponse
err = json.Unmarshal(res, &r)
if err != nil {
return Queue{}, fmt.Errorf("%s: %w", errUnmarshalError, err)
}
return r.Result, nil
}
// UpdateQueue updates a queue.
//
// API reference: https://api.cloudflare.com/#queue-update-queue
func (api *API) UpdateQueue(ctx context.Context, rc *ResourceContainer, params UpdateQueueParams) (Queue, error) {
if rc.Identifier == "" {
return Queue{}, ErrMissingAccountID
}
if params.Name == "" || params.UpdatedName == "" {
return Queue{}, ErrMissingQueueName
}
uri := fmt.Sprintf("/accounts/%s/workers/queues/%s", rc.Identifier, params.Name)
res, err := api.makeRequestContext(ctx, http.MethodPut, uri, params)
if err != nil {
return Queue{}, fmt.Errorf("%s: %w", errMakeRequestError, err)
}
var r QueueResponse
err = json.Unmarshal(res, &r)
if err != nil {
return Queue{}, fmt.Errorf("%s: %w", errUnmarshalError, err)
}
return r.Result, nil
}
// ListQueueConsumers returns the consumers of a queue.
//
// API reference: https://api.cloudflare.com/#queue-list-queue-consumers
func (api *API) ListQueueConsumers(ctx context.Context, rc *ResourceContainer, params ListQueueConsumersParams) ([]QueueConsumer, *ResultInfo, error) {
if rc.Identifier == "" {
return []QueueConsumer{}, &ResultInfo{}, ErrMissingAccountID
}
if params.QueueName == "" {
return []QueueConsumer{}, &ResultInfo{}, ErrMissingQueueName
}
autoPaginate := true
if params.PerPage >= 1 || params.Page >= 1 {
autoPaginate = false
}
if params.PerPage < 1 {
params.PerPage = 50
}
if params.Page < 1 {
params.Page = 1
}
var queuesConsumers []QueueConsumer
var qResponse ListQueueConsumersResponse
for {
qResponse = ListQueueConsumersResponse{}
uri := buildURI(fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers", rc.Identifier, params.QueueName), params)
res, err := api.makeRequestContext(ctx, http.MethodGet, uri, nil)
if err != nil {
return []QueueConsumer{}, &ResultInfo{}, err
}
err = json.Unmarshal(res, &qResponse)
if err != nil {
return []QueueConsumer{}, &ResultInfo{}, fmt.Errorf("failed to unmarshal filters JSON data: %w", err)
}
queuesConsumers = append(queuesConsumers, qResponse.Result...)
params.ResultInfo = qResponse.ResultInfo.Next()
if params.ResultInfo.Done() || !autoPaginate {
break
}
}
return queuesConsumers, &qResponse.ResultInfo, nil
}
// CreateQueueConsumer creates a new consumer for a queue.
//
// API reference: https://api.cloudflare.com/#queue-create-queue-consumer
func (api *API) CreateQueueConsumer(ctx context.Context, rc *ResourceContainer, params CreateQueueConsumerParams) (QueueConsumer, error) {
if rc.Identifier == "" {
return QueueConsumer{}, ErrMissingAccountID
}
if params.QueueName == "" {
return QueueConsumer{}, ErrMissingQueueName
}
uri := fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers", rc.Identifier, params.QueueName)
res, err := api.makeRequestContext(ctx, http.MethodPost, uri, params.Consumer)
if err != nil {
return QueueConsumer{}, fmt.Errorf("%s: %w", errMakeRequestError, err)
}
var r QueueConsumerResponse
err = json.Unmarshal(res, &r)
if err != nil {
return QueueConsumer{}, fmt.Errorf("%s: %w", errUnmarshalError, err)
}
return r.Result, nil
}
// DeleteQueueConsumer deletes the consumer for a queue..
//
// API reference: https://api.cloudflare.com/#queue-delete-queue-consumer
func (api *API) DeleteQueueConsumer(ctx context.Context, rc *ResourceContainer, params DeleteQueueConsumerParams) error {
if rc.Identifier == "" {
return ErrMissingAccountID
}
if params.QueueName == "" {
return ErrMissingQueueName
}
if params.ConsumerName == "" {
return ErrMissingQueueConsumerName
}
uri := fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers/%s", rc.Identifier, params.QueueName, params.ConsumerName)
_, err := api.makeRequestContext(ctx, http.MethodDelete, uri, nil)
if err != nil {
return fmt.Errorf("%s: %w", errMakeRequestError, err)
}
return nil
}
// UpdateQueueConsumer updates the consumer for a queue, or creates one if it does not exist..
//
// API reference: https://api.cloudflare.com/#queue-update-queue-consumer
func (api *API) UpdateQueueConsumer(ctx context.Context, rc *ResourceContainer, params UpdateQueueConsumerParams) (QueueConsumer, error) {
if rc.Identifier == "" {
return QueueConsumer{}, ErrMissingAccountID
}
if params.QueueName == "" {
return QueueConsumer{}, ErrMissingQueueName
}
if params.Consumer.Name == "" {
return QueueConsumer{}, ErrMissingQueueConsumerName
}
uri := fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers/%s", rc.Identifier, params.QueueName, params.Consumer.Name)
res, err := api.makeRequestContext(ctx, http.MethodPut, uri, params.Consumer)
if err != nil {
return QueueConsumer{}, fmt.Errorf("%s: %w", errMakeRequestError, err)
}
var r QueueConsumerResponse
err = json.Unmarshal(res, &r)
if err != nil {
return QueueConsumer{}, fmt.Errorf("%s: %w", errUnmarshalError, err)
}
return r.Result, nil
}