350 lines
9.7 KiB
Go
350 lines
9.7 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"net/http"
|
|
"taskqueue/models"
|
|
"time"
|
|
|
|
"go.mongodb.org/mongo-driver/bson"
|
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
|
"go.mongodb.org/mongo-driver/mongo"
|
|
"go.mongodb.org/mongo-driver/mongo/options"
|
|
)
|
|
|
|
// ---------- Handlers ----------
|
|
|
|
func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodPost {
|
|
httpError(w, http.StatusMethodNotAllowed, "POST required")
|
|
return
|
|
}
|
|
var req struct {
|
|
ApplicationId string `json:"applicationId"`
|
|
Payload map[string]interface{} `json:"payload"`
|
|
Priority int `json:"priority"`
|
|
DelaySec int `json:"delaySec"` // optional delay until available
|
|
MaxAttempts int `json:"maxAttempts"` // optional
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
httpError(w, http.StatusBadRequest, "invalid json: "+err.Error())
|
|
return
|
|
}
|
|
now := time.Now().UTC()
|
|
availableAt := now
|
|
if req.DelaySec > 0 {
|
|
availableAt = now.Add(time.Duration(req.DelaySec) * time.Second)
|
|
}
|
|
if req.MaxAttempts <= 0 {
|
|
req.MaxAttempts = 5
|
|
}
|
|
|
|
task := models.Task{
|
|
ApplicationId: req.ApplicationId,
|
|
Payload: req.Payload,
|
|
Priority: req.Priority,
|
|
AvailableAt: availableAt,
|
|
Status: StatusPending,
|
|
Attempts: 0,
|
|
MaxAttempts: req.MaxAttempts,
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
}
|
|
|
|
ctx := r.Context()
|
|
res, err := s.col.InsertOne(ctx, task)
|
|
if err != nil {
|
|
httpError(w, http.StatusInternalServerError, "insert: "+err.Error())
|
|
return
|
|
}
|
|
task.ID = res.InsertedID.(primitive.ObjectID)
|
|
writeJSON(w, http.StatusCreated, task)
|
|
}
|
|
|
|
func (s *Server) handlePop(w http.ResponseWriter, r *http.Request) {
|
|
// Atomically find a task matching filters and claim it (lease)
|
|
if r.Method != http.MethodPost {
|
|
httpError(w, http.StatusMethodNotAllowed, "POST required")
|
|
return
|
|
}
|
|
|
|
var req struct {
|
|
WorkerID string `json:"workerId"` // required - identity of popper
|
|
LeaseSeconds int `json:"leaseSeconds"` // optional, defaults to 60
|
|
Filters map[string]interface{} `json:"filters"` // e.g. {"type":"email"}
|
|
MinPriority *int `json:"minPriority"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
httpError(w, http.StatusBadRequest, "invalid json: "+err.Error())
|
|
return
|
|
}
|
|
if req.WorkerID == "" {
|
|
httpError(w, http.StatusBadRequest, "workerId required")
|
|
return
|
|
}
|
|
leaseSeconds := req.LeaseSeconds
|
|
if leaseSeconds <= 0 {
|
|
leaseSeconds = 60
|
|
}
|
|
|
|
now := time.Now().UTC()
|
|
leaseUntil := now.Add(time.Duration(leaseSeconds) * time.Second)
|
|
|
|
// Build filter:
|
|
// - task is pending and availableAt <= now
|
|
// OR - task is leased but leasedUntil <= now (expired)
|
|
filter := bson.A{}
|
|
|
|
// user filters (e.g. type)
|
|
userFilter := bson.D{}
|
|
for k, v := range req.Filters {
|
|
userFilter = append(userFilter, bson.E{Key: k, Value: v})
|
|
}
|
|
if req.MinPriority != nil {
|
|
userFilter = append(userFilter, bson.E{Key: "priority", Value: bson.D{{"$gte", *req.MinPriority}}})
|
|
}
|
|
|
|
criteria := bson.D{
|
|
{"$and", bson.A{
|
|
userFilter,
|
|
bson.D{{"availableAt", bson.D{{"$lte", now}}}},
|
|
bson.D{{"$or", bson.A{
|
|
bson.D{{"status", StatusPending}},
|
|
bson.D{{"status", StatusLeased}, {"leasedUntil", bson.D{{"$lte", now}}}}, // expired
|
|
}}},
|
|
}},
|
|
}
|
|
|
|
filter = append(filter, criteria)
|
|
|
|
findFilter := filter[0]
|
|
|
|
// Sort by priority desc, createdAt asc
|
|
sort := bson.D{{"priority", -1}, {"createdAt", 1}}
|
|
|
|
update := bson.D{
|
|
{"$set", bson.D{
|
|
{"status", StatusLeased},
|
|
{"leaseOwner", req.WorkerID},
|
|
{"leasedUntil", leaseUntil},
|
|
{"updatedAt", now},
|
|
}},
|
|
{"$inc", bson.D{{"attempts", 1}}},
|
|
}
|
|
|
|
opts := options.FindOneAndUpdate().SetSort(sort).SetReturnDocument(options.After)
|
|
ctx := r.Context()
|
|
|
|
var task models.Task
|
|
err := s.col.FindOneAndUpdate(ctx, findFilter.(bson.D), update, opts).Decode(&task)
|
|
if err != nil {
|
|
if errors.Is(err, mongo.ErrNoDocuments) {
|
|
// nothing to pop
|
|
writeJSON(w, http.StatusNoContent, nil)
|
|
return
|
|
}
|
|
httpError(w, http.StatusInternalServerError, "pop error: "+err.Error())
|
|
return
|
|
}
|
|
|
|
// If attempts > maxAttempts, mark done with error note instead of returning
|
|
if task.MaxAttempts > 0 && task.Attempts > task.MaxAttempts {
|
|
_, _ = s.col.UpdateByID(ctx, task.ID, bson.D{
|
|
{"$set", bson.D{
|
|
{"status", StatusDone},
|
|
{"error", "max attempts exceeded"},
|
|
{"updatedAt", time.Now().UTC()},
|
|
}},
|
|
})
|
|
writeJSON(w, http.StatusNoContent, nil)
|
|
return
|
|
}
|
|
|
|
writeJSON(w, http.StatusOK, task)
|
|
}
|
|
|
|
func (s *Server) handleComplete(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodPost {
|
|
httpError(w, http.StatusMethodNotAllowed, "POST required")
|
|
return
|
|
}
|
|
var req struct {
|
|
TaskID string `json:"taskId"`
|
|
WorkerID string `json:"workerId"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
httpError(w, http.StatusBadRequest, "invalid json: "+err.Error())
|
|
return
|
|
}
|
|
if req.TaskID == "" || req.WorkerID == "" {
|
|
httpError(w, http.StatusBadRequest, "taskId and workerId required")
|
|
return
|
|
}
|
|
oid, err := primitive.ObjectIDFromHex(req.TaskID)
|
|
if err != nil {
|
|
httpError(w, http.StatusBadRequest, "invalid taskId")
|
|
return
|
|
}
|
|
ctx := r.Context()
|
|
now := time.Now().UTC()
|
|
// Only allow completion if worker matches leaseOwner (or lease expired)
|
|
filter := bson.D{
|
|
{"_id", oid},
|
|
{"$or", bson.A{
|
|
bson.D{{"leaseOwner", req.WorkerID}},
|
|
bson.D{{"leasedUntil", bson.D{{"$lte", now}}}}, // lease expired -> allow completion as cleanup
|
|
}},
|
|
}
|
|
update := bson.D{
|
|
{"$set", bson.D{
|
|
{"status", StatusDone},
|
|
{"updatedAt", now},
|
|
}},
|
|
}
|
|
res, err := s.col.UpdateOne(ctx, filter, update)
|
|
if err != nil {
|
|
httpError(w, http.StatusInternalServerError, "complete update: "+err.Error())
|
|
return
|
|
}
|
|
if res.MatchedCount == 0 {
|
|
httpError(w, http.StatusNotFound, "task not found or worker mismatch")
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]string{"result": "ok"})
|
|
}
|
|
|
|
func (s *Server) handleExtend(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodPost {
|
|
httpError(w, http.StatusMethodNotAllowed, "POST required")
|
|
return
|
|
}
|
|
var req struct {
|
|
TaskID string `json:"taskId"`
|
|
WorkerID string `json:"workerId"`
|
|
ExtraSecond int `json:"extraSecond"` // how many more seconds to add
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
httpError(w, http.StatusBadRequest, "invalid json: "+err.Error())
|
|
return
|
|
}
|
|
if req.TaskID == "" || req.WorkerID == "" || req.ExtraSecond <= 0 {
|
|
httpError(w, http.StatusBadRequest, "taskId, workerId and extraSecond required")
|
|
return
|
|
}
|
|
oid, err := primitive.ObjectIDFromHex(req.TaskID)
|
|
if err != nil {
|
|
httpError(w, http.StatusBadRequest, "invalid taskId")
|
|
return
|
|
}
|
|
ctx := r.Context()
|
|
now := time.Now().UTC()
|
|
|
|
filter := bson.D{
|
|
{"_id", oid},
|
|
{"leaseOwner", req.WorkerID},
|
|
{"leasedUntil", bson.D{{"$gt", now}}}, // lease must still be valid
|
|
}
|
|
|
|
// do proper update: set leasedUntil to old + extra
|
|
var task models.Task
|
|
err = s.col.FindOne(ctx, filter).Decode(&task)
|
|
if err != nil {
|
|
httpError(w, http.StatusInternalServerError, "find task: "+err.Error())
|
|
return
|
|
}
|
|
if task.LeaseOwner != req.WorkerID {
|
|
httpError(w, http.StatusForbidden, "worker does not own lease")
|
|
return
|
|
}
|
|
newUntil := task.LeasedUntil.Add(time.Duration(req.ExtraSecond) * time.Second)
|
|
_, err = s.col.UpdateOne(ctx, filter, bson.D{
|
|
{"$set", bson.D{
|
|
{"leasedUntil", newUntil},
|
|
{"updatedAt", now},
|
|
}},
|
|
})
|
|
if err != nil {
|
|
httpError(w, http.StatusInternalServerError, "extend update: "+err.Error())
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]string{"result": "ok", "newLeasedUntil": newUntil.Format(time.RFC3339)})
|
|
}
|
|
|
|
func (s *Server) handleRequeue(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodPost {
|
|
httpError(w, http.StatusMethodNotAllowed, "POST required")
|
|
return
|
|
}
|
|
var req struct {
|
|
TaskID string `json:"taskId"`
|
|
WorkerID string `json:"workerId"`
|
|
DelaySec int `json:"delaySec"` // optional
|
|
Error string `json:"error"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
httpError(w, http.StatusBadRequest, "invalid json: "+err.Error())
|
|
return
|
|
}
|
|
if req.TaskID == "" || req.WorkerID == "" {
|
|
httpError(w, http.StatusBadRequest, "taskId and workerId required")
|
|
return
|
|
}
|
|
oid, err := primitive.ObjectIDFromHex(req.TaskID)
|
|
if err != nil {
|
|
httpError(w, http.StatusBadRequest, "invalid taskId")
|
|
return
|
|
}
|
|
now := time.Now().UTC()
|
|
availableAt := now
|
|
if req.DelaySec > 0 {
|
|
availableAt = now.Add(time.Duration(req.DelaySec) * time.Second)
|
|
}
|
|
ctx := r.Context()
|
|
|
|
// Only allow requeue if worker owns lease OR lease expired.
|
|
// We'll increment attempts only when popped, so here we don't change attempts.
|
|
filter := bson.D{
|
|
{"_id", oid},
|
|
{"$or", bson.A{
|
|
bson.D{{"leaseOwner", req.WorkerID}},
|
|
bson.D{{"leasedUntil", bson.D{{"$lte", now}}}},
|
|
}},
|
|
}
|
|
update := bson.D{
|
|
{"$set", bson.D{
|
|
{"status", StatusPending},
|
|
{"availableAt", availableAt},
|
|
{"leaseOwner", ""},
|
|
{"leasedUntil", time.Time{}},
|
|
{"updatedAt", now},
|
|
{"error", req.Error},
|
|
}},
|
|
}
|
|
res, err := s.col.UpdateOne(ctx, filter, update)
|
|
if err != nil {
|
|
httpError(w, http.StatusInternalServerError, "requeue update: "+err.Error())
|
|
return
|
|
}
|
|
if res.MatchedCount == 0 {
|
|
httpError(w, http.StatusNotFound, "task not found or worker mismatch")
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]string{"result": "ok"})
|
|
}
|
|
|
|
// ---------- Helpers ----------
|
|
func httpError(w http.ResponseWriter, status int, msg string) {
|
|
w.WriteHeader(status)
|
|
writeJSON(w, status, map[string]string{"error": msg})
|
|
}
|
|
|
|
func writeJSON(w http.ResponseWriter, status int, v interface{}) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(status)
|
|
if v == nil {
|
|
return
|
|
}
|
|
_ = json.NewEncoder(w).Encode(v)
|
|
}
|