package client import ( "bytes" "context" "encoding/json" "fmt" "io" "net/http" "time" ) // Client wraps HTTP calls to the task queue service. type Client struct { BaseURL string HTTPClient *http.Client } type TaskFilter struct { ApplicationId string `json:"applicationId"` } type Task struct { ID string `json:"id"` ApplicationId string `json:"applicationId"` Payload interface{} `json:"payload"` Priority int `json:"priority"` AvailableAt time.Time `json:"availableAt"` Status string `json:"status"` LeaseOwner string `json:"leaseOwner"` LeasedUntil time.Time `json:"leasedUntil"` Attempts int `json:"attempts"` MaxAttempts int `json:"maxAttempts"` CreatedAt time.Time `json:"createdAt"` UpdatedAt time.Time `json:"updatedAt"` Error string `json:"error"` } // ParsePayload decodes the payload into the provided struct pointer. func (t *Task) ParsePayload(v interface{}) error { // Marshal the payload map back to JSON, then unmarshal into v. // Marshal the interface{} (map or slice) back to JSON data, err := json.Marshal(t.Payload) if err != nil { return fmt.Errorf("failed to marshal payload: %w", err) } // Unmarshal into the target struct if err := json.Unmarshal(data, v); err != nil { return fmt.Errorf("failed to unmarshal payload: %w", err) } return nil } type EnqueueRequest struct { ApplicationId string `json:"applicationId"` Payload interface{} `json:"payload"` Priority int `json:"priority"` DelaySec int `json:"delaySec"` MaxAttempts int `json:"maxAttempts"` } type PopRequest struct { WorkerId string `json:"workerId"` LeaseSeconds int `json:"leaseSeconds"` Filter *TaskFilter `json:"filters"` MinPriority int `json:"minPriority,omitempty"` } type CompleteRequest struct { WorkerId string `json:"workerId"` TaskId string `json:"taskId"` } type ExtendRequest struct { WorkerId string `json:"workerId"` TaskId string `json:"taskId"` ExtraSeconds int `json:"extraSecond"` } type RequeueRequest struct { WorkerId string `json:"workerId"` TaskId string `json:"taskId"` DelaySeconds int `json:"delaySec"` Error string `json:"error"` } // New creates a new client. func New(baseURL string) *Client { return &Client{ BaseURL: baseURL, HTTPClient: &http.Client{Timeout: 10 * time.Second}, } } // Enqueue adds a new task. func (c *Client) Enqueue(ctx context.Context, request *EnqueueRequest) (*Task, error) { var t Task if err := c.do(ctx, "POST", "/enqueue", request, &t); err != nil { return nil, err } return &t, nil } // Pop leases a task. func (c *Client) Pop(ctx context.Context, request *PopRequest) (*Task, bool, error) { var t Task status, err := c.doStatus(ctx, "POST", "/pop", request, &t) if err != nil { return nil, false, err } if status == http.StatusNoContent { return nil, false, nil } return &t, true, nil } // Complete marks a task as done. func (c *Client) Complete(ctx context.Context, request *CompleteRequest) error { return c.doNoResp(ctx, "POST", "/complete", request) } // Extend extends a lease on a task. func (c *Client) Extend(ctx context.Context, request *ExtendRequest) error { return c.doNoResp(ctx, "POST", "/extend", request) } // Requeue returns a task to the queue. func (c *Client) Requeue(ctx context.Context, request *RequeueRequest) error { return c.doNoResp(ctx, "POST", "/requeue", request) } // ----------- internal helpers ----------- func (c *Client) do(ctx context.Context, method, path string, body interface{}, out interface{}) error { status, err := c.doStatus(ctx, method, path, body, out) if err != nil { return err } if status < 200 || status >= 300 { return fmt.Errorf("unexpected status %d", status) } return nil } func (c *Client) doNoResp(ctx context.Context, method, path string, body interface{}) error { status, err := c.doStatus(ctx, method, path, body, nil) if err != nil { return err } if status < 200 || status >= 300 { return fmt.Errorf("unexpected status %d", status) } return nil } func (c *Client) doStatus(ctx context.Context, method, path string, body interface{}, out interface{}) (int, error) { var reader io.Reader if body != nil { buf, err := json.Marshal(body) if err != nil { return 0, err } reader = bytes.NewBuffer(buf) } req, err := http.NewRequestWithContext(ctx, method, c.BaseURL+path, reader) if err != nil { return 0, err } if body != nil { req.Header.Set("Content-Type", "application/json") } resp, err := c.HTTPClient.Do(req) if err != nil { return 0, err } defer resp.Body.Close() if out != nil && resp.StatusCode >= 200 && resp.StatusCode < 300 { if err := json.NewDecoder(resp.Body).Decode(out); err != nil { return resp.StatusCode, err } } return resp.StatusCode, nil }