diff --git a/README.md b/README.md old mode 100644 new mode 100755 diff --git a/client/client.go b/client/client.go old mode 100644 new mode 100755 index 6c7aabf..b56ba2c --- a/client/client.go +++ b/client/client.go @@ -7,13 +7,26 @@ import ( "fmt" "io" "net/http" + "sync" "time" ) +type SubscriberFunc func(*Task) error + // Client wraps HTTP calls to the task queue service. type Client struct { - BaseURL string - HTTPClient *http.Client + BaseURL string + HTTPClient *http.Client + ApplicationID string + WorkerID string + + mu sync.Mutex + subscribers map[string]map[int]SubscriberFunc + nextID int + + ctx context.Context + cancel context.CancelFunc + running bool } type TaskFilter struct { @@ -23,6 +36,7 @@ type TaskFilter struct { type Task struct { ID string `json:"id"` ApplicationId string `json:"applicationId"` + EventID string `json:"eventId"` Payload interface{} `json:"payload"` Priority int `json:"priority"` AvailableAt time.Time `json:"availableAt"` @@ -52,12 +66,17 @@ func (t *Task) ParsePayload(v interface{}) error { return nil } +type EnqueueOptions struct { + Priority int `json:"priority"` + DelaySec int `json:"delaySec"` + MaxAttempts int `json:"maxAttempts"` +} + type EnqueueRequest struct { ApplicationId string `json:"applicationId"` + EventID string `json:"eventId"` Payload interface{} `json:"payload"` - Priority int `json:"priority"` - DelaySec int `json:"delaySec"` - MaxAttempts int `json:"maxAttempts"` + EnqueueOptions } type PopRequest struct { @@ -79,22 +98,27 @@ type ExtendRequest struct { } type RequeueRequest struct { - WorkerId string `json:"workerId"` - TaskId string `json:"taskId"` - DelaySeconds int `json:"delaySec"` - Error string `json:"error"` + WorkerId string `json:"workerId"` + TaskId string `json:"taskId"` + DelaySeconds int `json:"delaySec"` + Errors []string `json:"errors"` } // New creates a new client. -func New(baseURL string) *Client { +func New(baseURL string, ApplicationID string, WorkerId string) *Client { return &Client{ - BaseURL: baseURL, - HTTPClient: &http.Client{Timeout: 10 * time.Second}, + BaseURL: baseURL, + HTTPClient: &http.Client{Timeout: 10 * time.Second}, + subscribers: make(map[string]map[int]SubscriberFunc), + ApplicationID: ApplicationID, + WorkerID: WorkerId, } } // Enqueue adds a new task. -func (c *Client) Enqueue(ctx context.Context, request *EnqueueRequest) (*Task, error) { +func (c *Client) enqueue(ctx context.Context, request *EnqueueRequest) (*Task, error) { + request.ApplicationId = c.ApplicationID + var t Task if err := c.do(ctx, "POST", "/enqueue", request, &t); err != nil { return nil, err @@ -103,23 +127,27 @@ func (c *Client) Enqueue(ctx context.Context, request *EnqueueRequest) (*Task, e } // Pop leases a task. -func (c *Client) Pop(ctx context.Context, request *PopRequest) (*Task, bool, error) { +func (c *Client) pop(ctx context.Context, request *PopRequest) (*Task, bool, error) { var t Task status, err := c.doStatus(ctx, "POST", "/pop", request, &t) - if status == http.StatusNoContent { + if status == http.StatusNoContent { return nil, false, nil } - + if err != nil { return nil, false, err } - + return &t, true, nil } // Complete marks a task as done. -func (c *Client) Complete(ctx context.Context, request *CompleteRequest) error { +func (c *Client) complete(ctx context.Context, TaskId string) error { + request := &CompleteRequest{ + WorkerId: c.WorkerID, + TaskId: TaskId, + } return c.doNoResp(ctx, "POST", "/complete", request) } @@ -129,7 +157,7 @@ func (c *Client) Extend(ctx context.Context, request *ExtendRequest) error { } // Requeue returns a task to the queue. -func (c *Client) Requeue(ctx context.Context, request *RequeueRequest) error { +func (c *Client) requeue(ctx context.Context, request *RequeueRequest) error { return c.doNoResp(ctx, "POST", "/requeue", request) } @@ -185,3 +213,146 @@ func (c *Client) doStatus(ctx context.Context, method, path string, body interfa } return resp.StatusCode, nil } + +func (c *Client) Publish(eventId string, payload interface{}, opts *EnqueueOptions) error { + + _, err := c.enqueue(context.Background(), &EnqueueRequest{ + EventID: eventId, + Payload: payload, + EnqueueOptions: *opts, + }) + if err != nil { + return fmt.Errorf("error enqueuing task with error: %s", err.Error()) + } + + return nil +} + +func (c *Client) Subscribe(event string, fn SubscriberFunc) int { + c.mu.Lock() + defer c.mu.Unlock() + + if c.subscribers[event] == nil { + c.subscribers[event] = make(map[int]SubscriberFunc) + } + + id := c.nextID + c.nextID++ + c.subscribers[event][id] = fn + + if !c.running { + c.startPolling() + } + + return id +} + +func (c *Client) Unsubscribe(event string, id int) { + c.mu.Lock() + defer c.mu.Unlock() + + if subs, ok := c.subscribers[event]; ok { + delete(subs, id) + if len(subs) == 0 { + delete(c.subscribers, event) + } + } + + if len(c.subscribers) == 0 && c.running { + c.stopPolling() + } +} + +func (c *Client) startPolling() { + c.ctx, c.cancel = context.WithCancel(context.Background()) + c.running = true + + go c.pollLoop() +} + +func (c *Client) stopPolling() { + c.cancel() + c.running = false +} + +func (c *Client) pollLoop() { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + select { + case <-c.ctx.Done(): + return + case <-ticker.C: + c.pollServer() + } + } +} + +func (c *Client) pollServer() { + task, ok, err := c.pop(c.ctx, &PopRequest{ + WorkerId: c.WorkerID, + LeaseSeconds: 60, + Filter: &TaskFilter{ + ApplicationId: c.ApplicationID, + }, + }) + if err != nil || !ok { + if err != nil { + fmt.Println(err.Error()) + } + return + } + + // snapshot subscribers + c.mu.Lock() + subscribers := c.subscribers[task.EventID] + c.mu.Unlock() + + if len(subscribers) == 0 { + return + } + + var ( + wg sync.WaitGroup + errs []error + mu sync.Mutex + ) + + for _, fn := range subscribers { + wg.Add(1) + + go func(fn SubscriberFunc) { + defer wg.Done() + + if err := fn(task); err != nil { + mu.Lock() + errs = append(errs, err) + mu.Unlock() + } + }(fn) + } + + wg.Wait() + + ctx := context.Background() + + if len(errs) > 0 { + errors := make([]string, 0) + for _, err := range errs { + if err != nil { + errors = append(errors, err.Error()) + } + } + + c.requeue(ctx, &RequeueRequest{ + TaskId: task.ID, + WorkerId: c.WorkerID, + DelaySeconds: 5, + Errors: errors, + }) + return + } + + c.complete(ctx, task.ID) +} diff --git a/client/client_test.go b/client/client_test.go old mode 100644 new mode 100755 index fd208f5..18347ae --- a/client/client_test.go +++ b/client/client_test.go @@ -1,10 +1,12 @@ package client import ( - "context" + "errors" "fmt" "log" "os" + "os/signal" + "syscall" "testing" ) @@ -18,8 +20,26 @@ func TestClient(t *testing.T) { workerId, _ := os.Hostname() - client := New("http://localhost:10101") - ctx := context.Background() + client := New("http://10.10.10.2:10101", "email", workerId) + + subID := client.Subscribe("send.email", func(task *Task) error { + + // Parse payload into struct + var email EmailPayload + if err := task.ParsePayload(&email); err != nil { + log.Fatal("parse payload:", err) + } + fmt.Printf("Got email task: %+v\n", email) + + fmt.Println("Task 1 completed") + + return nil + }) + + subID2 := client.Subscribe("send.email", func(task *Task) error { + + return errors.New("Test Error") + }) newEmailPayload := EmailPayload{ To: "example@example.com", @@ -28,43 +48,27 @@ func TestClient(t *testing.T) { } // Enqueue a task - task, err := client.Enqueue(ctx, &EnqueueRequest{ - ApplicationId: "email", - Payload: newEmailPayload, - Priority: 10, - DelaySec: 0, - MaxAttempts: 5, + err := client.Publish("send.email", newEmailPayload, &EnqueueOptions{ + Priority: 10, + DelaySec: 0, + MaxAttempts: 5, }) if err != nil { log.Fatal("enqueue:", err) } - fmt.Println("Enqueued task:", task.ID) + fmt.Println("Enqueued task") - // Pop a task - task, ok, err := client.Pop(ctx, &PopRequest{ - WorkerId: workerId, - LeaseSeconds: 60, - Filter: &TaskFilter{ApplicationId: "email"}, - MinPriority: 0, - }) - if err != nil { - log.Fatal("pop:", err) - } - if !ok { - fmt.Println("No tasks available") - return - } + // Block until SIGINT or SIGTERM + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - // Parse payload into struct - var email EmailPayload - if err := task.ParsePayload(&email); err != nil { - log.Fatal("parse payload:", err) - } - fmt.Printf("Got email task: %+v\n", email) + <-sigChan - // Mark task as complete - if err := client.Complete(ctx, &CompleteRequest{TaskId: task.ID, WorkerId: workerId}); err != nil { - log.Fatal("complete:", err) - } - fmt.Println("Task completed") + fmt.Println("\nShutting down...") + + // Unsubscribe so polling stops + client.Unsubscribe("send.email", subID) + client.Unsubscribe("send.email", subID2) + + fmt.Println("Clean exit.") } diff --git a/go.mod b/go.mod old mode 100644 new mode 100755