1 Commits

Author SHA1 Message Date
mrhid6
da9ca139af feat: use publish/subscribe for tasks 2025-12-29 11:03:30 +00:00
4 changed files with 229 additions and 54 deletions

0
README.md Normal file → Executable file
View File

189
client/client.go Normal file → Executable file
View File

@@ -7,13 +7,26 @@ import (
"fmt"
"io"
"net/http"
"sync"
"time"
)
type SubscriberFunc func(*Task) error
// Client wraps HTTP calls to the task queue service.
type Client struct {
BaseURL string
HTTPClient *http.Client
ApplicationID string
WorkerID string
mu sync.Mutex
subscribers map[string]map[int]SubscriberFunc
nextID int
ctx context.Context
cancel context.CancelFunc
running bool
}
type TaskFilter struct {
@@ -23,6 +36,7 @@ type TaskFilter struct {
type Task struct {
ID string `json:"id"`
ApplicationId string `json:"applicationId"`
EventID string `json:"eventId"`
Payload interface{} `json:"payload"`
Priority int `json:"priority"`
AvailableAt time.Time `json:"availableAt"`
@@ -52,14 +66,19 @@ func (t *Task) ParsePayload(v interface{}) error {
return nil
}
type EnqueueRequest struct {
ApplicationId string `json:"applicationId"`
Payload interface{} `json:"payload"`
type EnqueueOptions struct {
Priority int `json:"priority"`
DelaySec int `json:"delaySec"`
MaxAttempts int `json:"maxAttempts"`
}
type EnqueueRequest struct {
ApplicationId string `json:"applicationId"`
EventID string `json:"eventId"`
Payload interface{} `json:"payload"`
EnqueueOptions
}
type PopRequest struct {
WorkerId string `json:"workerId"`
LeaseSeconds int `json:"leaseSeconds"`
@@ -82,19 +101,24 @@ type RequeueRequest struct {
WorkerId string `json:"workerId"`
TaskId string `json:"taskId"`
DelaySeconds int `json:"delaySec"`
Error string `json:"error"`
Errors []string `json:"errors"`
}
// New creates a new client.
func New(baseURL string) *Client {
func New(baseURL string, ApplicationID string, WorkerId string) *Client {
return &Client{
BaseURL: baseURL,
HTTPClient: &http.Client{Timeout: 10 * time.Second},
subscribers: make(map[string]map[int]SubscriberFunc),
ApplicationID: ApplicationID,
WorkerID: WorkerId,
}
}
// Enqueue adds a new task.
func (c *Client) Enqueue(ctx context.Context, request *EnqueueRequest) (*Task, error) {
func (c *Client) enqueue(ctx context.Context, request *EnqueueRequest) (*Task, error) {
request.ApplicationId = c.ApplicationID
var t Task
if err := c.do(ctx, "POST", "/enqueue", request, &t); err != nil {
return nil, err
@@ -103,7 +127,7 @@ func (c *Client) Enqueue(ctx context.Context, request *EnqueueRequest) (*Task, e
}
// Pop leases a task.
func (c *Client) Pop(ctx context.Context, request *PopRequest) (*Task, bool, error) {
func (c *Client) pop(ctx context.Context, request *PopRequest) (*Task, bool, error) {
var t Task
status, err := c.doStatus(ctx, "POST", "/pop", request, &t)
@@ -119,7 +143,11 @@ func (c *Client) Pop(ctx context.Context, request *PopRequest) (*Task, bool, err
}
// Complete marks a task as done.
func (c *Client) Complete(ctx context.Context, request *CompleteRequest) error {
func (c *Client) complete(ctx context.Context, TaskId string) error {
request := &CompleteRequest{
WorkerId: c.WorkerID,
TaskId: TaskId,
}
return c.doNoResp(ctx, "POST", "/complete", request)
}
@@ -129,7 +157,7 @@ func (c *Client) Extend(ctx context.Context, request *ExtendRequest) error {
}
// Requeue returns a task to the queue.
func (c *Client) Requeue(ctx context.Context, request *RequeueRequest) error {
func (c *Client) requeue(ctx context.Context, request *RequeueRequest) error {
return c.doNoResp(ctx, "POST", "/requeue", request)
}
@@ -185,3 +213,146 @@ func (c *Client) doStatus(ctx context.Context, method, path string, body interfa
}
return resp.StatusCode, nil
}
func (c *Client) Publish(eventId string, payload interface{}, opts *EnqueueOptions) error {
_, err := c.enqueue(context.Background(), &EnqueueRequest{
EventID: eventId,
Payload: payload,
EnqueueOptions: *opts,
})
if err != nil {
return fmt.Errorf("error enqueuing task with error: %s", err.Error())
}
return nil
}
func (c *Client) Subscribe(event string, fn SubscriberFunc) int {
c.mu.Lock()
defer c.mu.Unlock()
if c.subscribers[event] == nil {
c.subscribers[event] = make(map[int]SubscriberFunc)
}
id := c.nextID
c.nextID++
c.subscribers[event][id] = fn
if !c.running {
c.startPolling()
}
return id
}
func (c *Client) Unsubscribe(event string, id int) {
c.mu.Lock()
defer c.mu.Unlock()
if subs, ok := c.subscribers[event]; ok {
delete(subs, id)
if len(subs) == 0 {
delete(c.subscribers, event)
}
}
if len(c.subscribers) == 0 && c.running {
c.stopPolling()
}
}
func (c *Client) startPolling() {
c.ctx, c.cancel = context.WithCancel(context.Background())
c.running = true
go c.pollLoop()
}
func (c *Client) stopPolling() {
c.cancel()
c.running = false
}
func (c *Client) pollLoop() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
c.pollServer()
}
}
}
func (c *Client) pollServer() {
task, ok, err := c.pop(c.ctx, &PopRequest{
WorkerId: c.WorkerID,
LeaseSeconds: 60,
Filter: &TaskFilter{
ApplicationId: c.ApplicationID,
},
})
if err != nil || !ok {
if err != nil {
fmt.Println(err.Error())
}
return
}
// snapshot subscribers
c.mu.Lock()
subscribers := c.subscribers[task.EventID]
c.mu.Unlock()
if len(subscribers) == 0 {
return
}
var (
wg sync.WaitGroup
errs []error
mu sync.Mutex
)
for _, fn := range subscribers {
wg.Add(1)
go func(fn SubscriberFunc) {
defer wg.Done()
if err := fn(task); err != nil {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
}(fn)
}
wg.Wait()
ctx := context.Background()
if len(errs) > 0 {
errors := make([]string, 0)
for _, err := range errs {
if err != nil {
errors = append(errors, err.Error())
}
}
c.requeue(ctx, &RequeueRequest{
TaskId: task.ID,
WorkerId: c.WorkerID,
DelaySeconds: 5,
Errors: errors,
})
return
}
c.complete(ctx, task.ID)
}

84
client/client_test.go Normal file → Executable file
View File

@@ -1,10 +1,12 @@
package client
import (
"context"
"errors"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"testing"
)
@@ -18,42 +20,9 @@ func TestClient(t *testing.T) {
workerId, _ := os.Hostname()
client := New("http://localhost:10101")
ctx := context.Background()
client := New("http://10.10.10.2:10101", "email", workerId)
newEmailPayload := EmailPayload{
To: "example@example.com",
Subject: "Test",
Body: "Hello World",
}
// Enqueue a task
task, err := client.Enqueue(ctx, &EnqueueRequest{
ApplicationId: "email",
Payload: newEmailPayload,
Priority: 10,
DelaySec: 0,
MaxAttempts: 5,
})
if err != nil {
log.Fatal("enqueue:", err)
}
fmt.Println("Enqueued task:", task.ID)
// Pop a task
task, ok, err := client.Pop(ctx, &PopRequest{
WorkerId: workerId,
LeaseSeconds: 60,
Filter: &TaskFilter{ApplicationId: "email"},
MinPriority: 0,
})
if err != nil {
log.Fatal("pop:", err)
}
if !ok {
fmt.Println("No tasks available")
return
}
subID := client.Subscribe("send.email", func(task *Task) error {
// Parse payload into struct
var email EmailPayload
@@ -62,9 +31,44 @@ func TestClient(t *testing.T) {
}
fmt.Printf("Got email task: %+v\n", email)
// Mark task as complete
if err := client.Complete(ctx, &CompleteRequest{TaskId: task.ID, WorkerId: workerId}); err != nil {
log.Fatal("complete:", err)
fmt.Println("Task 1 completed")
return nil
})
subID2 := client.Subscribe("send.email", func(task *Task) error {
return errors.New("Test Error")
})
newEmailPayload := EmailPayload{
To: "example@example.com",
Subject: "Test",
Body: "Hello World",
}
fmt.Println("Task completed")
// Enqueue a task
err := client.Publish("send.email", newEmailPayload, &EnqueueOptions{
Priority: 10,
DelaySec: 0,
MaxAttempts: 5,
})
if err != nil {
log.Fatal("enqueue:", err)
}
fmt.Println("Enqueued task")
// Block until SIGINT or SIGTERM
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
fmt.Println("\nShutting down...")
// Unsubscribe so polling stops
client.Unsubscribe("send.email", subID)
client.Unsubscribe("send.email", subID2)
fmt.Println("Clean exit.")
}

0
go.mod Normal file → Executable file
View File