package main import ( "encoding/json" "errors" "net/http" "taskqueue/models" "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) // ---------- Handlers ---------- func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { httpError(w, http.StatusMethodNotAllowed, "POST required") return } var req struct { ApplicationId string `json:"applicationId"` Payload map[string]interface{} `json:"payload"` Priority int `json:"priority"` DelaySec int `json:"delaySec"` // optional delay until available MaxAttempts int `json:"maxAttempts"` // optional } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { httpError(w, http.StatusBadRequest, "invalid json: "+err.Error()) return } now := time.Now().UTC() availableAt := now if req.DelaySec > 0 { availableAt = now.Add(time.Duration(req.DelaySec) * time.Second) } if req.MaxAttempts <= 0 { req.MaxAttempts = 5 } task := models.Task{ ApplicationId: req.ApplicationId, Payload: req.Payload, Priority: req.Priority, AvailableAt: availableAt, Status: StatusPending, Attempts: 0, MaxAttempts: req.MaxAttempts, CreatedAt: now, UpdatedAt: now, } ctx := r.Context() res, err := s.col.InsertOne(ctx, task) if err != nil { httpError(w, http.StatusInternalServerError, "insert: "+err.Error()) return } task.ID = res.InsertedID.(primitive.ObjectID) writeJSON(w, http.StatusCreated, task) } func (s *Server) handlePop(w http.ResponseWriter, r *http.Request) { // Atomically find a task matching filters and claim it (lease) if r.Method != http.MethodPost { httpError(w, http.StatusMethodNotAllowed, "POST required") return } var req struct { WorkerID string `json:"workerId"` // required - identity of popper LeaseSeconds int `json:"leaseSeconds"` // optional, defaults to 60 Filters map[string]interface{} `json:"filters"` // e.g. {"type":"email"} MinPriority *int `json:"minPriority"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { httpError(w, http.StatusBadRequest, "invalid json: "+err.Error()) return } if req.WorkerID == "" { httpError(w, http.StatusBadRequest, "workerId required") return } leaseSeconds := req.LeaseSeconds if leaseSeconds <= 0 { leaseSeconds = 60 } now := time.Now().UTC() leaseUntil := now.Add(time.Duration(leaseSeconds) * time.Second) // Build filter: // - task is pending and availableAt <= now // OR - task is leased but leasedUntil <= now (expired) filter := bson.A{} // user filters (e.g. type) userFilter := bson.D{} for k, v := range req.Filters { userFilter = append(userFilter, bson.E{Key: k, Value: v}) } if req.MinPriority != nil { userFilter = append(userFilter, bson.E{Key: "priority", Value: bson.D{{"$gte", *req.MinPriority}}}) } criteria := bson.D{ {"$and", bson.A{ userFilter, bson.D{{"availableAt", bson.D{{"$lte", now}}}}, bson.D{{"$or", bson.A{ bson.D{{"status", StatusPending}}, bson.D{{"status", StatusLeased}, {"leasedUntil", bson.D{{"$lte", now}}}}, // expired }}}, }}, } filter = append(filter, criteria) findFilter := filter[0] // Sort by priority desc, createdAt asc sort := bson.D{{"priority", -1}, {"createdAt", 1}} update := bson.D{ {"$set", bson.D{ {"status", StatusLeased}, {"leaseOwner", req.WorkerID}, {"leasedUntil", leaseUntil}, {"updatedAt", now}, }}, {"$inc", bson.D{{"attempts", 1}}}, } opts := options.FindOneAndUpdate().SetSort(sort).SetReturnDocument(options.After) ctx := r.Context() var task models.Task err := s.col.FindOneAndUpdate(ctx, findFilter.(bson.D), update, opts).Decode(&task) if err != nil { if errors.Is(err, mongo.ErrNoDocuments) { // nothing to pop writeJSON(w, http.StatusNoContent, nil) return } httpError(w, http.StatusInternalServerError, "pop error: "+err.Error()) return } // If attempts > maxAttempts, mark done with error note instead of returning if task.MaxAttempts > 0 && task.Attempts > task.MaxAttempts { _, _ = s.col.UpdateByID(ctx, task.ID, bson.D{ {"$set", bson.D{ {"status", StatusDone}, {"error", "max attempts exceeded"}, {"updatedAt", time.Now().UTC()}, }}, }) writeJSON(w, http.StatusNoContent, nil) return } writeJSON(w, http.StatusOK, task) } func (s *Server) handleComplete(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { httpError(w, http.StatusMethodNotAllowed, "POST required") return } var req struct { TaskID string `json:"taskId"` WorkerID string `json:"workerId"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { httpError(w, http.StatusBadRequest, "invalid json: "+err.Error()) return } if req.TaskID == "" || req.WorkerID == "" { httpError(w, http.StatusBadRequest, "taskId and workerId required") return } oid, err := primitive.ObjectIDFromHex(req.TaskID) if err != nil { httpError(w, http.StatusBadRequest, "invalid taskId") return } ctx := r.Context() now := time.Now().UTC() // Only allow completion if worker matches leaseOwner (or lease expired) filter := bson.D{ {"_id", oid}, {"$or", bson.A{ bson.D{{"leaseOwner", req.WorkerID}}, bson.D{{"leasedUntil", bson.D{{"$lte", now}}}}, // lease expired -> allow completion as cleanup }}, } update := bson.D{ {"$set", bson.D{ {"status", StatusDone}, {"updatedAt", now}, }}, } res, err := s.col.UpdateOne(ctx, filter, update) if err != nil { httpError(w, http.StatusInternalServerError, "complete update: "+err.Error()) return } if res.MatchedCount == 0 { httpError(w, http.StatusNotFound, "task not found or worker mismatch") return } writeJSON(w, http.StatusOK, map[string]string{"result": "ok"}) } func (s *Server) handleExtend(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { httpError(w, http.StatusMethodNotAllowed, "POST required") return } var req struct { TaskID string `json:"taskId"` WorkerID string `json:"workerId"` ExtraSecond int `json:"extraSecond"` // how many more seconds to add } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { httpError(w, http.StatusBadRequest, "invalid json: "+err.Error()) return } if req.TaskID == "" || req.WorkerID == "" || req.ExtraSecond <= 0 { httpError(w, http.StatusBadRequest, "taskId, workerId and extraSecond required") return } oid, err := primitive.ObjectIDFromHex(req.TaskID) if err != nil { httpError(w, http.StatusBadRequest, "invalid taskId") return } ctx := r.Context() now := time.Now().UTC() filter := bson.D{ {"_id", oid}, {"leaseOwner", req.WorkerID}, {"leasedUntil", bson.D{{"$gt", now}}}, // lease must still be valid } // do proper update: set leasedUntil to old + extra var task models.Task err = s.col.FindOne(ctx, filter).Decode(&task) if err != nil { httpError(w, http.StatusInternalServerError, "find task: "+err.Error()) return } if task.LeaseOwner != req.WorkerID { httpError(w, http.StatusForbidden, "worker does not own lease") return } newUntil := task.LeasedUntil.Add(time.Duration(req.ExtraSecond) * time.Second) _, err = s.col.UpdateOne(ctx, filter, bson.D{ {"$set", bson.D{ {"leasedUntil", newUntil}, {"updatedAt", now}, }}, }) if err != nil { httpError(w, http.StatusInternalServerError, "extend update: "+err.Error()) return } writeJSON(w, http.StatusOK, map[string]string{"result": "ok", "newLeasedUntil": newUntil.Format(time.RFC3339)}) } func (s *Server) handleRequeue(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { httpError(w, http.StatusMethodNotAllowed, "POST required") return } var req struct { TaskID string `json:"taskId"` WorkerID string `json:"workerId"` DelaySec int `json:"delaySec"` // optional Error string `json:"error"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { httpError(w, http.StatusBadRequest, "invalid json: "+err.Error()) return } if req.TaskID == "" || req.WorkerID == "" { httpError(w, http.StatusBadRequest, "taskId and workerId required") return } oid, err := primitive.ObjectIDFromHex(req.TaskID) if err != nil { httpError(w, http.StatusBadRequest, "invalid taskId") return } now := time.Now().UTC() availableAt := now if req.DelaySec > 0 { availableAt = now.Add(time.Duration(req.DelaySec) * time.Second) } ctx := r.Context() // Only allow requeue if worker owns lease OR lease expired. // We'll increment attempts only when popped, so here we don't change attempts. filter := bson.D{ {"_id", oid}, {"$or", bson.A{ bson.D{{"leaseOwner", req.WorkerID}}, bson.D{{"leasedUntil", bson.D{{"$lte", now}}}}, }}, } update := bson.D{ {"$set", bson.D{ {"status", StatusPending}, {"availableAt", availableAt}, {"leaseOwner", ""}, {"leasedUntil", time.Time{}}, {"updatedAt", now}, {"error", req.Error}, }}, } res, err := s.col.UpdateOne(ctx, filter, update) if err != nil { httpError(w, http.StatusInternalServerError, "requeue update: "+err.Error()) return } if res.MatchedCount == 0 { httpError(w, http.StatusNotFound, "task not found or worker mismatch") return } writeJSON(w, http.StatusOK, map[string]string{"result": "ok"}) } // ---------- Helpers ---------- func httpError(w http.ResponseWriter, status int, msg string) { w.WriteHeader(status) writeJSON(w, status, map[string]string{"error": msg}) } func writeJSON(w http.ResponseWriter, status int, v interface{}) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) if v == nil { return } _ = json.NewEncoder(w).Encode(v) }