From 5d273dacd00e7250ef6cdcf4b32be5e231658558 Mon Sep 17 00:00:00 2001 From: mrhid6 Date: Fri, 12 Sep 2025 11:35:25 +0000 Subject: [PATCH] feat: Initial commit --- .gitignore | 3 + Dockerfile | 16 ++ README.md | 13 ++ docker-compose.yaml | 22 +++ entry.sh | 33 +++++ go.mod | 23 +++ go.sum | 56 +++++++ handler.go | 349 ++++++++++++++++++++++++++++++++++++++++++++ main.go | 51 +++++++ makefile | 45 ++++++ models/task.go | 23 +++ server.go | 46 ++++++ 12 files changed, 680 insertions(+) create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 docker-compose.yaml create mode 100644 entry.sh create mode 100644 go.mod create mode 100644 go.sum create mode 100644 handler.go create mode 100644 main.go create mode 100644 makefile create mode 100644 models/task.go create mode 100644 server.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0609413 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.env +*.pid +bin/ \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..9c1c873 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,16 @@ +# syntax=docker/dockerfile:1 +FROM golang:1.25 + +ENV DEBIAN_FRONTEND noninteractive + +RUN apt-get -qq update -y &&\ + apt-get -qq install apt-utils wget curl -y &&\ + mkdir /opt/app &&\ + useradd -m -s /bin/bash appuser + +COPY entry.sh / +COPY bin/* /opt/app/ + +RUN chmod 777 /entry.sh && chown -R appuser:appuser /opt/app && chmod -R 777 /opt/app + +ENTRYPOINT [ "/entry.sh" ] \ No newline at end of file diff --git a/README.md b/README.md index e69de29..83ffb80 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,13 @@ +# Task Queue Server + +## Introduction + +Server to queue tasks in MongoDB for use in workflow scenario. + +## Features: + +- Enqueue tasks to queue +- Pop tasks from queue +- Complete tasks +- Extend leased tasks +- Requeue task on error diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..d6b493d --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,22 @@ +services: + taskqueue-server: + image: gitea.hostxtra.co.uk/hostxtra/taskqueue-server:latest + container_name: taskqueue-server + restart: unless-stopped + environment: + DB_HOST: ${DB_HOST} + DB_PORT: ${DB_PORT} + DB_USER: ${DB_USER} + DB_PASSWORD: ${DB_PASSWORD} + DB_DATABASE: ${DB_DATABASE} + DB_AUTHSOURCE: ${DB_AUTHSOURCE} + DB_DEBUG: ${DB_DEBUG} + HTTP_BIND: ${HTTP_BIND} + ports: + - "10101:10101" + networks: + - taskqueue-net + +networks: + taskqueue-net: + driver: bridge diff --git a/entry.sh b/entry.sh new file mode 100644 index 0000000..7679065 --- /dev/null +++ b/entry.sh @@ -0,0 +1,33 @@ +#!/bin/bash + +EXE="/opt/app/taskqueue-server" + +chown -R appuser:appuser /opt/app +chmod -R 777 /opt/app + +#Define cleanup procedure +cleanup() { + echo "Container stopped, performing cleanup..." + pid=$(ps -ef | awk '$8=="'${EXE}'" {print $2}') + kill -INT $pid + + while true; do + echo "Waiting for process to finish" + pid=$(ps -ef | awk '$8=="'${EXE}'" {print $2}') + if [ "$pid" == "" ]; then + break + fi + sleep 5 + done + exit 0 +} + +#Trap SIGTERM +trap 'cleanup' SIGTERM + +hostname + +su appuser -c "cd /opt/app && ${EXE}" & + +wait $! +sleep 40 \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..4097366 --- /dev/null +++ b/go.mod @@ -0,0 +1,23 @@ +module taskqueue + +go 1.25.0 + +require ( + github.com/joho/godotenv v1.5.1 + github.com/mrhid6/go-mongoose v0.0.17 + go.mongodb.org/mongo-driver v1.17.4 +) + +require ( + github.com/gertd/go-pluralize v0.2.1 // indirect + github.com/golang/snappy v1.0.0 // indirect + github.com/klauspost/compress v1.18.0 // indirect + github.com/montanaflynn/stats v0.7.1 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect + github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect + golang.org/x/crypto v0.42.0 // indirect + golang.org/x/sync v0.17.0 // indirect + golang.org/x/text v0.29.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..27e9c34 --- /dev/null +++ b/go.sum @@ -0,0 +1,56 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gertd/go-pluralize v0.2.1 h1:M3uASbVjMnTsPb0PNqg+E/24Vwigyo/tvyMTtAlLgiA= +github.com/gertd/go-pluralize v0.2.1/go.mod h1:rbYaKDbsXxmRfr8uygAEKhOWsjyrrqrkHVpZvoOp8zk= +github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= +github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= +github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= +github.com/mrhid6/go-mongoose v0.0.17 h1:K/HL4PIS2Le5I5s4U85b/R9zYdX9PwnBnQqJLCdukOA= +github.com/mrhid6/go-mongoose v0.0.17/go.mod h1:RowhE7wnLlPzZ8f/Mgfkh1ErIpM0SOpNU4C7ixgYoZc= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.mongodb.org/mongo-driver v1.17.4 h1:jUorfmVzljjr0FLzYQsGP8cgN/qzzxlY9Vh0C9KFXVw= +go.mongodb.org/mongo-driver v1.17.4/go.mod h1:Hy04i7O2kC4RS06ZrhPRqj/u4DTYkFDAAccj+rVKqgQ= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI= +golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= +golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/handler.go b/handler.go new file mode 100644 index 0000000..3e55b74 --- /dev/null +++ b/handler.go @@ -0,0 +1,349 @@ +package main + +import ( + "encoding/json" + "errors" + "net/http" + "taskqueue/models" + "time" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +// ---------- Handlers ---------- + +func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + httpError(w, http.StatusMethodNotAllowed, "POST required") + return + } + var req struct { + ApplicationId string `json:"applicationId"` + Payload map[string]interface{} `json:"payload"` + Priority int `json:"priority"` + DelaySec int `json:"delaySec"` // optional delay until available + MaxAttempts int `json:"maxAttempts"` // optional + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + httpError(w, http.StatusBadRequest, "invalid json: "+err.Error()) + return + } + now := time.Now().UTC() + availableAt := now + if req.DelaySec > 0 { + availableAt = now.Add(time.Duration(req.DelaySec) * time.Second) + } + if req.MaxAttempts <= 0 { + req.MaxAttempts = 5 + } + + task := models.Task{ + ApplicationId: req.ApplicationId, + Payload: req.Payload, + Priority: req.Priority, + AvailableAt: availableAt, + Status: StatusPending, + Attempts: 0, + MaxAttempts: req.MaxAttempts, + CreatedAt: now, + UpdatedAt: now, + } + + ctx := r.Context() + res, err := s.col.InsertOne(ctx, task) + if err != nil { + httpError(w, http.StatusInternalServerError, "insert: "+err.Error()) + return + } + task.ID = res.InsertedID.(primitive.ObjectID) + writeJSON(w, http.StatusCreated, task) +} + +func (s *Server) handlePop(w http.ResponseWriter, r *http.Request) { + // Atomically find a task matching filters and claim it (lease) + if r.Method != http.MethodPost { + httpError(w, http.StatusMethodNotAllowed, "POST required") + return + } + + var req struct { + WorkerID string `json:"workerId"` // required - identity of popper + LeaseSeconds int `json:"leaseSeconds"` // optional, defaults to 60 + Filters map[string]interface{} `json:"filters"` // e.g. {"type":"email"} + MinPriority *int `json:"minPriority"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + httpError(w, http.StatusBadRequest, "invalid json: "+err.Error()) + return + } + if req.WorkerID == "" { + httpError(w, http.StatusBadRequest, "workerId required") + return + } + leaseSeconds := req.LeaseSeconds + if leaseSeconds <= 0 { + leaseSeconds = 60 + } + + now := time.Now().UTC() + leaseUntil := now.Add(time.Duration(leaseSeconds) * time.Second) + + // Build filter: + // - task is pending and availableAt <= now + // OR - task is leased but leasedUntil <= now (expired) + filter := bson.A{} + + // user filters (e.g. type) + userFilter := bson.D{} + for k, v := range req.Filters { + userFilter = append(userFilter, bson.E{Key: k, Value: v}) + } + if req.MinPriority != nil { + userFilter = append(userFilter, bson.E{Key: "priority", Value: bson.D{{"$gte", *req.MinPriority}}}) + } + + criteria := bson.D{ + {"$and", bson.A{ + userFilter, + bson.D{{"availableAt", bson.D{{"$lte", now}}}}, + bson.D{{"$or", bson.A{ + bson.D{{"status", StatusPending}}, + bson.D{{"status", StatusLeased}, {"leasedUntil", bson.D{{"$lte", now}}}}, // expired + }}}, + }}, + } + + filter = append(filter, criteria) + + findFilter := filter[0] + + // Sort by priority desc, createdAt asc + sort := bson.D{{"priority", -1}, {"createdAt", 1}} + + update := bson.D{ + {"$set", bson.D{ + {"status", StatusLeased}, + {"leaseOwner", req.WorkerID}, + {"leasedUntil", leaseUntil}, + {"updatedAt", now}, + }}, + {"$inc", bson.D{{"attempts", 1}}}, + } + + opts := options.FindOneAndUpdate().SetSort(sort).SetReturnDocument(options.After) + ctx := r.Context() + + var task models.Task + err := s.col.FindOneAndUpdate(ctx, findFilter.(bson.D), update, opts).Decode(&task) + if err != nil { + if errors.Is(err, mongo.ErrNoDocuments) { + // nothing to pop + writeJSON(w, http.StatusNoContent, nil) + return + } + httpError(w, http.StatusInternalServerError, "pop error: "+err.Error()) + return + } + + // If attempts > maxAttempts, mark done with error note instead of returning + if task.MaxAttempts > 0 && task.Attempts > task.MaxAttempts { + _, _ = s.col.UpdateByID(ctx, task.ID, bson.D{ + {"$set", bson.D{ + {"status", StatusDone}, + {"error", "max attempts exceeded"}, + {"updatedAt", time.Now().UTC()}, + }}, + }) + writeJSON(w, http.StatusNoContent, nil) + return + } + + writeJSON(w, http.StatusOK, task) +} + +func (s *Server) handleComplete(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + httpError(w, http.StatusMethodNotAllowed, "POST required") + return + } + var req struct { + TaskID string `json:"taskId"` + WorkerID string `json:"workerId"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + httpError(w, http.StatusBadRequest, "invalid json: "+err.Error()) + return + } + if req.TaskID == "" || req.WorkerID == "" { + httpError(w, http.StatusBadRequest, "taskId and workerId required") + return + } + oid, err := primitive.ObjectIDFromHex(req.TaskID) + if err != nil { + httpError(w, http.StatusBadRequest, "invalid taskId") + return + } + ctx := r.Context() + now := time.Now().UTC() + // Only allow completion if worker matches leaseOwner (or lease expired) + filter := bson.D{ + {"_id", oid}, + {"$or", bson.A{ + bson.D{{"leaseOwner", req.WorkerID}}, + bson.D{{"leasedUntil", bson.D{{"$lte", now}}}}, // lease expired -> allow completion as cleanup + }}, + } + update := bson.D{ + {"$set", bson.D{ + {"status", StatusDone}, + {"updatedAt", now}, + }}, + } + res, err := s.col.UpdateOne(ctx, filter, update) + if err != nil { + httpError(w, http.StatusInternalServerError, "complete update: "+err.Error()) + return + } + if res.MatchedCount == 0 { + httpError(w, http.StatusNotFound, "task not found or worker mismatch") + return + } + writeJSON(w, http.StatusOK, map[string]string{"result": "ok"}) +} + +func (s *Server) handleExtend(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + httpError(w, http.StatusMethodNotAllowed, "POST required") + return + } + var req struct { + TaskID string `json:"taskId"` + WorkerID string `json:"workerId"` + ExtraSecond int `json:"extraSecond"` // how many more seconds to add + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + httpError(w, http.StatusBadRequest, "invalid json: "+err.Error()) + return + } + if req.TaskID == "" || req.WorkerID == "" || req.ExtraSecond <= 0 { + httpError(w, http.StatusBadRequest, "taskId, workerId and extraSecond required") + return + } + oid, err := primitive.ObjectIDFromHex(req.TaskID) + if err != nil { + httpError(w, http.StatusBadRequest, "invalid taskId") + return + } + ctx := r.Context() + now := time.Now().UTC() + + filter := bson.D{ + {"_id", oid}, + {"leaseOwner", req.WorkerID}, + {"leasedUntil", bson.D{{"$gt", now}}}, // lease must still be valid + } + + // do proper update: set leasedUntil to old + extra + var task models.Task + err = s.col.FindOne(ctx, filter).Decode(&task) + if err != nil { + httpError(w, http.StatusInternalServerError, "find task: "+err.Error()) + return + } + if task.LeaseOwner != req.WorkerID { + httpError(w, http.StatusForbidden, "worker does not own lease") + return + } + newUntil := task.LeasedUntil.Add(time.Duration(req.ExtraSecond) * time.Second) + _, err = s.col.UpdateOne(ctx, filter, bson.D{ + {"$set", bson.D{ + {"leasedUntil", newUntil}, + {"updatedAt", now}, + }}, + }) + if err != nil { + httpError(w, http.StatusInternalServerError, "extend update: "+err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]string{"result": "ok", "newLeasedUntil": newUntil.Format(time.RFC3339)}) +} + +func (s *Server) handleRequeue(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + httpError(w, http.StatusMethodNotAllowed, "POST required") + return + } + var req struct { + TaskID string `json:"taskId"` + WorkerID string `json:"workerId"` + DelaySec int `json:"delaySec"` // optional + Error string `json:"error"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + httpError(w, http.StatusBadRequest, "invalid json: "+err.Error()) + return + } + if req.TaskID == "" || req.WorkerID == "" { + httpError(w, http.StatusBadRequest, "taskId and workerId required") + return + } + oid, err := primitive.ObjectIDFromHex(req.TaskID) + if err != nil { + httpError(w, http.StatusBadRequest, "invalid taskId") + return + } + now := time.Now().UTC() + availableAt := now + if req.DelaySec > 0 { + availableAt = now.Add(time.Duration(req.DelaySec) * time.Second) + } + ctx := r.Context() + + // Only allow requeue if worker owns lease OR lease expired. + // We'll increment attempts only when popped, so here we don't change attempts. + filter := bson.D{ + {"_id", oid}, + {"$or", bson.A{ + bson.D{{"leaseOwner", req.WorkerID}}, + bson.D{{"leasedUntil", bson.D{{"$lte", now}}}}, + }}, + } + update := bson.D{ + {"$set", bson.D{ + {"status", StatusPending}, + {"availableAt", availableAt}, + {"leaseOwner", ""}, + {"leasedUntil", time.Time{}}, + {"updatedAt", now}, + {"error", req.Error}, + }}, + } + res, err := s.col.UpdateOne(ctx, filter, update) + if err != nil { + httpError(w, http.StatusInternalServerError, "requeue update: "+err.Error()) + return + } + if res.MatchedCount == 0 { + httpError(w, http.StatusNotFound, "task not found or worker mismatch") + return + } + writeJSON(w, http.StatusOK, map[string]string{"result": "ok"}) +} + +// ---------- Helpers ---------- +func httpError(w http.ResponseWriter, status int, msg string) { + w.WriteHeader(status) + writeJSON(w, status, map[string]string{"error": msg}) +} + +func writeJSON(w http.ResponseWriter, status int, v interface{}) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + if v == nil { + return + } + _ = json.NewEncoder(w).Encode(v) +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..0b94702 --- /dev/null +++ b/main.go @@ -0,0 +1,51 @@ +package main + +import ( + "context" + "log" + "net/http" + "os" + + "github.com/joho/godotenv" + "github.com/mrhid6/go-mongoose/mongoose" +) + +const ( + collectionName = "tasks" +) + +// Task statuses +const ( + StatusPending = "pending" + StatusLeased = "leased" + StatusDone = "done" +) + +func main() { + + godotenv.Load() + + ctx := context.Background() + + mc, err := mongoose.NewMongoClient(ctx, mongoose.GetConnectionOptionsFromEnv()) + if err != nil { + log.Fatalf("mongo connect: %v", err) + } + + col := mc.GetCollection(collectionName) + s := &Server{mc: mc, col: col} + + if err := s.ensureIndexes(ctx); err != nil { + log.Fatalf("ensure indexes: %v", err) + } + + http.HandleFunc("/enqueue", s.handleEnqueue) + http.HandleFunc("/pop", s.handlePop) + http.HandleFunc("/complete", s.handleComplete) + http.HandleFunc("/extend", s.handleExtend) + http.HandleFunc("/requeue", s.handleRequeue) + + addr := os.Getenv("HTTP_BIND") + log.Printf("task queue API listening on %s", addr) + log.Fatal(http.ListenAndServe(addr, nil)) +} diff --git a/makefile b/makefile new file mode 100644 index 0000000..9e44b50 --- /dev/null +++ b/makefile @@ -0,0 +1,45 @@ +# Makefile for Go app with ngrok + +APP_NAME := taskqueue-server +PORT := 10101 +GO_BUILD := go build -o bin/$(APP_NAME) +NGROK_BIN := ngrok + +.PHONY: build run serve ngrok kill clean + +build: + @echo "๐Ÿ”จ Building Go app..." + @if [ ! -d bin ]; then mkdir bin; fi + $(GO_BUILD) . + +run: build + @echo "๐Ÿš€ Running Go app..." + ./bin/$(APP_NAME) + +serve: kill build + @echo "๐Ÿš€ Running Go app in background..." + ./bin/$(APP_NAME) & echo $$! > app.pid + sleep 2 + @echo "๐ŸŒ Starting ngrok on port $(PORT)..." + $(NGROK_BIN) http --domain=handy-outgoing-finch.ngrok-free.app $(PORT) > /dev/null & echo $$! > ngrok.pid + +ngrok: + @echo "๐ŸŒ Starting ngrok on port $(PORT)..." + $(NGROK_BIN) http --domain=handy-outgoing-finch.ngrok-free.app $(PORT) > /dev/null & echo $$! > ngrok.pid + +kill: + @echo "๐Ÿ›‘ Killing Go app (if running)..." + @if [ -f app.pid ]; then kill `cat app.pid` && rm app.pid; fi + @if [ -f ngrok.pid ]; then kill `cat ngrok.pid` && rm ngrok.pid; fi + +clean: kill + @echo "๐Ÿงน Cleaning up..." + rm -f $(APP_NAME) + +docker-build: kill build + @echo "Building Docker Image" + docker build -t gitea.hostxtra.co.uk/hostxtra/taskqueue-server:latest . + +docker-push: docker-build + @echo "Pushing docker image" + docker push gitea.hostxtra.co.uk/hostxtra/taskqueue-server:latest \ No newline at end of file diff --git a/models/task.go b/models/task.go new file mode 100644 index 0000000..db4701d --- /dev/null +++ b/models/task.go @@ -0,0 +1,23 @@ +package models + +import ( + "time" + + "go.mongodb.org/mongo-driver/bson/primitive" +) + +type Task struct { + ID primitive.ObjectID `bson:"_id,omitempty" json:"id"` + ApplicationId string `bson:"applicationId,omitempty" json:"applicationId"` + Payload interface{} `bson:"payload,omitempty" json:"payload"` + Priority int `bson:"priority,omitempty" json:"priority"` // higher first + AvailableAt time.Time `bson:"availableAt,omitempty" json:"availableAt"` + Status string `bson:"status,omitempty" json:"status"` + LeaseOwner string `bson:"leaseOwner,omitempty" json:"leaseOwner"` + LeasedUntil time.Time `bson:"leasedUntil,omitempty" json:"leasedUntil"` + Attempts int `bson:"attempts,omitempty" json:"attempts"` + MaxAttempts int `bson:"maxAttempts,omitempty" json:"maxAttempts"` + CreatedAt time.Time `bson:"createdAt,omitempty" json:"createdAt"` + UpdatedAt time.Time `bson:"updatedAt,omitempty" json:"updatedAt"` + Error string `bson:"error,omitempty" json:"error,omitempty"` +} diff --git a/server.go b/server.go new file mode 100644 index 0000000..d8679fb --- /dev/null +++ b/server.go @@ -0,0 +1,46 @@ +package main + +import ( + "context" + + "github.com/mrhid6/go-mongoose/mongoose" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +type Server struct { + mc *mongoose.MongooseClient + col *mongo.Collection +} + +// ---------- Indexes ---------- +func (s *Server) ensureIndexes(ctx context.Context) error { + idxs := s.col.Indexes() + models := []mongo.IndexModel{ + // frequently query by status + availableAt + priority + { + Keys: bson.D{ + {"status", 1}, + {"availableAt", 1}, + {"priority", -1}, + }, + Options: options.Index().SetName("status_available_priority"), + }, + // lease expiration useful for queries + { + Keys: bson.D{ + {"leasedUntil", 1}, + }, + Options: options.Index().SetName("leased_until_idx"), + }, + // TTL index on done tasks if you want automatic removal after X seconds. + // Uncomment or adjust the seconds as you need. Here we do not create by default. + // { + // Keys: bson.D{{"updatedAt", 1}}, + // Options: options.Index().SetExpireAfterSeconds(60 * 60 * 24), // e.g. 24h + // }, + } + _, err := idxs.CreateMany(ctx, models) + return err +}