Files
taskqueue-client/client/client.go
2026-01-29 09:22:42 +00:00

371 lines
8.0 KiB
Go
Executable File

package client
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"sync"
"time"
)
type SubscriberFunc func(*Task) error
// Client wraps HTTP calls to the task queue service.
type Client struct {
BaseURL string
HTTPClient *http.Client
ApplicationID string
WorkerID string
mu sync.Mutex
subscribers map[string]map[int]SubscriberFunc
nextID int
ctx context.Context
cancel context.CancelFunc
running bool
leaseSeconds int
}
type TaskFilter struct {
ApplicationId string `json:"applicationId"`
}
type Task struct {
ID string `json:"id"`
ApplicationId string `json:"applicationId"`
EventID string `json:"eventId"`
Payload interface{} `json:"payload"`
Priority int `json:"priority"`
AvailableAt time.Time `json:"availableAt"`
Status string `json:"status"`
LeaseOwner string `json:"leaseOwner"`
LeasedUntil time.Time `json:"leasedUntil"`
Attempts int `json:"attempts"`
MaxAttempts int `json:"maxAttempts"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
Error string `json:"error"`
}
// ParsePayload decodes the payload into the provided struct pointer.
func (t *Task) ParsePayload(v interface{}) error {
// Marshal the payload map back to JSON, then unmarshal into v.
// Marshal the interface{} (map or slice) back to JSON
data, err := json.Marshal(t.Payload)
if err != nil {
return fmt.Errorf("failed to marshal payload: %w", err)
}
// Unmarshal into the target struct
if err := json.Unmarshal(data, v); err != nil {
return fmt.Errorf("failed to unmarshal payload: %w", err)
}
return nil
}
type EnqueueOptions struct {
Priority int `json:"priority"`
DelaySec int `json:"delaySec"`
MaxAttempts int `json:"maxAttempts"`
}
type EnqueueRequest struct {
ApplicationId string `json:"applicationId"`
EventID string `json:"eventId"`
Payload interface{} `json:"payload"`
EnqueueOptions
}
type PopRequest struct {
WorkerId string `json:"workerId"`
LeaseSeconds int `json:"leaseSeconds"`
Filter *TaskFilter `json:"filters"`
MinPriority int `json:"minPriority,omitempty"`
}
type CompleteRequest struct {
WorkerId string `json:"workerId"`
TaskId string `json:"taskId"`
}
type ExtendRequest struct {
WorkerId string `json:"workerId"`
TaskId string `json:"taskId"`
ExtraSeconds int `json:"extraSecond"`
}
type RequeueRequest struct {
WorkerId string `json:"workerId"`
TaskId string `json:"taskId"`
DelaySeconds int `json:"delaySec"`
Errors []string `json:"errors"`
}
// New creates a new client.
func New(baseURL string, ApplicationID string, WorkerId string) *Client {
return &Client{
BaseURL: baseURL,
HTTPClient: &http.Client{Timeout: 10 * time.Second},
subscribers: make(map[string]map[int]SubscriberFunc),
ApplicationID: ApplicationID,
WorkerID: WorkerId,
}
}
func (c *Client) SetLeaseSeconds(s int) {
c.leaseSeconds = s
}
// Enqueue adds a new task.
func (c *Client) enqueue(ctx context.Context, request *EnqueueRequest) (*Task, error) {
request.ApplicationId = c.ApplicationID
var t Task
if err := c.do(ctx, "POST", "/enqueue", request, &t); err != nil {
return nil, err
}
return &t, nil
}
// Pop leases a task.
func (c *Client) pop(ctx context.Context, request *PopRequest) (*Task, bool, error) {
var t Task
status, err := c.doStatus(ctx, "POST", "/pop", request, &t)
if status == http.StatusNoContent {
return nil, false, nil
}
if err != nil {
return nil, false, err
}
return &t, true, nil
}
// Complete marks a task as done.
func (c *Client) complete(ctx context.Context, TaskId string) error {
request := &CompleteRequest{
WorkerId: c.WorkerID,
TaskId: TaskId,
}
return c.doNoResp(ctx, "POST", "/complete", request)
}
// Extend extends a lease on a task.
func (c *Client) Extend(ctx context.Context, request *ExtendRequest) error {
return c.doNoResp(ctx, "POST", "/extend", request)
}
// Requeue returns a task to the queue.
func (c *Client) requeue(ctx context.Context, request *RequeueRequest) error {
return c.doNoResp(ctx, "POST", "/requeue", request)
}
// ----------- internal helpers -----------
func (c *Client) do(ctx context.Context, method, path string, body interface{}, out interface{}) error {
status, err := c.doStatus(ctx, method, path, body, out)
if err != nil {
return err
}
if status < 200 || status >= 300 {
return fmt.Errorf("unexpected status %d", status)
}
return nil
}
func (c *Client) doNoResp(ctx context.Context, method, path string, body interface{}) error {
status, err := c.doStatus(ctx, method, path, body, nil)
if err != nil {
return err
}
if status < 200 || status >= 300 {
return fmt.Errorf("unexpected status %d", status)
}
return nil
}
func (c *Client) doStatus(ctx context.Context, method, path string, body interface{}, out interface{}) (int, error) {
var reader io.Reader
if body != nil {
buf, err := json.Marshal(body)
if err != nil {
return 0, err
}
reader = bytes.NewBuffer(buf)
}
req, err := http.NewRequestWithContext(ctx, method, c.BaseURL+path, reader)
if err != nil {
return 0, err
}
if body != nil {
req.Header.Set("Content-Type", "application/json")
}
resp, err := c.HTTPClient.Do(req)
if err != nil {
return 0, err
}
defer resp.Body.Close()
if out != nil && resp.StatusCode >= 200 && resp.StatusCode < 300 {
if err := json.NewDecoder(resp.Body).Decode(out); err != nil {
return resp.StatusCode, err
}
}
return resp.StatusCode, nil
}
func (c *Client) Publish(eventId string, payload interface{}, opts *EnqueueOptions) error {
_, err := c.enqueue(context.Background(), &EnqueueRequest{
EventID: eventId,
Payload: payload,
EnqueueOptions: *opts,
})
if err != nil {
return fmt.Errorf("error enqueuing task with error: %s", err.Error())
}
return nil
}
func (c *Client) Subscribe(event string, fn SubscriberFunc) int {
c.mu.Lock()
defer c.mu.Unlock()
if c.subscribers[event] == nil {
c.subscribers[event] = make(map[int]SubscriberFunc)
}
id := c.nextID
c.nextID++
c.subscribers[event][id] = fn
if !c.running {
c.startPolling()
}
return id
}
func (c *Client) Unsubscribe(event string, id int) {
c.mu.Lock()
defer c.mu.Unlock()
if subs, ok := c.subscribers[event]; ok {
delete(subs, id)
if len(subs) == 0 {
delete(c.subscribers, event)
}
}
if len(c.subscribers) == 0 && c.running {
c.stopPolling()
}
}
func (c *Client) startPolling() {
c.ctx, c.cancel = context.WithCancel(context.Background())
c.running = true
go c.pollLoop()
}
func (c *Client) stopPolling() {
c.cancel()
c.running = false
}
func (c *Client) pollLoop() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
c.pollServer()
}
}
}
func (c *Client) pollServer() {
leaseSeconds := c.leaseSeconds
if leaseSeconds == 0 {
leaseSeconds = 60
}
task, ok, err := c.pop(c.ctx, &PopRequest{
WorkerId: c.WorkerID,
LeaseSeconds: leaseSeconds,
Filter: &TaskFilter{
ApplicationId: c.ApplicationID,
},
})
if err != nil || !ok {
if err != nil {
fmt.Println(err.Error())
}
return
}
// snapshot subscribers
c.mu.Lock()
subscribers := c.subscribers[task.EventID]
c.mu.Unlock()
if len(subscribers) == 0 {
return
}
var (
wg sync.WaitGroup
errs []error
mu sync.Mutex
)
for _, fn := range subscribers {
wg.Add(1)
go func(fn SubscriberFunc) {
defer wg.Done()
if err := fn(task); err != nil {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
}(fn)
}
wg.Wait()
ctx := context.Background()
if len(errs) > 0 {
errors := make([]string, 0)
for _, err := range errs {
if err != nil {
errors = append(errors, err.Error())
}
}
c.requeue(ctx, &RequeueRequest{
TaskId: task.ID,
WorkerId: c.WorkerID,
DelaySeconds: 5,
Errors: errors,
})
return
}
c.complete(ctx, task.ID)
}