Added default alerts, updated UI to show alerts with checks, added auto sync for manual file editing
This commit is contained in:
@@ -31,13 +31,13 @@ func (d *Dispatcher) OnTransition(check *config.Check, from, to checks.State, sn
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
msg := Render(d.selfID, check, from, to, snap)
|
msg := Render(d.selfID, check, from, to, snap)
|
||||||
for _, alertID := range check.AlertIDs {
|
alerts := d.cluster.EffectiveAlertsFor(check)
|
||||||
alert := d.cluster.FindAlert(alertID)
|
if len(alerts) == 0 && len(check.AlertIDs) > 0 {
|
||||||
if alert == nil {
|
d.logger.Printf("alerts: check %q references alerts but none resolved", check.Name)
|
||||||
d.logger.Printf("alerts: check %q references unknown alert %q", check.Name, alertID)
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
if err := d.dispatchOne(alert, msg); err != nil {
|
for i := range alerts {
|
||||||
|
alert := alerts[i]
|
||||||
|
if err := d.dispatchOne(&alert, msg); err != nil {
|
||||||
d.logger.Printf("alerts: %q via %s: %v", alert.Name, alert.Type, err)
|
d.logger.Printf("alerts: %q via %s: %v", alert.Name, alert.Type, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
+48
-4
@@ -36,9 +36,9 @@ func addAlertCmd(root *cobra.Command) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
tw := tabwriter.NewWriter(cmd.OutOrStdout(), 0, 0, 2, ' ', 0)
|
tw := tabwriter.NewWriter(cmd.OutOrStdout(), 0, 0, 2, ' ', 0)
|
||||||
fmt.Fprintln(tw, "ID\tTYPE\tNAME")
|
fmt.Fprintln(tw, "ID\tTYPE\tDEFAULT\tNAME")
|
||||||
for _, a := range cluster.Alerts {
|
for _, a := range cluster.Alerts {
|
||||||
fmt.Fprintf(tw, "%s\t%s\t%s\n", a.ID, a.Type, a.Name)
|
fmt.Fprintf(tw, "%s\t%s\t%v\t%s\n", a.ID, a.Type, a.Default, a.Name)
|
||||||
}
|
}
|
||||||
return tw.Flush()
|
return tw.Flush()
|
||||||
},
|
},
|
||||||
@@ -80,7 +80,46 @@ func addAlertCmd(root *cobra.Command) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
alert.AddCommand(addParent, listCmd, removeCmd, testCmd)
|
defaultCmd := &cobra.Command{
|
||||||
|
Use: "default <id-or-name> <on|off>",
|
||||||
|
Short: "Toggle whether an alert is attached to every check by default",
|
||||||
|
Args: cobra.ExactArgs(2),
|
||||||
|
RunE: func(cmd *cobra.Command, args []string) error {
|
||||||
|
ctx, cancel := context.WithTimeout(cmd.Context(), 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
var on bool
|
||||||
|
switch args[1] {
|
||||||
|
case "on", "true", "yes", "1":
|
||||||
|
on = true
|
||||||
|
case "off", "false", "no", "0":
|
||||||
|
on = false
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("second arg must be on/off, got %q", args[1])
|
||||||
|
}
|
||||||
|
cluster, err := config.LoadClusterConfig()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
existing := cluster.FindAlert(args[0])
|
||||||
|
if existing == nil {
|
||||||
|
return fmt.Errorf("no alert named %q", args[0])
|
||||||
|
}
|
||||||
|
existing.Default = on
|
||||||
|
payload, _ := json.Marshal(existing)
|
||||||
|
body := daemon.MutateBody{Kind: transport.MutationAddAlert, Payload: payload}
|
||||||
|
raw, err := callDaemon(ctx, daemon.CtrlMutate, body)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
var res daemon.MutateResult
|
||||||
|
_ = json.Unmarshal(raw, &res)
|
||||||
|
fmt.Fprintf(cmd.OutOrStdout(), "alert %s default=%v — cluster version %d\n",
|
||||||
|
existing.Name, on, res.Version)
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
alert.AddCommand(addParent, listCmd, removeCmd, testCmd, defaultCmd)
|
||||||
root.AddCommand(alert)
|
root.AddCommand(alert)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -88,7 +127,7 @@ func buildSMTPAddCmd() *cobra.Command {
|
|||||||
var host, user, password, from string
|
var host, user, password, from string
|
||||||
var port int
|
var port int
|
||||||
var to []string
|
var to []string
|
||||||
var startTLS bool
|
var startTLS, makeDefault bool
|
||||||
|
|
||||||
cmd := &cobra.Command{
|
cmd := &cobra.Command{
|
||||||
Use: "smtp <name>",
|
Use: "smtp <name>",
|
||||||
@@ -101,6 +140,7 @@ func buildSMTPAddCmd() *cobra.Command {
|
|||||||
ID: uuid.NewString(),
|
ID: uuid.NewString(),
|
||||||
Name: args[0],
|
Name: args[0],
|
||||||
Type: config.AlertSMTP,
|
Type: config.AlertSMTP,
|
||||||
|
Default: makeDefault,
|
||||||
SMTPHost: host,
|
SMTPHost: host,
|
||||||
SMTPPort: port,
|
SMTPPort: port,
|
||||||
SMTPUser: user,
|
SMTPUser: user,
|
||||||
@@ -129,6 +169,7 @@ func buildSMTPAddCmd() *cobra.Command {
|
|||||||
cmd.Flags().StringVar(&from, "from", "", "envelope From address")
|
cmd.Flags().StringVar(&from, "from", "", "envelope From address")
|
||||||
cmd.Flags().StringSliceVar(&to, "to", nil, "recipient address (repeat or comma-separate)")
|
cmd.Flags().StringSliceVar(&to, "to", nil, "recipient address (repeat or comma-separate)")
|
||||||
cmd.Flags().BoolVar(&startTLS, "starttls", true, "negotiate STARTTLS")
|
cmd.Flags().BoolVar(&startTLS, "starttls", true, "negotiate STARTTLS")
|
||||||
|
cmd.Flags().BoolVar(&makeDefault, "default", false, "attach this alert to every check automatically")
|
||||||
_ = cmd.MarkFlagRequired("host")
|
_ = cmd.MarkFlagRequired("host")
|
||||||
_ = cmd.MarkFlagRequired("from")
|
_ = cmd.MarkFlagRequired("from")
|
||||||
_ = cmd.MarkFlagRequired("to")
|
_ = cmd.MarkFlagRequired("to")
|
||||||
@@ -137,6 +178,7 @@ func buildSMTPAddCmd() *cobra.Command {
|
|||||||
|
|
||||||
func buildDiscordAddCmd() *cobra.Command {
|
func buildDiscordAddCmd() *cobra.Command {
|
||||||
var webhook string
|
var webhook string
|
||||||
|
var makeDefault bool
|
||||||
cmd := &cobra.Command{
|
cmd := &cobra.Command{
|
||||||
Use: "discord <name>",
|
Use: "discord <name>",
|
||||||
Short: "Add a Discord webhook alert",
|
Short: "Add a Discord webhook alert",
|
||||||
@@ -148,6 +190,7 @@ func buildDiscordAddCmd() *cobra.Command {
|
|||||||
ID: uuid.NewString(),
|
ID: uuid.NewString(),
|
||||||
Name: args[0],
|
Name: args[0],
|
||||||
Type: config.AlertDiscord,
|
Type: config.AlertDiscord,
|
||||||
|
Default: makeDefault,
|
||||||
DiscordWebhook: webhook,
|
DiscordWebhook: webhook,
|
||||||
}
|
}
|
||||||
payload, _ := json.Marshal(a)
|
payload, _ := json.Marshal(a)
|
||||||
@@ -164,6 +207,7 @@ func buildDiscordAddCmd() *cobra.Command {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
cmd.Flags().StringVar(&webhook, "webhook", "", "discord webhook URL")
|
cmd.Flags().StringVar(&webhook, "webhook", "", "discord webhook URL")
|
||||||
|
cmd.Flags().BoolVar(&makeDefault, "default", false, "attach this alert to every check automatically")
|
||||||
_ = cmd.MarkFlagRequired("webhook")
|
_ = cmd.MarkFlagRequired("webhook")
|
||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
|||||||
+21
-7
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
"text/tabwriter"
|
"text/tabwriter"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -69,12 +70,24 @@ func runStatusPrint(ctx context.Context, cmd *cobra.Command, peersOnly bool) err
|
|||||||
fmt.Fprintln(out)
|
fmt.Fprintln(out)
|
||||||
fmt.Fprintln(out, "CHECKS")
|
fmt.Fprintln(out, "CHECKS")
|
||||||
tw2 := tabwriter.NewWriter(out, 0, 0, 2, ' ', 0)
|
tw2 := tabwriter.NewWriter(out, 0, 0, 2, ' ', 0)
|
||||||
fmt.Fprintln(tw2, "ID\tNAME\tSTATE\tOK/TOTAL\tDETAIL")
|
fmt.Fprintln(tw2, "ID\tNAME\tSTATE\tOK/TOTAL\tALERTS\tDETAIL")
|
||||||
for _, c := range st.Checks {
|
for _, c := range st.Checks {
|
||||||
fmt.Fprintf(tw2, "%s\t%s\t%s\t%d/%d\t%s\n",
|
fmt.Fprintf(tw2, "%s\t%s\t%s\t%d/%d\t%s\t%s\n",
|
||||||
c.CheckID, c.Name, c.State, c.OKCount, c.Total, c.Detail)
|
c.CheckID, c.Name, c.State, c.OKCount, c.Total,
|
||||||
|
alertsCol(c.Alerts), c.Detail)
|
||||||
}
|
}
|
||||||
return tw2.Flush()
|
if err := tw2.Flush(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
fmt.Fprintln(out, "(alerts marked * are attached as defaults)")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func alertsCol(names []string) string {
|
||||||
|
if len(names) == 0 {
|
||||||
|
return "-"
|
||||||
|
}
|
||||||
|
return strings.Join(names, ",")
|
||||||
}
|
}
|
||||||
|
|
||||||
// runStatusPrintChecks renders only the checks block (used by
|
// runStatusPrintChecks renders only the checks block (used by
|
||||||
@@ -90,10 +103,11 @@ func runStatusPrintChecks(ctx context.Context, cmd *cobra.Command) error {
|
|||||||
}
|
}
|
||||||
out := cmd.OutOrStdout()
|
out := cmd.OutOrStdout()
|
||||||
tw := tabwriter.NewWriter(out, 0, 0, 2, ' ', 0)
|
tw := tabwriter.NewWriter(out, 0, 0, 2, ' ', 0)
|
||||||
fmt.Fprintln(tw, "ID\tNAME\tSTATE\tOK/TOTAL\tDETAIL")
|
fmt.Fprintln(tw, "ID\tNAME\tSTATE\tOK/TOTAL\tALERTS\tDETAIL")
|
||||||
for _, c := range st.Checks {
|
for _, c := range st.Checks {
|
||||||
fmt.Fprintf(tw, "%s\t%s\t%s\t%d/%d\t%s\n",
|
fmt.Fprintf(tw, "%s\t%s\t%s\t%d/%d\t%s\t%s\n",
|
||||||
c.CheckID, c.Name, c.State, c.OKCount, c.Total, c.Detail)
|
c.CheckID, c.Name, c.State, c.OKCount, c.Total,
|
||||||
|
alertsCol(c.Alerts), c.Detail)
|
||||||
}
|
}
|
||||||
return tw.Flush()
|
return tw.Flush()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package config
|
package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/sha256"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -49,6 +50,9 @@ type Check struct {
|
|||||||
// AlertIDs lists which configured alerts fire when this check
|
// AlertIDs lists which configured alerts fire when this check
|
||||||
// transitions state.
|
// transitions state.
|
||||||
AlertIDs []string `yaml:"alert_ids,omitempty"`
|
AlertIDs []string `yaml:"alert_ids,omitempty"`
|
||||||
|
|
||||||
|
// SuppressAlertIDs lets a check opt out of specific default alerts.
|
||||||
|
SuppressAlertIDs []string `yaml:"suppress_alert_ids,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// AlertType enumerates supported notifier kinds.
|
// AlertType enumerates supported notifier kinds.
|
||||||
@@ -65,6 +69,11 @@ type Alert struct {
|
|||||||
Name string `yaml:"name"`
|
Name string `yaml:"name"`
|
||||||
Type AlertType `yaml:"type"`
|
Type AlertType `yaml:"type"`
|
||||||
|
|
||||||
|
// Default attaches this alert to every check automatically, on top
|
||||||
|
// of any explicit AlertIDs the check lists. A check that wants to
|
||||||
|
// opt out of a default alert can list it under SuppressAlertIDs.
|
||||||
|
Default bool `yaml:"default,omitempty"`
|
||||||
|
|
||||||
// SMTP options.
|
// SMTP options.
|
||||||
SMTPHost string `yaml:"smtp_host,omitempty"`
|
SMTPHost string `yaml:"smtp_host,omitempty"`
|
||||||
SMTPPort int `yaml:"smtp_port,omitempty"`
|
SMTPPort int `yaml:"smtp_port,omitempty"`
|
||||||
@@ -92,6 +101,7 @@ type ClusterConfig struct {
|
|||||||
|
|
||||||
mu sync.RWMutex `yaml:"-"`
|
mu sync.RWMutex `yaml:"-"`
|
||||||
onChange []func() // fired after any successful Mutate/Replace
|
onChange []func() // fired after any successful Mutate/Replace
|
||||||
|
lastSum [32]byte // sha256 of the bytes most recently written
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnChange registers a callback fired after every successful Mutate
|
// OnChange registers a callback fired after every successful Mutate
|
||||||
@@ -128,19 +138,43 @@ func LoadClusterConfig() (*ClusterConfig, error) {
|
|||||||
if err := yaml.Unmarshal(raw, cfg); err != nil {
|
if err := yaml.Unmarshal(raw, cfg); err != nil {
|
||||||
return nil, fmt.Errorf("parse cluster.yaml: %w", err)
|
return nil, fmt.Errorf("parse cluster.yaml: %w", err)
|
||||||
}
|
}
|
||||||
|
cfg.lastSum = sha256.Sum256(raw)
|
||||||
return cfg, nil
|
return cfg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save writes cluster.yaml atomically. Caller is responsible for
|
// Save writes cluster.yaml atomically. Caller is responsible for
|
||||||
// having already taken any external locks.
|
// having already taken any external locks.
|
||||||
func (c *ClusterConfig) Save() error {
|
func (c *ClusterConfig) Save() error {
|
||||||
c.mu.RLock()
|
c.mu.Lock()
|
||||||
defer c.mu.RUnlock()
|
defer c.mu.Unlock()
|
||||||
out, err := yaml.Marshal(c)
|
out, err := yaml.Marshal(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return AtomicWrite(ClusterFilePath(), out, 0o600)
|
if err := AtomicWrite(ClusterFilePath(), out, 0o600); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
c.lastSum = sha256.Sum256(out)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// LastSavedSum returns the sha256 of the bytes most recently written
|
||||||
|
// to disk. The manual-edit watcher uses this to distinguish edits
|
||||||
|
// originating from this daemon (where the on-disk hash matches) from
|
||||||
|
// edits made externally by the operator.
|
||||||
|
func (c *ClusterConfig) LastSavedSum() [32]byte {
|
||||||
|
c.mu.RLock()
|
||||||
|
defer c.mu.RUnlock()
|
||||||
|
return c.lastSum
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetLastSavedSum lets the manual-edit watcher record that it has
|
||||||
|
// observed (and either applied or rejected) a specific on-disk hash,
|
||||||
|
// so the same edit isn't reprocessed on every poll.
|
||||||
|
func (c *ClusterConfig) SetLastSavedSum(sum [32]byte) {
|
||||||
|
c.mu.Lock()
|
||||||
|
c.lastSum = sum
|
||||||
|
c.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Snapshot returns a deep-enough copy of the config that can be
|
// Snapshot returns a deep-enough copy of the config that can be
|
||||||
@@ -179,6 +213,7 @@ func (c *ClusterConfig) Mutate(byNode string, fn func(*ClusterConfig) error) err
|
|||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
c.lastSum = sha256.Sum256(out)
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
c.fireOnChange()
|
c.fireOnChange()
|
||||||
return nil
|
return nil
|
||||||
@@ -208,11 +243,59 @@ func (c *ClusterConfig) Replace(incoming *ClusterConfig) (bool, error) {
|
|||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
c.lastSum = sha256.Sum256(out)
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
c.fireOnChange()
|
c.fireOnChange()
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EffectiveAlertsFor returns the alerts that should fire when a check
|
||||||
|
// transitions: every alert explicitly listed in check.AlertIDs, plus
|
||||||
|
// every alert flagged Default=true, minus anything the check listed
|
||||||
|
// under SuppressAlertIDs. Result is de-duplicated by alert ID.
|
||||||
|
func (c *ClusterConfig) EffectiveAlertsFor(check *Check) []Alert {
|
||||||
|
c.mu.RLock()
|
||||||
|
defer c.mu.RUnlock()
|
||||||
|
if check == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
suppress := map[string]struct{}{}
|
||||||
|
for _, s := range check.SuppressAlertIDs {
|
||||||
|
suppress[s] = struct{}{}
|
||||||
|
}
|
||||||
|
seen := map[string]struct{}{}
|
||||||
|
var out []Alert
|
||||||
|
|
||||||
|
add := func(a Alert) {
|
||||||
|
if _, dup := seen[a.ID]; dup {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if _, off := suppress[a.ID]; off {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if _, off := suppress[a.Name]; off {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
seen[a.ID] = struct{}{}
|
||||||
|
out = append(out, a)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, want := range check.AlertIDs {
|
||||||
|
for i := range c.Alerts {
|
||||||
|
if c.Alerts[i].ID == want || c.Alerts[i].Name == want {
|
||||||
|
add(c.Alerts[i])
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for i := range c.Alerts {
|
||||||
|
if c.Alerts[i].Default {
|
||||||
|
add(c.Alerts[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
// FindAlert returns the alert with the given ID or name, or nil if
|
// FindAlert returns the alert with the given ID or name, or nil if
|
||||||
// no entry matches.
|
// no entry matches.
|
||||||
func (c *ClusterConfig) FindAlert(idOrName string) *Alert {
|
func (c *ClusterConfig) FindAlert(idOrName string) *Alert {
|
||||||
|
|||||||
@@ -195,6 +195,12 @@ func (d *Daemon) Run(ctx context.Context) error {
|
|||||||
d.scheduler.Start(ctx)
|
d.scheduler.Start(ctx)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
d.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer d.wg.Done()
|
||||||
|
d.watchManualEdits(ctx)
|
||||||
|
}()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
case err := <-servErr:
|
case err := <-servErr:
|
||||||
|
|||||||
@@ -169,6 +169,7 @@ func (d *Daemon) buildStatus() transport.StatusResponse {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
for _, c := range snap.Checks {
|
for _, c := range snap.Checks {
|
||||||
|
check := c
|
||||||
cs := transport.CheckSnapshot{CheckID: c.ID, Name: c.Name, State: "unknown"}
|
cs := transport.CheckSnapshot{CheckID: c.ID, Name: c.Name, State: "unknown"}
|
||||||
if agg, ok := d.aggregator.SnapshotFor(c.ID); ok {
|
if agg, ok := d.aggregator.SnapshotFor(c.ID); ok {
|
||||||
cs.State = string(agg.State)
|
cs.State = string(agg.State)
|
||||||
@@ -176,6 +177,13 @@ func (d *Daemon) buildStatus() transport.StatusResponse {
|
|||||||
cs.Total = agg.Reports
|
cs.Total = agg.Reports
|
||||||
cs.Detail = agg.Detail
|
cs.Detail = agg.Detail
|
||||||
}
|
}
|
||||||
|
for _, a := range d.cluster.EffectiveAlertsFor(&check) {
|
||||||
|
label := a.Name
|
||||||
|
if a.Default {
|
||||||
|
label += "*"
|
||||||
|
}
|
||||||
|
cs.Alerts = append(cs.Alerts, label)
|
||||||
|
}
|
||||||
out.Checks = append(out.Checks, cs)
|
out.Checks = append(out.Checks, cs)
|
||||||
}
|
}
|
||||||
return out
|
return out
|
||||||
|
|||||||
@@ -0,0 +1,85 @@
|
|||||||
|
package daemon
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/sha256"
|
||||||
|
"os"
|
||||||
|
"reflect"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gopkg.in/yaml.v3"
|
||||||
|
|
||||||
|
"git.cer.sh/axodouble/quptime/internal/config"
|
||||||
|
"git.cer.sh/axodouble/quptime/internal/transport"
|
||||||
|
)
|
||||||
|
|
||||||
|
// manualEditPollInterval is how often the daemon checks cluster.yaml's
|
||||||
|
// hash against the last value it wrote. Short enough that an operator
|
||||||
|
// `vim`-ing the file sees their change applied within a few seconds.
|
||||||
|
const manualEditPollInterval = 2 * time.Second
|
||||||
|
|
||||||
|
// watchManualEdits polls cluster.yaml. When the on-disk content
|
||||||
|
// diverges from what the daemon last wrote, the file is parsed and
|
||||||
|
// pushed through the master as a MutationReplaceConfig — so a
|
||||||
|
// hand-edit on any node ends up replicated everywhere.
|
||||||
|
//
|
||||||
|
// The poll uses sha256 of the file contents rather than mtime so we
|
||||||
|
// don't race against `os.Rename` from our own AtomicWrite or against
|
||||||
|
// editors that touch mtime without changing content.
|
||||||
|
func (d *Daemon) watchManualEdits(ctx context.Context) {
|
||||||
|
t := time.NewTicker(manualEditPollInterval)
|
||||||
|
defer t.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-t.C:
|
||||||
|
d.checkManualEdit(ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Daemon) checkManualEdit(ctx context.Context) {
|
||||||
|
raw, err := os.ReadFile(config.ClusterFilePath())
|
||||||
|
if err != nil {
|
||||||
|
// A missing file during early boot or temp-file races is fine;
|
||||||
|
// the next tick will re-read it.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sum := sha256.Sum256(raw)
|
||||||
|
if sum == d.cluster.LastSavedSum() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var edited config.ClusterConfig
|
||||||
|
if err := yaml.Unmarshal(raw, &edited); err != nil {
|
||||||
|
d.logger.Printf("manual-edit: parse cluster.yaml: %v — ignoring", err)
|
||||||
|
// Pin the hash so we don't loop on a broken file. The operator
|
||||||
|
// must save a valid YAML for the next attempt.
|
||||||
|
d.cluster.SetLastSavedSum(sum)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
current := d.cluster.Snapshot()
|
||||||
|
if reflect.DeepEqual(current.Peers, edited.Peers) &&
|
||||||
|
reflect.DeepEqual(current.Checks, edited.Checks) &&
|
||||||
|
reflect.DeepEqual(current.Alerts, edited.Alerts) {
|
||||||
|
// Only cosmetic (whitespace/comments) — accept it.
|
||||||
|
d.cluster.SetLastSavedSum(sum)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
d.logger.Printf("manual-edit: cluster.yaml changed externally — replicating via master")
|
||||||
|
d.cluster.SetLastSavedSum(sum)
|
||||||
|
|
||||||
|
callCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
payload := &config.ClusterConfig{
|
||||||
|
Peers: edited.Peers,
|
||||||
|
Checks: edited.Checks,
|
||||||
|
Alerts: edited.Alerts,
|
||||||
|
}
|
||||||
|
if _, err := d.replicator.LocalMutate(callCtx, transport.MutationReplaceConfig, payload); err != nil {
|
||||||
|
d.logger.Printf("manual-edit: forward to master: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -275,6 +275,16 @@ func (r *Replicator) applyLocally(kind transport.MutationKind, payload json.RawM
|
|||||||
c.Peers = append(c.Peers, p)
|
c.Peers = append(c.Peers, p)
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
|
case transport.MutationReplaceConfig:
|
||||||
|
var incoming config.ClusterConfig
|
||||||
|
if err := json.Unmarshal(payload, &incoming); err != nil {
|
||||||
|
return fmt.Errorf("decode replace_config: %w", err)
|
||||||
|
}
|
||||||
|
c.Peers = append([]config.PeerInfo(nil), incoming.Peers...)
|
||||||
|
c.Checks = append([]config.Check(nil), incoming.Checks...)
|
||||||
|
c.Alerts = append([]config.Alert(nil), incoming.Alerts...)
|
||||||
|
return nil
|
||||||
|
|
||||||
case transport.MutationRemovePeer:
|
case transport.MutationRemovePeer:
|
||||||
var target string
|
var target string
|
||||||
if err := json.Unmarshal(payload, &target); err != nil {
|
if err := json.Unmarshal(payload, &target); err != nil {
|
||||||
|
|||||||
@@ -136,6 +136,13 @@ const (
|
|||||||
MutationRemoveAlert MutationKind = "remove_alert"
|
MutationRemoveAlert MutationKind = "remove_alert"
|
||||||
MutationAddPeer MutationKind = "add_peer"
|
MutationAddPeer MutationKind = "add_peer"
|
||||||
MutationRemovePeer MutationKind = "remove_peer"
|
MutationRemovePeer MutationKind = "remove_peer"
|
||||||
|
// MutationReplaceConfig overwrites the editable portions
|
||||||
|
// (peers/checks/alerts) of cluster.yaml in one shot. The replicated
|
||||||
|
// version, updated_at, and updated_by are still set by the master.
|
||||||
|
// Used by the manual-edit watcher: an operator edits cluster.yaml
|
||||||
|
// directly, the daemon detects it, and forwards the parsed snapshot
|
||||||
|
// to the master through this mutation.
|
||||||
|
MutationReplaceConfig MutationKind = "replace_config"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ProposeMutationRequest is a follower-to-master message. The payload
|
// ProposeMutationRequest is a follower-to-master message. The payload
|
||||||
@@ -199,4 +206,8 @@ type CheckSnapshot struct {
|
|||||||
OKCount int `json:"ok_count"`
|
OKCount int `json:"ok_count"`
|
||||||
Total int `json:"total"`
|
Total int `json:"total"`
|
||||||
Detail string `json:"detail,omitempty"`
|
Detail string `json:"detail,omitempty"`
|
||||||
|
// Alerts holds one display string per effective alert. Names of
|
||||||
|
// default-attached alerts are suffixed with "*" so the operator can
|
||||||
|
// see which fired without lookup.
|
||||||
|
Alerts []string `json:"alerts,omitempty"`
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user