From 1dc3ad1215b009901228283d914a05b662f81152 Mon Sep 17 00:00:00 2001 From: Axodouble Date: Thu, 14 May 2026 00:25:43 +0000 Subject: [PATCH] Added default alerts, updated UI to show alerts with checks, added auto sync for manual file editing --- internal/alerts/dispatcher.go | 14 ++--- internal/cli/alert.go | 52 +++++++++++++++++-- internal/cli/status.go | 28 +++++++--- internal/config/cluster.go | 89 ++++++++++++++++++++++++++++++-- internal/daemon/daemon.go | 6 +++ internal/daemon/handlers.go | 8 +++ internal/daemon/watcher.go | 85 ++++++++++++++++++++++++++++++ internal/replicate/replicator.go | 10 ++++ internal/transport/messages.go | 11 ++++ 9 files changed, 282 insertions(+), 21 deletions(-) create mode 100644 internal/daemon/watcher.go diff --git a/internal/alerts/dispatcher.go b/internal/alerts/dispatcher.go index 71e606a..949aff7 100644 --- a/internal/alerts/dispatcher.go +++ b/internal/alerts/dispatcher.go @@ -31,13 +31,13 @@ func (d *Dispatcher) OnTransition(check *config.Check, from, to checks.State, sn return } msg := Render(d.selfID, check, from, to, snap) - for _, alertID := range check.AlertIDs { - alert := d.cluster.FindAlert(alertID) - if alert == nil { - d.logger.Printf("alerts: check %q references unknown alert %q", check.Name, alertID) - continue - } - if err := d.dispatchOne(alert, msg); err != nil { + alerts := d.cluster.EffectiveAlertsFor(check) + if len(alerts) == 0 && len(check.AlertIDs) > 0 { + d.logger.Printf("alerts: check %q references alerts but none resolved", check.Name) + } + for i := range alerts { + alert := alerts[i] + if err := d.dispatchOne(&alert, msg); err != nil { d.logger.Printf("alerts: %q via %s: %v", alert.Name, alert.Type, err) } } diff --git a/internal/cli/alert.go b/internal/cli/alert.go index aba4695..7376cb6 100644 --- a/internal/cli/alert.go +++ b/internal/cli/alert.go @@ -36,9 +36,9 @@ func addAlertCmd(root *cobra.Command) { return err } tw := tabwriter.NewWriter(cmd.OutOrStdout(), 0, 0, 2, ' ', 0) - fmt.Fprintln(tw, "ID\tTYPE\tNAME") + fmt.Fprintln(tw, "ID\tTYPE\tDEFAULT\tNAME") for _, a := range cluster.Alerts { - fmt.Fprintf(tw, "%s\t%s\t%s\n", a.ID, a.Type, a.Name) + fmt.Fprintf(tw, "%s\t%s\t%v\t%s\n", a.ID, a.Type, a.Default, a.Name) } return tw.Flush() }, @@ -80,7 +80,46 @@ func addAlertCmd(root *cobra.Command) { }, } - alert.AddCommand(addParent, listCmd, removeCmd, testCmd) + defaultCmd := &cobra.Command{ + Use: "default ", + Short: "Toggle whether an alert is attached to every check by default", + Args: cobra.ExactArgs(2), + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithTimeout(cmd.Context(), 10*time.Second) + defer cancel() + var on bool + switch args[1] { + case "on", "true", "yes", "1": + on = true + case "off", "false", "no", "0": + on = false + default: + return fmt.Errorf("second arg must be on/off, got %q", args[1]) + } + cluster, err := config.LoadClusterConfig() + if err != nil { + return err + } + existing := cluster.FindAlert(args[0]) + if existing == nil { + return fmt.Errorf("no alert named %q", args[0]) + } + existing.Default = on + payload, _ := json.Marshal(existing) + body := daemon.MutateBody{Kind: transport.MutationAddAlert, Payload: payload} + raw, err := callDaemon(ctx, daemon.CtrlMutate, body) + if err != nil { + return err + } + var res daemon.MutateResult + _ = json.Unmarshal(raw, &res) + fmt.Fprintf(cmd.OutOrStdout(), "alert %s default=%v — cluster version %d\n", + existing.Name, on, res.Version) + return nil + }, + } + + alert.AddCommand(addParent, listCmd, removeCmd, testCmd, defaultCmd) root.AddCommand(alert) } @@ -88,7 +127,7 @@ func buildSMTPAddCmd() *cobra.Command { var host, user, password, from string var port int var to []string - var startTLS bool + var startTLS, makeDefault bool cmd := &cobra.Command{ Use: "smtp ", @@ -101,6 +140,7 @@ func buildSMTPAddCmd() *cobra.Command { ID: uuid.NewString(), Name: args[0], Type: config.AlertSMTP, + Default: makeDefault, SMTPHost: host, SMTPPort: port, SMTPUser: user, @@ -129,6 +169,7 @@ func buildSMTPAddCmd() *cobra.Command { cmd.Flags().StringVar(&from, "from", "", "envelope From address") cmd.Flags().StringSliceVar(&to, "to", nil, "recipient address (repeat or comma-separate)") cmd.Flags().BoolVar(&startTLS, "starttls", true, "negotiate STARTTLS") + cmd.Flags().BoolVar(&makeDefault, "default", false, "attach this alert to every check automatically") _ = cmd.MarkFlagRequired("host") _ = cmd.MarkFlagRequired("from") _ = cmd.MarkFlagRequired("to") @@ -137,6 +178,7 @@ func buildSMTPAddCmd() *cobra.Command { func buildDiscordAddCmd() *cobra.Command { var webhook string + var makeDefault bool cmd := &cobra.Command{ Use: "discord ", Short: "Add a Discord webhook alert", @@ -148,6 +190,7 @@ func buildDiscordAddCmd() *cobra.Command { ID: uuid.NewString(), Name: args[0], Type: config.AlertDiscord, + Default: makeDefault, DiscordWebhook: webhook, } payload, _ := json.Marshal(a) @@ -164,6 +207,7 @@ func buildDiscordAddCmd() *cobra.Command { }, } cmd.Flags().StringVar(&webhook, "webhook", "", "discord webhook URL") + cmd.Flags().BoolVar(&makeDefault, "default", false, "attach this alert to every check automatically") _ = cmd.MarkFlagRequired("webhook") return cmd } diff --git a/internal/cli/status.go b/internal/cli/status.go index 2553d1b..d81568b 100644 --- a/internal/cli/status.go +++ b/internal/cli/status.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "strings" "text/tabwriter" "time" @@ -69,12 +70,24 @@ func runStatusPrint(ctx context.Context, cmd *cobra.Command, peersOnly bool) err fmt.Fprintln(out) fmt.Fprintln(out, "CHECKS") tw2 := tabwriter.NewWriter(out, 0, 0, 2, ' ', 0) - fmt.Fprintln(tw2, "ID\tNAME\tSTATE\tOK/TOTAL\tDETAIL") + fmt.Fprintln(tw2, "ID\tNAME\tSTATE\tOK/TOTAL\tALERTS\tDETAIL") for _, c := range st.Checks { - fmt.Fprintf(tw2, "%s\t%s\t%s\t%d/%d\t%s\n", - c.CheckID, c.Name, c.State, c.OKCount, c.Total, c.Detail) + fmt.Fprintf(tw2, "%s\t%s\t%s\t%d/%d\t%s\t%s\n", + c.CheckID, c.Name, c.State, c.OKCount, c.Total, + alertsCol(c.Alerts), c.Detail) } - return tw2.Flush() + if err := tw2.Flush(); err != nil { + return err + } + fmt.Fprintln(out, "(alerts marked * are attached as defaults)") + return nil +} + +func alertsCol(names []string) string { + if len(names) == 0 { + return "-" + } + return strings.Join(names, ",") } // runStatusPrintChecks renders only the checks block (used by @@ -90,10 +103,11 @@ func runStatusPrintChecks(ctx context.Context, cmd *cobra.Command) error { } out := cmd.OutOrStdout() tw := tabwriter.NewWriter(out, 0, 0, 2, ' ', 0) - fmt.Fprintln(tw, "ID\tNAME\tSTATE\tOK/TOTAL\tDETAIL") + fmt.Fprintln(tw, "ID\tNAME\tSTATE\tOK/TOTAL\tALERTS\tDETAIL") for _, c := range st.Checks { - fmt.Fprintf(tw, "%s\t%s\t%s\t%d/%d\t%s\n", - c.CheckID, c.Name, c.State, c.OKCount, c.Total, c.Detail) + fmt.Fprintf(tw, "%s\t%s\t%s\t%d/%d\t%s\t%s\n", + c.CheckID, c.Name, c.State, c.OKCount, c.Total, + alertsCol(c.Alerts), c.Detail) } return tw.Flush() } diff --git a/internal/config/cluster.go b/internal/config/cluster.go index 8608c15..4893113 100644 --- a/internal/config/cluster.go +++ b/internal/config/cluster.go @@ -1,6 +1,7 @@ package config import ( + "crypto/sha256" "fmt" "os" "sync" @@ -49,6 +50,9 @@ type Check struct { // AlertIDs lists which configured alerts fire when this check // transitions state. AlertIDs []string `yaml:"alert_ids,omitempty"` + + // SuppressAlertIDs lets a check opt out of specific default alerts. + SuppressAlertIDs []string `yaml:"suppress_alert_ids,omitempty"` } // AlertType enumerates supported notifier kinds. @@ -65,6 +69,11 @@ type Alert struct { Name string `yaml:"name"` Type AlertType `yaml:"type"` + // Default attaches this alert to every check automatically, on top + // of any explicit AlertIDs the check lists. A check that wants to + // opt out of a default alert can list it under SuppressAlertIDs. + Default bool `yaml:"default,omitempty"` + // SMTP options. SMTPHost string `yaml:"smtp_host,omitempty"` SMTPPort int `yaml:"smtp_port,omitempty"` @@ -92,6 +101,7 @@ type ClusterConfig struct { mu sync.RWMutex `yaml:"-"` onChange []func() // fired after any successful Mutate/Replace + lastSum [32]byte // sha256 of the bytes most recently written } // OnChange registers a callback fired after every successful Mutate @@ -128,19 +138,43 @@ func LoadClusterConfig() (*ClusterConfig, error) { if err := yaml.Unmarshal(raw, cfg); err != nil { return nil, fmt.Errorf("parse cluster.yaml: %w", err) } + cfg.lastSum = sha256.Sum256(raw) return cfg, nil } // Save writes cluster.yaml atomically. Caller is responsible for // having already taken any external locks. func (c *ClusterConfig) Save() error { - c.mu.RLock() - defer c.mu.RUnlock() + c.mu.Lock() + defer c.mu.Unlock() out, err := yaml.Marshal(c) if err != nil { return err } - return AtomicWrite(ClusterFilePath(), out, 0o600) + if err := AtomicWrite(ClusterFilePath(), out, 0o600); err != nil { + return err + } + c.lastSum = sha256.Sum256(out) + return nil +} + +// LastSavedSum returns the sha256 of the bytes most recently written +// to disk. The manual-edit watcher uses this to distinguish edits +// originating from this daemon (where the on-disk hash matches) from +// edits made externally by the operator. +func (c *ClusterConfig) LastSavedSum() [32]byte { + c.mu.RLock() + defer c.mu.RUnlock() + return c.lastSum +} + +// SetLastSavedSum lets the manual-edit watcher record that it has +// observed (and either applied or rejected) a specific on-disk hash, +// so the same edit isn't reprocessed on every poll. +func (c *ClusterConfig) SetLastSavedSum(sum [32]byte) { + c.mu.Lock() + c.lastSum = sum + c.mu.Unlock() } // Snapshot returns a deep-enough copy of the config that can be @@ -179,6 +213,7 @@ func (c *ClusterConfig) Mutate(byNode string, fn func(*ClusterConfig) error) err c.mu.Unlock() return err } + c.lastSum = sha256.Sum256(out) c.mu.Unlock() c.fireOnChange() return nil @@ -208,11 +243,59 @@ func (c *ClusterConfig) Replace(incoming *ClusterConfig) (bool, error) { c.mu.Unlock() return false, err } + c.lastSum = sha256.Sum256(out) c.mu.Unlock() c.fireOnChange() return true, nil } +// EffectiveAlertsFor returns the alerts that should fire when a check +// transitions: every alert explicitly listed in check.AlertIDs, plus +// every alert flagged Default=true, minus anything the check listed +// under SuppressAlertIDs. Result is de-duplicated by alert ID. +func (c *ClusterConfig) EffectiveAlertsFor(check *Check) []Alert { + c.mu.RLock() + defer c.mu.RUnlock() + if check == nil { + return nil + } + suppress := map[string]struct{}{} + for _, s := range check.SuppressAlertIDs { + suppress[s] = struct{}{} + } + seen := map[string]struct{}{} + var out []Alert + + add := func(a Alert) { + if _, dup := seen[a.ID]; dup { + return + } + if _, off := suppress[a.ID]; off { + return + } + if _, off := suppress[a.Name]; off { + return + } + seen[a.ID] = struct{}{} + out = append(out, a) + } + + for _, want := range check.AlertIDs { + for i := range c.Alerts { + if c.Alerts[i].ID == want || c.Alerts[i].Name == want { + add(c.Alerts[i]) + break + } + } + } + for i := range c.Alerts { + if c.Alerts[i].Default { + add(c.Alerts[i]) + } + } + return out +} + // FindAlert returns the alert with the given ID or name, or nil if // no entry matches. func (c *ClusterConfig) FindAlert(idOrName string) *Alert { diff --git a/internal/daemon/daemon.go b/internal/daemon/daemon.go index e1abe0b..150f41e 100644 --- a/internal/daemon/daemon.go +++ b/internal/daemon/daemon.go @@ -195,6 +195,12 @@ func (d *Daemon) Run(ctx context.Context) error { d.scheduler.Start(ctx) }() + d.wg.Add(1) + go func() { + defer d.wg.Done() + d.watchManualEdits(ctx) + }() + select { case <-ctx.Done(): case err := <-servErr: diff --git a/internal/daemon/handlers.go b/internal/daemon/handlers.go index 112b685..87c1e32 100644 --- a/internal/daemon/handlers.go +++ b/internal/daemon/handlers.go @@ -169,6 +169,7 @@ func (d *Daemon) buildStatus() transport.StatusResponse { }) } for _, c := range snap.Checks { + check := c cs := transport.CheckSnapshot{CheckID: c.ID, Name: c.Name, State: "unknown"} if agg, ok := d.aggregator.SnapshotFor(c.ID); ok { cs.State = string(agg.State) @@ -176,6 +177,13 @@ func (d *Daemon) buildStatus() transport.StatusResponse { cs.Total = agg.Reports cs.Detail = agg.Detail } + for _, a := range d.cluster.EffectiveAlertsFor(&check) { + label := a.Name + if a.Default { + label += "*" + } + cs.Alerts = append(cs.Alerts, label) + } out.Checks = append(out.Checks, cs) } return out diff --git a/internal/daemon/watcher.go b/internal/daemon/watcher.go new file mode 100644 index 0000000..0dd8ab0 --- /dev/null +++ b/internal/daemon/watcher.go @@ -0,0 +1,85 @@ +package daemon + +import ( + "context" + "crypto/sha256" + "os" + "reflect" + "time" + + "gopkg.in/yaml.v3" + + "git.cer.sh/axodouble/quptime/internal/config" + "git.cer.sh/axodouble/quptime/internal/transport" +) + +// manualEditPollInterval is how often the daemon checks cluster.yaml's +// hash against the last value it wrote. Short enough that an operator +// `vim`-ing the file sees their change applied within a few seconds. +const manualEditPollInterval = 2 * time.Second + +// watchManualEdits polls cluster.yaml. When the on-disk content +// diverges from what the daemon last wrote, the file is parsed and +// pushed through the master as a MutationReplaceConfig — so a +// hand-edit on any node ends up replicated everywhere. +// +// The poll uses sha256 of the file contents rather than mtime so we +// don't race against `os.Rename` from our own AtomicWrite or against +// editors that touch mtime without changing content. +func (d *Daemon) watchManualEdits(ctx context.Context) { + t := time.NewTicker(manualEditPollInterval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + d.checkManualEdit(ctx) + } + } +} + +func (d *Daemon) checkManualEdit(ctx context.Context) { + raw, err := os.ReadFile(config.ClusterFilePath()) + if err != nil { + // A missing file during early boot or temp-file races is fine; + // the next tick will re-read it. + return + } + sum := sha256.Sum256(raw) + if sum == d.cluster.LastSavedSum() { + return + } + + var edited config.ClusterConfig + if err := yaml.Unmarshal(raw, &edited); err != nil { + d.logger.Printf("manual-edit: parse cluster.yaml: %v — ignoring", err) + // Pin the hash so we don't loop on a broken file. The operator + // must save a valid YAML for the next attempt. + d.cluster.SetLastSavedSum(sum) + return + } + + current := d.cluster.Snapshot() + if reflect.DeepEqual(current.Peers, edited.Peers) && + reflect.DeepEqual(current.Checks, edited.Checks) && + reflect.DeepEqual(current.Alerts, edited.Alerts) { + // Only cosmetic (whitespace/comments) — accept it. + d.cluster.SetLastSavedSum(sum) + return + } + + d.logger.Printf("manual-edit: cluster.yaml changed externally — replicating via master") + d.cluster.SetLastSavedSum(sum) + + callCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + payload := &config.ClusterConfig{ + Peers: edited.Peers, + Checks: edited.Checks, + Alerts: edited.Alerts, + } + if _, err := d.replicator.LocalMutate(callCtx, transport.MutationReplaceConfig, payload); err != nil { + d.logger.Printf("manual-edit: forward to master: %v", err) + } +} diff --git a/internal/replicate/replicator.go b/internal/replicate/replicator.go index 2048025..e6b39a2 100644 --- a/internal/replicate/replicator.go +++ b/internal/replicate/replicator.go @@ -275,6 +275,16 @@ func (r *Replicator) applyLocally(kind transport.MutationKind, payload json.RawM c.Peers = append(c.Peers, p) return nil + case transport.MutationReplaceConfig: + var incoming config.ClusterConfig + if err := json.Unmarshal(payload, &incoming); err != nil { + return fmt.Errorf("decode replace_config: %w", err) + } + c.Peers = append([]config.PeerInfo(nil), incoming.Peers...) + c.Checks = append([]config.Check(nil), incoming.Checks...) + c.Alerts = append([]config.Alert(nil), incoming.Alerts...) + return nil + case transport.MutationRemovePeer: var target string if err := json.Unmarshal(payload, &target); err != nil { diff --git a/internal/transport/messages.go b/internal/transport/messages.go index 710fe51..1ed24a3 100644 --- a/internal/transport/messages.go +++ b/internal/transport/messages.go @@ -136,6 +136,13 @@ const ( MutationRemoveAlert MutationKind = "remove_alert" MutationAddPeer MutationKind = "add_peer" MutationRemovePeer MutationKind = "remove_peer" + // MutationReplaceConfig overwrites the editable portions + // (peers/checks/alerts) of cluster.yaml in one shot. The replicated + // version, updated_at, and updated_by are still set by the master. + // Used by the manual-edit watcher: an operator edits cluster.yaml + // directly, the daemon detects it, and forwards the parsed snapshot + // to the master through this mutation. + MutationReplaceConfig MutationKind = "replace_config" ) // ProposeMutationRequest is a follower-to-master message. The payload @@ -199,4 +206,8 @@ type CheckSnapshot struct { OKCount int `json:"ok_count"` Total int `json:"total"` Detail string `json:"detail,omitempty"` + // Alerts holds one display string per effective alert. Names of + // default-attached alerts are suffixed with "*" so the operator can + // see which fired without lookup. + Alerts []string `json:"alerts,omitempty"` }