updates
This commit is contained in:
@@ -0,0 +1,76 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/mrhid6/keymanager/server/internal/grpc/pb"
|
||||
)
|
||||
|
||||
type commandDispatcher struct {
|
||||
mu sync.RWMutex
|
||||
channels map[string]chan *pb.ServerCommand
|
||||
}
|
||||
|
||||
// Dispatcher is the singleton command dispatcher used by both the gRPC server
|
||||
// and the REST API to push commands to connected agents.
|
||||
var Dispatcher = &commandDispatcher{
|
||||
channels: make(map[string]chan *pb.ServerCommand),
|
||||
}
|
||||
|
||||
// Connect registers an agent's command channel. Returns the channel to drain.
|
||||
func (d *commandDispatcher) Connect(serverID string) chan *pb.ServerCommand {
|
||||
ch := make(chan *pb.ServerCommand, 16)
|
||||
d.mu.Lock()
|
||||
d.channels[serverID] = ch
|
||||
d.mu.Unlock()
|
||||
return ch
|
||||
}
|
||||
|
||||
// Disconnect removes the agent's channel on stream close.
|
||||
func (d *commandDispatcher) Disconnect(serverID string) {
|
||||
d.mu.Lock()
|
||||
delete(d.channels, serverID)
|
||||
d.mu.Unlock()
|
||||
}
|
||||
|
||||
// IsConnected reports whether an agent is currently holding a CommandStream.
|
||||
func (d *commandDispatcher) IsConnected(serverID string) bool {
|
||||
d.mu.RLock()
|
||||
_, ok := d.channels[serverID]
|
||||
d.mu.RUnlock()
|
||||
return ok
|
||||
}
|
||||
|
||||
func (d *commandDispatcher) dispatch(serverID string, cmd *pb.ServerCommand) error {
|
||||
d.mu.RLock()
|
||||
ch, ok := d.channels[serverID]
|
||||
d.mu.RUnlock()
|
||||
if !ok {
|
||||
return fmt.Errorf("agent for server %s is not connected", serverID)
|
||||
}
|
||||
select {
|
||||
case ch <- cmd:
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("command queue full for server %s", serverID)
|
||||
}
|
||||
}
|
||||
|
||||
// DispatchGenerateKey sends a generate-key command to the named server's agent.
|
||||
// Returns the command ID that can be used to correlate the agent's result.
|
||||
func DispatchGenerateKey(serverID, label string) (string, error) {
|
||||
if !Dispatcher.IsConnected(serverID) {
|
||||
return "", fmt.Errorf("agent is not connected to the command stream")
|
||||
}
|
||||
cmdID := uuid.New().String()
|
||||
cmd := &pb.ServerCommand{
|
||||
CommandId: cmdID,
|
||||
GenerateKey: &pb.GenerateKeyCmd{Label: label},
|
||||
}
|
||||
if err := Dispatcher.dispatch(serverID, cmd); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return cmdID, nil
|
||||
}
|
||||
Reference in New Issue
Block a user