package client import ( "bytes" "context" "encoding/json" "fmt" "io" "net/http" "sync" "time" ) type SubscriberFunc func(*Task) error // Client wraps HTTP calls to the task queue service. type Client struct { BaseURL string HTTPClient *http.Client ApplicationID string WorkerID string mu sync.Mutex subscribers map[string]map[int]SubscriberFunc nextID int ctx context.Context cancel context.CancelFunc running bool } type TaskFilter struct { ApplicationId string `json:"applicationId"` } type Task struct { ID string `json:"id"` ApplicationId string `json:"applicationId"` EventID string `json:"eventId"` Payload interface{} `json:"payload"` Priority int `json:"priority"` AvailableAt time.Time `json:"availableAt"` Status string `json:"status"` LeaseOwner string `json:"leaseOwner"` LeasedUntil time.Time `json:"leasedUntil"` Attempts int `json:"attempts"` MaxAttempts int `json:"maxAttempts"` CreatedAt time.Time `json:"createdAt"` UpdatedAt time.Time `json:"updatedAt"` Error string `json:"error"` } // ParsePayload decodes the payload into the provided struct pointer. func (t *Task) ParsePayload(v interface{}) error { // Marshal the payload map back to JSON, then unmarshal into v. // Marshal the interface{} (map or slice) back to JSON data, err := json.Marshal(t.Payload) if err != nil { return fmt.Errorf("failed to marshal payload: %w", err) } // Unmarshal into the target struct if err := json.Unmarshal(data, v); err != nil { return fmt.Errorf("failed to unmarshal payload: %w", err) } return nil } type EnqueueOptions struct { Priority int `json:"priority"` DelaySec int `json:"delaySec"` MaxAttempts int `json:"maxAttempts"` } type EnqueueRequest struct { ApplicationId string `json:"applicationId"` EventID string `json:"eventId"` Payload interface{} `json:"payload"` EnqueueOptions } type PopRequest struct { WorkerId string `json:"workerId"` LeaseSeconds int `json:"leaseSeconds"` Filter *TaskFilter `json:"filters"` MinPriority int `json:"minPriority,omitempty"` } type CompleteRequest struct { WorkerId string `json:"workerId"` TaskId string `json:"taskId"` } type ExtendRequest struct { WorkerId string `json:"workerId"` TaskId string `json:"taskId"` ExtraSeconds int `json:"extraSecond"` } type RequeueRequest struct { WorkerId string `json:"workerId"` TaskId string `json:"taskId"` DelaySeconds int `json:"delaySec"` Errors []string `json:"errors"` } // New creates a new client. func New(baseURL string, ApplicationID string, WorkerId string) *Client { return &Client{ BaseURL: baseURL, HTTPClient: &http.Client{Timeout: 10 * time.Second}, subscribers: make(map[string]map[int]SubscriberFunc), ApplicationID: ApplicationID, WorkerID: WorkerId, } } // Enqueue adds a new task. func (c *Client) enqueue(ctx context.Context, request *EnqueueRequest) (*Task, error) { request.ApplicationId = c.ApplicationID var t Task if err := c.do(ctx, "POST", "/enqueue", request, &t); err != nil { return nil, err } return &t, nil } // Pop leases a task. func (c *Client) pop(ctx context.Context, request *PopRequest) (*Task, bool, error) { var t Task status, err := c.doStatus(ctx, "POST", "/pop", request, &t) if status == http.StatusNoContent { return nil, false, nil } if err != nil { return nil, false, err } return &t, true, nil } // Complete marks a task as done. func (c *Client) complete(ctx context.Context, TaskId string) error { request := &CompleteRequest{ WorkerId: c.WorkerID, TaskId: TaskId, } return c.doNoResp(ctx, "POST", "/complete", request) } // Extend extends a lease on a task. func (c *Client) Extend(ctx context.Context, request *ExtendRequest) error { return c.doNoResp(ctx, "POST", "/extend", request) } // Requeue returns a task to the queue. func (c *Client) requeue(ctx context.Context, request *RequeueRequest) error { return c.doNoResp(ctx, "POST", "/requeue", request) } // ----------- internal helpers ----------- func (c *Client) do(ctx context.Context, method, path string, body interface{}, out interface{}) error { status, err := c.doStatus(ctx, method, path, body, out) if err != nil { return err } if status < 200 || status >= 300 { return fmt.Errorf("unexpected status %d", status) } return nil } func (c *Client) doNoResp(ctx context.Context, method, path string, body interface{}) error { status, err := c.doStatus(ctx, method, path, body, nil) if err != nil { return err } if status < 200 || status >= 300 { return fmt.Errorf("unexpected status %d", status) } return nil } func (c *Client) doStatus(ctx context.Context, method, path string, body interface{}, out interface{}) (int, error) { var reader io.Reader if body != nil { buf, err := json.Marshal(body) if err != nil { return 0, err } reader = bytes.NewBuffer(buf) } req, err := http.NewRequestWithContext(ctx, method, c.BaseURL+path, reader) if err != nil { return 0, err } if body != nil { req.Header.Set("Content-Type", "application/json") } resp, err := c.HTTPClient.Do(req) if err != nil { return 0, err } defer resp.Body.Close() if out != nil && resp.StatusCode >= 200 && resp.StatusCode < 300 { if err := json.NewDecoder(resp.Body).Decode(out); err != nil { return resp.StatusCode, err } } return resp.StatusCode, nil } func (c *Client) Publish(eventId string, payload interface{}, opts *EnqueueOptions) error { _, err := c.enqueue(context.Background(), &EnqueueRequest{ EventID: eventId, Payload: payload, EnqueueOptions: *opts, }) if err != nil { return fmt.Errorf("error enqueuing task with error: %s", err.Error()) } return nil } func (c *Client) Subscribe(event string, fn SubscriberFunc) int { c.mu.Lock() defer c.mu.Unlock() if c.subscribers[event] == nil { c.subscribers[event] = make(map[int]SubscriberFunc) } id := c.nextID c.nextID++ c.subscribers[event][id] = fn if !c.running { c.startPolling() } return id } func (c *Client) Unsubscribe(event string, id int) { c.mu.Lock() defer c.mu.Unlock() if subs, ok := c.subscribers[event]; ok { delete(subs, id) if len(subs) == 0 { delete(c.subscribers, event) } } if len(c.subscribers) == 0 && c.running { c.stopPolling() } } func (c *Client) startPolling() { c.ctx, c.cancel = context.WithCancel(context.Background()) c.running = true go c.pollLoop() } func (c *Client) stopPolling() { c.cancel() c.running = false } func (c *Client) pollLoop() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case <-c.ctx.Done(): return case <-ticker.C: c.pollServer() } } } func (c *Client) pollServer() { task, ok, err := c.pop(c.ctx, &PopRequest{ WorkerId: c.WorkerID, LeaseSeconds: 60, Filter: &TaskFilter{ ApplicationId: c.ApplicationID, }, }) if err != nil || !ok { if err != nil { fmt.Println(err.Error()) } return } // snapshot subscribers c.mu.Lock() subscribers := c.subscribers[task.EventID] c.mu.Unlock() if len(subscribers) == 0 { return } var ( wg sync.WaitGroup errs []error mu sync.Mutex ) for _, fn := range subscribers { wg.Add(1) go func(fn SubscriberFunc) { defer wg.Done() if err := fn(task); err != nil { mu.Lock() errs = append(errs, err) mu.Unlock() } }(fn) } wg.Wait() ctx := context.Background() if len(errs) > 0 { errors := make([]string, 0) for _, err := range errs { if err != nil { errors = append(errors, err.Error()) } } c.requeue(ctx, &RequeueRequest{ TaskId: task.ID, WorkerId: c.WorkerID, DelaySeconds: 5, Errors: errors, }) return } c.complete(ctx, task.ID) }