From 493f3aefd1c364790bf11e12ee584de97e870d30 Mon Sep 17 00:00:00 2001 From: mrhid6 Date: Fri, 12 Sep 2025 12:01:39 +0000 Subject: [PATCH] feat: inital commit --- client/client.go | 185 ++++++++++++++++++++++++++++++++++++++++++ client/client_test.go | 70 ++++++++++++++++ go.mod | 3 + 3 files changed, 258 insertions(+) create mode 100644 client/client.go create mode 100644 client/client_test.go create mode 100644 go.mod diff --git a/client/client.go b/client/client.go new file mode 100644 index 0000000..c2e8e5b --- /dev/null +++ b/client/client.go @@ -0,0 +1,185 @@ +package client + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" +) + +// Client wraps HTTP calls to the task queue service. +type Client struct { + BaseURL string + HTTPClient *http.Client +} + +type TaskFilter struct { + ApplicationId string `json:"applicationId"` +} + +type Task struct { + ID string `json:"id"` + ApplicationId string `json:"applicationId"` + Payload interface{} `json:"payload"` + Priority int `json:"priority"` + AvailableAt time.Time `json:"availableAt"` + Status string `json:"status"` + LeaseOwner string `json:"leaseOwner"` + LeasedUntil time.Time `json:"leasedUntil"` + Attempts int `json:"attempts"` + MaxAttempts int `json:"maxAttempts"` + CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` + Error string `json:"error"` +} + +// ParsePayload decodes the payload into the provided struct pointer. +func (t *Task) ParsePayload(v interface{}) error { + // Marshal the payload map back to JSON, then unmarshal into v. + // Marshal the interface{} (map or slice) back to JSON + data, err := json.Marshal(t.Payload) + if err != nil { + return fmt.Errorf("failed to marshal payload: %w", err) + } + + // Unmarshal into the target struct + if err := json.Unmarshal(data, v); err != nil { + return fmt.Errorf("failed to unmarshal payload: %w", err) + } + return nil +} + +type EnqueueRequest struct { + ApplicationId string `json:"applicationId"` + Payload interface{} `json:"payload"` + Priority int `json:"priority"` + DelaySec int `json:"delaySec"` + MaxAttempts int `json:"maxAttempts"` +} + +type PopRequest struct { + WorkerId string `json:"workerId"` + LeaseSeconds int `json:"leaseSeconds"` + Filter *TaskFilter `json:"filters"` + MinPriority int `json:"minPriority,omitempty"` +} + +type CompleteRequest struct { + WorkerId string `json:"workerId"` + TaskId string `json:"taskId"` +} + +type ExtendRequest struct { + WorkerId string `json:"workerId"` + TaskId string `json:"taskId"` + ExtraSeconds int `json:"extraSecond"` +} + +type RequeueRequest struct { + WorkerId string `json:"workerId"` + TaskId string `json:"taskId"` + DelaySeconds int `json:"delaySec"` + Error string `json:"error"` +} + +// New creates a new client. +func New(baseURL string) *Client { + return &Client{ + BaseURL: baseURL, + HTTPClient: &http.Client{Timeout: 10 * time.Second}, + } +} + +// Enqueue adds a new task. +func (c *Client) Enqueue(ctx context.Context, request *EnqueueRequest) (*Task, error) { + var t Task + if err := c.do(ctx, "POST", "/enqueue", request, &t); err != nil { + return nil, err + } + return &t, nil +} + +// Pop leases a task. +func (c *Client) Pop(ctx context.Context, request *PopRequest) (*Task, bool, error) { + + var t Task + status, err := c.doStatus(ctx, "POST", "/pop", request, &t) + if err != nil { + return nil, false, err + } + if status == http.StatusNoContent { + return nil, false, nil + } + return &t, true, nil +} + +// Complete marks a task as done. +func (c *Client) Complete(ctx context.Context, request *CompleteRequest) error { + return c.doNoResp(ctx, "POST", "/complete", request) +} + +// Extend extends a lease on a task. +func (c *Client) Extend(ctx context.Context, request *ExtendRequest) error { + return c.doNoResp(ctx, "POST", "/extend", request) +} + +// Requeue returns a task to the queue. +func (c *Client) Requeue(ctx context.Context, request *RequeueRequest) error { + return c.doNoResp(ctx, "POST", "/requeue", request) +} + +// ----------- internal helpers ----------- +func (c *Client) do(ctx context.Context, method, path string, body interface{}, out interface{}) error { + status, err := c.doStatus(ctx, method, path, body, out) + if err != nil { + return err + } + if status < 200 || status >= 300 { + return fmt.Errorf("unexpected status %d", status) + } + return nil +} + +func (c *Client) doNoResp(ctx context.Context, method, path string, body interface{}) error { + status, err := c.doStatus(ctx, method, path, body, nil) + if err != nil { + return err + } + if status < 200 || status >= 300 { + return fmt.Errorf("unexpected status %d", status) + } + return nil +} + +func (c *Client) doStatus(ctx context.Context, method, path string, body interface{}, out interface{}) (int, error) { + var reader io.Reader + if body != nil { + buf, err := json.Marshal(body) + if err != nil { + return 0, err + } + reader = bytes.NewBuffer(buf) + } + req, err := http.NewRequestWithContext(ctx, method, c.BaseURL+path, reader) + if err != nil { + return 0, err + } + if body != nil { + req.Header.Set("Content-Type", "application/json") + } + resp, err := c.HTTPClient.Do(req) + if err != nil { + return 0, err + } + defer resp.Body.Close() + + if out != nil && resp.StatusCode >= 200 && resp.StatusCode < 300 { + if err := json.NewDecoder(resp.Body).Decode(out); err != nil { + return resp.StatusCode, err + } + } + return resp.StatusCode, nil +} diff --git a/client/client_test.go b/client/client_test.go new file mode 100644 index 0000000..fd208f5 --- /dev/null +++ b/client/client_test.go @@ -0,0 +1,70 @@ +package client + +import ( + "context" + "fmt" + "log" + "os" + "testing" +) + +type EmailPayload struct { + To string `json:"to"` + Subject string `json:"subject"` + Body string `json:"body"` +} + +func TestClient(t *testing.T) { + + workerId, _ := os.Hostname() + + client := New("http://localhost:10101") + ctx := context.Background() + + newEmailPayload := EmailPayload{ + To: "example@example.com", + Subject: "Test", + Body: "Hello World", + } + + // Enqueue a task + task, err := client.Enqueue(ctx, &EnqueueRequest{ + ApplicationId: "email", + Payload: newEmailPayload, + Priority: 10, + DelaySec: 0, + MaxAttempts: 5, + }) + if err != nil { + log.Fatal("enqueue:", err) + } + fmt.Println("Enqueued task:", task.ID) + + // Pop a task + task, ok, err := client.Pop(ctx, &PopRequest{ + WorkerId: workerId, + LeaseSeconds: 60, + Filter: &TaskFilter{ApplicationId: "email"}, + MinPriority: 0, + }) + if err != nil { + log.Fatal("pop:", err) + } + if !ok { + fmt.Println("No tasks available") + return + } + + // Parse payload into struct + var email EmailPayload + if err := task.ParsePayload(&email); err != nil { + log.Fatal("parse payload:", err) + } + fmt.Printf("Got email task: %+v\n", email) + + // Mark task as complete + if err := client.Complete(ctx, &CompleteRequest{TaskId: task.ID, WorkerId: workerId}); err != nil { + log.Fatal("complete:", err) + } + fmt.Println("Task completed") +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..4924e4b --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module taskqueue-client + +go 1.23.5