package services import ( "fmt" "sync" "github.com/google/uuid" "github.com/mrhid6/keymanager/server/internal/grpc/pb" ) type commandDispatcher struct { mu sync.RWMutex channels map[string]chan *pb.ServerCommand } // Dispatcher is the singleton command dispatcher used by both the gRPC server // and the REST API to push commands to connected agents. var Dispatcher = &commandDispatcher{ channels: make(map[string]chan *pb.ServerCommand), } // Connect registers an agent's command channel. Returns the channel to drain. func (d *commandDispatcher) Connect(serverID string) chan *pb.ServerCommand { ch := make(chan *pb.ServerCommand, 16) d.mu.Lock() d.channels[serverID] = ch d.mu.Unlock() return ch } // Disconnect removes the agent's channel on stream close. func (d *commandDispatcher) Disconnect(serverID string) { d.mu.Lock() delete(d.channels, serverID) d.mu.Unlock() } // IsConnected reports whether an agent is currently holding a CommandStream. func (d *commandDispatcher) IsConnected(serverID string) bool { d.mu.RLock() _, ok := d.channels[serverID] d.mu.RUnlock() return ok } func (d *commandDispatcher) dispatch(serverID string, cmd *pb.ServerCommand) error { d.mu.RLock() ch, ok := d.channels[serverID] d.mu.RUnlock() if !ok { return fmt.Errorf("agent for server %s is not connected", serverID) } select { case ch <- cmd: return nil default: return fmt.Errorf("command queue full for server %s", serverID) } } // DispatchGenerateKey sends a generate-key command to the named server's agent. // Returns the command ID that can be used to correlate the agent's result. func DispatchGenerateKey(serverID, label string) (string, error) { if !Dispatcher.IsConnected(serverID) { return "", fmt.Errorf("agent is not connected to the command stream") } cmdID := uuid.New().String() cmd := &pb.ServerCommand{ CommandId: cmdID, GenerateKey: &pb.GenerateKeyCmd{Label: label}, } if err := Dispatcher.dispatch(serverID, cmd); err != nil { return "", err } return cmdID, nil }