Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
66d8352502 | ||
|
|
da9ca139af | ||
|
|
a58f5b5018 |
223
client/client.go
Normal file → Executable file
223
client/client.go
Normal file → Executable file
@@ -7,13 +7,28 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type SubscriberFunc func(*Task) error
|
||||
|
||||
// Client wraps HTTP calls to the task queue service.
|
||||
type Client struct {
|
||||
BaseURL string
|
||||
HTTPClient *http.Client
|
||||
BaseURL string
|
||||
HTTPClient *http.Client
|
||||
ApplicationID string
|
||||
WorkerID string
|
||||
|
||||
mu sync.Mutex
|
||||
subscribers map[string]map[int]SubscriberFunc
|
||||
nextID int
|
||||
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
running bool
|
||||
|
||||
leaseSeconds int
|
||||
}
|
||||
|
||||
type TaskFilter struct {
|
||||
@@ -23,6 +38,7 @@ type TaskFilter struct {
|
||||
type Task struct {
|
||||
ID string `json:"id"`
|
||||
ApplicationId string `json:"applicationId"`
|
||||
EventID string `json:"eventId"`
|
||||
Payload interface{} `json:"payload"`
|
||||
Priority int `json:"priority"`
|
||||
AvailableAt time.Time `json:"availableAt"`
|
||||
@@ -52,12 +68,17 @@ func (t *Task) ParsePayload(v interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type EnqueueOptions struct {
|
||||
Priority int `json:"priority"`
|
||||
DelaySec int `json:"delaySec"`
|
||||
MaxAttempts int `json:"maxAttempts"`
|
||||
}
|
||||
|
||||
type EnqueueRequest struct {
|
||||
ApplicationId string `json:"applicationId"`
|
||||
EventID string `json:"eventId"`
|
||||
Payload interface{} `json:"payload"`
|
||||
Priority int `json:"priority"`
|
||||
DelaySec int `json:"delaySec"`
|
||||
MaxAttempts int `json:"maxAttempts"`
|
||||
EnqueueOptions
|
||||
}
|
||||
|
||||
type PopRequest struct {
|
||||
@@ -79,22 +100,31 @@ type ExtendRequest struct {
|
||||
}
|
||||
|
||||
type RequeueRequest struct {
|
||||
WorkerId string `json:"workerId"`
|
||||
TaskId string `json:"taskId"`
|
||||
DelaySeconds int `json:"delaySec"`
|
||||
Error string `json:"error"`
|
||||
WorkerId string `json:"workerId"`
|
||||
TaskId string `json:"taskId"`
|
||||
DelaySeconds int `json:"delaySec"`
|
||||
Errors []string `json:"errors"`
|
||||
}
|
||||
|
||||
// New creates a new client.
|
||||
func New(baseURL string) *Client {
|
||||
func New(baseURL string, ApplicationID string, WorkerId string) *Client {
|
||||
return &Client{
|
||||
BaseURL: baseURL,
|
||||
HTTPClient: &http.Client{Timeout: 10 * time.Second},
|
||||
BaseURL: baseURL,
|
||||
HTTPClient: &http.Client{Timeout: 10 * time.Second},
|
||||
subscribers: make(map[string]map[int]SubscriberFunc),
|
||||
ApplicationID: ApplicationID,
|
||||
WorkerID: WorkerId,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) SetLeaseSeconds(s int) {
|
||||
c.leaseSeconds = s
|
||||
}
|
||||
|
||||
// Enqueue adds a new task.
|
||||
func (c *Client) Enqueue(ctx context.Context, request *EnqueueRequest) (*Task, error) {
|
||||
func (c *Client) enqueue(ctx context.Context, request *EnqueueRequest) (*Task, error) {
|
||||
request.ApplicationId = c.ApplicationID
|
||||
|
||||
var t Task
|
||||
if err := c.do(ctx, "POST", "/enqueue", request, &t); err != nil {
|
||||
return nil, err
|
||||
@@ -103,21 +133,27 @@ func (c *Client) Enqueue(ctx context.Context, request *EnqueueRequest) (*Task, e
|
||||
}
|
||||
|
||||
// Pop leases a task.
|
||||
func (c *Client) Pop(ctx context.Context, request *PopRequest) (*Task, bool, error) {
|
||||
func (c *Client) pop(ctx context.Context, request *PopRequest) (*Task, bool, error) {
|
||||
|
||||
var t Task
|
||||
status, err := c.doStatus(ctx, "POST", "/pop", request, &t)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
if status == http.StatusNoContent {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
return &t, true, nil
|
||||
}
|
||||
|
||||
// Complete marks a task as done.
|
||||
func (c *Client) Complete(ctx context.Context, request *CompleteRequest) error {
|
||||
func (c *Client) complete(ctx context.Context, TaskId string) error {
|
||||
request := &CompleteRequest{
|
||||
WorkerId: c.WorkerID,
|
||||
TaskId: TaskId,
|
||||
}
|
||||
return c.doNoResp(ctx, "POST", "/complete", request)
|
||||
}
|
||||
|
||||
@@ -127,7 +163,7 @@ func (c *Client) Extend(ctx context.Context, request *ExtendRequest) error {
|
||||
}
|
||||
|
||||
// Requeue returns a task to the queue.
|
||||
func (c *Client) Requeue(ctx context.Context, request *RequeueRequest) error {
|
||||
func (c *Client) requeue(ctx context.Context, request *RequeueRequest) error {
|
||||
return c.doNoResp(ctx, "POST", "/requeue", request)
|
||||
}
|
||||
|
||||
@@ -183,3 +219,152 @@ func (c *Client) doStatus(ctx context.Context, method, path string, body interfa
|
||||
}
|
||||
return resp.StatusCode, nil
|
||||
}
|
||||
|
||||
func (c *Client) Publish(eventId string, payload interface{}, opts *EnqueueOptions) error {
|
||||
|
||||
_, err := c.enqueue(context.Background(), &EnqueueRequest{
|
||||
EventID: eventId,
|
||||
Payload: payload,
|
||||
EnqueueOptions: *opts,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("error enqueuing task with error: %s", err.Error())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) Subscribe(event string, fn SubscriberFunc) int {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if c.subscribers[event] == nil {
|
||||
c.subscribers[event] = make(map[int]SubscriberFunc)
|
||||
}
|
||||
|
||||
id := c.nextID
|
||||
c.nextID++
|
||||
c.subscribers[event][id] = fn
|
||||
|
||||
if !c.running {
|
||||
c.startPolling()
|
||||
}
|
||||
|
||||
return id
|
||||
}
|
||||
|
||||
func (c *Client) Unsubscribe(event string, id int) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if subs, ok := c.subscribers[event]; ok {
|
||||
delete(subs, id)
|
||||
if len(subs) == 0 {
|
||||
delete(c.subscribers, event)
|
||||
}
|
||||
}
|
||||
|
||||
if len(c.subscribers) == 0 && c.running {
|
||||
c.stopPolling()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) startPolling() {
|
||||
c.ctx, c.cancel = context.WithCancel(context.Background())
|
||||
c.running = true
|
||||
|
||||
go c.pollLoop()
|
||||
}
|
||||
|
||||
func (c *Client) stopPolling() {
|
||||
c.cancel()
|
||||
c.running = false
|
||||
}
|
||||
|
||||
func (c *Client) pollLoop() {
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
c.pollServer()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) pollServer() {
|
||||
|
||||
leaseSeconds := c.leaseSeconds
|
||||
if leaseSeconds == 0 {
|
||||
leaseSeconds = 60
|
||||
}
|
||||
|
||||
task, ok, err := c.pop(c.ctx, &PopRequest{
|
||||
WorkerId: c.WorkerID,
|
||||
LeaseSeconds: leaseSeconds,
|
||||
Filter: &TaskFilter{
|
||||
ApplicationId: c.ApplicationID,
|
||||
},
|
||||
})
|
||||
if err != nil || !ok {
|
||||
if err != nil {
|
||||
fmt.Println(err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// snapshot subscribers
|
||||
c.mu.Lock()
|
||||
subscribers := c.subscribers[task.EventID]
|
||||
c.mu.Unlock()
|
||||
|
||||
if len(subscribers) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
wg sync.WaitGroup
|
||||
errs []error
|
||||
mu sync.Mutex
|
||||
)
|
||||
|
||||
for _, fn := range subscribers {
|
||||
wg.Add(1)
|
||||
|
||||
go func(fn SubscriberFunc) {
|
||||
defer wg.Done()
|
||||
|
||||
if err := fn(task); err != nil {
|
||||
mu.Lock()
|
||||
errs = append(errs, err)
|
||||
mu.Unlock()
|
||||
}
|
||||
}(fn)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
if len(errs) > 0 {
|
||||
errors := make([]string, 0)
|
||||
for _, err := range errs {
|
||||
if err != nil {
|
||||
errors = append(errors, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
c.requeue(ctx, &RequeueRequest{
|
||||
TaskId: task.ID,
|
||||
WorkerId: c.WorkerID,
|
||||
DelaySeconds: 5,
|
||||
Errors: errors,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
c.complete(ctx, task.ID)
|
||||
}
|
||||
|
||||
74
client/client_test.go
Normal file → Executable file
74
client/client_test.go
Normal file → Executable file
@@ -1,10 +1,12 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"testing"
|
||||
)
|
||||
|
||||
@@ -18,8 +20,26 @@ func TestClient(t *testing.T) {
|
||||
|
||||
workerId, _ := os.Hostname()
|
||||
|
||||
client := New("http://localhost:10101")
|
||||
ctx := context.Background()
|
||||
client := New("http://10.10.10.2:10101", "email", workerId)
|
||||
|
||||
subID := client.Subscribe("send.email", func(task *Task) error {
|
||||
|
||||
// Parse payload into struct
|
||||
var email EmailPayload
|
||||
if err := task.ParsePayload(&email); err != nil {
|
||||
log.Fatal("parse payload:", err)
|
||||
}
|
||||
fmt.Printf("Got email task: %+v\n", email)
|
||||
|
||||
fmt.Println("Task 1 completed")
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
subID2 := client.Subscribe("send.email", func(task *Task) error {
|
||||
|
||||
return errors.New("Test Error")
|
||||
})
|
||||
|
||||
newEmailPayload := EmailPayload{
|
||||
To: "example@example.com",
|
||||
@@ -28,43 +48,27 @@ func TestClient(t *testing.T) {
|
||||
}
|
||||
|
||||
// Enqueue a task
|
||||
task, err := client.Enqueue(ctx, &EnqueueRequest{
|
||||
ApplicationId: "email",
|
||||
Payload: newEmailPayload,
|
||||
Priority: 10,
|
||||
DelaySec: 0,
|
||||
MaxAttempts: 5,
|
||||
err := client.Publish("send.email", newEmailPayload, &EnqueueOptions{
|
||||
Priority: 10,
|
||||
DelaySec: 0,
|
||||
MaxAttempts: 5,
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatal("enqueue:", err)
|
||||
}
|
||||
fmt.Println("Enqueued task:", task.ID)
|
||||
fmt.Println("Enqueued task")
|
||||
|
||||
// Pop a task
|
||||
task, ok, err := client.Pop(ctx, &PopRequest{
|
||||
WorkerId: workerId,
|
||||
LeaseSeconds: 60,
|
||||
Filter: &TaskFilter{ApplicationId: "email"},
|
||||
MinPriority: 0,
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatal("pop:", err)
|
||||
}
|
||||
if !ok {
|
||||
fmt.Println("No tasks available")
|
||||
return
|
||||
}
|
||||
// Block until SIGINT or SIGTERM
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
// Parse payload into struct
|
||||
var email EmailPayload
|
||||
if err := task.ParsePayload(&email); err != nil {
|
||||
log.Fatal("parse payload:", err)
|
||||
}
|
||||
fmt.Printf("Got email task: %+v\n", email)
|
||||
<-sigChan
|
||||
|
||||
// Mark task as complete
|
||||
if err := client.Complete(ctx, &CompleteRequest{TaskId: task.ID, WorkerId: workerId}); err != nil {
|
||||
log.Fatal("complete:", err)
|
||||
}
|
||||
fmt.Println("Task completed")
|
||||
fmt.Println("\nShutting down...")
|
||||
|
||||
// Unsubscribe so polling stops
|
||||
client.Unsubscribe("send.email", subID)
|
||||
client.Unsubscribe("send.email", subID2)
|
||||
|
||||
fmt.Println("Clean exit.")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user