feat: inital commit
This commit is contained in:
185
client/client.go
Normal file
185
client/client.go
Normal file
@@ -0,0 +1,185 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Client wraps HTTP calls to the task queue service.
|
||||
type Client struct {
|
||||
BaseURL string
|
||||
HTTPClient *http.Client
|
||||
}
|
||||
|
||||
type TaskFilter struct {
|
||||
ApplicationId string `json:"applicationId"`
|
||||
}
|
||||
|
||||
type Task struct {
|
||||
ID string `json:"id"`
|
||||
ApplicationId string `json:"applicationId"`
|
||||
Payload interface{} `json:"payload"`
|
||||
Priority int `json:"priority"`
|
||||
AvailableAt time.Time `json:"availableAt"`
|
||||
Status string `json:"status"`
|
||||
LeaseOwner string `json:"leaseOwner"`
|
||||
LeasedUntil time.Time `json:"leasedUntil"`
|
||||
Attempts int `json:"attempts"`
|
||||
MaxAttempts int `json:"maxAttempts"`
|
||||
CreatedAt time.Time `json:"createdAt"`
|
||||
UpdatedAt time.Time `json:"updatedAt"`
|
||||
Error string `json:"error"`
|
||||
}
|
||||
|
||||
// ParsePayload decodes the payload into the provided struct pointer.
|
||||
func (t *Task) ParsePayload(v interface{}) error {
|
||||
// Marshal the payload map back to JSON, then unmarshal into v.
|
||||
// Marshal the interface{} (map or slice) back to JSON
|
||||
data, err := json.Marshal(t.Payload)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal payload: %w", err)
|
||||
}
|
||||
|
||||
// Unmarshal into the target struct
|
||||
if err := json.Unmarshal(data, v); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal payload: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type EnqueueRequest struct {
|
||||
ApplicationId string `json:"applicationId"`
|
||||
Payload interface{} `json:"payload"`
|
||||
Priority int `json:"priority"`
|
||||
DelaySec int `json:"delaySec"`
|
||||
MaxAttempts int `json:"maxAttempts"`
|
||||
}
|
||||
|
||||
type PopRequest struct {
|
||||
WorkerId string `json:"workerId"`
|
||||
LeaseSeconds int `json:"leaseSeconds"`
|
||||
Filter *TaskFilter `json:"filters"`
|
||||
MinPriority int `json:"minPriority,omitempty"`
|
||||
}
|
||||
|
||||
type CompleteRequest struct {
|
||||
WorkerId string `json:"workerId"`
|
||||
TaskId string `json:"taskId"`
|
||||
}
|
||||
|
||||
type ExtendRequest struct {
|
||||
WorkerId string `json:"workerId"`
|
||||
TaskId string `json:"taskId"`
|
||||
ExtraSeconds int `json:"extraSecond"`
|
||||
}
|
||||
|
||||
type RequeueRequest struct {
|
||||
WorkerId string `json:"workerId"`
|
||||
TaskId string `json:"taskId"`
|
||||
DelaySeconds int `json:"delaySec"`
|
||||
Error string `json:"error"`
|
||||
}
|
||||
|
||||
// New creates a new client.
|
||||
func New(baseURL string) *Client {
|
||||
return &Client{
|
||||
BaseURL: baseURL,
|
||||
HTTPClient: &http.Client{Timeout: 10 * time.Second},
|
||||
}
|
||||
}
|
||||
|
||||
// Enqueue adds a new task.
|
||||
func (c *Client) Enqueue(ctx context.Context, request *EnqueueRequest) (*Task, error) {
|
||||
var t Task
|
||||
if err := c.do(ctx, "POST", "/enqueue", request, &t); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &t, nil
|
||||
}
|
||||
|
||||
// Pop leases a task.
|
||||
func (c *Client) Pop(ctx context.Context, request *PopRequest) (*Task, bool, error) {
|
||||
|
||||
var t Task
|
||||
status, err := c.doStatus(ctx, "POST", "/pop", request, &t)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
if status == http.StatusNoContent {
|
||||
return nil, false, nil
|
||||
}
|
||||
return &t, true, nil
|
||||
}
|
||||
|
||||
// Complete marks a task as done.
|
||||
func (c *Client) Complete(ctx context.Context, request *CompleteRequest) error {
|
||||
return c.doNoResp(ctx, "POST", "/complete", request)
|
||||
}
|
||||
|
||||
// Extend extends a lease on a task.
|
||||
func (c *Client) Extend(ctx context.Context, request *ExtendRequest) error {
|
||||
return c.doNoResp(ctx, "POST", "/extend", request)
|
||||
}
|
||||
|
||||
// Requeue returns a task to the queue.
|
||||
func (c *Client) Requeue(ctx context.Context, request *RequeueRequest) error {
|
||||
return c.doNoResp(ctx, "POST", "/requeue", request)
|
||||
}
|
||||
|
||||
// ----------- internal helpers -----------
|
||||
func (c *Client) do(ctx context.Context, method, path string, body interface{}, out interface{}) error {
|
||||
status, err := c.doStatus(ctx, method, path, body, out)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if status < 200 || status >= 300 {
|
||||
return fmt.Errorf("unexpected status %d", status)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) doNoResp(ctx context.Context, method, path string, body interface{}) error {
|
||||
status, err := c.doStatus(ctx, method, path, body, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if status < 200 || status >= 300 {
|
||||
return fmt.Errorf("unexpected status %d", status)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) doStatus(ctx context.Context, method, path string, body interface{}, out interface{}) (int, error) {
|
||||
var reader io.Reader
|
||||
if body != nil {
|
||||
buf, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
reader = bytes.NewBuffer(buf)
|
||||
}
|
||||
req, err := http.NewRequestWithContext(ctx, method, c.BaseURL+path, reader)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if body != nil {
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
}
|
||||
resp, err := c.HTTPClient.Do(req)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if out != nil && resp.StatusCode >= 200 && resp.StatusCode < 300 {
|
||||
if err := json.NewDecoder(resp.Body).Decode(out); err != nil {
|
||||
return resp.StatusCode, err
|
||||
}
|
||||
}
|
||||
return resp.StatusCode, nil
|
||||
}
|
||||
70
client/client_test.go
Normal file
70
client/client_test.go
Normal file
@@ -0,0 +1,70 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"testing"
|
||||
)
|
||||
|
||||
type EmailPayload struct {
|
||||
To string `json:"to"`
|
||||
Subject string `json:"subject"`
|
||||
Body string `json:"body"`
|
||||
}
|
||||
|
||||
func TestClient(t *testing.T) {
|
||||
|
||||
workerId, _ := os.Hostname()
|
||||
|
||||
client := New("http://localhost:10101")
|
||||
ctx := context.Background()
|
||||
|
||||
newEmailPayload := EmailPayload{
|
||||
To: "example@example.com",
|
||||
Subject: "Test",
|
||||
Body: "Hello World",
|
||||
}
|
||||
|
||||
// Enqueue a task
|
||||
task, err := client.Enqueue(ctx, &EnqueueRequest{
|
||||
ApplicationId: "email",
|
||||
Payload: newEmailPayload,
|
||||
Priority: 10,
|
||||
DelaySec: 0,
|
||||
MaxAttempts: 5,
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatal("enqueue:", err)
|
||||
}
|
||||
fmt.Println("Enqueued task:", task.ID)
|
||||
|
||||
// Pop a task
|
||||
task, ok, err := client.Pop(ctx, &PopRequest{
|
||||
WorkerId: workerId,
|
||||
LeaseSeconds: 60,
|
||||
Filter: &TaskFilter{ApplicationId: "email"},
|
||||
MinPriority: 0,
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatal("pop:", err)
|
||||
}
|
||||
if !ok {
|
||||
fmt.Println("No tasks available")
|
||||
return
|
||||
}
|
||||
|
||||
// Parse payload into struct
|
||||
var email EmailPayload
|
||||
if err := task.ParsePayload(&email); err != nil {
|
||||
log.Fatal("parse payload:", err)
|
||||
}
|
||||
fmt.Printf("Got email task: %+v\n", email)
|
||||
|
||||
// Mark task as complete
|
||||
if err := client.Complete(ctx, &CompleteRequest{TaskId: task.ID, WorkerId: workerId}); err != nil {
|
||||
log.Fatal("complete:", err)
|
||||
}
|
||||
fmt.Println("Task completed")
|
||||
}
|
||||
Reference in New Issue
Block a user